v0.0.0-...-116b634 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2021 License: Apache-2.0 Imports: 14 Imported by: 2



Package cluster is a simplistic clustering implementaion built on top of

The assumption behind this package is that you have identical nodes, each responsible for a certain part of the data, a datum, identified by an integer id, and any node forwards requests to the node designated for the datum. The designation is determined by a simple mod operation of datum id against the number of nodes, therefore id distribution matters. There is no leader.

If a node must terminate, it is given an opportunity to save the data it is responsible for, then signal the nodes now responsible that they can take over the processing.

Any cluster change triggers a "transition". During a transition each datum is "relinquished", and upon the relinquish the next responsible node is notified. All this is managed by Cluster, all that is required from the application is to enure that each datum implements the DistDatum interface.




This section is empty.


This section is empty.


This section is empty.


type Cluster

type Cluster struct {
	// contains filtered or unexported fields

Cluster is based on Memberlist and adds some functionality on top of it such as the notion of a node being "ready".

Example (Output)

This example joins a sole node cluster, and shows how to watch cluster changes and trigger transitions.

c, err := NewCluster()
if err != nil {
	fmt.Printf("Error creating cluster (address in use?): %v\n", err)

if err = c.Join([]string{}); err != nil {
	fmt.Printf("Error joining cluster: %v\n", err)

clusterChgCh := c.NotifyClusterChanges()

var wg sync.WaitGroup
go func() {
	defer wg.Done()
	for {
		_, ok := <-clusterChgCh
		if !ok {

		fmt.Printf("A cluster change occurred, running a transition.\n")
		if err := c.Transition(1 * time.Second); err != nil {
			fmt.Printf("Transition error: %v", err)

// Leave the cluster (this triggers a cluster change event)
c.Leave(1 * time.Second)

// This will cause the goroutine to exit

A cluster change occurred, running a transition.

func NewCluster

func NewCluster() (*Cluster, error)

NewCluster creates a new Cluster with reasonable defaults.

func NewClusterBind

func NewClusterBind(baddr string, bport int, aaddr string, aport int, rpcport int, name string) (*Cluster, error)

NewClusterBind creates a new Cluster while allowing for specification of the address/port to bind to, the address/port to advertize to the other nodes (use zero values for default) as well as the hostname. (This is useful if your app is running in a Docker container where it is impossible to figure out the outside IP addresses and the hostname can be the same).

func (*Cluster) Copies

func (c *Cluster) Copies(n int

Set the number of copies of DistDatims that the Cluster will keep. The default is 1. You can only set it while the cluster is empty.

func (*Cluster) GetBroadcasts

func (c *Cluster) GetBroadcasts(overhead, limit int) [][]byte

func (*Cluster) Join

func (c *Cluster) Join(existing []string) error

Join joins a cluster given at least one node address/port. NB: You can always join yourself if this is a cluster of one node.

func (*Cluster) List

func (c *Cluster) List() map[string]*ddEntry

func (*Cluster) LoadDistData

func (c *Cluster) LoadDistData(f func() ([]DistDatum, error)) error

LoadDistData will trigger a load of DistDatum's. Its argument is a function which performs the actual load and returns the list, while also providing the data to the application in whatever way is needed by the user-side. This action has to be triggered from the user-side. You should LoadDistData prior to marking your node as ready.

func (*Cluster) LocalNode

func (c *Cluster) LocalNode() *Node

LocalNode returns a pointer to the local node.

func (*Cluster) LocalState

func (c *Cluster) LocalState(join bool) []byte

func (*Cluster) Members

func (c *Cluster) Members() []*Node

Members lists cluster members (ready or not).

func (*Cluster) MergeRemoteState

func (c *Cluster) MergeRemoteState(buf []byte, join bool)

func (*Cluster) NodeMeta

func (c *Cluster) NodeMeta(limit int) []byte

func (*Cluster) NodesForDistDatum

func (c *Cluster) NodesForDistDatum(dd DistDatum) []*Node

NodesForDistDatum returns the nodes responsible for this DistDatum. The first node is the one responsible for Relinquish(), the rest are up to the user to decide. The nodes are cached, the call doesn't compute anything. The idea is that a NodesForDistDatum() should be pretty fast so that you can call it a lot, e.g. for every incoming data point.

func (*Cluster) NotifyClusterChanges

func (c *Cluster) NotifyClusterChanges() chan bool

NotifyClusterChanges returns a bool channel which will be sent true any time a cluster change happens (nodes join or leave, or node metadata changes).

func (*Cluster) NotifyJoin

func (c *Cluster) NotifyJoin(n *memberlist.Node)

func (*Cluster) NotifyLeave

func (c *Cluster) NotifyLeave(n *memberlist.Node)

func (*Cluster) NotifyMsg

func (c *Cluster) NotifyMsg(b []byte)

func (*Cluster) NotifyUpdate

func (c *Cluster) NotifyUpdate(n *memberlist.Node)

func (*Cluster) Ready

func (c *Cluster) Ready(status bool) error

Ready sets the Node status in the metadata and broadcasts a change notification to the cluster.

func (*Cluster) RegisterMsgType

func (c *Cluster) RegisterMsgType() (snd, rcv chan *Msg)

RegisterMsgType makes sending messages across nodes simpler. It returns two channels, one to send the other to receive a *Msg structure. The nodes of the cluster must call RegisterMsgType in exact same order because that is what determines the internal message id and the channel to which it will be passed. The message is sent to the destination specified in Msg.Dst. Messages are compressed using flate.

func (*Cluster) SetMetaData

func (c *Cluster) SetMetaData(b []byte) error

Sets the metadata and broadcasts an UpdateNode message to the cluster.

func (*Cluster) Shutdown

func (c *Cluster) Shutdown() error

func (*Cluster) SortedNodes

func (c *Cluster) SortedNodes() ([]*Node, error)

SortedNodes returns nodes ordered by process start time

func (*Cluster) Transition

func (c *Cluster) Transition(timeout time.Duration) error

Transition() provides the transition on cluster changes. Transitions should be triggered by user-land after receiving a cluster change event from a channel returned by NotifyClusterChanges(). The transition will call Relinquish() on all DistDatums that are transferring to other nodes and wait for confirmation of Relinquish() from other nodes for DistDatums transferring to this node. Generally a node should be buffering all the data it receives during a transition.

type ClusterRPC

type ClusterRPC struct {
	// contains filtered or unexported fields

func (*ClusterRPC) Message

func (rpc *ClusterRPC) Message(msg Msg, reply *Msg) error

type DistDatum

type DistDatum interface {
	// Id returns an integer that uniquely identifies this datum for
	// this type. Datum -> node designation is determined by id %
	// numNodes, which means id distribution matters.
	Id() int64

	// Type returns a string that identifies the type. The value
	// doesn't matter, so long as the type:id conbination uniquely
	// identifies this DistDatum. (A good practice is to just use the
	// type name as a string).
	Type() string

	// Reqlinquish is a chance to persist the data before the datum
	// can be assigned to another node. On a cluater change that
	// requires a reassignment, the receiving node will wait for the
	// Relinquish operation to complete (up to a configurable
	// timeout).
	Relinquish() error

	// Acquire is chance to do something just before we can start
	// processing data for this DistDatum (which normally would just
	// be Relinquished by another node).
	Acquire() error

	// This is only used for logging/debugging. It should return some
	// kind of a meaningful symbolic name for this datum, if any.
	GetName() string

DistDatum is an interface for a piece of data distributed across the cluster. More preciesely, each DistDatum belongs to a node, and nodes are responsible for forwarding requests to the responsible node.

type Msg

type Msg struct {
	Id       int
	Dst, Src *Node
	Body     []byte

Msg is the structure that should be passed to channels returned by c.RegisterMsgType().

func NewMsg

func NewMsg(dest *Node, payload interface{}) (*Msg, error)

NewMsg creates a Msg from a payload which is gob-encodable

func (*Msg) Decode

func (m *Msg) Decode(dst interface{}) error

implement gob.GobDecoder interface.

type Node

type Node struct {
	// contains filtered or unexported fields

func (*Node) Meta

func (n *Node) Meta() ([]byte, error)

Meta() will return the user part of the node metadata. (Cluster uses the beginning bytes to store its internal stuff such as the ready status of a node, trailed by user part).

func (*Node) Name

func (n *Node) Name() string

Name returns the node name or "<nil>" if the pointer is nil.

func (*Node) Ready

func (n *Node) Ready() bool

Ready returns the status of a node.

func (*Node) SanitizedAddr

func (n *Node) SanitizedAddr() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL