copycat

package module
v0.0.0-...-5b1c8cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2018 License: Apache-2.0 Imports: 28 Imported by: 1

README

Welcome to CopyCat

CopyCat is a library providing long-range (cross data center) data transport with ordering and exactly-once guarantees. On top of that CopyCat tries to make an effort to keep your data highly available by creating replicas.

The basic promise of CopyCat is that users are able to implement arbitrary data structures and CopyCat copies them consistently across multiple other nodes in order for them to be highly available. For that purpose CopyCat heavily relies on the excellent raft consensus implementation of etcd. The foundation of CoreOS' work is the paper outlining the raft protocol as an understandable consensus algorithm. CopyCat does not require a central repository for meta data (such as Zookeeper) instead it is trading its meta data via the gossip-like SWIM protocol.

What is CopyCat not?

  • CopyCat is not a data structure server (like Redis). CopyCat is not a service at all! It's just a library. The responsibility to serve up data in a particular format lies with the consumer of CopyCat. It would actually be fairly easy to build a data structure server on top of CopyCat. Further down in this readme we will do exactly that.
  • CopyCat does not offer consistency between its nodes.

Sample Code

In this section we will explain how to use CopyCat by walking through an example. In smaller pieces we will build a distributed hashmap on top of CopyCat.

First we need to start a CopyCat node by creating the CopyCat config object and passing the config to CopyCat.

ccc := copycat.DefaultConfig()
// Sets advertised host name within the CopyCat cluster.
// This hosts need to reachable via this host name.
ccc.Hostname = "127.0.0.1"
ccc.CopyCatPort = 7458
ccc.GossipPort = 7459
ccc.CopyCatDataDir = "./copy-cat-db/"
// Starting CopyCat with no peers to contact (nil or empty slice)
// will cause this node to start a new cluster
// If you want this node to connect to an existing node,
// you need to provide the gossip port of at least one other node in the cluster.
// This would look like so:
// ccc.PeersToContact = "127.0.0.1:17459 127.0.0.1:27459"
// Notice the space-separated list of peers to contact!!
ccc.PeersToContact = nil
err = os.MkdirAll(ccc.CopyCatDataDir, os.ModePerm)
if err != nil {
  log.Panicf("Can't create CopyCat directory: %s", err)
}

cc, err := copycat.NewCopyCat(ccc)
if err != nil {
  log.Panicf("Can't start CopyCat: %s", err.Error())
}

After CopyCat is started the instance needs to be used to connect to all data structures that want to be managed by CopyCat.

Subscribing to a data structure managed by CopyCat works like this:

type theMap struct {
	data             map[string]string
	mutex            *sync.RWMutex
	// used to propose changes to the data structure
	proposeCh        chan<- []byte
	// if proposed changes are committed, they will be replayed to you via this channel
	commitCh         <-chan []byte
	// if proposed changes cause an error, it will be replayed via this channel
	errorCh          <-chan error
	snapshotConsumer copycat.SnapshotConsumer
}

func newMap(cc copycat.CopyCat) (*theMap, error) {
	var err error
	m := &theMap{
		data:  make(map[string]string),
		mutex: &sync.RWMutex{},
	}

	m.proposeCh, m.commitCh, m.errorCh, m.snapshotConsumer, err = cc.SubscribeToDataStructureWithStringID("01CFR5R4JJE9KNRAJFFEAQ2SAJ", m.snapshotProvider)
	if err != nil {
		return nil, err
	}

	go m.serveChannels()
	return m, nil
}

Subscribing to a data structure returns a few channels and includes passing of function pointers. Let's take a look at them one by one (don't worry, we will look at code later too):

  • Propose Channel: The propose channel is a write-only channel consuming arbitrary byte slices. These byte slices can contain pretty much anything. CopyCat will (under the covers) take this byte slice and forward it to all other rafts in the respective raft group and make sure there is consensus about the existence of these byte slices and the order at which they were consumed. Once all rafts in the raft group agree, a change is considered committed. This channel emits a nil value instead of a byte slice in case CopyCat received a new snapshot that is supposed to loaded.
  • Commit Channel: The commit channel is a read-only channel conveying all committed byte slices in the order they were sent to the propose channel. These byte slices are relayed in the exact same order as they were received (and committed) by the propose channel. In addition CopyCat guarantees that each byte slice is relayed to this channel exactly once.
  • Error Channel: Errors that occur while trying to commit change proposals, this read-only channel will relay them to you.
  • Snapshot Provider Function: At regular intervals CopyCat will ask you to provide a consistent snapshot of your data structure. It indicates that to you by calling this function. This function is supposed to serialize your data structure into a byte slice and return it (back to CopyCat).
  • Snapshot Consumer Function: You will receive this function pointer from CopyCat. When the commit channel emits a nil-value, CopyCat has a new snapshot ready for you to load. This is done by calling this function. Essentially this function will provide you the same byte slice you created in the snapshot provider. You're supposed to deserialize this byte slice and set your data structure to comply to the state found in this byte slice.

If all of this sounds complicated, don't be worried. We will look at what to do with these channels and functions in our example.

Put and get method implementations for our hashmap would look like this:

func (m *theMap) get(key string) string {
	// nothing fancy here
	m.mutex.RLock()
	defer m.mutex.RUnlock()
	return m.data[key]
}

func (m *theMap) put(key, value string) {
	// take key and value and create an operation
	item := &pb.Operation{
		Key:   key,
		Value: value,
	}

	// marshal operation into byte slice
	bites, err := item.Marshal()
	if err != nil {
		log.Errorf("Can't marshal op: %s", err.Error())
	}

	// send byte slice to propose channel
	m.proposeCh <- bites
}

How to serve the channels we received from CopyCat.

func (m *theMap) serveChannels() {
	for {
		select {
		case data, ok := <-m.commitCh:
			if !ok {
				return
			}

			// the committed entry is nil, that means we need to reload the
			// data structure from a consistent snapshot
			if data == nil {
				// retrieve the byte slice containing the snapshot from CopyCat
				bites, err := m.snapshotConsumer()
				if err != nil {
					log.Errorf("Error getting snapshot: %s", err.Error())
				}

				// deserialize the byte slice
				tcData := &pb.ReplicatMap{}
				err = proto.Unmarshal(bites, tcData)
				if err != nil {
					log.Errorf("Error unmarshaling snapshot: %s", err.Error())
				}

				// set internal structure to reflect the new state
				m.mutex.Lock()
				m.data = tcData.M
				m.mutex.Unlock()
			} else {
				// data is not nil and therefore contains a valid byte slice
				// in this case an operation
				// unmarshal byte slice into operation
				newOp := &pb.Operation{}
				err := newOp.Unmarshal(data)
				if err != nil {
					log.Errorf("Can't unmarshal operation: %s", err.Error())
				}

				// apply operation
				m.mutex.Lock()
				if newOp.Value == "" {
					// empty value means "delete"
					delete(m.data, newOp.Key)
				} else {
					// non-nil value means "set"
					m.data[newOp.Key] = newOp.Value
				}
				m.mutex.Unlock()
			}

		case err, ok := <-m.errorCh:
			if !ok {
				return
			}
			log.Errorf("Ran into error: %s", err.Error())
		}
	}
}

The last piece is the snapshot provider.

func (m *theMap) snapshotProvider() ([]byte, error) {
	// take current state and marshal it into a byte slice
	daMap := &pb.ReplicatMap{M: m.data}
	return daMap.Marshal()
}

Internals

This section will go into more detail about the inner workings of CopyCat.

Terminology

There is no point in reading on further if you don’t know how the raft protocol works. Luckily there’s a great illustrative explanation here.

  • Raft Node (or just "a raft"): A collection of threads with a defined API running the raft protocol. A raft does not map to a single machine (or container). Multiple rafts can be colocated in the same process. Rafts come in two flavors: detached and interactive. Detached rafts are only available via CopyCats internal API and the consumer doesn’t know of their existence. Detached rafts main purpose is to provide high availability. These rafts are not reachable via a public API and cannot be interacted with. Interactive rafts are exposed via the API and can be manipulated by the consumer of CopyCat.
  • Raft Group: A group of rafts that own a particular piece of data and keep consensus amongst them about what the current state is. A raft group is the unit of leader election and consistency in CopyCat. As long as at least one member of a raft group is alive, your data is safe. If you lose an entire raft group, your data became unavailable (or will be lost depending on your setup).
  • Channels: Are the major API primitive of CopyCat. Items within a channel have a natural order. Making channels CopyCats API primitives is CopyCats way of telling you that the order of events you send to CopyCat is your problem. At the same time channels are great for scaling processing of items.
  • Gossip: As CopyCat has no central coordination mechanism, meta data about nodes and rafts in the cluster is traded between nodes using gossip. No consumer data is ever transported with gossip only meta data about the existence and location of data is traded.

TODOs

  • link full source code example

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCantFindEnoughReplicas = errors.New("Couldn't find enough remote peers for raft creation and timed out")

ErrCantFindEnoughReplicas is returned of the number of replicas for an allocation request can't be fulfilled

Functions

This section is empty.

Types

type AllocationOption

type AllocationOption func(*allocationOptions)

func WithDataCenterReplicas

func WithDataCenterReplicas(numDataCenterReplicas int) AllocationOption

type Config

type Config struct {
	// Advertized host name
	Hostname string
	// Port of the CopyCat server. The server runs the management interface and the raft state machine.
	CopyCatPort int
	// Port on which CopyCat gossips about cluster metadata.
	GossipPort int
	// Directory under which this CopyCat instance will put all data it's collecting.
	CopyCatDataDir string
	// Simple address of at least one CopyCat peer to contact. The format is "<machine>:<copycat_port>".
	// If you leave this empty or nil, this node will start a brandnew cluster.
	PeersToContact []string
	// Arbitrary string identifying a geo location. CopyCat will try to distribute data across multiple
	// geo locations to protect data against dependent failures such as power outages, etc.
	// Even though the API doesn't prescribe a string size, shorter is better.
	// A data center is such an arbitrary geo location.
	DataCenter string
	// A rack is a section within a data center. Even though you might not know which rack your nodes live in,
	// this can be used to create a second hierarchy for data placement.
	Rack string
	// contains filtered or unexported fields
}

Config is the public configuration struct that needs to be passed into the constructor function.

func DefaultConfig

func DefaultConfig() *Config

func (*Config) String

func (c *Config) String() string

type CopyCat

type CopyCat interface {
	// Acquires a new CopyCat data structure id. The consumer is in the responsibility to keep this id save.
	// If the id is lost, the data structure can't be accessed anymore.
	NewDataStructureID() (*ID, error)
	// AllocateNewDataStructure creates a new data structure and returns its unique ID.
	// The consumer is in the responsibility to keep this id save. If the id is lost, the data structure can't be accessed anymore.
	// TODO - change the API to return the id right away
	// and a channel that is closed when the raft is being created
	// this way consumers can move on with storing the ID somewhere while they wait for their raft groups to come up
	AllocateNewDataStructure(opts ...AllocationOption) (*ID, error)
	// Convenience wrapper to connect to a data structure with the string representation of an id.
	SubscribeToDataStructureWithStringID(id string, provider SnapshotProvider) (chan<- []byte, <-chan []byte, <-chan error, SnapshotConsumer, error)
	// SubscribeToDataStructure allows the consumer to connect to a data structure identified by id.
	// In addition to that a SnapshotProvider needs to be passed in to enable CopyCat to create consistent snapshots.
	// CopyCat responds with a write only proposal channel, a read-only commit channel, a read-only error channel, and
	// a SnapshotConsumer that is used to retrieve consistent snapshots from CopyCat.
	SubscribeToDataStructure(id *ID, provider SnapshotProvider) (chan<- []byte, <-chan []byte, <-chan error, SnapshotConsumer, error)
	Shutdown()
}

CopyCat is the struct consumers need to create in order to create distributed data structures.

func NewCopyCat

func NewCopyCat(config *Config) (CopyCat, error)

NewCopyCat initiates and starts the copycat framework.

type ID

type ID [16]byte

IDs are handles to data structures. The only valid two operations on an id are to convert it to a string format (e.g. to save it somewhere) or to compare if with other ids.

func (*ID) CompareTo

func (id *ID) CompareTo(other *ID) int

CompareTo returns an integer comparing id and other lexicographically. The result will be 0 if id==other, -1 if id < other, and +1 if id > other.

func (*ID) String

func (id *ID) String() string

String returns a lexicographically sortable string encoded ULID (26 characters, non-standard base 32) e.g. 01CFR5R4JJE9KNRAJFFEAQ2SAJ Format: tttttttttteeeeeeeeeeeeeeee where t is time and e is entropy

type SnapshotConsumer

type SnapshotConsumer func() ([]byte, error)

SnapshotConsumers are used to transport a serialized representation of a data structure out of CopyCat.

type SnapshotProvider

type SnapshotProvider func() ([]byte, error)

SnapshotProviders are used to transport a serialized representation of a data structure into CopyCat.

Directories

Path Synopsis
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL