Package cluster is a simplistic clustering implementaion built on top of

    The assumption behind this package is that you have identical nodes, each responsible for a certain part of the data, a datum, identified by an integer id, and any node forwards requests to the node designated for the datum. The designation is determined by a simple mod operation of datum id against the number of nodes, therefore id distribution matters. There is no leader.

    If a node must terminate, it is given an opportunity to save the data it is responsible for, then signal the nodes now responsible that they can take over the processing.

    Any cluster change triggers a "transition". During a transition each datum is "relinquished", and upon the relinquish the next responsible node is notified. All this is managed by Cluster, all that is required from the application is to enure that each datum implements the DistDatum interface.




    This section is empty.


    This section is empty.


    This section is empty.


    type Cluster

    type Cluster struct {
    	// contains filtered or unexported fields

      Cluster is based on Memberlist and adds some functionality on top of it such as the notion of a node being "ready".

      Example (Output)

        This example joins a sole node cluster, and shows how to watch cluster changes and trigger transitions.


        c, err := NewCluster()
        if err != nil {
        	fmt.Printf("Error creating cluster (address in use?): %v\n", err)
        if err = c.Join([]string{}); err != nil {
        	fmt.Printf("Error joining cluster: %v\n", err)
        clusterChgCh := c.NotifyClusterChanges()
        var wg sync.WaitGroup
        go func() {
        	defer wg.Done()
        	for {
        		_, ok := <-clusterChgCh
        		if !ok {
        		fmt.Printf("A cluster change occurred, running a transition.\n")
        		if err := c.Transition(1 * time.Second); err != nil {
        			fmt.Printf("Transition error: %v", err)
        // Leave the cluster (this triggers a cluster change event)
        c.Leave(1 * time.Second)
        // This will cause the goroutine to exit
        A cluster change occurred, running a transition.

        func NewCluster

        func NewCluster() (*Cluster, error)

          NewCluster creates a new Cluster with reasonable defaults.

          func NewClusterBind

          func NewClusterBind(baddr string, bport int, aaddr string, aport int, rpcport int, name string) (*Cluster, error)

            NewClusterBind creates a new Cluster while allowing for specification of the address/port to bind to, the address/port to advertize to the other nodes (use zero values for default) as well as the hostname. (This is useful if your app is running in a Docker container where it is impossible to figure out the outside IP addresses and the hostname can be the same).

            func (*Cluster) Copies

            func (c *Cluster) Copies(n int

              Set the number of copies of DistDatims that the Cluster will keep. The default is 1. You can only set it while the cluster is empty.

              func (*Cluster) GetBroadcasts

              func (c *Cluster) GetBroadcasts(overhead, limit int) [][]byte

              func (*Cluster) Join

              func (c *Cluster) Join(existing []string) error

                Join joins a cluster given at least one node address/port. NB: You can always join yourself if this is a cluster of one node.

                func (*Cluster) List

                func (c *Cluster) List() map[string]*ddEntry

                func (*Cluster) LoadDistData

                func (c *Cluster) LoadDistData(f func() ([]DistDatum, error)) error

                  LoadDistData will trigger a load of DistDatum's. Its argument is a function which performs the actual load and returns the list, while also providing the data to the application in whatever way is needed by the user-side. This action has to be triggered from the user-side. You should LoadDistData prior to marking your node as ready.

                  func (*Cluster) LocalNode

                  func (c *Cluster) LocalNode() *Node

                    LocalNode returns a pointer to the local node.

                    func (*Cluster) LocalState

                    func (c *Cluster) LocalState(join bool) []byte

                    func (*Cluster) Members

                    func (c *Cluster) Members() []*Node

                      Members lists cluster members (ready or not).

                      func (*Cluster) MergeRemoteState

                      func (c *Cluster) MergeRemoteState(buf []byte, join bool)

                      func (*Cluster) NodeMeta

                      func (c *Cluster) NodeMeta(limit int) []byte

                      func (*Cluster) NodesForDistDatum

                      func (c *Cluster) NodesForDistDatum(dd DistDatum) []*Node

                        NodesForDistDatum returns the nodes responsible for this DistDatum. The first node is the one responsible for Relinquish(), the rest are up to the user to decide. The nodes are cached, the call doesn't compute anything. The idea is that a NodesForDistDatum() should be pretty fast so that you can call it a lot, e.g. for every incoming data point.

                        func (*Cluster) NotifyClusterChanges

                        func (c *Cluster) NotifyClusterChanges() chan bool

                          NotifyClusterChanges returns a bool channel which will be sent true any time a cluster change happens (nodes join or leave, or node metadata changes).

                          func (*Cluster) NotifyJoin

                          func (c *Cluster) NotifyJoin(n *memberlist.Node)

                          func (*Cluster) NotifyLeave

                          func (c *Cluster) NotifyLeave(n *memberlist.Node)

                          func (*Cluster) NotifyMsg

                          func (c *Cluster) NotifyMsg(b []byte)

                          func (*Cluster) NotifyUpdate

                          func (c *Cluster) NotifyUpdate(n *memberlist.Node)

                          func (*Cluster) Ready

                          func (c *Cluster) Ready(status bool) error

                            Ready sets the Node status in the metadata and broadcasts a change notification to the cluster.

                            func (*Cluster) RegisterMsgType

                            func (c *Cluster) RegisterMsgType() (snd, rcv chan *Msg)

                              RegisterMsgType makes sending messages across nodes simpler. It returns two channels, one to send the other to receive a *Msg structure. The nodes of the cluster must call RegisterMsgType in exact same order because that is what determines the internal message id and the channel to which it will be passed. The message is sent to the destination specified in Msg.Dst. Messages are compressed using flate.

                              func (*Cluster) SetMetaData

                              func (c *Cluster) SetMetaData(b []byte) error

                                Sets the metadata and broadcasts an UpdateNode message to the cluster.

                                func (*Cluster) Shutdown

                                func (c *Cluster) Shutdown() error

                                func (*Cluster) SortedNodes

                                func (c *Cluster) SortedNodes() ([]*Node, error)

                                  SortedNodes returns nodes ordered by process start time

                                  func (*Cluster) Transition

                                  func (c *Cluster) Transition(timeout time.Duration) error

                                    Transition() provides the transition on cluster changes. Transitions should be triggered by user-land after receiving a cluster change event from a channel returned by NotifyClusterChanges(). The transition will call Relinquish() on all DistDatums that are transferring to other nodes and wait for confirmation of Relinquish() from other nodes for DistDatums transferring to this node. Generally a node should be buffering all the data it receives during a transition.

                                    type ClusterRPC

                                    type ClusterRPC struct {
                                    	// contains filtered or unexported fields

                                    func (*ClusterRPC) Message

                                    func (rpc *ClusterRPC) Message(msg Msg, reply *Msg) error

                                    type DistDatum

                                    type DistDatum interface {
                                    	// Id returns an integer that uniquely identifies this datum for
                                    	// this type. Datum -> node designation is determined by id %
                                    	// numNodes, which means id distribution matters.
                                    	Id() int64
                                    	// Type returns a string that identifies the type. The value
                                    	// doesn't matter, so long as the type:id conbination uniquely
                                    	// identifies this DistDatum. (A good practice is to just use the
                                    	// type name as a string).
                                    	Type() string
                                    	// Reqlinquish is a chance to persist the data before the datum
                                    	// can be assigned to another node. On a cluater change that
                                    	// requires a reassignment, the receiving node will wait for the
                                    	// Relinquish operation to complete (up to a configurable
                                    	// timeout).
                                    	Relinquish() error
                                    	// Acquire is chance to do something just before we can start
                                    	// processing data for this DistDatum (which normally would just
                                    	// be Relinquished by another node).
                                    	Acquire() error
                                    	// This is only used for logging/debugging. It should return some
                                    	// kind of a meaningful symbolic name for this datum, if any.
                                    	GetName() string

                                      DistDatum is an interface for a piece of data distributed across the cluster. More preciesely, each DistDatum belongs to a node, and nodes are responsible for forwarding requests to the responsible node.

                                      type Msg

                                      type Msg struct {
                                      	Id       int
                                      	Dst, Src *Node
                                      	Body     []byte

                                        Msg is the structure that should be passed to channels returned by c.RegisterMsgType().

                                        func NewMsg

                                        func NewMsg(dest *Node, payload interface{}) (*Msg, error)

                                          NewMsg creates a Msg from a payload which is gob-encodable

                                          func (*Msg) Decode

                                          func (m *Msg) Decode(dst interface{}) error

                                            implement gob.GobDecoder interface.

                                            type Node

                                            type Node struct {
                                            	// contains filtered or unexported fields

                                            func (*Node) Meta

                                            func (n *Node) Meta() ([]byte, error)

                                              Meta() will return the user part of the node metadata. (Cluster uses the beginning bytes to store its internal stuff such as the ready status of a node, trailed by user part).

                                              func (*Node) Name

                                              func (n *Node) Name() string

                                                Name returns the node name or "<nil>" if the pointer is nil.

                                                func (*Node) Ready

                                                func (n *Node) Ready() bool

                                                  Ready returns the status of a node.

                                                  func (*Node) SanitizedAddr

                                                  func (n *Node) SanitizedAddr() string

                                                  Source Files