raft

package module
v0.0.0-...-ccc19f7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2019 License: Apache-2.0 Imports: 29 Imported by: 0

README

GoDoc Build Status codecov Go Report Card

raft

Yet another implementation of raft, in go.

This raft package may be useful to any application run as a cluster of coordinating instances requiring a distributed replicated log of events. An identical sequence of events will be presented to each member instance. Each member instance can contribute to that distributed log. No third party system is required.

Member instances can use the replicated log to drive a deterministic state machine: the result would be that all instances will arrive at the same state traversing the same sequence of states.

The best (and only) place to start if you are planning to embed this package in your application is the package documentation here.

Extensive examples are included, including how to setup TLS with mutual authentication to protect gRPC intra-cluster communication. The rest of this README.md is largely about the implementation of the package itself.

A helm chart is also provided to facilitate deploying a demonstration application cluster on Kubernetes (e.g. on Google Kubernetes Engine in Google Cloud). This demo application includes metric export and grafana dashboard and can be deployed using a single helm install invocation.

The focus of this implementation is an all-batteries-included, production quality, extensively tested, complete implementation of the core Raft specification. The list of Raft enhancements which are still pending is as follows: graceful leader shutdown, cluster membership extensions for dynamic membership, log compaction, exactly-once guarantees for clients and, beyond Raft, Tangaroa extensions. Report card section below includes details of enhancements.

Intra-cluster connectivity is implemented over gRPC with support for TLS protection with mutual authentication.

Observability is central to a production-quality implementation; structured logging (using Uber zap library) and metrics export (using prometheus library and gRPC interceptors for both metrics and logging) are an integral part of the implementation. The basic dashboard below is include in the helm chart and shows the state of each node in the application cluster; follower, candidate or leader role, who is leader (and how leadership changed) from the perspective of each node, what is the term, committed and applied index at each node.

Raft Package Overview

Unit test coverage is high and it is a goal to keep it so; unit test code itself is a key component of the implementation.

Test Application

The source in app application is a skeleton demonstration showing simple features of the raft package in action. Test Application instances running in a cluster will all see a single version of a distributed log and each contribute random log commands to that log. Each command is made up of an originating node and a UUID.

Sample output from an instance in the application cluster would look like this:

Node0:29463c76-159a-43bd-8aa0-c0c654e67f69
Node0:832a806b-2cab-47d5-9a79-2a075f56324e
Node1:54caaacb-b585-4b01-8db4-9c3739d1c4ba
Node2:2740efcb-df3f-4c04-b3d0-b1c7ed163bc3
Node0:d6adc157-27c3-45de-9c91-f4d83ea2d19f
Node1:00bdafa3-30d4-4be8-bb06-3472927ad00a
Node0:3c732fc7-a31e-4a8d-8a01-c8199df058fd

A simple multistage Dockerfile is provided, together with a helm chart to enable deployment of the application cluster as kubernetes deployments are provided.

Instructions are also included for application cluster deployment in a cloud environment (e.g. Google Kubernetes Engine).

Intracluster Messaging

Intracluster messaging in this package relies on gRPC. The gRPC server and client options can be configured by the application. By default, the client and server are configured with aggressive maximum concurrent streams per transport, and keepalive parameters and enforcement policy.

Both the server side and client side are set up with prometheus and zap logging interceptors in order to provide consistent logging and metrics collection for gRPC calls. A detailed configuration option WithMetrics under the control of the application determines whether we track the latency distribution of RPC calls.

TLS: Protecting Intracluster Messaging

The Raft package supports protecting intra-cluster traffic with mutually authenticated TLS. Client dial (grpc.DialOptions) and server (grpc.ServerOptions) options can be provided by the application as part of MakeNode() initialisation. Godoc provides an example of how to run with TLS enabled and with mutual authentication between server and client.

Metrics

The Raft package accepts a prometheus metrics registry and uses that to track key statistics of the Raft operation, including RPC metrics client and server side (using gRPC middleware interceptors). The WithMetrics can be called without a registry in which case metrics are collected against the default registry. If WithMetrics option is not setup, no metrics are collected.

A Grafana dashboards is provided for the raft package. The kubernetes/helm based deployment of the example application provides a live view of the Grafana raft dashboard.

Logging

Logging is primarily to three levels; Info which logs very minimally and captures very significant events like role (leader/follower/candidate) changes, Debug which is far more verbose, and Error.

Logs are named raft.NODE<node index>. GRPC server and client message logs are named raft.NODE<node index>.GRPC_<C|S> so they can be easily filtered in or out. GRPC message logs are logged at debug level only. Log names are appended to whatever name the the application uses if it provides the log through WithLogger option.

By default, the raft package will set up a customised production zap log: logging Info level and above, structured and JSON formatted, with sampling and caller disabled, and stacktrace enabled for errors, and logging to stdout. The logger field is set (using logger.Named()) to unambiguously indicate that logs are coming from raft package. Logs, by default, look like this:

{"level":"info","ts":"2019-07-07T10:20:23.859+0100","logger":"raft.NODE0","msg":"raft package, hello (logging can be customised or disabled using WithLogger options)"}
{"level":"info","ts":"2019-07-07T10:20:23.866+0100","logger":"raft.NODE0","msg":"raftEngine, start running","currentTerm":0,"commitIndex":0,"appliedIndex":0,"state":"not set","VotedFor":0,"currentLeader":-1}
{"level":"info","ts":"2019-07-07T10:20:23.866+0100","logger":"raft.NODE0","msg":"ROLE CHANGE: from UNINIT to FOLLOWER","currentTerm":0,"commitIndex":0,"appliedIndex":0,"state":"follower","VotedFor":0,"currentLeader":-1}
{"level":"info","ts":"2019-07-07T10:20:35.814+0100","logger":"raft.NODE0","msg":"ROLE CHANGE: from FOLLOWER to CANDIDATE","currentTerm":0,"commitIndex":0,"appliedIndex":0,"state":"candidate","VotedFor":0,"currentLeader":-1}
{"level":"info","ts":"2019-07-07T10:20:35.818+0100","logger":"raft.NODE0","msg":"ROLE CHANGE: from CANDIDATE to LEADER","currentTerm":1,"commitIndex":0,"appliedIndex":0,"state":"leader","VotedFor":0,"currentLeader":0}

If the log encoding is set to console instead of JSON (unusual in production environments, but useful if looking at logs directly instead of through some log aggregating tool like Splunk or Elastic), then Info level logs look like this:

2019-07-07T16:05:33.104+0100	info	raft.NODE0	raft package, hello (logging can be customised or disabled using WithLogger options)
2019-07-07T16:05:33.115+0100	info	raft.NODE0	raftEngine, start running	{"currentTerm": 0, "commitIndex": 0, "appliedIndex": 0, "state": "uninit", "VotedFor": 0, "currentLeader": -1}
2019-07-07T16:05:33.115+0100	info	raft.NODE0	ROLE CHANGE: from UNINIT to FOLLOWER	{"currentTerm": 0, "commitIndex": 0, "appliedIndex": 0, "state": "follower", "VotedFor": 0, "currentLeader": -1}
2019-07-07T16:05:45.064+0100	info	raft.NODE0	ROLE CHANGE: from FOLLOWER to CANDIDATE	{"currentTerm": 0, "commitIndex": 0, "appliedIndex": 0, "state": "candidate", "VotedFor": 0, "currentLeader": -1}
2019-07-07T16:05:45.070+0100	info	raft.NODE0	ROLE CHANGE: from CANDIDATE to LEADER	{"currentTerm": 1, "commitIndex": 0, "appliedIndex": 0, "state": "leader", "VotedFor": 0, "currentLeader": 0}
2019-07-07T16:05:45.187+0100	info	raft.NODE0	application is requesting a shutdown
2019-07-07T16:05:45.187+0100	info	raft.NODE0	raftEngine, stop running	{"currentTerm": 1, "commitIndex": 0, "appliedIndex": 0, "state": "leader", "VotedFor": 0, "currentLeader": 0}
2019-07-07T16:05:45.187+0100	info	raft.NODE0	raft package, goodbye

Raft logging is customisable in the MakeNode call, application controls logger through the WithLogging option. See godoc example for details. WithLogger option allows app to disable logging, provide its own zap logger, or, in conjunction with DefaultZapLoggerConfig, to start with the default logging configuration in raft package, modify it based on application need, and activate it.

Error Handling

Errors returned from the raft package across APIs attempt to provide the necessary context to clarify the circumstances leading to the error (in the usual form of a string returned via Error() method on error). The same error can also be squeezed for a root cause using the errors.Cause() method to support programmatic handling of the error if required. If the root cause is internal to raft, then the sentinel error will be one of those defined here and can be tested programmatically. If the root cause originated in some package downstream of raft, then the downstream error is propagated explicitly. Godoc documentation includes an example of how the errors can be handled programmatically. In essence:

	n, err := MakeNode(ctx, &wg, cfg, localNodeIndex, WithLogger(mylog))
	if err != nil {

		switch errors.Cause(err) {
		case RaftErrorBadMakeNodeOption:
			//
			// Handle specific sentinel in whichever way we see fit.
			// ...
		default:
			// Root cause is not a handled sentinel.
		}
		// err itself renders the full context not just the sentinel.
		fmt.Println(err)
	}

Report Card

Lots to go, but do come inside and have a look.

Completed so far:

  • General package infra: gRPC client and server setup, logging, metrics, UT.
  • Core Raft Implementation (i.e. leadership election and log replication).
  • Basic metrics export, and comprehensive structures logging.
  • Basic helm deployment of demo test application for experimentation.

Todo next:

Target is to, eventually, cover all of Raft functionality including cluster membership extensions, log compaction, exactly-once guarantees to clients and, beyond Raft, to bring Byzantine fault tolerance via Tangaroa.

  • election: performance and negative testing; more functional testing around recovery
  • graceful handover on leader before shutdown
  • refactor TestLogReplication()
  • errors/utilisation/saturation metrics
  • add dashboard to cover gRPC metrics, and add configmap to deploy dashboard automatically using the approach described here.
  • docker compose based example for those running locally
  • add config exchange as part of voting protocol to protect/detect misconfiguration (duplicate node ids, sufficiently mismatched pre-jittered election timeouts etc)
  • set up an anti-affinity example in the helm deployment as an example of how to avoid application cluster nodes being serviced by the same underlying kubernetes node
Dependencies

Other than the usual dependencies (i.e. go installation), protoc needs to be installed as described here and the directory hosting protoc should be in the PATH. Running go generate will automatically regenerate generated source.

Raft, The Consensus Algorithm

Key references for the implementation are:

  1. Diego Ongaro and John Ousterhout, In Search of an Understandable Consensus Algorithm. 2014 (ISUCA)

  2. Diego Ongaro. Consensus: Bridging Theory and Practice. Stanford University Ph.D. Dissertation. Aug. 2014. (CBTP)

  3. Heidi Howard. ARC: analysis of Raft consensus. University of Cambridge, Computer Laboratory, Jul. 2014. (ARC)

Reference to the above in code uses acronyms included above (i.e. CBTP, ARC, ISUCA).

For an excellent short presentation about Raft, see: https://www.usenix.org/node/184041

Key assertions:

  • a raft node can be in one of three roles: Follower, Candidate and Leader.
  • log entries only ever flow from leader to followers.
  • a term will only ever have one leader.
  • followers never accept log entries from leaders on smaller term.
  • leaders never remove entries, and followers only remove entries which conflict with leader (and are by definition uncommitted).
  • voters only vote for leader if leader is as up-to-date as voter at least.

Raft is not Byzantine fault tolerant. A Raft variant called Tangaroa proposes extensions to make it so.

Core Raft provides clients with at-least-once guarantees (Section 6.3 CBTP). This is because a leader may fail once a proposal has been committed but before an acknowledgment is sent to the client. The same section proposes a method to support exactly-once guarantees for a proposal even in the context of concurrent writes from the same session.

Implementation Notes

The figure below represents the main components in the raft package and the interactions with the application.

Raft Package Overview

ASIDE; the choice of term 'client' can lead to confusion - almost invariably client is referring to the gRPC client functionality which a Node uses to interact with other nodes' servers in the cluster. The term 'application' is used to refer to the entity reading and consuming log updates. In the raft specifications, 'application' is called client.

From raftEngine to gRPC client goroutines, we never block - if channel if full raftEngine side proceeds. Client side recovers by pulling when channel starts to drain. This constraint should always be satisfied because this is what ensures that we never livelock with client goroutine pushing to raftEngine, and raftEngine trying to push to raftEngine.

Notes about Flows

The life-of-a-packet for various paths through the system...

Application learns about newly committed log command from raft package:

Trigger: load a new commitIndex. This could happen when we learn of new commit index as followers from a leader, or as a leader when an acknowledgement for an AppendEntry update is received. Action: the local publisher is notified (assuming a notification is not pending already). The local publisher strives to keep the local application in sync with committed log commands.

Application pushes new log command on follower node:

Trigger: application pushes log command to log producer via LogProduce(). Log producer hands off logCommandContainer to raftEngine and blocks waiting for response on response channel. This means we will only have one cmd log in flight at any one point if the client serialises calls to LogProduce. (Eventually we may publish results asynchronously to improve performance.) Action: raftEngine hands the logCommandContainer to the client for the leader on the logCommand channel as a logCmdEvent . The log command is propagated to the leader over gRPC and handled there. The entries are added to the log. And ack tracker is setup to watch for when the committedIndex moves to this index. When that happens the tracker will acknowledge which will release the response to the remote node. When the result is received at the originating node for the producing application, it is passed back to the log producer and the application through the return channel in the logCommandContainer. In the case where the log command was sourced locally to the leader, then the acker will feed the acknowledgement directly to the logProducer and the application.

raftEngine receives a new log command on leader node:

Trigger: remote (or local) logCommand message with new log command. Action: raftEngine installs the new log command in the log, and notifies clients (if no notification is pending) one by one. Clients will pull and doggedly attempt to get the missing log commands to the remote nodes. raftEngine leader tracks client matchIndex and nextIndex. Any rejections received for AppendEntry messages reset the next index. When a node becomes a leader it assumes that followers are in sync by setting the nextIndex to one ahead of index of last entry in log on leader. The keepalive includes the latest prevLogIndex and prevLogTerm. Followers which are not in sync, will nak the keepalive resulting in their nextIndex being rolled back to hunt for common point in log between leader and follower.

Unrecoverable Errors

When the raft packages encounters failures which are unrecoverable; e.g. persisted content it can not unmarshal, or persistent store failure, raft package signals catastrophic failure to the application. The expectation is that the application does what is necessary and appropriate to draw operator attention (e.g. shutdown).

Concurrency and Synchronisation

The package uses multiple goroutines;

  • a group of go routines offload communication to other cluster nodes (local node acting as a gRPC client to each of the other nodes in the cluster). A goroutine fed through a buffered channel receives messages which need to be communicated to the remote node. The goroutine handles the call and blocks waiting for response, and on receipt, delivers the response back to the raft engine thread.
  • grpc servers handling RPCs initiated by remote nodes in the cluster
  • the central goroutine handles the raft state machine. Messages received from other goroutines and timer events are the main inputs to the state machine. This goroutine never blocks on other go routines. This way we make reasoning about concurrency in raft package a little less painful, and ensure we avoid deadlocks.
  • while in leader state, an independent go routine handles acknowledging log command requests from local or remote applications when the log command is committed.
  • an application facing goroutine is responsible for feeding the channel of 'applied' log entries to the application.

Synchronisation is mostly lock free and largely message passing based. Other synchronisation primitives used include atomic updates and mutexes held briefly in leaf functions.

Documentation

Overview

Package raft is yet another implementation of raft, in go.

This raft package is intended to be embeddable in any application run as a cluster of coordinating instances and wishing to benefit from a distributed replicated log.

For an overview of the package, and the current state of the implementation, see: https://github.com/ccassar/raft/blob/master/README.md

Consuming Distributed Log

The application receives distributed log commands over a channel it provides at initialisation time. This channel is used by the raft package to publish committed log commands. The channel can be consumed at the convenience of the application and does not effect the operation of the raft protocol. This is made possible through a dedicated goroutine whose responsibility is to synchronise the application to the tail of the distributed log by fetching any pending log commands if channel has capacity. The application provides the channel. A buffered channel would ensure that the producer does not underrun the application unnecessarily when the application is not synchronised.

Publishing To Distributed Log

Any application instance in the application cluster can publish to the distributed log. The `Node` function `LogProduce` provides this facility. The API is blocking and will return no error on commit, or an error for any other outcome. Errors include wrapped sentinel errors for programmatic handling both for follower and leader side errors. Note that errors reflect Raft propagation errors not state machine errors once log command is applied - the state machine is completely invisible to the raft package - the package only ensure a consistent distributed log of commands; each of which can be applied independently and consistently (for success or failure) across all instances of the application cluster.

Embedding Raft Package

The code to embed and initialise a local raft instance is straightforward. Numerous examples are included. The common pattern across examples goes like this:

MakeNode() is called to fire up the local node. MakeNode will setup the local node to communicate with the rest of the remote nodes in the cluster. MakeNode takes a mandatory configuration block in the form of NodeConfig. All the fields in NodeConfig must be set.

NodeConfig primarily dictates the composition of the remote cluster, the location of the boltdb file where logs and metadata are persisted, and the channel to use to communicate committed log commands to the application.

MakeNode also takes a series of options; these options control, for example, whether and how to protect intra-cluster gRPC communication, logging and metrics collection.

The code to run the local node (without TLS protection) for a three node cluster with default logging, and metrics registered against the default prometheus registry would look like this:

 var wg sync.WaitGroup
 ctx, cancel := context.WithCancel(context.Background())

 cfg := NodeConfig{
 	Nodes: []string{"node1.example.com:443", "node2.example.com:443", "node3.example.com:443"},
 	LogDB: "/data/myboltdbfile",
 	LogCmds: make(chan []byte, 32),
 }

 // index 2 suggests we are running node3.example.com.
 node, err := MakeNode(ctx, &wg, cfg, 2, WithMetrics(nil, true))
 if err != nil {
	// Handle unrecoverable error
 }

 //
 // At this point we're all set up. We can go about our business and start to receive committed distributed log
 // commands irrespective of which node generates them (i.e. local or remote). We also want to learn about and handle
 // any underlying unrecoverable failures. (This probability of such errors is expected to be vanishingly small).
 raftUnrecoverableError := node.FatalErrorChannel()
 for {
	 select {
		case logCmd := <- cfg.LogCmds:
		// Handle committed distributed log commands
		case err := <- raftUnrecoverableError:
		// Raft took some underlying error. Handle as appropriate (fail to orchestrator, restart, etc).
		...
	 }
 }

 // We produce log commands to the distributed log using the LogProduce API.
 ctxWithTimeout, cancelMsg := context.WithTimeout(ctx, logCmdTimeout)
 err = node.LogProduce(ctxWithTimeout, msg)
 if err != nil {
	// Message was refused. Retry (e.g. using a backoff package)
 }
 cancelMsg()

 //
 // When we are done with the local node, we can shut it down, and wait until it cleans up. This commitIndex
 // can be followed irrespective of whether raft returned an error from MakeNode, asynchronously via the fatal
 // error channel, or no errors at all.
 cancel()
 wg.Wait()

Slightly more is required when initialising in MakeNode in order to set up TLS with mutual authentication. It is of course possible to set up variations in between; e.g. where client verifies server certificate, but server does not validate client, or even, skip certificate verification on both sides. The changes involve setting up a function to return the gRPC server options on request, and similarly for the client options. Note that in the client options callback, we return a remote server name for dial options of a connection from a client to a server. This server name would be expected to match Common Name in X509 certificate. (Note how in the example we are mapping from the cluster node name we specify in the Nodes configuration to the common name in case they are different).

 clientDialOptionsFn = func(local, remote string) []grpc.DialOption {
		tlsCfg := &tls.Config{
			ServerName:   serverToName[remote],
			Certificates: []tls.Certificate{localCert},
			// If RootCAs is not set, host OS root CA set is used to validate server certificate.
			// Alternatively, custom Cert CA pool to use to validate server certificate would be set up here.
			RootCAs: certPool,
		}
		return []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))}
	}
 serverOptionsFn = func(local string) []grpc.ServerOption {
		tlsCfg := &tls.Config{
			ClientAuth:   tls.RequireAndVerifyClientCert,
			Certificates: []tls.Certificate{localCert},
			// If ClientCAs is not set, host OS root CA set is used to validate client certificate.
			// Alternatively, custom Cert CA pool to use to validate server certificate would be set up here.
			// ClientCAs pool does NOT need to be the same as RootCAs pool.
			ClientCAs: certPool,
		}
		return []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
	}

These options would be passed in using WithClientDialOptionsFn and WithServerOptionsFn respectively.

Index

Examples

Constants

View Source
const RaftErrorBadLocalNodeIndex = Error(raftSentinel + "bad localNodeIndex option")

RaftErrorBadLocalNodeIndex is returned (extracted using errors.Cause(err)) if localNodeIndex provided is incorrect - typically out-of-bounds of Nodes in cluster. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorBadMakeNodeOption = Error(raftSentinel + "bad MakeNode option")

RaftErrorBadMakeNodeOption is returned (extracted using errors.Cause(err)) if options provided at start up fail to apply. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorClientConnectionUnrecoverable = Error(
	raftSentinel + "gRPC client connection failed in an unrecoverable way. Check NodeConfig is correct.")

RaftErrorClientConnectionUnrecoverable is the sentinel returned (extracted using errors.Cause(err)) if client gRPC connection to remote node failed. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorLeaderTransitionInTerm = Error(raftSentinel + "mid term leader transition")

RaftErrorLeaderTransitionInTerm is returned (extracted using errors.Cause(err)) if a transition in leader happens without a change in term. This is a catastrophic unexpected error and would cause a shutdown of raft package if it occurred.

View Source
const RaftErrorLogCommandLocalDrop = Error(raftSentinel + "log command dropped locally, please retry")

RaftErrorLogCommandLocalDrop is returned (extracted using errors.Cause(err)) if the local raft package drops the log command before we even try to push it to the cluster leader.

View Source
const RaftErrorLogCommandRejected = Error(raftSentinel + "log command failed to commit")

RaftErrorLogCommandRejected is returned (extracted using errors.Cause(err)) if we fail to commit a log command requested by the application. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorMissingLogger = Error(raftSentinel + "no logger setup")

RaftErrorMissingLogger is returned (extracted using errors.Cause(err)) if options provided at start up fail to apply. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorMissingNodeConfig = Error(raftSentinel + "node config insufficient")

RaftErrorMissingNodeConfig is returned (extracted using errors.Cause(err)) if NodeConfig options provided at start are expected but missing. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorMustFailed = Error(raftSentinel + "raft internal assertion, shutting down local node")

RaftErrorMustFailed is returned (extracted using errors.Cause(err)) if the local raft package hits an assertion failure. This will cause the raft package to signal a catastrophic failure and shut itself down

View Source
const RaftErrorNodePersistentData = Error(raftSentinel + "node persistent data failed")

RaftErrorNodePersistentData is returned (extracted using errors.Cause(err)) if we fail a bolt operation on the persistent node data in BoltDB. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorOutOfBoundsClient = Error(raftSentinel + "node index outside bounds of known clients")

RaftErrorOutOfBoundsClient is returned (extracted using errors.Cause(err)) if logic produces a client index for a client which does not exists. See ExampleMakeNode for an example of how to extract and test against sentinel.

View Source
const RaftErrorServerNotSetup = Error(raftSentinel + "local server side not set up yet")

RaftErrorServerNotSetup is the sentinel returned (extracted using errors.Cause(err)) if local address (server side) is not set up when expected. See ExampleMakeNode for an example of how to extract and test against sentinel.

Variables

This section is empty.

Functions

func DefaultZapLoggerConfig

func DefaultZapLoggerConfig() zap.Config

DefaultZapLoggerConfig provides a production logger configuration (logs Info and above, JSON to stderr, with stacktrace, caller and sampling disabled) which can be customised by application to produce its own logger based on the raft configuration. Any logger provided by the application will also have its name extended by the raft package to clearly identify that log message comes from raft. For example, if the application log is named "foo", then the raft logs will be labelled with key "logger" value "foo.raft".

func NewFlushableEventChannel

func NewFlushableEventChannel(size int32) flushableEventChannel

Types

type Error

type Error string

Error implements the error interface and represents sentinel errors for the raft package (as per https://dave.cheney.net/2016/04/07/constant-errors).

func (Error) Error

func (e Error) Error() string

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node tracks the state and configuration of this local node. Public access to services provided by node are concurrency safe. Node structure carries the state of the local running raft instance.

func MakeNode

func MakeNode(
	ctx context.Context,
	wg *sync.WaitGroup,
	cfg NodeConfig,
	localNodeIndex int32,
	opts ...NodeOption) (*Node, error)

MakeNode starts the raft node according to configuration provided.

Node is returned, and public methods associated with Node can be used to interact with Node from multiple go routines e.g. specifically in order to access the replicated log.

Context can be cancelled to signal exit. WaitGroup wg should have 1 added to it prior to calling MakeNode and should be waited on by the caller before exiting following cancellation. Whether MakeNode returns successfully ot not, WaitGroup will be marked Done() by the time the Node has cleaned up.

The configuration block NodeConfig, along with localNodeIndex determine the configuration required to join the cluster. The localNodeIndex determines the identity of the local node as an index into the list of nodes in the cluster as specific in NodeConfig Nodes field.

If MakeNode returns without error, than over its lifetime it will be striving to maintain the node as a raft member in the raft cluster, and maintaining its replica of the replicated log.

If a fatal error is encountered at any point in the life of the node after MakeNode has returned, error will be signalled over the fatalError channel. A buffered channel of errors is provided to allow for raft package to signal fatal errors upstream and allow client to determine best course of action; typically close context to shutdown. As in the normal shutdown case, following receipt of a fatal error, caller should cancel context and wait for wait group before exiting. FatalErrorChannel method on the returned Node returns the error channel the application should consume.

MakeNode also accepts various options including, gRPC server and dial options, logging and metrics (see functions returning NodeOption like WithMetrics, WithLogging etc).

For logging, we would like to support structured logging. This makes specifying a useful logger interface a little messier. Instead we depend on Uber zap and its interface, but allow user to either provide a configured zap logger of its own, allow raft to use its default logging setup, or disable logging altogether. Customisation is achieved through the WithLogging option.

For metrics, raft package tries to adhere to the USE method as described here: (http://www.brendangregg.com/usemethod.html) - in summary 'For every resource, track utilization, saturation and errors.'. On top of that we expect to be handed a metrics registry, and if one is provided, then we register our metrics against that. Metrics registry and customisation can be provided through the WithMetrics option.

Example

ExampleMakeNode provides a simple example of how we kick off the Raft package, and also how we can programmatically handle errors if we prefer to. It also shows how asynchronous fatal errors in raft can be received and handled.

var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())

cfg := NodeConfig{
	Nodes:   []string{"node1.example.com:443", "node2.example.com:443", "node3.example.com:443"},
	LogCmds: make(chan []byte, 32),
	LogDB:   "mydb.bbolt",
}
wg.Add(1)
localIndex := int32(2) // say, if we are node3.example.com
n, err := MakeNode(ctx, &wg, cfg, localIndex)
if err != nil {

	switch errors.Cause(err) {
	case RaftErrorBadMakeNodeOption:
		//
		// Handle specific sentinel in whichever way we see fit.
		// ...
	default:
		// Root cause is not a sentinel.
	}
	// err itself renders the full context not just the context sentinel.
	fmt.Println(err)

} else {

	fmt.Printf("node started with config [%v]", n.config)

	// Handle any fatal signals from below as appropriate... either by starting a new instance of exiting and letting
	// orchestrator handle failure.
	fatalSignal := n.FatalErrorChannel()

	//...
	// Once we are done, we can signal shutdown and wait for raft to clean up and exit.
	select {
	case err := <-fatalSignal:
		// handle fatal error as appropriate.
		fmt.Println(err)

	case <-ctx.Done():
		//...
	}

}

cancel()
wg.Wait()
Output:

Example (WithCustomisedLogLevel)

ExampleMakeNodeWithCustomisedLogLevel provides a simple example of how we kick off the Raft package, with a logger provided by application. The application chooses to base its log configuration on the default raft log configuration, and tweaks that configuration with on-the-fly logging level setting. Finally, application also requests that raft package redirects underlying grpc package logging to zap.

var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())

cfg := NodeConfig{
	Nodes:   []string{"node1.example.com:443", "node2.example.com:443", "node3.example.com:443"},
	LogCmds: make(chan []byte, 32),
	LogDB:   "mydb.bbolt",
}

loggerCfg := DefaultZapLoggerConfig()
logger, err := loggerCfg.Build( /* custom options can be provided here */ )
if err != nil {
	//...
}

wg.Add(1)
n, err := MakeNode(ctx, &wg, cfg, 2,
	WithLogger(logger, false),
	WithLeaderTimeout(time.Second))
if err != nil {
	/// handle error
	return
}

//
// At any point, the logging level can be safely and concurrently changed.
loggerCfg.Level.SetLevel(zapcore.InfoLevel)

fmt.Printf("node started with config [%v]", n.config)
//...
// Once we are done, we can signal shutdown and wait for raft to clean up and exit.
cancel()
wg.Wait()
Output:

Example (WithDedicatedMetricsRegistry)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

l, err := DefaultZapLoggerConfig().Build()
if err != nil {
	// handle err
}

cfg := NodeConfig{
	Nodes:   []string{"node1.example.com:443", "node2.example.com:443", "node3.example.com:443"},
	LogCmds: make(chan []byte, 32),
	LogDB:   "mydb.bbolt",
}

myregistry := prometheus.NewRegistry()
// Do remember to serve metrics by setting up the server which serves the prometheus handler
// obtained by handler := promhttp.HandlerFor(myregistry, promhttp.HandlerOpts{})

_, err = MakeNode(ctx, &wg, cfg, 1, // say we are node2.example.com
	WithMetrics(myregistry, "appFoo", true),
	WithLogger(l, false))
if err != nil {
	/// handle error
}

cancel()
wg.Wait()
Output:

Example (WithDefaultMetricsRegistry)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

l, err := DefaultZapLoggerConfig().Build()
if err != nil {
	// handle err
}

cfg := NodeConfig{
	Nodes:   []string{"node1.example.com:443", "node2.example.com:443", "node3.example.com:443"},
	LogCmds: make(chan []byte, 32),
	LogDB:   "mydb.bbolt",
}
_, err = MakeNode(ctx, &wg, cfg, 1, // say if we are node2.example.com
	WithMetrics(nil, "appFoo", true),
	WithLogger(l, false))
if err != nil {
	// handle error...
}

// Do remember to serve the metrics registered with the prometheus DefaultRegistry:
// e.g. as described here: https://godoc.org/github.com/prometheus/client_golang/prometheus

cancel()
wg.Wait()
Output:

Example (WithTLSConfiguration)

ExampleMakeNodeWithTLS is a simple example showing how TLS protection with mutual authentication can be setup between raft cluster nodes.

// Used when node operates as client to validate remote node name (as provided in Nodes) to certificate Common
// Name.
serverToName := map[string]string{
	"node1.example.com:443": "node1",
	"node2.example.com:443": "node2",
	"node3.example.com:443": "node3",
}

certPool := x509.NewCertPool()
// Populate the cert pool with root CAs which can validate server and client certs. e.g.
c, err := ioutil.ReadFile("rootCA.pem")
if err != nil {
	// handle error
}
certPool.AppendCertsFromPEM(c)

localCert, err := tls.LoadX509KeyPair("localnode.crt", "localnode.key")
if err != nil {
	// handle error
}

// We setup a configuration to enforce authenticating TLS client connecting to this node, and to validate
// server certificate in all client connections to remove cluster nodes.
nc := NodeConfig{
	Nodes:   []string{"node1.example.com:443", "node2.example.com:443", "node3.example.com:443"},
	LogCmds: make(chan []byte, 32),
	LogDB:   "mydb.bbolt",
}

clientDialOptionsFn := func(local, remote string) []grpc.DialOption {
	tlsCfg := &tls.Config{
		ServerName:   serverToName[remote],
		Certificates: []tls.Certificate{localCert},
		// If RootCAs is not set, host OS root CA set is used to validate server certificate.
		// Alternatively, custom Cert CA pool to use to validate server certificate would be set up here.
		RootCAs: certPool,
	}
	return []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))}
}

serverOptionsFn := func(local string) []grpc.ServerOption {
	tlsCfg := &tls.Config{
		ClientAuth:   tls.RequireAndVerifyClientCert,
		Certificates: []tls.Certificate{localCert},
		// If ClientCAs is not set, host OS root CA set is used to validate client certificate.
		// Alternatively, custom Cert CA pool to use to validate server certificate would be set up here.
		// ClientCAs pool does NOT need to be the same as RootCAs pool.
		ClientCAs: certPool,
	}
	return []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
}

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

l, err := DefaultZapLoggerConfig().Build()
if err != nil {
	// handle err
}

wg.Add(1)
// if we are starting up node1.example.com, index would be 0
n, err := MakeNode(ctx, &wg, nc, 0,
	WithClientDialOptionsFn(clientDialOptionsFn),
	WithServerOptionsFn(serverOptionsFn),
	WithLogger(l, true))
if err != nil {
	// handle err
	return
}

fmt.Printf("node started with config [%v]", n.config)

cancel()
wg.Wait()
Output:

func (*Node) FatalErrorChannel

func (n *Node) FatalErrorChannel() chan error

FatalErrorChannel returns an error channel which is used by the raft Node to signal an unrecoverable failure asynchronously to the application. Such errors are expected to occur with vanishingly small probability. An example of such an error would be if the dial options or gRPC server options provided make it impossible for the client to successfully connect with the server (RaftErrorClientConnectionUnrecoverable). When a fatal error is registered, raft package will stop operating, and will mark the root wait group done.

func (*Node) LogProduce

func (n *Node) LogProduce(ctx context.Context, data []byte) error

node.LogProduce is a blocking call which accepts a log command request from the application, and returns an error if log command failed to commit. The implementation takes care of proxying the request and finding and forwarding the request to the current leader.

LogProduce can carry a batch of commands as data. These are treated atomically by raft. This is a slightly cheeky and effortless way of improving throughput through the system.

Example
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())

cfg := NodeConfig{
	Nodes:   []string{"node1.example.com:443", "node2.example.com:443", "node3.example.com:443"},
	LogCmds: make(chan []byte, 32),
	LogDB:   "mydb.bbolt",
}
wg.Add(1)
localIndex := int32(2) // say, if we are node3.example.com
n, err := MakeNode(ctx, &wg, cfg, localIndex)
if err != nil {
}

// Kick off handling of LogCmds and FatalErrorChannel().
// ...
//
// Produce log command to distributed log.
ctxLogProduce, cancel := context.WithTimeout(ctx, 3*time.Second)
err = n.LogProduce(ctxLogProduce, []byte("I'm sorry Dave..."))
cancel()
if err != nil {
	// Do retry here, preferably using exponentially backoff.
}

// When we're done...
cancel()
wg.Wait()
Output:

type NodeConfig

type NodeConfig struct {
	//
	// Raft cluster node addresses. Nodes should include all the node addresses including the local one, in the
	// form address:port. This makes configuration easy in that all nodes can share the same configuration.
	//
	// The order of the nodes (ignoring the local node) is also interpreted as the order of preference to transfer
	// leadership to in case we need to transfer. Note that this tie breaker only kicks in amongst nodes
	// which have a log matched to ours.
	Nodes []string
	//
	// Application provides a channel over which committed log commands are published for the application to consume.
	// The log commands are opaque to the raft package.
	LogCmds chan []byte
	//
	// LogDB points at file location which is the home of the persisted logDB.
	LogDB string
	// contains filtered or unexported fields
}

NodeConfig is the mandatory configuration for the local node. Every field in NodeConfig must be provided by application using the raft package. NodeConfig is passed in when starting up the node using MakeNode. (Optional configuration can be passed in using WithX NodeOption options).

type NodeOption

type NodeOption func(*Node) error

NodeOption operator, operates on node to manage configuration.

func WithChannelDepthToClientOffload

func WithChannelDepthToClientOffload(depth int32) NodeOption

WithChannelDepthToClientOffload allows the application to overload the channel depth between the raft core engine, and the goroutine which handles messaging to the remote nodes. A sensible default value (32) is used if this option is not specified.

func WithClientDialOptionsFn

func WithClientDialOptionsFn(fn func(local, remote string) []grpc.DialOption) NodeOption

WithClientDialOptionsFn sets up callback used to allow application to specify client side grpc options for local raft node. These client options will be merged in with default options, with default options overwritten if provided by callback. The callback passes in the local/remote node pair in the form specified in the Nodes configuration. Client side options could be used, for example, to set up mutually authenticated TLS protection of exchanges with other nodes.

func WithLeaderTimeout

func WithLeaderTimeout(leaderTimeout time.Duration) NodeOption

WithLeaderTimeout used with MakeNode to specify leader timeout: leader timeout is used to determine the maximum period which can elapse without getting AppendEntry messages from the leader without forcing a new election. On the follower side, a random value between leaderTimeout and 2*leaderTimeout is used by the raft package to determine when to force an election. On the leader side, leader will attempt to send at least one AppendEntry (possible empty if necessary) within ever leaderTimeout period. Randomized election timeouts minimise the probability of split votes and resolves them quickly when they happen as described in Section 3.4 of CBTP. LeaderTimeout defaults to 2s if not set.

func WithLogCommandBatchSize

func WithLogCommandBatchSize(depth int32) NodeOption

WithLogCommandBatchSize sets the maximum number of log commands which can be sent in one AppendEntryRequest RPC. Defaults to a sensible value (32) if not set.

func WithLogger

func WithLogger(logger *zap.Logger, verboseLogging bool) NodeOption

WithLogger option is invoked by the application to provide a customised zap logger option, or to disable logging. The NodeOption returned by WithLogger is passed in to MakeNode to control logging; e.g. to provide a preconfigured application logger. If logger passed in is nil, raft will disable logging.

If WithLogger generated NodeOption is not passed in, package uses its own configured zap logger.

Finally, if application wishes to derive its logger as some variant of the default raft logger, application can invoke DefaultZapLoggerConfig() to fetch a default logger configuration. It can use that configuration (modified as necessary) to build a new logger directly through zap library. That new logger can then be passed into WithLogger to generate the appropriate node option. An example of exactly this use case is available in the godoc examples. In the example, the logger configuration is set up to allow for on-the-fly changes to logging level.

verboseLogging controls whether raft package redirects underlying gprc middleware logging to zap log and includes ultra low level debugging messages including keepalives. This makes debug very noisy, and unless in depth low level message troubleshooting is required, verboseLogging should be set to false.

func WithMetrics

func WithMetrics(registry *prometheus.Registry, namespace string, detailed bool) NodeOption

WithMetrics option used with MakeNode to specify metrics registry we should count in. Argument namespace specifies the namespace for the metrics. This is useful if the application prefixes all its metrics with a prefix. For e.g. if namespace is 'foo', then all raft package metrics will be prefixed with 'foo.raft.'. Argument detailed controls whether detailed (and more expensive) metrics are tracked (e.g. grpc latency distribution). If nil is passed in for the registry, the default registry prometheus.DefaultRegisterer is used. Do note that the package does not setup serving metrics; that is up to the application. Examples are included to show how to setup a custom metrics registry to log against. If the WithMetrics NodeOption is NOT passed in to MakeNode, metrics collection is disabled.

func WithServerOptionsFn

func WithServerOptionsFn(fn func(local string) []grpc.ServerOption) NodeOption

WithServerOptionsFn sets up callback used to allow application to specify server side grpc options for local raft node. These server options will be merged in with default options, with default options overwritten if provided by callback. The callback passes in the local node as specified in the Nodes configuration. Server side options could be used, for example, to set up mutually authenticated TLS protection of exchanges with other nodes.

func WithUnaryGRPCTimeout

func WithUnaryGRPCTimeout(d time.Duration) NodeOption

WithUnaryGRPCTimeout used with MakeNode to specify timeout in gRPC RPC commands. By default, gRPC timeouts are set to 5s.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL