kite

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: MIT Imports: 19 Imported by: 0

README

Kite GoDoc Build

Kite is a Go library that manages cluster membership, anti-entropy and failure detection.

Each node maintains its own local state containing key-value pairs. Kite then propagates this state to all nodes in the cluster, meaning each node has an eventually consistent view of the other nodes state.

Kite is based on the Scuttlebutt protocol, which is an efficient gossip-based anti-entropy mechanism.

Usage

See docs at godoc.

Nodes are identified by a unique ID.

When a node joins a cluster, it configures a set of addresses of existing nodes to join. Kite will attempt to sync with those nodes, so the local node sends its state to the remote nodes and learns the state of the other nodes in the cluster.

The addresses of the nodes to join may be a domain name that resolves to a set of addresses of nodes in the cluster. Such as in Kubernetes this may be a headless service whose domain resolves to the IP addresses of pods in the service. When the joined address is a domain, Kite will periodically re-resolve the domain and attempt to gossip with any discovered nodes.

A nodes state may then be updated and the update will be propagated to the other nodes in the cluster. Kite will also detect when other nodes are down.

var opts []kite.Option
// Watch for updates to node state.
opts = append(opts, kite.Watcher(watcher))
// ...

kite, err := kite.New(opts...)
if err != nil {
	panic("failed to start kite:" + err.Error())
}
defer kite.Close()

// Add application state for the local node to propagate to the other nodes.
kite.UpsertLocal("rpc_addr", "10.26.104.15")
kite.UpsertLocal("version", "1.5.4")
kite.UpsertLocal("status", "loading")

// Join an existing cluster by domain.
nodeIDs, err = kite.Join([]string{"pico.prod-pico-ns.svc.cluster.local"})
if err != nil {
	panic("failed to join cluster:" + err.Error())
}
if len(nodeIDs) != 0 {
	fmt.Println("joined nodes:", nodeIDs)
}

for _, metadata := range kite.Nodes() {
	state, _ := kite.Node(metadata.ID)
	fmt.Printf(`node_id: %s
  addr: %s
  state:
    version: %s
    status: %s
`, metadata.ID, metadata.Addr, state.Get("version"), state.Get("status"))
}

// Update the local state which will be propagated across the cluster.
kite.UpsertLocal("status", "active")

// ...

// Gracefully leave the cluster.
if err = kite.Leave(); err != nil {
	panic("failed to leave cluster:" + err.Error())
}
Examples

See example.

Docs

For details on the design of Kite, see design.md.

Comparison

memberlist

memberlist is a great library for cluster membership and failure detection and is often the right fit. However there are use cases where each member maintains its own state that should be propagated to the other members.

memberlist provides mechanisms for sending messages to nodes and broadcasting updates, though these mechanisms can be difficult to use.

In contract, each node in Kite maintains a local state as key-value pairs which Kite will propagate to the other members.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Entry

type Entry struct {
	Key     string `json:"key" codec:"key"`
	Value   string `json:"value" codec:"value"`
	Version uint64 `json:"version" codec:"version"`

	// Internal indicates whether this is an internal entry.
	Internal bool `json:"internal" codec:"internal"`
	// Deleted indicates whether this entry represents a deleted key.
	Deleted bool `json:"tombstone" codec:"tombstone"`
}

Entry represents a versioned key-value pair state.

type Kite

type Kite struct {
	// contains filtered or unexported fields
}

Kite manages cluster membership, anti-entropy and failure detection for the local node.

Each node maintains its own local state containing key-value pairs. Kite then propagates this state to all nodes in the cluster, meaning each node has an eventually consistent view of the other nodes state.

func New

func New(opts ...Option) (*Kite, error)

New creates a new Kite node that listens for incoming gossip traffic.

Kite won't actively attempt to join a cluster a until Join is called.

func (*Kite) Close

func (k *Kite) Close() error

Close stops gossiping and closes all listeners.

To leave gracefully, first call Leave, otherwise other nodes in the cluster will detect this nodes as failed rather than as having left.

func (*Kite) DeleteLocal

func (k *Kite) DeleteLocal(key string)

DeleteLocal deletes the local state entry with the given key.

func (*Kite) Join

func (k *Kite) Join(addrs []string) ([]string, error)

Join attempts to join an existing cluster by syncronising with the nodes at the given addresses.

The addresses may contain either IP addresses or domain names. When a domain name is used, the domain is resolved and each resolved IP address is attempted. Kite will also periodically re-resolve the joined domains and attempt to gossip with any unknown nodes. If the port is omitted the default bind port is used.

Returns the IDs of joined nodes. Or if addresses were provided by no nodes could be joined an error is returned. Note if a domain was provided that only resolved to the current node then Join will return nil.

func (*Kite) Leave

func (k *Kite) Leave() error

Leave gracefully leaves the cluster.

This block while it attempts to notify upto 3 nodes in the cluster that the node is leaving to ensure the status update is propagated.

After the node has left it's state should not be updated again.

Returns an error if no nodes could be notified.

func (*Kite) LocalNode

func (k *Kite) LocalNode() *NodeState

LocalNode returns the state of the local node.

func (*Kite) Metrics

func (k *Kite) Metrics() *Metrics

Metrics returns the Kite metrics which may be registered with Prometheus.

func (*Kite) Node

func (k *Kite) Node(id string) (*NodeState, bool)

Node returns the known state for the node with the given ID.

func (*Kite) Nodes

func (k *Kite) Nodes() []NodeMetadata

Nodes returns the known metadata of each node in the cluster.

func (*Kite) UpsertLocal

func (k *Kite) UpsertLocal(key, value string)

UpsertLocal updates the local node state entry with the given key.

type Logger

type Logger interface {
	Debug(msg string, fields ...zap.Field)
	Info(msg string, fields ...zap.Field)
	Warn(msg string, fields ...zap.Field)
	Error(msg string, fields ...zap.Field)
}

Logger is an interface for logging that is compatible with *zap.Logger.

Note *zap.Logger is not used directly to support custom logger implementations.

type Metrics

type Metrics struct {
	// ConnectionsInbound is the total number of incoming stream
	// connections.
	ConnectionsInbound prometheus.Counter

	// StreamBytesInbound is the total number of read bytes via a stream
	// connection.
	StreamBytesInbound prometheus.Counter

	// PacketBytesInbound is the total number of read bytes via a packet
	// connection.
	PacketBytesInbound prometheus.Counter

	// ConnectionsOutbound is the total number of outgoing stream
	// connections.
	ConnectionsOutbound prometheus.Counter

	// StreamBytesOutbound is the total number of written bytes via a stream
	// connection.
	StreamBytesOutbound prometheus.Counter

	// PacketBytesOutbound is the total number of written bytes via a packet
	// connection.
	PacketBytesOutbound prometheus.Counter
}

func (*Metrics) Register

func (m *Metrics) Register(reg *prometheus.Registry)

type NodeMetadata

type NodeMetadata struct {
	// ID is a unique identifier for the node.
	ID string `json:"id"`

	// Addr is the gossip address of the node.
	Addr string `json:"addr"`

	// Version is the latest known version of the node.
	Version uint64 `json:"version"`

	// Left indicates whether the node has left the cluster.
	Left bool `json:"left"`

	// Down indicates whether the node is considered down.
	Down bool `json:"down"`

	// Expiry contains the time the node state will expire. This is only set
	// if the node is considered left or down until the expiry.
	Expiry time.Time
}

NodeMetadata contains the known metadata about the node.

type NodeState

type NodeState struct {
	NodeMetadata

	Entries []Entry
}

NodeState contains the known state for the node.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAdvertiseAddr

func WithAdvertiseAddr(addr string) Option

WithAdvertiseAddr sets the address to advertise to other members of the cluster.

By default, if a bind address includes an IP to bind to that will be used. If the bind address does not include an IP (such as ':7000') the nodes private IP will be used, such as a bind address of ':7000' may have an advertise address of '10.26.104.14:7000'.

func WithBindAddr

func WithBindAddr(addr string) Option

WithBindAddr sets the address to listen on for gossip traffic.

Defaults to ':7000'.

func WithCompactThreshold

func WithCompactThreshold(threshold int) Option

WithCompactThreshold sets the minimum number of deleted keys in the local node state before Kite will compact the entries to remove deleted keys.

Compacting means re-versioning and re-propagating all non-deleted keys which has an overhead so there is a tradeoff between the memory of storing deleted keys and the overhead of compacting.

Defaults to 500.

func WithGossipInterval

func WithGossipInterval(interval time.Duration) Option

WithGossipInterval sets the interval to initiate gossip with a known peer in the cluster.

Defaults to 500ms.

func WithLogger

func WithLogger(log Logger) Option

func WithMaxPacketSize

func WithMaxPacketSize(size int) Option

WithMaxPacketSize sets the maximum size of any packet send.

Defaults to 1400. Depending on your networks MTU you may be able to increase to include more data in each packet.

func WithNodeID

func WithNodeID(id string) Option

WithNodeID sets a unique identifier for this node in the cluster.

Even if a node restarts, it must be given a new unique ID rather than than reusing the old ID.

Defaults to a 9 byte random ID, such as 'cm8m5nbfs' (using crypto/rand).

func WithStreamTimeout

func WithStreamTimeout(t time.Duration) Option

WithStreamTimeout sets the timeout for establishing a stream connection to a remote node and for read/write operations.

Defaults to 10 seconds.

func WithSuspicionThreshold

func WithSuspicionThreshold(threshold int) Option

WithSuspicionThreshold sets the threshold that a node is considered down if its suspicion level exceeds the threshold.

The suspcision level is set using the "Phi Accrual Failure Detector".

A higher suspicion threshold reduces false positives but takes longer to detect down nodes, and a lower suspicion threshold detects own nodes faster but is more likely to get false positives.

Defaults to 20.

func WithWatcher

func WithWatcher(w Watcher) Option

WithWatcher adds a watcher that is notified when a remote members state is updated (join, leave, update and down).

type Watcher

type Watcher interface {
	// OnJoin notifies that a new node joined the cluster.
	OnJoin(nodeID string)

	// OnLeave notifies that a node gracefully left the cluster.
	OnLeave(nodeID string)

	// OnUp notifies that a node that was considered down has recovered.
	OnUp(nodeID string)

	// OnDown notifies that a node is considered as down.
	OnDown(nodeID string)

	// OnUpsertKey notifies that a nodes state key has been updated.
	OnUpsertKey(nodeID, key, value string)

	// OnDeleteKey notifies that a nodes state key has been deleted.
	OnDeleteKey(nodeID, key string)

	// OnExpired notifies that a nodes state has expired and been removed.
	OnExpired(nodeID string)
}

Watcher is used to receive notifications when the known remote node state changes.

The implementations of Watcher must not block. Watcher is also called with the state mutex held so should not call back to kite.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL