Documentation ¶
Overview ¶
Package raft implements a proof-of-concept Raft consensus algorithm (See https://raft.github.io/).
End-users of the raft package should configure the Raft cluster and then run the Raft cluster as follows:
func main() { // Start a 1-node cluster. id := uint64(1) addresses := map[uint64]string{1: "tcp://localhost:8001"} tr, err := raft.NewTransportConfig(id, addresses).Build() if err != nil { log.Fatal(err) } psm, err := raft.NewProtocolConfig(id).Build(tr) if err != nil { log.Fatal(err) } app := newApplication() node, err := raft.NewNodeConfig(id).Build(psm, tr, app) if err != nil { log.Fatal(err) } node.Start() defer node.Stop() // Here, app can interact with node by calling node.Propose() and node.Read() // ... }
See examples/kvstore/server for an example demo of a key-value store backed by a 3-node Raft cluster. Run start3cluster in the provided utils.sh
Index ¶
- Variables
- type Application
- type ChannelMedium
- type Consistency
- type GRPCMedium
- type Lease
- type Medium
- type MemberState
- type Node
- type NodeConfig
- type NodeConfigOption
- type Proposal
- type ProtocolConfig
- type ProtocolConfigOption
- func AddProtocolLogger() ProtocolConfigOption
- func WithConsistency(consistency Consistency) ProtocolConfigOption
- func WithElectionTicker(ticker Ticker) ProtocolConfigOption
- func WithHeartbeatTicker(ticker Ticker) ProtocolConfigOption
- func WithHeartbeatTicks(heartbeatTicks uint) ProtocolConfigOption
- func WithLease(lease time.Duration) ProtocolConfigOption
- func WithMaxElectionTicks(maxElectionTicks uint) ProtocolConfigOption
- func WithMinElectionTicks(minElectionTicks uint) ProtocolConfigOption
- func WithProtocolDebug(debug bool) ProtocolConfigOption
- func WithProtocolLogger(logger *zap.Logger) ProtocolConfigOption
- func WithTickPeriod(tickPeriod time.Duration) ProtocolConfigOption
- type ProtocolStateMachine
- type Read
- type Role
- type State
- type Ticker
- type Transport
- type TransportConfig
- type TransportConfigOption
- func AddTransportLogger() TransportConfigOption
- func WithAddresses(addresses map[uint64]string) TransportConfigOption
- func WithChannelMedium(memberIDs ...uint64) TransportConfigOption
- func WithGRPCCallOption(opt grpc.CallOption) TransportConfigOption
- func WithGRPCDialOption(opt grpc.DialOption) TransportConfigOption
- func WithGRPCServerOption(opt grpc.ServerOption) TransportConfigOption
- func WithSecurity(opt grpc.DialOption) TransportConfigOption
- func WithTransportDebug(debug bool) TransportConfigOption
- func WithTransportLogger(logger *zap.Logger) TransportConfigOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ProtocolConfigTemplate = ProtocolConfig{ TickPeriod: 100 * time.Millisecond, HeartbeatTicks: 1, MinElectionTicks: 10, MaxElectionTicks: 20, Consistency: ConsistencyLease, Lease: 500 * time.Millisecond, }
ProtocolConfigTemplate is a partially filled ProtocolConfig that contains default values. Do not use ProtocolConfigTemplate directly; use NewProtocolConfig() instead.
var TransportConfigTemplate = TransportConfig{ MsgBufferSize: 30, Medium: &GRPCMedium{ DialTimeout: 3 * time.Second, ReconnectDelay: 3 * time.Second, ServerOptions: []grpc.ServerOption{ grpc.KeepaliveParams(keepalive.ServerParameters{ Time: 5 * time.Second, Timeout: 5 * time.Second, }), }, }, }
TransportConfigTemplate is a partially filled TransportConfig that contains default values. Do not use TransportConfigTemplate directly; use NewTransportConfig() instead.
Functions ¶
This section is empty.
Types ¶
type Application ¶
type Application interface { // Apply applies the newly committed entries to the application. Apply(entries []pb.Entry) error }
Application applies the committed raft entries. Applications interfacing with the Raft node must implement this interface.
Example (Register) ¶
package main import ( "context" "fmt" "io/ioutil" "log" "os" "path/filepath" "strconv" "sync" "time" "github.com/ulysseses/raft" "github.com/ulysseses/raft/pb" ) // register is a single-valued store type register struct { sync.RWMutex node *raft.Node x int } // Apply implements Application for register func (r *register) Apply(entries []pb.Entry) error { r.Lock() defer r.Unlock() for _, entry := range entries { if len(entry.Data) == 0 { continue } x, err := strconv.Atoi(string(entry.Data)) if err != nil { return err } r.x = x } return nil } func (r *register) set(ctx context.Context, x int) error { s := strconv.Itoa(x) _, _, err := r.node.Propose(ctx, []byte(s)) return err } func (r *register) get(ctx context.Context) (int, error) { if err := r.node.Read(ctx); err != nil { return 0, err } r.RLock() defer r.RUnlock() return r.x, nil } func main() { tmpDir, err := ioutil.TempDir("", "") if err != nil { log.Fatal(err) } defer os.RemoveAll(tmpDir) id := uint64(1) addresses := map[uint64]string{ 1: fmt.Sprintf("unix://%s", filepath.Join(tmpDir, "ExampleApplication_register.sock")), } // Start up the Raft node. r := ®ister{} tr, err := raft.NewTransportConfig(id, raft.WithAddresses(addresses)).Build() if err != nil { log.Fatal(err) } psm, err := raft.NewProtocolConfig(id).Build(tr) if err != nil { log.Fatal(err) } node, err := raft.NewNodeConfig(id).Build(psm, tr, r) if err != nil { log.Fatal(err) } r.node = node node.Start() defer node.Stop() // Wait for leader election time.Sleep(2 * time.Second) // Commit an entry via the register IDed 1 if err := r.set(context.Background(), 42); err != nil { log.Fatalf("set failed: %v", err) } // Get that committed entry via register IDed 2 if x, err := r.get(context.Background()); err == nil { fmt.Println(x) } else { log.Fatalf("get failed: %v", err) } }
Output: 42
type ChannelMedium ¶
type ChannelMedium struct { // MemberIDs is a slice of the IDs of all the Raft cluster members, including self. MemberIDs []uint64 }
ChannelMedium sends messages over Go channels.
type Consistency ¶
type Consistency uint8
Consistency is the consistency mode that Raft operations should support.
const ( // ConsistencyLease follows the serializable consistency model. If there is a leadership // change, read requests are potentially stale for a maximum of the lease duration amount. ConsistencyLease Consistency = iota // ConsistencyStrict follows the linearizable consistency model. Every read request will // require a quorum's worth of heartbeat acks. ConsistencyStrict // ConsistencyStale follows the serializable consistency model. // Reads can be stale for an arbitrary amount of time. Writes never diverge. ConsistencyStale )
func (Consistency) MarshalJSON ¶
func (c Consistency) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler for Consistency
func (Consistency) String ¶
func (c Consistency) String() string
func (*Consistency) UnmarshalJSON ¶
func (c *Consistency) UnmarshalJSON(b []byte) error
UnmarshalJSON implements json.Unmarshaler for Consistency
type GRPCMedium ¶
type GRPCMedium struct { // Addresses mapping Raft node ID to address to connect to. Addresses map[uint64]string // DialTimeout is the timeout for dialing to peers. // ReconnectDelay is the duration to wait before retrying to dial a connection. DialTimeout, ReconnectDelay time.Duration // ServerOptions is an optional list of grpc.ServerOptions to configure the gRPC server. ServerOptions []grpc.ServerOption // DialOptions is an optional list of grpc.DialOptions to configure dialing to the peer // gRPC servers. DialOptions []grpc.DialOption // CallOptions is an optional list of grpc.CallOptions to configure calling the Communicate RPC. CallOptions []grpc.CallOption }
GRPCMedium sends messages over the gRPC RaftProtocol service (via client stream).
type Lease ¶
type Lease struct { // Timeout is the point in time that a lease should timeout. Timeout int64 // Start marks the beginning of the attempt to extend the lease, i.e. sending out new heartbeats Start int64 // If a majority of heartbeats were acked, the Timeout should be extended to Start + Extension Extension int64 // Acks is the number of acks for the specified Start. Acks int }
Lease contains fields related to lease for read requests. This is used only by leaders in ConsistencyLease mode.
type Medium ¶
type Medium interface {
// contains filtered or unexported methods
}
Medium represents which the type of communication medium that pb.Messages should be sent. ChannelMedium sends messages over Go channels. GRPCMedium sends messages over the gRPC RaftProtocol service (via client stream).
type MemberState ¶
type MemberState struct { // peer node's ID ID uint64 // last known largest index that this peer matches this node's log Match uint64 // index of the prefix log of entries to send in the heartbeat to the peer Next uint64 // whether or not the peer responded to the heartbeat within the election timeout Ack bool // vote was granted to elect us by this peer VoteGranted bool // Read of this member. This is only used by ConsistencyStrict. Read Read }
MemberState contains all info about a member node from the perspective of this node.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is a Raft node that interacts with an Application state machine and network.
func (*Node) Members ¶
func (n *Node) Members() map[uint64]MemberState
Members returns the latest member states.
func (*Node) Propose ¶
Propose should be called by the client/application. This method proposes data to the raft log. If error is non-nil, index and term should be used to check if the proposed entry was committed.
type NodeConfig ¶
type NodeConfig struct { // ID of the Raft node. ID uint64 // Logger, if provided, will be used to log events. Logger *zap.Logger // Debug, if true, will log events at the DEBUG verbosity/granularity. Debug bool }
NodeConfig configures Node-specific configuration.
func NewNodeConfig ¶
func NewNodeConfig(id uint64, opts ...NodeConfigOption) *NodeConfig
NewNodeConfig builds a NodeConfig for a Raft node.
func (*NodeConfig) Build ¶
func (c *NodeConfig) Build( psm *ProtocolStateMachine, tr Transport, a Application, ) (*Node, error)
Build builds a Raft node.
func (*NodeConfig) Verify ¶
func (c *NodeConfig) Verify() error
Verify verifies that the configuration is correct.
type NodeConfigOption ¶
type NodeConfigOption interface{ Transform(*NodeConfig) }
NodeConfigOption provides options to configure Node specifically.
func AddNodeLogger ¶
func AddNodeLogger() NodeConfigOption
AddNodeLogger adds a default production zap.Logger to the configuration.
func WithNodeDebug ¶
func WithNodeDebug(debug bool) NodeConfigOption
WithNodeDebug sets the debug field for the NodeConfig.
func WithNodeLogger ¶
func WithNodeLogger(logger *zap.Logger) NodeConfigOption
WithNodeLogger configures to use a specified logger for the protocol state machine.
type Proposal ¶
type Proposal struct { TID int64 // Index is the proposed index. // Term is the proposed term. Index, Term uint64 // contains filtered or unexported fields }
Proposal is the context associated with a proposal.
type ProtocolConfig ¶
type ProtocolConfig struct { // ID of the Raft node. ID uint64 // TickPeriod is the period of tiem at which the ticker should fire. TickPeriod time.Duration // HeartbeatTicks is the number of tick periods before a heartbeat // should fire. // MinElectionTicks is the minimum number of tick periods before an // election timeout should fire. // MaxElectionTicks is the maximum number of tick periods before an // election timeout should fire. HeartbeatTicks, MinElectionTicks, MaxElectionTicks uint // Consistency is the consistency level to use for the Raft cluster. Consistency Consistency // Lease is the duration of the read request lease. This is used only if ConsistencyLease. Lease time.Duration // HeartbeatTicker is the ticker to use for signaling when to send out heartbeats. If nil, // a default one based on TickPeriod and HeartbeatTicks is created and used. // ElectionTicker is the ticker to use for signaling when to timeout an election. If nil, // a default one based on TickPeriod, MinElectionTicks, and MaxElectionTicks is created and used. HeartbeatTicker, ElectionTicker Ticker // Logger, if provided, will be used to log events. Logger *zap.Logger // Debug, if true, will log events at the DEBUG verbosity/granularity. Debug bool }
ProtocolConfig configures the Raft protocol of the Raft cluster.
func NewProtocolConfig ¶
func NewProtocolConfig(id uint64, opts ...ProtocolConfigOption) *ProtocolConfig
NewProtocolConfig builds a ProtocolConfig for a Raft node.
func (*ProtocolConfig) Build ¶
func (c *ProtocolConfig) Build(tr Transport) (*ProtocolStateMachine, error)
Build builds a ProtocolStateMachine from configuration
func (*ProtocolConfig) Verify ¶
func (c *ProtocolConfig) Verify() error
Verify verifies that the configuration is correct.
type ProtocolConfigOption ¶
type ProtocolConfigOption interface{ Transform(*ProtocolConfig) }
ProtocolConfigOption provides options to configure ProtocolConfig further.
func AddProtocolLogger ¶
func AddProtocolLogger() ProtocolConfigOption
AddProtocolLogger adds a default production zap.Logger to the configuration.
func WithConsistency ¶
func WithConsistency(consistency Consistency) ProtocolConfigOption
WithConsistency sets the consistency mode.
func WithElectionTicker ¶
func WithElectionTicker(ticker Ticker) ProtocolConfigOption
WithElectionTicker configures to use a specified election timeout ticker.
func WithHeartbeatTicker ¶
func WithHeartbeatTicker(ticker Ticker) ProtocolConfigOption
WithHeartbeatTicker configures to use a specified heartbeat ticker.
func WithHeartbeatTicks ¶
func WithHeartbeatTicks(heartbeatTicks uint) ProtocolConfigOption
WithHeartbeatTicks sets the specified heartbeat ticks.
func WithLease ¶
func WithLease(lease time.Duration) ProtocolConfigOption
WithLease sets the lease duration. This should be used if using ConsistencyLease.
func WithMaxElectionTicks ¶
func WithMaxElectionTicks(maxElectionTicks uint) ProtocolConfigOption
WithMaxElectionTicks sets the specified minimum election timeout ticks.
func WithMinElectionTicks ¶
func WithMinElectionTicks(minElectionTicks uint) ProtocolConfigOption
WithMinElectionTicks sets the specified minimum election timeout ticks.
func WithProtocolDebug ¶
func WithProtocolDebug(debug bool) ProtocolConfigOption
WithProtocolDebug sets the debug field for the ProtocolConfig.
func WithProtocolLogger ¶
func WithProtocolLogger(logger *zap.Logger) ProtocolConfigOption
WithProtocolLogger configures to use a specified logger for the protocol state machine.
func WithTickPeriod ¶
func WithTickPeriod(tickPeriod time.Duration) ProtocolConfigOption
WithTickPeriod sets a specified tick period.
type ProtocolStateMachine ¶
type ProtocolStateMachine struct {
// contains filtered or unexported fields
}
ProtocolStateMachine represents the Raft Protocol state machine of a Raft node. It has a central event loop that interacts with a heartbeat ticker, election ticker, and Raft protocol messages sent/received over the transport network.
type Read ¶
type Read struct { // TID is the "transaction ID". It increases monotonically. TID int64 // Index is the read index of the read request. Index uint64 // Acks is the number of acks for the latest read request. Acks int }
Read contains all fields relevant to read requests.
type Role ¶
type Role uint8
Role can be follower, candidate, or leader.
func (Role) MarshalJSON ¶
MarshalJSON implements json.Marshaler for Role
func (*Role) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler for Role
type State ¶
type State struct { // id ID uint64 // Consistency mode Consistency Consistency // quorum size QuorumSize int // cluster size ClusterSize int // role Role Role // current term Term uint64 // who this node thinks currently is the leader. Leader uint64 // committed index Commit uint64 // who this node last voted for VotedFor uint64 // last index of this node's log LastIndex uint64 // largest term of this node's log LogTerm uint64 // context of a proposal, if any. Proposal Proposal // context of a read request originating from this node, if any. // This is used only by ConsistencyStrict. Read Read // lease, if using ConsistencyLease mode. Lease Lease }
State contains all state of a Node.
type Ticker ¶
type Ticker interface { // Start starts the ticker. Start() // Stop stops the ticker. Stop() // C returns the channel to read ticks from. C() <-chan struct{} // Reset resets the ticker. Reset() }
Ticker sends ticks.
type Transport ¶
type Transport interface {
// contains filtered or unexported methods
}
Transport sends and receives Raft protocol messages between Raft nodes of the cluster.
type TransportConfig ¶
type TransportConfig struct { // ID of the Raft node to configure. ID uint64 // MsgBufferSize is the max number of Raft protocol messages per peer node allowed to be buffered // before the Raft node can process/send them out. MsgBufferSize int // Logger, if provided, will be used to log events. Logger *zap.Logger // Debug, if true, will log events at the DEBUG verbosity/granularity. Debug bool // Medium represents which the type of communication medium that pb.Messages should be sent. // The default medium is GRPCMedium. Medium Medium }
TransportConfig configures transport for the Raft cluster.
func NewTransportConfig ¶
func NewTransportConfig( id uint64, opts ...TransportConfigOption, ) *TransportConfig
NewTransportConfig builds a TransportConfig for a Raft node.
func (*TransportConfig) Build ¶
func (c *TransportConfig) Build() (Transport, error)
Build builds a Transport from configuration.
func (*TransportConfig) Verify ¶
func (c *TransportConfig) Verify() error
Verify verifies that the configuration is correct.
type TransportConfigOption ¶
type TransportConfigOption interface{ Transform(*TransportConfig) }
TransportConfigOption provides options to configure TransportConfig further.
func AddTransportLogger ¶
func AddTransportLogger() TransportConfigOption
AddTransportLogger adds a default production zap.Logger to the configuration.
func WithAddresses ¶
func WithAddresses(addresses map[uint64]string) TransportConfigOption
WithAddresses sets the addresses of all Raft cluster nodes.
func WithChannelMedium ¶
func WithChannelMedium(memberIDs ...uint64) TransportConfigOption
WithChannelMedium configures to use Go channels to transport messages instead of default gRPC. This option should only be used in testing!
func WithGRPCCallOption ¶
func WithGRPCCallOption(opt grpc.CallOption) TransportConfigOption
WithGRPCCallOption adds a grpc.CallOption to grpc.NewServer
func WithGRPCDialOption ¶
func WithGRPCDialOption(opt grpc.DialOption) TransportConfigOption
WithGRPCDialOption adds a grpc.DialOption to grpc.NewServer
func WithGRPCServerOption ¶
func WithGRPCServerOption(opt grpc.ServerOption) TransportConfigOption
WithGRPCServerOption adds a grpc.ServerOption to grpc.NewServer
func WithSecurity ¶
func WithSecurity(opt grpc.DialOption) TransportConfigOption
WithSecurity configures gRPC to use security instead of the default grpc.WithInsecure option.
func WithTransportDebug ¶
func WithTransportDebug(debug bool) TransportConfigOption
WithTransportDebug sets the debug field for the TransportConfig.
func WithTransportLogger ¶
func WithTransportLogger(logger *zap.Logger) TransportConfigOption
WithTransportLogger configures to use a specified logger for the protocol state machine.