raft

package
v0.9.0-rc19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2015 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package raft implements a streaming version of the Raft protocol.

The Raft distributed consensus protocol is used for maintaining consistency in a distributed system. It implements a replicated log between multiple nodes. This log is a series of entries that represent commands to be executed on the node's state. By executing the same commands on every node in the same order, the state of each node is consistent.

For a more detailed understanding of the Raft protocol, please see the original Raft paper by Diego Ongaro:

https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf

Streaming Raft

The standard implementation of Raft implements a two RPC calls between the leader and the followers: AppendEntries & RequestVote. The RequestVote is the same as in the standard implementation. The AppendEntries RPC is split into two calls: a streaming log and a heartbeat.

One issue with the original Raft implementation is that AppendEntries calls were required to wait between calls. This would cause a delay of a few milliseconds but would ultimitely limit the throughput that Raft could acheive. This can be mitigated by pipelining requests so they don't have to wait for one another. Pipelining helps significantly but there is still overhead in tracking each request and there's a limit on outstanding requests.

Another issue with AppendEntries is that it combines the log replication with the heartbeat mechanism. This is beneficial in that it causes log replication issues such as disk write errors to cause heartbeat errors but it also has the problem that replicating too many log entries will cause a heartbeat to be missed.

Streaming Raft tries to mitigate by separating log replication from heartbeats. When a node becomes a leader, it begins sending heartbeats to establish dominance. When a follower node recognizes a new leader, it connects to the leader using a long-running connection that simply streams the log from where the follower left off. The handshake process required by the standard Raft implementation on every AppendEntries request is now handled once upon connection.

Index

Constants

View Source
const (
	// DefaultApplyInterval is the default time between checks to apply commands.
	DefaultApplyInterval = 10 * time.Millisecond

	// DefaultElectionTimeout is the default time before starting an election.
	DefaultElectionTimeout = 1 * time.Second

	// DefaultHeartbeatInterval is the default time to wait between heartbeats.
	DefaultHeartbeatInterval = 100 * time.Millisecond

	// DefaultReconnectTimeout is the default time to wait before reconnecting.
	DefaultReconnectTimeout = 10 * time.Millisecond
)
View Source
const WaitInterval = 1 * time.Millisecond

WaitInterval represents the amount of time between checks to the applied index. This is used by clients wanting to wait until a given index is processed.

Variables

View Source
var (
	// ErrClosed is returned when the log is closed.
	ErrClosed = errors.New("log closed")

	// ErrOpen is returned when opening a log that is already open.
	ErrOpen = errors.New("log already open")

	// ErrInitialized is returned when initializing a log that is already a
	// member of a cluster.
	ErrInitialized = errors.New("log already initialized")

	// ErrURLRequired is returned when opening a log without a URL set.
	ErrURLRequired = errors.New("url required")

	// ErrLogExists is returned when initializing an already existing log.
	ErrLogExists = errors.New("log exists")

	// ErrNotLeader is returned performing leader operations on a non-leader.
	ErrNotLeader = errors.New("not leader")

	// ErrStaleTerm is returned when a term is before the current term.
	ErrStaleTerm = errors.New("stale term")

	// ErrOutOfDateLog is returned when a candidate's log is not up to date.
	ErrOutOfDateLog = errors.New("out of date log")

	// ErrAlreadyVoted is returned when a vote has already been cast for
	// a different candidate in the same election term.
	ErrAlreadyVoted = errors.New("already voted")

	// ErrNodeNotFound is returned when referencing a non-existent node.
	ErrNodeNotFound = errors.New("node not found")

	// ErrInvalidNodeID is returned when using a node id of zero.
	ErrInvalidNodeID = errors.New("invalid node id")

	// ErrNodeURLRequired is returned a node config has no URL set.
	ErrNodeURLRequired = errors.New("node url required")

	// ErrDuplicateNodeID is returned when adding a node with an existing id.
	ErrDuplicateNodeID = errors.New("duplicate node id")

	// ErrDuplicateNodeURL is returned when adding a node with an existing URL.
	ErrDuplicateNodeURL = errors.New("duplicate node url")

	// ErrSnapshotRequired returned when reading from an out-of-order log.
	// The snapshot will be retrieved on the next reader request.
	ErrSnapshotRequired = errors.New("snapshot required")
)

Functions

This section is empty.

Types

type Clock

type Clock struct {
	ApplyInterval     time.Duration
	ElectionTimeout   time.Duration
	HeartbeatInterval time.Duration
	ReconnectTimeout  time.Duration
}

Clock implements an interface to the real-time clock.

func NewClock

func NewClock() *Clock

NewClock returns a instance of Clock with defaults set.

func (*Clock) AfterApplyInterval

func (c *Clock) AfterApplyInterval() <-chan chan struct{}

AfterApplyInterval returns a channel that fires after the apply interval.

func (*Clock) AfterElectionTimeout

func (c *Clock) AfterElectionTimeout() <-chan chan struct{}

AfterElectionTimeout returns a channel that fires after a duration that is between the election timeout and double the election timeout.

func (*Clock) AfterHeartbeatInterval

func (c *Clock) AfterHeartbeatInterval() <-chan chan struct{}

AfterHeartbeatInterval returns a channel that fires after the heartbeat interval.

func (*Clock) AfterReconnectTimeout

func (c *Clock) AfterReconnectTimeout() <-chan chan struct{}

AfterReconnectTimeout returns a channel that fires after the reconnection timeout.

func (*Clock) Now

func (c *Clock) Now() time.Time

Now returns the current wall clock time.

type Config

type Config struct {
	// Cluster identifier. Used to prevent separate clusters from
	// accidentally communicating with one another.
	ClusterID uint64

	// Index is the last log index when the configuration was updated.
	Index uint64

	// MaxNodeID is the largest node identifier generated for this config.
	MaxNodeID uint64

	// List of nodes in the cluster.
	Nodes []*ConfigNode
}

Config represents the configuration for the log.

func (*Config) AddNode

func (c *Config) AddNode(id uint64, u url.URL) error

AddNode adds a new node to the config.

func (*Config) Clone

func (c *Config) Clone() *Config

Clone returns a deep copy of the configuration.

func (*Config) NodeByID

func (c *Config) NodeByID(id uint64) *ConfigNode

NodeByID returns a node by identifier.

func (*Config) NodeByURL

func (c *Config) NodeByURL(u url.URL) *ConfigNode

NodeByURL returns a node by URL.

func (*Config) RemoveNode

func (c *Config) RemoveNode(id uint64) error

RemoveNode removes a node by id. Returns ErrNodeNotFound if the node does not exist.

type ConfigDecoder

type ConfigDecoder struct {
	// contains filtered or unexported fields
}

ConfigDecoder decodes a config from a reader.

func NewConfigDecoder

func NewConfigDecoder(r io.Reader) *ConfigDecoder

NewConfigDecoder returns a new instance of ConfigDecoder attached to a reader.

func (*ConfigDecoder) Decode

func (dec *ConfigDecoder) Decode(c *Config) error

Decode marshals the configuration to the decoder's reader.

type ConfigEncoder

type ConfigEncoder struct {
	// contains filtered or unexported fields
}

ConfigEncoder encodes a config to a writer.

func NewConfigEncoder

func NewConfigEncoder(w io.Writer) *ConfigEncoder

NewConfigEncoder returns a new instance of ConfigEncoder attached to a writer.

func (*ConfigEncoder) Encode

func (enc *ConfigEncoder) Encode(c *Config) error

Encode marshals the configuration to the encoder's writer.

type ConfigNode

type ConfigNode struct {
	ID  uint64
	URL url.URL
}

ConfigNode represents a single machine in the raft configuration.

type FSM

type FSM interface {
	// These implement the snapshot and restore.
	io.WriterTo
	io.ReaderFrom

	// Executes a log entry against the state machine.
	// Non-repeatable errors such as system and disk errors must panic.
	Apply(*LogEntry) error

	// Returns the applied index saved to the state machine.
	Index() uint64
}

FSM represents the state machine that the log is applied to. The FSM must maintain the highest index that it has seen.

type HTTPTransport

type HTTPTransport struct{}

HTTPTransport represents a transport for sending RPCs over the HTTP protocol.

func (*HTTPTransport) Heartbeat

func (t *HTTPTransport) Heartbeat(uri url.URL, term, commitIndex, leaderID uint64) (uint64, error)

Heartbeat checks the status of a follower.

func (*HTTPTransport) Join

func (t *HTTPTransport) Join(uri url.URL, nodeURL url.URL) (uint64, uint64, *Config, error)

Join requests membership into a node's cluster.

func (*HTTPTransport) Leave

func (t *HTTPTransport) Leave(uri url.URL, id uint64) error

Leave removes a node from a cluster's membership.

func (*HTTPTransport) ReadFrom

func (t *HTTPTransport) ReadFrom(uri url.URL, id, term, index uint64) (io.ReadCloser, error)

ReadFrom streams the log from a leader.

func (*HTTPTransport) RequestVote

func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error

RequestVote requests a vote for a candidate in a given term.

type Handler

type Handler struct {
	Log interface {
		AddPeer(u url.URL) (id uint64, leaderID uint64, config *Config, err error)
		RemovePeer(id uint64) error
		Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
		WriteEntriesTo(w io.Writer, id, term, index uint64) error
		RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) error
	}
}

Handler represents an HTTP endpoint for Raft to communicate over.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP handles all incoming HTTP requests.

type Log

type Log struct {

	// The state machine that log entries will be applied to.
	FSM FSM

	// The transport used to communicate with other nodes in the cluster.
	Transport interface {
		Join(u url.URL, nodeURL url.URL) (id uint64, leaderID uint64, config *Config, err error)
		Leave(u url.URL, id uint64) error
		Heartbeat(u url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error)
		ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser, error)
		RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error
	}

	// Clock is an abstraction of time.
	Clock interface {
		Now() time.Time
		AfterApplyInterval() <-chan chan struct{}
		AfterElectionTimeout() <-chan chan struct{}
		AfterHeartbeatInterval() <-chan chan struct{}
		AfterReconnectTimeout() <-chan chan struct{}
	}

	// Rand returns a random number.
	Rand func() int64

	// Sets whether trace messages are logged.
	DebugEnabled bool

	// This logs some asynchronous errors that occur within the log.
	Logger *log.Logger
	// contains filtered or unexported fields
}

Log represents a replicated log of commands based on the Raft protocol.

The log can exist in one of four states that transition based on the following rules:

         ┌───────────┐
      ┌─▶│  Stopped  │
      │  └───────────┘
      │        │
      │        ▼
      │  ┌───────────┐
      ├──│ Follower  │◀─┐
      │  └───────────┘  │
close │        │        │
 log  │        ▼        │
      │  ┌───────────┐  │
      ├──│ Candidate │──┤ higher
      │  └───────────┘  │  term
      │        │        │
      │        ▼        │
      │  ┌───────────┐  │
      └──│  Leader   │──┘
         └───────────┘

- Stopped moves to Follower when initialized or joined.
- Follower moves to Candidate after election timeout.
- Candidate moves to Leader after a quorum of votes.
- Leader or Candidate moves to Follower if higher term seen.
- Any state moves to Stopped if log is closed.

func NewLog

func NewLog() *Log

NewLog creates a new instance of Log with reasonable defaults.

func (*Log) AddPeer

func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error)

AddPeer creates a new peer in the cluster. Returns the new peer's identifier and the current configuration.

func (*Log) Apply

func (l *Log) Apply(command []byte) (uint64, error)

Apply executes a command against the log. This function returns once the command has been committed to the log.

func (*Log) Close

func (l *Log) Close() error

Close closes the log.

func (*Log) ClusterID

func (l *Log) ClusterID() uint64

ClusterID returns the identifier for the cluster. Returns zero if the cluster has not been initialized yet.

func (*Log) CommitIndex

func (l *Log) CommitIndex() uint64

CommtIndex returns the highest committed index.

func (*Log) Config

func (l *Log) Config() *Config

Config returns a the log's current configuration.

func (*Log) Flush

func (l *Log) Flush()

Flush pushes out buffered data for all open writers.

func (*Log) Heartbeat

func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)

Heartbeat establishes dominance by the current leader. Returns the current term and highest written log entry index.

func (*Log) ID

func (l *Log) ID() uint64

ID returns the log's identifier.

func (*Log) Initialize

func (l *Log) Initialize() error

Initialize a new log. Returns an error if log data already exists.

func (*Log) IsLeader

func (l *Log) IsLeader() bool

IsLeader returns true if the log is the current leader.

func (*Log) Join

func (l *Log) Join(u url.URL) error

Join contacts a node in the cluster to request membership. A log cannot join a cluster if it has already been initialized.

func (*Log) LastLogIndexTerm

func (l *Log) LastLogIndexTerm() (index, term uint64)

LastLogIndexTerm returns the last index & term from the log.

func (*Log) Leader

func (l *Log) Leader() (id uint64, u url.URL)

Leader returns the id and URL associated with the current leader. Returns zero if there is no current leader.

func (*Log) Leave

func (l *Log) Leave() error

Leave removes the log from cluster membership and removes the log data.

func (*Log) Open

func (l *Log) Open(path string) error

Open initializes the log from a path. If the path does not exist then it is created.

func (*Log) Opened

func (l *Log) Opened() bool

Opened returns true if the log is currently open.

func (*Log) Path

func (l *Log) Path() string

Path returns the data path of the Raft log. Returns an empty string if the log is closed.

func (*Log) ReadFrom

func (l *Log) ReadFrom(r io.ReadCloser) error

ReadFrom continually reads log entries from a reader.

func (*Log) RemovePeer

func (l *Log) RemovePeer(id uint64) error

RemovePeer removes an existing peer from the cluster by id.

func (*Log) RequestVote

func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (err error)

RequestVote requests a vote from the log.

func (*Log) SetURL

func (l *Log) SetURL(u url.URL)

SetURL sets the URL for the log. This must be set before opening.

func (*Log) State

func (l *Log) State() State

State returns the current state.

func (*Log) Term

func (l *Log) Term() uint64

Term returns the current term.

func (*Log) URL

func (l *Log) URL() url.URL

URL returns the URL for the log.

func (*Log) URLs

func (l *Log) URLs() []url.URL

URLs returns a list of all URLs in the cluster.

func (*Log) Wait

func (l *Log) Wait(idx uint64) error

Wait blocks until a given index is applied.

func (*Log) WriteEntriesTo

func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error

WriteEntriesTo attaches a writer to the log from a given index. The index specified must be a committed index.

type LogEntry

type LogEntry struct {
	Type  LogEntryType
	Index uint64
	Term  uint64
	Data  []byte
}

LogEntry represents a single command within the log.

type LogEntryDecoder

type LogEntryDecoder struct {
	// contains filtered or unexported fields
}

LogEntryDecoder decodes entries from a reader.

func NewLogEntryDecoder

func NewLogEntryDecoder(r io.Reader) *LogEntryDecoder

NewLogEntryDecoder returns a new instance of the LogEntryDecoder that will decode from a reader.

func (*LogEntryDecoder) Decode

func (dec *LogEntryDecoder) Decode(e *LogEntry) error

Decode reads a log entry from the decoder's reader.

type LogEntryEncoder

type LogEntryEncoder struct {
	// contains filtered or unexported fields
}

LogEntryEncoder encodes entries to a writer.

func NewLogEntryEncoder

func NewLogEntryEncoder(w io.Writer) *LogEntryEncoder

NewLogEntryEncoder returns a new instance of the LogEntryEncoder that will encode to a writer.

func (*LogEntryEncoder) Encode

func (enc *LogEntryEncoder) Encode(e *LogEntry) error

Encode writes a log entry to the encoder's writer.

type LogEntryType

type LogEntryType uint8

LogEntryType serves as an internal marker for log entries. Non-command entry types are handled by the library itself.

const (
	LogEntryCommand LogEntryType = iota
	LogEntryNop
	LogEntryInitialize
	LogEntryAddPeer
	LogEntryRemovePeer
)

type State

type State int

State represents whether the log is a follower, candidate, or leader.

const (
	Stopped State = iota
	Follower
	Candidate
	Leader
)

func (State) String

func (s State) String() string

String returns the string representation of the state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL