Documentation ¶
Overview ¶
The server package contains the networking and coordination logic for the Forest Bus server implementation.
There are several different objects used to implement the server logic.
ServerNode - This is the root object managing the overall state and coordination of a Forest Bus Server. RPCHandler - Entry point for RPC requests into this server. ServerNodeConfiguration - This holds configuration of the overall server. Node - This is the main Topic object that implements the Raft algorithm.
For each topic (Node) additional objects are used.
RaftElectionTimer - Manages the leadership timeouts. QueueWriteAggregator - Consolidates individual batches of messages from clients for efficient writing to the commitlog. Peer - Represents the topic's state on other peers.
Index ¶
- Constants
- Variables
- func CrossOriginReponseHandler(onwardsHandler http.Handler) (handler http.HandlerFunc)
- type AppendEntriesArgs
- type AppendEntriesResults
- type Command
- type CommitIndex
- func (commInd *CommitIndex) FirstIndexOfTerm(firstInd int64, lastSeenCommit int64)
- func (commInd *CommitIndex) GetCommitIndex() int64
- func (commInd *CommitIndex) ReleaseWaitingClients()
- func (commInd *CommitIndex) UpdateCommitIndex(node *Node)
- func (commInd *CommitIndex) WaitOnCommitChange(index int64) int64
- type ConfigChangeRequest
- type ConfigChangeResults
- type ConfigChangeType
- type ConfigPeers
- type ConfigTopic
- type ConfigTopics
- type IdentifyNodeArgs
- type IdentifyNodeResults
- type Node
- func (node *Node) AppendEntries(args *AppendEntriesArgs, results *AppendEntriesResults)
- func (node *Node) ChangePeerConfiguration(newPeers ConfigPeers)
- func (node *Node) ClientReceiveMessages(args *rapi.ReceiveMessagesArgs, results *rapi.ReceiveMessagesResults) error
- func (node *Node) ClientRequestSendMessages(args *rapi.SendMessagesArgs, results *rapi.SendMessagesResults) error
- func (node *Node) ExpVar() interface{}
- func (node *Node) GetCommitIndex() interface{}
- func (node *Node) GetCommitLog() *commitlog.CommitLog
- func (node *Node) GetTopicDetails(args *rapi.GetTopicDetailsArgs, results *rapi.GetTopicDetailsResults) error
- func (node *Node) RequestVote(args *RequestVoteArgs, results *RequestVoteResults)
- func (node *Node) SendLogsToPeers()
- func (node *Node) SetupNode(topic string, server *ServerNode, ourName string, ourPeers ConfigPeers, ...) error
- func (node *Node) Shutdown(notifier *utils.ShutdownNotifier)
- func (node *Node) StartNode()
- type NodeInfo
- type NodeState
- type Peer
- func (peer *Peer) ExpVar() interface{}
- func (peer *Peer) GetLastIndex() int64
- func (peer *Peer) GetLastValidatedIndex() int64
- func (peer *Peer) GetNextIndex() int64
- func (peer *Peer) SendMessage(cmnd Command) bool
- func (peer *Peer) SetLastIndex(newIndex int64)
- func (peer *Peer) SetNextIndex(newIndex int64)
- func (peer *Peer) SetValidatedIndex(validatedIndex, nextIndex int64)
- type PeerInfo
- type QueueWriteAggregator
- type RPCHandler
- func (handler *RPCHandler) AppendEntries(args *AppendEntriesArgs, results *AppendEntriesResults) error
- func (handler *RPCHandler) ConfigurationChange(args *ServerNodeConfiguration, results *ConfigChangeResults) error
- func (handler *RPCHandler) GetClusterDetails(args *rapi.GetClusterDetailsArgs, results *rapi.GetClusterDetailsResults) error
- func (handler *RPCHandler) GetTopicDetails(args *rapi.GetTopicDetailsArgs, results *rapi.GetTopicDetailsResults) error
- func (handler *RPCHandler) IdentifyNode(args *IdentifyNodeArgs, results *IdentifyNodeResults) error
- func (handler *RPCHandler) ReceiveMessages(args *rapi.ReceiveMessagesArgs, results *rapi.ReceiveMessagesResults) error
- func (handler *RPCHandler) RequestVote(args *RequestVoteArgs, results *RequestVoteResults) error
- func (handler *RPCHandler) SendMessages(args *rapi.SendMessagesArgs, results *rapi.SendMessagesResults) error
- type RaftElectionTimer
- type RequestVoteArgs
- type RequestVoteResults
- type ServerNode
- func (srv *ServerNode) ChangeConfiguration(newconfig *ServerNodeConfiguration, results *ConfigChangeResults)
- func (srv *ServerNode) ExpVar() interface{}
- func (srv *ServerNode) GetClusterDetails(args *rapi.GetClusterDetailsArgs, results *rapi.GetClusterDetailsResults) error
- func (srv *ServerNode) GetClusterID() string
- func (srv *ServerNode) GetNode(topic string) *Node
- func (srv *ServerNode) ListenConnections() error
- func (srv *ServerNode) RequestShutdown(reason string)
- func (srv *ServerNode) SendPeerMessage(peerName string, api string, args interface{}, reply interface{}) error
- func (srv *ServerNode) WaitForShutdown()
- type ServerNodeConfiguration
- type ServerNodeInfo
- type ServerPeer
- type ServerPeerInfo
- type TopicPersistentStore
Constants ¶
const ( // Command to send a vote request to a peer ACTION_SEND_VOTE int = iota // Command to send the log to a peer ACTION_SEND_LOG // Send an ACK to ensure everyone knows we are still the leader ACTION_SEND_ACK // Command to shutdown the peer loop ACTION_SHUTDOWN )
const CANDIDATE_VOTE_TIMEOUT = 200 * time.Millisecond
Candidate timeout for how long to give peers to vote
const CONFIG_FILENAME_CURRENT = "config.cfg"
CONFIG_FILENAME_CURRENT is the name of the currently active configuration file (stored in json)
const CONFIG_FILENAME_NEW = "config.new"
CONFIG_FILENAME_NEW is the name of the new configuration file. Once writing to config.new is complete, the config.cfg file is moved to config.old and then config.new is moved to config.cfg.
const CONFIG_FILENAME_OLD = "config.old"
CONFIG_FILENAME_OLD is the name of the previously active configuration file
const CONFIG_SUB_DIRECTORY = "Config"
CONFIG_SUB_DIRECTORY is the name of the directory within the data path that holds the configuration files.
const DATA_SUB_DIRECTORY = "Data"
The DATA_SUB_DIRECTORY is the name of the data directory in the data path. Each topic will be stored under a directory named after the topic in this directory.
const DEFAULT_AGGREGATION_WINDOW = time.Millisecond * 3
DEFAULT_AGGREGATION_WINDOW is the default amount of time to wait on additional messages arriving for aggregation prior to writing.
const DEFAULT_MAX_BATCH_AGGREGATION = 1500
DEFAULT_MAX_BATCH_AGGREGATION is the default maximum number of batches (client requests) to aggregate into a single write
const DEFAULT_TRIGGER_TOTAL_AGGREGATED_MESSAGES = commitlog.MAX_RETRIEVE_MESSAGE_COUNT * 5
DEFAULT_TRIGGER_TOTAL_AGGREGATED_MESSAGES is the default trigger point beyond which we stop aggegating and do a send to the commit log This is set to the maximum number of messages that a leader sends to followers in one go
const LEADER_ACK_TIMEOUT = 200 * time.Millisecond
Ack idle ping time to avoid leadership timeouts
const MAX_ELECTION_TIMEOUT = 700
const MAX_INCOMING_MESSAGE_SIZE = 5 * 1024 * 1024
MAX_INCOMING_MESSAGE_SIZE defines the maximum amount of data that can be read in a single go
const MIN_ELECTION_TIMEOUT = 500
Times in milliseonds
const NODE_CONNECTION_INTERVAL = time.Second * 2
NODE_CONNECTION_INTERVAL is the amount of time to wait between connection attempts.
const RAFT_NODE_SUBSYSTEM_SHUTDOWN_TIMEOUT = 500 * time.Millisecond
const SHUTDOWN_WAIT_TIMER = time.Second * 5
SHUTDOWN_WAIT_TIMER is the amount of time to wait for a clean shutdown of the server.
const STARTUP_DELAY_TIMEOUT = NODE_CONNECTION_INTERVAL + time.Second
Time to delay the start of the first timeout period. This avoids trying to start an election right at the start of our lifecycle.
Variables ¶
var ERR_INCORRECT_CLUSTER_ID = errors.New("Incorrect cluster ID")
ERR_INCORRECT_CLUSTER_ID is thrown if the configuration cluster ID doesn't match the expected one.
var ERR_MISSING_CONFIG_NON_EMPTY_DIR = errors.New("Configuration files not found or could not be read, but the given directory is not empty and so no new configuration created.")
ERR_MISSING_CONFIG_NON_EMPTY_DIR is thrown if there are no configuration files present, but the directory given isn't empty.
var ERR_NO_DATA_PATHS = errors.New("No data paths defined")
ERR_NO_DATA_PATHS is returned when no data storage paths are found.
var ERR_NO_PEER_CONNECTION_AVAILABLE = errors.New("Peer connection not available.")
ERR_NO_PEER_CONNECTION_AVAILABLE is returned when there is no connection available for a given peer.
var ERR_PEER_NOT_FOUND = errors.New("Unknown peer")
ERR_PEER_NOT_FOUND is returned when a node connects that is not a known peer.
Functions ¶
func CrossOriginReponseHandler ¶
func CrossOriginReponseHandler(onwardsHandler http.Handler) (handler http.HandlerFunc)
Types ¶
type AppendEntriesArgs ¶
type AppendEntriesResults ¶
type AppendEntriesResults struct { // Term is the term of the follower node Term int64 // Success is true if the append worked Success bool // NextIndex contains the next index that the follower is expecting. // If Success if false this can be used by the leader to determine whether to skip back in time // to allow faster catchup of new / recovering followers. NextIndex int64 }
AppendEntriesResults contains the returns data from a call by a leader to a follower to append messages
type Command ¶
type Command struct { Action int ResultChannel chan interface{} }
Command is an internal structure used to send votes and append messages requests to the goroutines that manage interactions with the peers.
type CommitIndex ¶
type CommitIndex struct {
// contains filtered or unexported fields
}
CommitIndex is used by the Leader Node (topic leader) to calculate what the current commit index is.
func NewCommitIndex ¶
func NewCommitIndex() *CommitIndex
func (*CommitIndex) FirstIndexOfTerm ¶
func (commInd *CommitIndex) FirstIndexOfTerm(firstInd int64, lastSeenCommit int64)
FirstIndexOfTerm is used by a Leader Node after it has won an election to inform the CommitIndex of the current index and the last commit index seen. This is used to implement the Raft commit logic of only using majority votes to commit new messages in a leaders term.
func (*CommitIndex) GetCommitIndex ¶
func (commInd *CommitIndex) GetCommitIndex() int64
func (*CommitIndex) ReleaseWaitingClients ¶
func (commInd *CommitIndex) ReleaseWaitingClients()
ReleaseWaitingClients is used by the Node to notify any clients waiting on the commit index change that we are no longer leaders.
func (*CommitIndex) UpdateCommitIndex ¶
func (commInd *CommitIndex) UpdateCommitIndex(node *Node)
UpdateCommitIndex is used to recalculate the commit index for a topic when replication of messages to a node has been completed.
The first index of the term is used to determine whether the replication is sufficient to trigger the majority rule for commit index.
func (*CommitIndex) WaitOnCommitChange ¶
func (commInd *CommitIndex) WaitOnCommitChange(index int64) int64
WaitOnCommitChange returns the new commitIndex when it changes. If the commitIndex is already further on than the index given it returns immediately. The method does not wait until index has been reached - the caller must check whether WaitOnCommitChange is required again.
type ConfigChangeRequest ¶
type ConfigChangeRequest struct { Configuration *ServerNodeConfiguration Response chan rapi.ResultInfo }
A ConfigChangeRequest is used internally within the server to serialise the application of new ServerNodeConfiguration
type ConfigChangeResults ¶
type ConfigChangeResults struct {
Result rapi.ResultInfo
}
type ConfigChangeType ¶
type ConfigChangeType int64
The ConfigChangeType is used by the admin tool to identify what in a configuration request has changed.
const ( // If CNF_Set_Peers is set then the configuration change includes a redfinition // of which Peers are available. CNF_Set_Peers ConfigChangeType = 1 << iota // If CNF_Set_Topic is set then the configuration change includes ensuring a topic exists. CNF_Set_Topic // If CNF_Remove_Topic is set then the configuration change lists the topics to be removed. CNF_Remove_Topic )
type ConfigPeers ¶
type ConfigPeers []string
func (ConfigPeers) Contains ¶
func (peers ConfigPeers) Contains(peer string) bool
func (ConfigPeers) Excluding ¶
func (peers ConfigPeers) Excluding(me string) ConfigPeers
type ConfigTopic ¶
ConfigTopic holds the configuration information for a topic.
func GetDefaultTopicConfiguration ¶
func GetDefaultTopicConfiguration() ConfigTopic
GetDefaultTopicConfiguration returns the default configuration for a topic.
func (*ConfigTopic) String ¶
func (ct *ConfigTopic) String() string
type ConfigTopics ¶
type ConfigTopics map[string]ConfigTopic
func (ConfigTopics) Contains ¶
func (topics ConfigTopics) Contains(topicName string) bool
type IdentifyNodeArgs ¶
type IdentifyNodeArgs struct { // Name is the name of the peer Name string // ClusterID is the ID the peer has been configured with. ClusterID string }
IdentifyNodeArgs contains the arguments for a Node calling identify
type IdentifyNodeResults ¶
type IdentifyNodeResults struct {
Result rapi.ResultInfo
}
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node holds the state of this node (topic) in the cluster.
func (*Node) AppendEntries ¶
func (node *Node) AppendEntries(args *AppendEntriesArgs, results *AppendEntriesResults)
AppendEntries handles all logic for dealing with a leaders request to append entries
func (*Node) ChangePeerConfiguration ¶
func (node *Node) ChangePeerConfiguration(newPeers ConfigPeers)
ChangePeerConfiguration is used by both StartNode and the ServerNode (when handling configuration changes) to manage the starting and stopping of peer goroutines as the number and identity of peers changes.
func (*Node) ClientReceiveMessages ¶
func (node *Node) ClientReceiveMessages(args *rapi.ReceiveMessagesArgs, results *rapi.ReceiveMessagesResults) error
ClientReceiveMessages returns messages at and beyond ID If WaitForMessages is true and no messages are currently present, this method blocks until at least one message can be returned
func (*Node) ClientRequestSendMessages ¶
func (node *Node) ClientRequestSendMessages(args *rapi.SendMessagesArgs, results *rapi.SendMessagesResults) error
ClientRequestSendMessages checks we are the leader and then queues the messages Results are only sent once the messages are on the queue
func (*Node) ExpVar ¶
func (node *Node) ExpVar() interface{}
ExpVar provides node stats when requested.
func (*Node) GetCommitIndex ¶
func (node *Node) GetCommitIndex() interface{}
func (*Node) GetCommitLog ¶
GetCommitLog doesn't apply the lock because the CommitLog doens't change over the lifetime of the Node.
func (*Node) GetTopicDetails ¶
func (node *Node) GetTopicDetails(args *rapi.GetTopicDetailsArgs, results *rapi.GetTopicDetailsResults) error
func (*Node) RequestVote ¶
func (node *Node) RequestVote(args *RequestVoteArgs, results *RequestVoteResults)
RequestVote handles all logic for dealing with a request to vote by another peer.
func (*Node) SendLogsToPeers ¶
func (node *Node) SendLogsToPeers()
SendLogsToPeers is used by the QueueWriteAggregator to inform all peer goroutines that there are new log entries available.
If there are no peers (single node cluster) then this method triggers the commit index update so that messages are available to clients.
func (*Node) SetupNode ¶
func (node *Node) SetupNode(topic string, server *ServerNode, ourName string, ourPeers ConfigPeers, ourLog *commitlog.CommitLog, topicStore TopicPersistentStore) error
StartNode starts up the Raft node implementation for a given topic. The connections are managed by the ServerNode. The Node provides methods that are used by the RPCHandler for interaction with the commit log.
func (*Node) Shutdown ¶
func (node *Node) Shutdown(notifier *utils.ShutdownNotifier)
Shutdown is used by the ServerNode to shutdown this raft node.
type NodeInfo ¶
type NodeInfo struct { Topic string State string CurrentTerm int64 VotedFor string LeaderNode string CommitIndex int64 CommitLog interface{} QueueWriteAggregator interface{} TopicPeers []interface{} }
The ExpVar information structure for the Node.
type NodeState ¶
type NodeState int
NodeState tracks what state this node (topic) in the cluster is in.
const ( // STARTING_UP_NODE is the initial state of a Node (topic) that is loading files, checking logs, etc at start. STARTING_UP_NODE NodeState = iota // FOLLOWER_NODE is the first working state of a Node. Nodes in this state can serve GET requests, vote and replicate the log. FOLLOWER_NODE // CANDIDATE_NODE is entered if the Node does not believe there is a valid leader and is asking for votes. CANDIDATE_NODE // LEADER_NODE is entered if the Node recieves a majority of votes for a given term. // Leaders accept PUT requests and issue AppendEntries requests to other nodes LEADER_NODE // ORPHAN_NODE is a node state when this node is not part of the cluster. // In this state elections are not requested ORPHAN_NODE // SHUTDOWN_NODE is a node that is in the process of shutting down SHUTDOWN_NODE )
type Peer ¶
type Peer struct { Name string // contains filtered or unexported fields }
Peer is the Node (topic)'s representation of the peers, including the leaders understanding of what the next and last index is on those peers.
func (*Peer) GetLastIndex ¶
func (*Peer) GetLastValidatedIndex ¶
func (*Peer) GetNextIndex ¶
func (*Peer) SendMessage ¶
SendMessage attempts to send a message to this peer. A RequestVote is sent if this node is in Candidate state An AppendEntries is sent if this node is in Leader state. True is returned if message delivery can be attempted, false otherwise.
func (*Peer) SetLastIndex ¶
func (*Peer) SetNextIndex ¶
func (*Peer) SetValidatedIndex ¶
type QueueWriteAggregator ¶
type QueueWriteAggregator struct {
// contains filtered or unexported fields
}
QueueWriteAggregator is used to aggregate client messages into larger batches, increasing the through-put of situations where a large number of clients are sending messages.
func NewQueueWriteAggregator ¶
func NewQueueWriteAggregator(node *Node) *QueueWriteAggregator
NewQueueWriteAggregator is used to create a new instance of the queue aggregator for the given node.
func (*QueueWriteAggregator) ExpVar ¶
func (agg *QueueWriteAggregator) ExpVar() interface{}
func (*QueueWriteAggregator) Queue ¶
func (agg *QueueWriteAggregator) Queue(messages [][]byte) ([]int64, error)
Queue turns a batch of messages into a request in the send queue and waits for the aggregator to respond
func (*QueueWriteAggregator) Shutdown ¶
func (agg *QueueWriteAggregator) Shutdown(notifier *utils.ShutdownNotifier)
Shutdown the aggregator.
type RPCHandler ¶
type RPCHandler struct {
// contains filtered or unexported fields
}
RPCHandler holds all exposed RPC methods.
func NewRPCHandler ¶
func NewRPCHandler(srv *ServerNode) *RPCHandler
func (*RPCHandler) AppendEntries ¶
func (handler *RPCHandler) AppendEntries(args *AppendEntriesArgs, results *AppendEntriesResults) error
AppendEntries is used by leaders to inform followers of new message entries.
func (*RPCHandler) ConfigurationChange ¶
func (handler *RPCHandler) ConfigurationChange(args *ServerNodeConfiguration, results *ConfigChangeResults) error
ConfigurationChange is used by the admin tool to send new configuration details to the nodes.
func (*RPCHandler) GetClusterDetails ¶
func (handler *RPCHandler) GetClusterDetails(args *rapi.GetClusterDetailsArgs, results *rapi.GetClusterDetailsResults) error
GetClusterDetails is a Client RPC method allowing clients to discover details about the cluster configuration.
func (*RPCHandler) GetTopicDetails ¶
func (handler *RPCHandler) GetTopicDetails(args *rapi.GetTopicDetailsArgs, results *rapi.GetTopicDetailsResults) error
GetTopicDetails is a Client RPC method allowing clients to discover details about a topic.
func (*RPCHandler) IdentifyNode ¶
func (handler *RPCHandler) IdentifyNode(args *IdentifyNodeArgs, results *IdentifyNodeResults) error
IdentifyNode is used by other peers in the cluster to identify themselves to this node.
func (*RPCHandler) ReceiveMessages ¶
func (handler *RPCHandler) ReceiveMessages(args *rapi.ReceiveMessagesArgs, results *rapi.ReceiveMessagesResults) error
ReceiveMessages is a Client RPC method allowing clients to get messages from this node.
func (*RPCHandler) RequestVote ¶
func (handler *RPCHandler) RequestVote(args *RequestVoteArgs, results *RequestVoteResults) error
RequestVote is used by peers to request a vote during an election.
func (*RPCHandler) SendMessages ¶
func (handler *RPCHandler) SendMessages(args *rapi.SendMessagesArgs, results *rapi.SendMessagesResults) error
SendMessages is a Client RPC method allowing clients to send new messages to a topic on the leader node.
type RaftElectionTimer ¶
type RaftElectionTimer struct {
// contains filtered or unexported fields
}
The RaftElectionTimer manages the timeout for triggering an election.
func NewElectionTimer ¶
func NewElectionTimer(node *Node) *RaftElectionTimer
func (*RaftElectionTimer) Activity ¶
func (t *RaftElectionTimer) Activity()
The Activity method is used by the Node to record that a leader has sent through a message and that the timeout should be restarted.
func (*RaftElectionTimer) Pause ¶
func (t *RaftElectionTimer) Pause()
Pause temproarily stops the timeout. This is useful when the node is carrying out potentially long running activites (writing to disk, syncing) that will interrupt the receiving of messages from the leader.
func (*RaftElectionTimer) RunElectionTimer ¶
func (t *RaftElectionTimer) RunElectionTimer()
RunElectionTimer runs the timer logic. This method will also call elections and run the leadership loop.
func (*RaftElectionTimer) Shutdown ¶
func (t *RaftElectionTimer) Shutdown(notifier *utils.ShutdownNotifier)
func (*RaftElectionTimer) Start ¶
func (t *RaftElectionTimer) Start()
Start triggers the running of the timeout loop.
type RequestVoteArgs ¶
type RequestVoteResults ¶
type ServerNode ¶
type ServerNode struct {
// contains filtered or unexported fields
}
A ServerNode is the main object of the server. It holds the master of the configuration, peer connections and topics.
func NewServerNode ¶
func NewServerNode(address, gobAddress, httpAddress, cborAddress, rootpath, cluster_id string) (*ServerNode, error)
func (*ServerNode) ChangeConfiguration ¶
func (srv *ServerNode) ChangeConfiguration(newconfig *ServerNodeConfiguration, results *ConfigChangeResults)
func (*ServerNode) GetClusterDetails ¶
func (srv *ServerNode) GetClusterDetails(args *rapi.GetClusterDetailsArgs, results *rapi.GetClusterDetailsResults) error
func (*ServerNode) GetClusterID ¶
func (srv *ServerNode) GetClusterID() string
GetClusterID returns the ClusterID that was set when the server started up. Locking is not used as the value will not change after startup.
func (*ServerNode) GetNode ¶
func (srv *ServerNode) GetNode(topic string) *Node
func (*ServerNode) ListenConnections ¶
func (srv *ServerNode) ListenConnections() error
listenConnections starts monitoring of incoming calls to allow for peer connections
func (*ServerNode) RequestShutdown ¶
func (srv *ServerNode) RequestShutdown(reason string)
func (*ServerNode) SendPeerMessage ¶
func (srv *ServerNode) SendPeerMessage(peerName string, api string, args interface{}, reply interface{}) error
func (*ServerNode) WaitForShutdown ¶
func (srv *ServerNode) WaitForShutdown()
WaitForShutdown blocks until the server is asked to shutdown, then it orchestrates the shutdown.
The shutdown sequence is:
- Close the RPC and HTTP listeners
- Shutdown the configuration go-routine
- Ask each topic's RaftNode to shutdown, this in turn:
- Shut's down the election timer / leader loop
- Ask's each peer goroutine to quit
- Once each's peer's connection is closed, shutdown the write aggregator
- Once the write aggregator is shutdown, notify the storage of shutdown. Disk storage will:
- Shutdown the cleanup goroutine
- Shutdown the segment close goroutine
- Close each open segment
- Shutdown each ServerPeer connection
- Close main goroutine
Note that open RPC connections from other nodes are not closed, but they will behave safely if any new requests are recieved.
type ServerNodeConfiguration ¶
type ServerNodeConfiguration struct { // Cluster_ID is used as a safety check to ensure that changes made and peers connecting are done against the correct cluster // Cluster_ID is recorded in the config file for a node and is passed by command line alone. Cluster_ID string // The Scope contains a bitmask of configuration changes represented by this data Scope ConfigChangeType // The list of Peers (name:port) that belong to this cluster Peers ConfigPeers // The map of Topics that belong to this cluster Topics ConfigTopics // List of paths data can be stored in - the last one is the default for new topics Data_Paths []string // contains filtered or unexported fields }
The ServerNodeConfiguration structure holds the complete configuration for a cluster. It is also used with bitmasks in the Scope field to communicate changes in configuration.
func GetEmptyConfiguration ¶
func GetEmptyConfiguration() *ServerNodeConfiguration
func LoadConfiguration ¶
func LoadConfiguration(clusterID, rootpath string) (*ServerNodeConfiguration, error)
LoadConfiguration is responsible for determining what server configuration is present at the given path.
The logic followed is:
1 - Is the rootpath a directory? 2 - Is the directory empty - if so create a skeleton configuration 3 - If the directory has an entry called "Config" look inside it 4 - If a config.cfg file exists, read it 5 - If a config.cfg file is not present, but a config.old is present, move it to config.cfg 6 - If the configuation cluster_ID is empty, set it to the clusterID 7 - If the configuration cluster_ID is not empty and doesn't match clusterID - error.
func (*ServerNodeConfiguration) SaveConfiguration ¶
func (conf *ServerNodeConfiguration) SaveConfiguration() error
SaveConfiguration saves the configuration file as JSON in the conf.root_path
Logic:
1 - Delete the config.new file if it already exists. 2 - Create the new confing.new file. 3 - Delete the config.old file if it already exists. 4 - Move the existing confing.cfg file to config.old 5 - Move the new config.new file to config.cfg
type ServerNodeInfo ¶
type ServerNodeInfo struct { Name string ServerPeerInfo []interface{} TopicsInfo []interface{} }
ServerNodeInfo is used to provide information on the server to ExpVar.
type ServerPeer ¶
type ServerPeer struct {
// contains filtered or unexported fields
}
The ServerPeer is used by the ServerNode to manage connections to a peer.
func NewServerPeer ¶
func NewServerPeer(ourname, name string) *ServerPeer
NewServerPeer creates a new ServerPeer to hold connections to a peer node.
func (*ServerPeer) ExpVar ¶
func (peer *ServerPeer) ExpVar() interface{}
type ServerPeerInfo ¶
ServerPeerInfo is used for ExpVar reporting on the state of Peer connnections.
type TopicPersistentStore ¶
type TopicPersistentStore interface { // SetTerm persists the term and vote SetTerm(term int64, votedFor string) (err error) // Load reads the given topic information from persistent storage Load() (term int64, votedFor string, err error) }
The TopicPersistentStore is an interface used by the node to store persistently the term and who the node last voted for.