memberlist

package
Version: v1.8.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2021 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthKv = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowKv   = fmt.Errorf("proto: integer overflow")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements kv.Client interface, by using memberlist.KV

func NewClient

func NewClient(kv *KV, codec codec.Codec) (*Client, error)

NewClient creates new client instance. Supplied codec must already be registered in KV.

func (*Client) CAS

func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error

CAS is part of kv.Client interface

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, key string) error

Delete is part of kv.Client interface.

func (*Client) Get

func (c *Client) Get(ctx context.Context, key string) (interface{}, error)

Get is part of kv.Client interface.

func (*Client) List

func (c *Client) List(ctx context.Context, prefix string) ([]string, error)

List is part of kv.Client interface.

func (*Client) WatchKey

func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool)

WatchKey is part of kv.Client interface.

func (*Client) WatchPrefix

func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)

WatchPrefix calls f whenever any value stored under prefix changes. Part of kv.Client interface.

type KV

type KV struct {
	services.Service
	// contains filtered or unexported fields
}

KV implements Key-Value store on top of memberlist library. KV store has API similar to kv.Client, except methods also need explicit codec for each operation. KV is a Service. It needs to be started first, and is only usable once it enters Running state. If joining of the cluster if configured, it is done in Running state, and if join fails and Abort flag is set, service fails.

func NewKV

func NewKV(cfg KVConfig, logger log.Logger) *KV

NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned and service enters Failed state.

func (*KV) CAS

func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error

CAS implements Compare-And-Set/Swap operation.

CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately.

This method combines Compare-And-Swap with Merge: it calls 'f' function to get a new state, and then merges this new state into current state, to find out what the change was. Resulting updated current state is then CAS-ed to KV store, and change is broadcast to cluster peers. Merge function is called with CAS flag on, so that it can detect removals. If Merge doesn't result in any change (returns nil), then operation fails and is retried again. After too many failed retries, this method returns error.

func (*KV) Collect

func (m *KV) Collect(ch chan<- prometheus.Metric)

Collect returns extra metrics via supplied channel

func (*KV) Describe

func (m *KV) Describe(ch chan<- *prometheus.Desc)

Describe returns prometheus descriptions via supplied channel

func (*KV) Get

func (m *KV) Get(key string, codec codec.Codec) (interface{}, error)

Get returns current value associated with given key. No communication with other nodes in the cluster is done here.

func (*KV) GetBroadcasts

func (m *KV) GetBroadcasts(overhead, limit int) [][]byte

GetBroadcasts is method from Memberlist Delegate interface It returns all pending broadcasts (within the size limit)

func (*KV) GetCodec

func (m *KV) GetCodec(codecID string) codec.Codec

GetCodec returns codec for given ID or nil.

func (*KV) GetListeningPort

func (m *KV) GetListeningPort() int

GetListeningPort returns port used for listening for memberlist communication. Useful when BindPort is set to 0. This call is only valid after KV service has been started.

func (*KV) JoinMembers

func (m *KV) JoinMembers(members []string) (int, error)

JoinMembers joins the cluster with given members. See https://godoc.org/github.com/hashicorp/memberlist#Memberlist.Join This call is only valid after KV service has been started and is still running.

func (*KV) List

func (m *KV) List(prefix string) []string

List returns all known keys under a given prefix. No communication with other nodes in the cluster is done here.

func (*KV) LocalState

func (m *KV) LocalState(join bool) []byte

LocalState is method from Memberlist Delegate interface

This is "pull" part of push/pull sync (either periodic, or when new node joins the cluster). Here we dump our entire state -- all keys and their values. There is no limit on message size here, as Memberlist uses 'stream' operations for transferring this state.

func (*KV) MergeRemoteState

func (m *KV) MergeRemoteState(data []byte, join bool)

MergeRemoteState is method from Memberlist Delegate interface

This is 'push' part of push/pull sync. We merge incoming KV store (all keys and values) with ours.

Data is full state of remote KV store, as generated by LocalState method (run on another node).

func (*KV) NodeMeta

func (m *KV) NodeMeta(limit int) []byte

NodeMeta is method from Memberlist Delegate interface

func (*KV) NotifyMsg

func (m *KV) NotifyMsg(msg []byte)

NotifyMsg is method from Memberlist Delegate interface Called when single message is received, i.e. what our broadcastNewValue has sent.

func (*KV) WatchKey

func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool)

WatchKey watches for value changes for given key. When value changes, 'f' function is called with the latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call.

Watching ends when 'f' returns false, context is done, or this client is shut down.

func (*KV) WatchPrefix

func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool)

WatchPrefix watches for any change of values stored under keys with given prefix. When change occurs, function 'f' is called with key and current value. Each change of the key results in one notification. If there are too many pending notifications ('f' is slow), some notifications may be lost.

Watching ends when 'f' returns false, context is done, or this client is shut down.

type KVConfig

type KVConfig struct {
	// Memberlist options.
	NodeName            string        `yaml:"node_name"`
	RandomizeNodeName   bool          `yaml:"randomize_node_name"`
	StreamTimeout       time.Duration `yaml:"stream_timeout"`
	RetransmitMult      int           `yaml:"retransmit_factor"`
	PushPullInterval    time.Duration `yaml:"pull_push_interval"`
	GossipInterval      time.Duration `yaml:"gossip_interval"`
	GossipNodes         int           `yaml:"gossip_nodes"`
	GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"`
	DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"`

	// List of members to join
	JoinMembers      flagext.StringSlice `yaml:"join_members"`
	MinJoinBackoff   time.Duration       `yaml:"min_join_backoff"`
	MaxJoinBackoff   time.Duration       `yaml:"max_join_backoff"`
	MaxJoinRetries   int                 `yaml:"max_join_retries"`
	AbortIfJoinFails bool                `yaml:"abort_if_cluster_join_fails"`
	RejoinInterval   time.Duration       `yaml:"rejoin_interval"`

	// Remove LEFT ingesters from ring after this timeout.
	LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"`

	// Timeout used when leaving the memberlist cluster.
	LeaveTimeout time.Duration `yaml:"leave_timeout"`

	// How much space to use to keep received and sent messages in memory (for troubleshooting).
	MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes"`

	TCPTransport TCPTransportConfig `yaml:",inline"`

	// Where to put custom metrics. Metrics are not registered, if this is nil.
	MetricsRegisterer prometheus.Registerer `yaml:"-"`
	MetricsNamespace  string                `yaml:"-"`

	// Codecs to register. Codecs need to be registered before joining other members.
	Codecs []codec.Codec `yaml:"-"`
}

KVConfig is a config for memberlist.KV

func (*KVConfig) RegisterFlags

func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string)

RegisterFlags registers flags.

type KVInitService

type KVInitService struct {
	services.Service
	// contains filtered or unexported fields
}

This service initialized memberlist.KV on first call to GetMemberlistKV, and starts it. On stop, KV is stopped too. If KV fails, error is reported from the service.

func NewKVInitService

func NewKVInitService(cfg *KVConfig, logger log.Logger) *KVInitService

func (*KVInitService) GetMemberlistKV

func (kvs *KVInitService) GetMemberlistKV() (*KV, error)

This method will initialize Memberlist.KV on first call, and add it to service failure watcher.

func (*KVInitService) ServeHTTP

func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request)

type KeyValuePair

type KeyValuePair struct {
	Key   string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
	Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	// ID of the codec used to write the value
	Codec string `protobuf:"bytes,3,opt,name=codec,proto3" json:"codec,omitempty"`
}

Single Key-Value pair. Key must be non-empty.

func (*KeyValuePair) Descriptor

func (*KeyValuePair) Descriptor() ([]byte, []int)

func (*KeyValuePair) Equal

func (this *KeyValuePair) Equal(that interface{}) bool

func (*KeyValuePair) GetCodec

func (m *KeyValuePair) GetCodec() string

func (*KeyValuePair) GetKey

func (m *KeyValuePair) GetKey() string

func (*KeyValuePair) GetValue

func (m *KeyValuePair) GetValue() []byte

func (*KeyValuePair) GoString

func (this *KeyValuePair) GoString() string

func (*KeyValuePair) Marshal

func (m *KeyValuePair) Marshal() (dAtA []byte, err error)

func (*KeyValuePair) MarshalTo

func (m *KeyValuePair) MarshalTo(dAtA []byte) (int, error)

func (*KeyValuePair) MarshalToSizedBuffer

func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*KeyValuePair) ProtoMessage

func (*KeyValuePair) ProtoMessage()

func (*KeyValuePair) Reset

func (m *KeyValuePair) Reset()

func (*KeyValuePair) Size

func (m *KeyValuePair) Size() (n int)

func (*KeyValuePair) String

func (this *KeyValuePair) String() string

func (*KeyValuePair) Unmarshal

func (m *KeyValuePair) Unmarshal(dAtA []byte) error

func (*KeyValuePair) XXX_DiscardUnknown

func (m *KeyValuePair) XXX_DiscardUnknown()

func (*KeyValuePair) XXX_Marshal

func (m *KeyValuePair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*KeyValuePair) XXX_Merge

func (m *KeyValuePair) XXX_Merge(src proto.Message)

func (*KeyValuePair) XXX_Size

func (m *KeyValuePair) XXX_Size() int

func (*KeyValuePair) XXX_Unmarshal

func (m *KeyValuePair) XXX_Unmarshal(b []byte) error

type KeyValueStore

type KeyValueStore struct {
	Pairs []*KeyValuePair `protobuf:"bytes,1,rep,name=pairs,proto3" json:"pairs,omitempty"`
}

KV Store is just a series of key-value pairs.

func (*KeyValueStore) Descriptor

func (*KeyValueStore) Descriptor() ([]byte, []int)

func (*KeyValueStore) Equal

func (this *KeyValueStore) Equal(that interface{}) bool

func (*KeyValueStore) GetPairs

func (m *KeyValueStore) GetPairs() []*KeyValuePair

func (*KeyValueStore) GoString

func (this *KeyValueStore) GoString() string

func (*KeyValueStore) Marshal

func (m *KeyValueStore) Marshal() (dAtA []byte, err error)

func (*KeyValueStore) MarshalTo

func (m *KeyValueStore) MarshalTo(dAtA []byte) (int, error)

func (*KeyValueStore) MarshalToSizedBuffer

func (m *KeyValueStore) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*KeyValueStore) ProtoMessage

func (*KeyValueStore) ProtoMessage()

func (*KeyValueStore) Reset

func (m *KeyValueStore) Reset()

func (*KeyValueStore) Size

func (m *KeyValueStore) Size() (n int)

func (*KeyValueStore) String

func (this *KeyValueStore) String() string

func (*KeyValueStore) Unmarshal

func (m *KeyValueStore) Unmarshal(dAtA []byte) error

func (*KeyValueStore) XXX_DiscardUnknown

func (m *KeyValueStore) XXX_DiscardUnknown()

func (*KeyValueStore) XXX_Marshal

func (m *KeyValueStore) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*KeyValueStore) XXX_Merge

func (m *KeyValueStore) XXX_Merge(src proto.Message)

func (*KeyValueStore) XXX_Size

func (m *KeyValueStore) XXX_Size() int

func (*KeyValueStore) XXX_Unmarshal

func (m *KeyValueStore) XXX_Unmarshal(b []byte) error

type Mergeable

type Mergeable interface {
	// Merge with other value in place. Returns change, that can be sent to other clients.
	// If merge doesn't result in any change, returns nil.
	// Error can be returned if merging with given 'other' value is not possible.
	//
	// In order for state merging to work correctly, Merge function must have some properties. When talking about the
	// result of the merge in the following text, we don't mean the return value ("change"), but the
	// end-state of receiver. That means Result of A.Merge(B) is end-state of A.
	//
	// Idempotency:
	// 		Result of applying the same state "B" to state "A" (A.Merge(B)) multiple times has the same effect as
	// 		applying it only once. Only first Merge will return non-empty change.
	//
	// Commutativity:
	//		A.Merge(B) has the same result as B.Merge(A) (result being the end state of A or B respectively)
	//
	// Associativity:
	//		calling B.Merge(C), and then A.Merge(B) has the same result as
	//		calling A.Merge(B) first, and then calling A.Merge(C),
	//		that is, A will end up in the same state in both cases.
	//
	// LocalCAS flag is used when doing Merge as part of local CAS operation on KV store. It can be used to detect
	// missing entries, and generate tombstones. (This breaks commutativity and associativity [!] so it can *only* be
	// used when doing CAS operation)
	Merge(other Mergeable, localCAS bool) (change Mergeable, error error)

	// Describes the content of this mergeable value. Used by memberlist client to decide if
	// one change-value can invalidate some other value, that was received previously.
	// Invalidation can happen only if output of MergeContent is a superset of some other MergeContent.
	MergeContent() []string

	// Remove tombstones older than given limit from this mergeable.
	// If limit is zero time, remove all tombstones. Memberlist client calls this method with zero limit each
	// time when client is accessing value from the store. It can be used to hide tombstones from the clients.
	RemoveTombstones(limit time.Time)
}

Mergeable is an interface that values used in gossiping KV Client must implement. It allows merging of different states, obtained via gossiping or CAS function.

type TCPTransport

type TCPTransport struct {
	// contains filtered or unexported fields
}

TCPTransport is a memberlist.Transport implementation that uses TCP for both packet and stream operations ("packet" and "stream" are terms used by memberlist). It uses a new TCP connections for each operation. There is no connection reuse.

func NewTCPTransport

func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTransport, error)

NewTCPTransport returns a new tcp-based transport with the given configuration. On success all the network listeners will be created and listening.

func (*TCPTransport) DialTimeout

func (t *TCPTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error)

DialTimeout is used to create a connection that allows memberlist to perform two-way communication with a peer.

func (*TCPTransport) FinalAdvertiseAddr

func (t *TCPTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)

FinalAdvertiseAddr is given the user's configured values (which might be empty) and returns the desired IP and port to advertise to the rest of the cluster. (Copied from memberlist' net_transport.go)

func (*TCPTransport) GetAutoBindPort

func (t *TCPTransport) GetAutoBindPort() int

GetAutoBindPort returns the bind port that was automatically given by the kernel, if a bind port of 0 was given.

func (*TCPTransport) PacketCh

func (t *TCPTransport) PacketCh() <-chan *memberlist.Packet

PacketCh returns a channel that can be read to receive incoming packets from other peers.

func (*TCPTransport) Shutdown

func (t *TCPTransport) Shutdown() error

Shutdown is called when memberlist is shutting down; this gives the transport a chance to clean up any listeners.

func (*TCPTransport) StreamCh

func (t *TCPTransport) StreamCh() <-chan net.Conn

StreamCh returns a channel that can be read to handle incoming stream connections from other peers.

func (*TCPTransport) WriteTo

func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error)

WriteTo is a packet-oriented interface that fires off the given payload to the given address.

type TCPTransportConfig

type TCPTransportConfig struct {
	// BindAddrs is a list of addresses to bind to.
	BindAddrs flagext.StringSlice `yaml:"bind_addr"`

	// BindPort is the port to listen on, for each address above.
	BindPort int `yaml:"bind_port"`

	// Timeout used when making connections to other nodes to send packet.
	// Zero = no timeout
	PacketDialTimeout time.Duration `yaml:"packet_dial_timeout"`

	// Timeout for writing packet data. Zero = no timeout.
	PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"`

	// Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on
	TransportDebug bool `yaml:"-"`

	// Where to put custom metrics. nil = don't register.
	MetricsRegisterer prometheus.Registerer `yaml:"-"`
	MetricsNamespace  string                `yaml:"-"`

	TLSEnabled bool                 `yaml:"tls_enabled"`
	TLS        tlsutil.ClientConfig `yaml:",inline"`
}

TCPTransportConfig is a configuration structure for creating new TCPTransport.

func (*TCPTransportConfig) RegisterFlags

func (cfg *TCPTransportConfig) RegisterFlags(f *flag.FlagSet, prefix string)

RegisterFlags registers flags.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto