Version: v1.2.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2021 License: MPL-2.0 Imports: 18 Imported by: 0



Package manager contains the management code for EliasDB's clustering feature.

The management code deals with cluster building, general communication between cluster members, verification of communicating peers and monitoring of members.

The cluster structure is pure peer-to-peer design with no single point of failure. All members of the cluster share a versioned cluster state which is persisted. Members have to manually be added or removed from the cluster. Each member also has a member info object which can be used by the application which uses the cluster to store additional member related information.

Temporary failures are detected automatically. Every member of the cluster monitors the state of all its peers by sending ping requests to them on a regular schedule.



View Source
const (
	StateInfoTS      = "ts"          // Timestamp of state info
	StateInfoTSOLD   = "tsold"       // Previous timestamp of state info
	StateInfoMEMBERS = "members"     // List of known cluster members
	StateInfoFAILED  = "failed"      // List of failed peers
	StateInfoREPFAC  = "replication" // Replication factor of the cluster

Known StateInfo entries

View Source
const (
	MemberInfoError   = "error"   // Error message if a member was not reachable
	MemberInfoTermURL = "termurl" // URL to the cluster terminal of the member

Known MemberInfo entries

View Source
const (
	RPCPing      RPCFunction = "Ping"
	RPCSIRequest             = "StateInfoRequest"
	RPCMIRequest             = "MemberInfoRequest"

	RPCAcquireLock = "AcquireLock"
	RPCReleaseLock = "ReleaseLock"

	RPCJoinCluster = "JoinCluster"
	RPCAddMember   = "AddMember"
	RPCEjectMember = "EjectMember"

	RPCUpdateStateInfo = "UpdateStateInfo"

	RPCDataRequest = "DataRequest"

List of all possible RPC functions. The list includes all RPC callable functions in this file.

View Source
const (
	ClusterLockUpdateStateInfo = "ClusterLockUpdateStateInfo"

Known cluster locks

View Source
const ConfigClusterSecret = "ClusterSecret"

ConfigClusterSecret is the secret which authorizes a cluster member (the secret must never be send directly over the network)

View Source
const ConfigMemberName = "ClusterMemberName"

ConfigMemberName is the name of the cluster member

View Source
const ConfigRPC = "ClusterMemberRPC"

ConfigRPC is the PRC network interface for the local cluster manager

View Source
const ConfigReplicationFactor = "ReplicationFactor"

ConfigReplicationFactor is the number of times a given datum must be stored redundently. The cluster can suffer n-1 member losses before it becomes inoperational. The value is set once in the configuration and becomes afterwards part of the global cluster state info (once this is there the config value is ignored).


View Source
var (
	ErrMemberComm    = errors.New("Network error")
	ErrMemberError   = errors.New("Member error")
	ErrClusterConfig = errors.New("Cluster configuration error")
	ErrClusterState  = errors.New("Cluster state error")
	ErrUnknownPeer   = errors.New("Unknown peer member")
	ErrUnknownTarget = errors.New("Unknown target member")
	ErrInvalidToken  = errors.New("Invalid member token")
	ErrNotMember     = errors.New("Client is not a cluster member")
	ErrLockTaken     = errors.New("Requested lock is already taken")
	ErrLockNotOwned  = errors.New("Requested lock not owned")

Cluster related error types

View Source
var DefaultConfig = map[string]interface{}{
	ConfigRPC:               "",
	ConfigMemberName:        "member1",
	ConfigClusterSecret:     "secret123",
	ConfigReplicationFactor: 1.0,

DefaultConfig is the defaut configuration

View Source
var DialTimeout = 10 * time.Second

DialTimeout is the dial timeout for RPC connections

View Source
var FreqHousekeeping float64 = 1000

FreqHousekeeping is the frequency of running housekeeping tasks (ms)

View Source
var LogDebug = Logger(LogNull)

LogDebug is called if a debug message is logged in the cluster code (by default disabled)

View Source
var LogInfo = Logger(log.Print)

LogInfo is called if an info message is logged in the cluster code

View Source
var LogNull = func(v ...interface{}) {

LogNull is a discarding logger to be used for disabling loggers

View Source
var MemberErrorExceptions map[string][]string

MemberErrorExceptions map to exclude members from simulated member errors (only used for testing)

View Source
var MemberErrors map[string]error

MemberErrors map for simulated member errors (only used for testing)

View Source
var MsiRetFlush error

MsiRetFlush nil or the error which should be returned by a Flush call


This section is empty.


type Client

type Client struct {
	// contains filtered or unexported fields

Client is the client for the RPC cluster API of a cluster member.

func (*Client) FailedPeerErrors

func (mc *Client) FailedPeerErrors() []string

FailedPeerErrors returns the same list as FailedPeers but with error messages.

func (*Client) FailedPeers

func (mc *Client) FailedPeers() []string

FailedPeers returns a list of failed members.

func (*Client) FailedTotal

func (mc *Client) FailedTotal() int

FailedTotal returns the total number of failed members.

func (*Client) IsFailed

func (mc *Client) IsFailed(name string) bool

IsFailed checks if the given member is in the failed state.

func (*Client) OperationalPeers

func (mc *Client) OperationalPeers() ([]string, error)

OperationalPeers returns all operational peers and an error if too many cluster members have failed.

func (*Client) SendAcquireClusterLock

func (mc *Client) SendAcquireClusterLock(lockName string) error

SendAcquireClusterLock tries to acquire a named lock on all members of the cluster. It fails if the lock is alread acquired or if not enough cluster members can be reached.

func (*Client) SendDataRequest

func (mc *Client) SendDataRequest(member string, reqdata interface{}) (interface{}, error)

SendDataRequest sends a data request to a member and returns its response.

func (*Client) SendEjectMember

func (mc *Client) SendEjectMember(member string, memberToEject string) error

SendEjectMember sends a request to eject a member from the cluster.

func (*Client) SendJoinCluster

func (mc *Client) SendJoinCluster(targetMember string, targetMemberRPC string) (map[string]interface{}, error)

SendJoinCluster sends a request to a cluster member to join the caller to the cluster. Pure clients cannot use this function as this call requires the Client.rpc field to be set.

func (*Client) SendMemberInfoRequest

func (mc *Client) SendMemberInfoRequest(member string) (map[string]interface{}, error)

SendMemberInfoRequest requests the static member info of a member and returns it.

func (*Client) SendPing

func (mc *Client) SendPing(member string, rpc string) ([]string, error)

SendPing sends a ping to a member and returns the result. Second argument is optional if the target member is not a known peer. Should be an empty string in all other cases.

func (*Client) SendReleaseClusterLock

func (mc *Client) SendReleaseClusterLock(lockName string) error

SendReleaseClusterLock tries to release a named lock on all members of the cluster. It is not an error if a lock is not takfen (or has expired) on this member or any other target member.

func (*Client) SendRequest

func (mc *Client) SendRequest(member string, remoteCall RPCFunction,
	args map[RequestArgument]interface{}) (interface{}, error)

SendRequest sends a request to another cluster member. Not reachable members get an entry in the failed map and the error return is ErrMemberComm. All other error returns should be considered serious errors.

func (*Client) SendStateInfoRequest

func (mc *Client) SendStateInfoRequest(member string) (map[string]interface{}, error)

SendStateInfoRequest requests the state info of a member and returns it.

type DefaultStateInfo

type DefaultStateInfo struct {
	// contains filtered or unexported fields

DefaultStateInfo is the default state info which uses a file to persist its data.

func (*DefaultStateInfo) Flush

func (dsi *DefaultStateInfo) Flush() error

Flush persists the state info.

func (*DefaultStateInfo) Get

func (dsi *DefaultStateInfo) Get(key string) (interface{}, bool)

Get retrieves some data from the state info.

func (*DefaultStateInfo) Map

func (dsi *DefaultStateInfo) Map() map[string]interface{}

Map returns the state info as a map.

func (*DefaultStateInfo) Put

func (dsi *DefaultStateInfo) Put(key string, value interface{})

Put stores some data in the state info.

type Error

type Error struct {
	Type   error  // Error type (to be used for equal checks)
	Detail string // Details of this error

Error is a cluster related error

func (*Error) Error

func (ge *Error) Error() string

Error returns a human-readable string representation of this error.

type Logger

type Logger func(v ...interface{})

Logger is a function which processes log messages from the cluster

type MemStateInfo

type MemStateInfo struct {
	// contains filtered or unexported fields

MemStateInfo is a state info object which does not persist its data.

func (*MemStateInfo) Flush

func (msi *MemStateInfo) Flush() error

Flush does not do anything :-)

func (*MemStateInfo) Get

func (msi *MemStateInfo) Get(key string) (interface{}, bool)

Get retrieves some data from the state info.

func (*MemStateInfo) Map

func (msi *MemStateInfo) Map() map[string]interface{}

Map returns the state info as a map.

func (*MemStateInfo) Put

func (msi *MemStateInfo) Put(key string, value interface{})

Put stores some data in the state info.

type MemberManager

type MemberManager struct {
	StopHousekeeping bool // Flag to temporarily stop housekeeping

	Client *Client // RPC client object
	// contains filtered or unexported fields

MemberManager is the management object for a cluster member.

This is the main object of the clustering code it contains the main API. A member registers itself to the rpc server which is the global ManagerServer (server) object. Each cluster member needs to have a unique name. Communication between members is secured by using a secret string which is never exchanged over the network and a hash generated token which identifies a member.

Each MemberManager object contains a Client object which can be used to communicate with other cluster members. This object should be used by pure clients - code which should communicate with the cluster without running an actual member.

func NewMemberManager

func NewMemberManager(rpcInterface string, name string, secret string, stateInfo StateInfo) *MemberManager

NewMemberManager create a new MemberManager object.

func (*MemberManager) EjectMember

func (mm *MemberManager) EjectMember(memberToEject string) error

EjectMember ejects a member from the current cluster. Trying to remove a non-existent member has no effect.

func (*MemberManager) HousekeepingWorker

func (mm *MemberManager) HousekeepingWorker()

HousekeepingWorker is the background thread which handles various tasks to provide "eventual" consistency for the cluster.

func (*MemberManager) JoinCluster

func (mm *MemberManager) JoinCluster(newMemberName string, newMemberRPC string) error

JoinCluster lets this member try to join an existing cluster. The secret must be correct otherwise the member will be rejected.

func (*MemberManager) JoinNewMember

func (mm *MemberManager) JoinNewMember(newMemberName string, newMemberRPC string) error

JoinNewMember joins a new member to the current cluster. It is assumed that the new members token has already been verified.

func (*MemberManager) LogInfo

func (mm *MemberManager) LogInfo(v ...interface{})

LogInfo logs a member related message at info level.

func (*MemberManager) MemberInfo

func (mm *MemberManager) MemberInfo() map[string]interface{}

MemberInfo returns the current static member info. Clients may modify the returned map. Member info can be used to store additional information on every member (e.g. a member specific URL).

func (*MemberManager) MemberInfoCluster

func (mm *MemberManager) MemberInfoCluster() map[string]map[string]interface{}

MemberInfoCluster returns the current static member info for every known cluster member. This calls every member in the cluster.

func (*MemberManager) Members

func (mm *MemberManager) Members() []string

Members returns a list of all cluster members.

func (*MemberManager) Name

func (mm *MemberManager) Name() string

Name returns the member name.

func (*MemberManager) NetAddr

func (mm *MemberManager) NetAddr() string

NetAddr returns the network address of the member.

func (*MemberManager) SetEventHandler

func (mm *MemberManager) SetEventHandler(notifyStateUpdate func(), notifyHouseKeeping func())

SetEventHandler sets event handler funtions which are called when the state info is updated or when housekeeping has been done.

func (*MemberManager) SetHandleDataRequest

func (mm *MemberManager) SetHandleDataRequest(handleDataRequest func(interface{}, *interface{}) error)

SetHandleDataRequest sets the data request handler.

func (*MemberManager) Shutdown

func (mm *MemberManager) Shutdown() error

Shutdown shuts the member manager rpc server for this cluster member down.

func (*MemberManager) Start

func (mm *MemberManager) Start() error

Start starts the manager process for this cluster member.

func (*MemberManager) StateInfo

func (mm *MemberManager) StateInfo() StateInfo

StateInfo returns the current state info.

func (*MemberManager) UpdateClusterStateInfo

func (mm *MemberManager) UpdateClusterStateInfo() error

UpdateClusterStateInfo updates the members state info and sends it to all members in the cluster.

type MemberToken

type MemberToken struct {
	MemberName string
	MemberAuth string

MemberToken is used to authenticate a member in the cluster

type RPCFunction

type RPCFunction string

RPCFunction is used to identify the called function in a RPC call

type RequestArgument

type RequestArgument int

RequestArgument is used to identify arguments in a RPC call

const (
	RequestTARGET       RequestArgument = iota // Required argument which identifies the target cluster member
	RequestTOKEN                               // Client token which is used for authorization checks
	RequestLOCK                                // Lock name which a member requests to take
	RequestMEMBERNAME                          // Name for a member
	RequestMEMBERRPC                           // Rpc address and port for a member
	RequestSTATEINFOMAP                        // StateInfo object as a map
	RequestDATA                                // Data request object

List of all possible arguments in a RPC request. There are usually no checks which give back an error if a required argument is missing. The RPC API is an internal API and might change without backwards compatibility.

type Server

type Server struct {
	// contains filtered or unexported fields

Server is the RPC exposed cluster API of a cluster member. Server is a singleton and will route incoming (authenticated) requests to registered MemberManagers. The calling member is referred to as source member and the called member is referred to as target member.

func (*Server) AcquireLock

func (ms *Server) AcquireLock(request map[RequestArgument]interface{},
	response *interface{}) error

AcquireLock tries to acquire a named lock for the source member on the target member. It fails if the lock is alread acquired by a different member. The lock can only be held for a limited amount of time.

func (*Server) AddMember

func (ms *Server) AddMember(request map[RequestArgument]interface{},
	response *interface{}) error

AddMember adds a new member on the target member.

func (*Server) DataRequest

func (ms *Server) DataRequest(request map[RequestArgument]interface{},
	response *interface{}) error

DataRequest handles a data request.

func (*Server) EjectMember

func (ms *Server) EjectMember(request map[RequestArgument]interface{},
	response *interface{}) error

EjectMember can be called by a cluster member to eject itself or another cluster member.

func (*Server) JoinCluster

func (ms *Server) JoinCluster(request map[RequestArgument]interface{},
	response *interface{}) error

JoinCluster is used by a new member if it wants to join the cluster.

func (*Server) MemberInfoRequest

func (ms *Server) MemberInfoRequest(request map[RequestArgument]interface{},
	response *interface{}) error

MemberInfoRequest answers with the member's static info.

func (*Server) Ping

func (ms *Server) Ping(request map[RequestArgument]interface{},
	response *interface{}) error

Ping answers with a Pong if the given client token was verified and the local cluster member exists.

func (*Server) ReleaseLock

func (ms *Server) ReleaseLock(request map[RequestArgument]interface{},
	response *interface{}) error

ReleaseLock releases a lock. Only the member which holds the lock can release it.

func (*Server) StateInfoRequest

func (ms *Server) StateInfoRequest(request map[RequestArgument]interface{},
	response *interface{}) error

StateInfoRequest answers with the member's state info.

func (*Server) UpdateStateInfo

func (ms *Server) UpdateStateInfo(request map[RequestArgument]interface{},
	response *interface{}) error

UpdateStateInfo updates the state info of the target member.

type StateInfo

type StateInfo interface {

		Put stores some data in the state info.
	Put(key string, value interface{})

		Get retrievtes some data from the state info.
	Get(key string) (interface{}, bool)

		Map returns the state info as a map.
	Map() map[string]interface{}

		Flush persists the state info.
	Flush() error

StateInfo models a state object which stores cluster related data. This information is exchanged between cluster members. It is not expected that the info changes frequently.

func NewDefaultStateInfo

func NewDefaultStateInfo(filename string) (StateInfo, error)

NewDefaultStateInfo creates a new DefaultStateInfo.

func NewMemStateInfo

func NewMemStateInfo() StateInfo

NewMemStateInfo creates a new MemStateInfo.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL