raft

package module
v0.0.0-...-730cdd4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2013 License: MIT Imports: 16 Imported by: 0

README

go-raft

Overview

This is an Go implementation of the Raft distributed consensus protocol. Raft is a protocol by which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log.

For more details on Raft, you can read In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout.

The Raft Protocol

Overview

Maintaining state in a single process on a single server is easy. Your process is a single point of authority so there are no conflicts when reading and writing state. Even multi-threaded processes can rely on locks or coroutines to serialize access to the data.

However, in a distributed system there is no single point of authority. Servers can crash or the network between two machines can become unavailable or any number of other problems can occur.

A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster. Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.

An alternative is the Raft distributed consensus protocol by Diego Ongaro and John Ousterhout. Raft is a protocol built with understandability as a primary tenant and it centers around two things:

  1. Leader Election
  2. Replicated Log

With these two constructs, you can build a system that can maintain state across multiple servers -- even in the event of multiple failures.

Leader Election

The Raft protocol effectively works as a master-slave system whereby state changes are written to a single server in the cluster and are distributed out to the rest of the servers in the cluster. This simplifies the protocol since there is only one data authority and conflicts will not have to be resolved.

Raft ensures that there is only one leader at a time. It does this by performing elections among the nodes in the cluster and requiring that a node must receive a majority of the votes in order to become leader. For example, if you have 3 nodes in your cluster then a single node would need 2 votes in order to become the leader. For a 5 node cluster, a server would need 3 votes to become leader.

Replicated Log

To maintain state, a log of commands is maintained. Each command makes a change to the state of the server and the command is deterministic. By ensuring that this log is replicated identically between all the nodes in the cluster we can replicate the state at any point in time in the log by running each command sequentially.

Replicating the log under normal conditions is done by sending an AppendEntries RPC from the leader to each of the other servers in the cluster (called Peers). Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.

For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: In Search of an Understandable Consensus Algorithm

Project Status

The go-raft library is feature complete but in alpha. There is a reference implementation called raftd that demonstrates how to use the library

The library will be considered experimental until it has significant production usage. I'm writing the library for the purpose of including distributed processing in my behavioral analytics database called Sky. However, I hope other projects can benefit from having a distributed consensus protocol so the go-raft library is available under MIT license.

If you have a project that you're using go-raft in, please add it to this README and send a pull request so others can see implementation examples. If you have any questions on implementing go-raft in your project, feel free to contact me on GitHub, Twitter or by e-mail at ben@skylandlabs.com.

Documentation

Index

Constants

View Source
const (
	Stopped   = "stopped"
	Follower  = "follower"
	Candidate = "candidate"
	Leader    = "leader"
)
View Source
const (
	DefaultHeartbeatTimeout = 50 * time.Millisecond
	DefaultElectionTimeout  = 150 * time.Millisecond
)

Variables

View Source
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
View Source
var NotLeaderError = errors.New("raft.Server: Not current leader")

Functions

func RegisterCommand

func RegisterCommand(command Command)

Registers a command by storing a reference to an instance of it.

Types

type AppendEntriesRequest

type AppendEntriesRequest struct {
	Term         uint64      `json:"term"`
	LeaderName   string      `json:"leaderName"`
	PrevLogIndex uint64      `json:"prevLogIndex"`
	PrevLogTerm  uint64      `json:"prevLogTerm"`
	Entries      []*LogEntry `json:"entries"`
	CommitIndex  uint64      `json:"commitIndex"`
}

The request sent to a server to append entries to the log.

func NewAppendEntriesRequest

func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest

Creates a new AppendEntries request.

type AppendEntriesResponse

type AppendEntriesResponse struct {
	Term        uint64 `json:"term"`
	Success     bool   `json:"success"`
	CommitIndex uint64 `json:"commitIndex"`
}

The response returned from a server appending entries to the log.

func NewAppendEntriesResponse

func NewAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse

Creates a new AppendEntries response.

type Command

type Command interface {
	CommandName() string
	Apply(server *Server) error
}

A command represents an action to be taken on the replicated state machine.

func NewCommand

func NewCommand(name string) (Command, error)

Creates a new instance of a command by name.

type Log

type Log struct {
	ApplyFunc func(Command) error
	// contains filtered or unexported fields
}

A log is a collection of log entries that are persisted to durable storage.

func NewLog

func NewLog() *Log

Creates a new log.

func (*Log) AppendEntries

func (l *Log) AppendEntries(entries []*LogEntry) error

Appends a series of entries to the log. These entries are not written to disk until SetCommitIndex() is called.

func (*Log) AppendEntry

func (l *Log) AppendEntry(entry *LogEntry) error

Appends a single entry to the log.

func (*Log) Close

func (l *Log) Close()

Closes the log file.

func (*Log) CommitIndex

func (l *Log) CommitIndex() uint64

The last committed index in the log.

func (*Log) CommitInfo

func (l *Log) CommitInfo() (index uint64, term uint64)

Retrieves the last index and term that has been committed to the log.

func (*Log) Compact

func (l *Log) Compact(index uint64, term uint64) error

compaction the log before index

func (*Log) ContainsEntry

func (l *Log) ContainsEntry(index uint64, term uint64) bool

Checks if the log contains a given index/term combination.

func (*Log) CreateEntry

func (l *Log) CreateEntry(term uint64, command Command) *LogEntry

Creates a log entry associated with this log.

func (*Log) CurrentIndex

func (l *Log) CurrentIndex() uint64

The current index in the log.

func (*Log) CurrentTerm

func (l *Log) CurrentTerm() uint64

The current term in the log.

func (*Log) GetEntriesAfter

func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64)

Retrieves a list of entries after a given index. This function also returns the term of the index provided.

func (*Log) GetEntryError

func (l *Log) GetEntryError(entry *LogEntry) error

Retrieves the error returned from an entry. The error can only exist after the entry has been committed.

func (*Log) IsEmpty

func (l *Log) IsEmpty() bool

Determines if the log contains zero entries.

func (*Log) LastCommandName

func (l *Log) LastCommandName() string

The name of the last command in the log.

func (*Log) NextIndex

func (l *Log) NextIndex() uint64

The next index in the log.

func (*Log) Open

func (l *Log) Open(path string) error

Opens the log file and reads existing entries. The log can remain open and continue to append entries to the end of the log.

func (*Log) SetCommitIndex

func (l *Log) SetCommitIndex(index uint64) error

Updates the commit index and writes entries after that index to the stable storage.

func (*Log) SetStartIndex

func (l *Log) SetStartIndex(i uint64)

func (*Log) SetStartTerm

func (l *Log) SetStartTerm(t uint64)

func (*Log) StartIndex

func (l *Log) StartIndex() uint64

func (*Log) Truncate

func (l *Log) Truncate(index uint64, term uint64) error

Truncates the log to the given index and term. This only works if the log at the index has not been committed.

func (*Log) UpdateCommitIndex

func (l *Log) UpdateCommitIndex(index uint64)

Updates the commit index

type LogEntry

type LogEntry struct {
	Index   uint64  `json:"index"`
	Term    uint64  `json:"term"`
	Command Command `json:"command"`
	// contains filtered or unexported fields
}

A log entry stores a single item in the log.

func NewLogEntry

func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry

Creates a new log entry associated with a log.

func (*LogEntry) Decode

func (e *LogEntry) Decode(r io.Reader) (pos int, err error)

Decodes the log entry from a buffer. Returns the number of bytes read.

func (*LogEntry) Encode

func (e *LogEntry) Encode(w io.Writer) error

Encodes the log entry to a buffer.

func (*LogEntry) MarshalJSON

func (e *LogEntry) MarshalJSON() ([]byte, error)

Encodes a log entry into JSON.

func (*LogEntry) UnmarshalJSON

func (e *LogEntry) UnmarshalJSON(data []byte) error

Decodes a log entry from a JSON byte array.

type Peer

type Peer struct {
	// contains filtered or unexported fields
}

A peer is a reference to another server involved in the consensus protocol.

func NewPeer

func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer

Creates a new peer.

func (*Peer) HeartbeatTimeout

func (p *Peer) HeartbeatTimeout() time.Duration

Retrieves the heartbeat timeout.

func (*Peer) Name

func (p *Peer) Name() string

Retrieves the name of the peer.

func (*Peer) SetHeartbeatTimeout

func (p *Peer) SetHeartbeatTimeout(duration time.Duration)

Sets the heartbeat timeout.

type RequestVoteRequest

type RequestVoteRequest struct {
	Term          uint64 `json:"term"`
	CandidateName string `json:"candidateName"`
	LastLogIndex  uint64 `json:"lastLogIndex"`
	LastLogTerm   uint64 `json:"lastLogTerm"`
	// contains filtered or unexported fields
}

The request sent to a server to vote for a candidate to become a leader.

func NewRequestVoteRequest

func NewRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest

Creates a new RequestVote request.

type RequestVoteResponse

type RequestVoteResponse struct {
	Term        uint64 `json:"term"`
	VoteGranted bool   `json:"voteGranted"`
	// contains filtered or unexported fields
}

The response returned from a server after a vote for a candidate to become a leader.

func NewRequestVoteResponse

func NewRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse

Creates a new RequestVote response.

type Server

type Server struct {
	// contains filtered or unexported fields
}

A server is involved in the consensus protocol and can act as a follower, candidate or a leader.

func NewServer

func NewServer(name string, path string, transporter Transporter, context interface{}) (*Server, error)

Creates a new server with a log at the given path.

func (*Server) AddPeer

func (s *Server) AddPeer(name string) error

Adds a peer to the server. This should be called by a system's join command within the context so that it is within the context of the server lock.

func (*Server) AppendEntries

func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesResponse, error)

Appends a log entry from the leader to this server.

func (*Server) Context

func (s *Server) Context() interface{}

Retrieves the context passed into the constructor.

func (*Server) Do

func (s *Server) Do(command Command) error

Attempts to execute a command and replicate it. The function will return when the command has been successfully committed or an error has occurred.

func (*Server) ElectionTimeout

func (s *Server) ElectionTimeout() time.Duration

Retrieves the election timeout.

func (*Server) HeartbeatTimeout

func (s *Server) HeartbeatTimeout() time.Duration

Retrieves the heartbeat timeout.

func (*Server) Initialize

func (s *Server) Initialize() error

Initializes the server to become leader of a new cluster. This function will fail if there is an existing log or the server is already a member in an existing cluster.

func (*Server) IsLogEmpty

func (s *Server) IsLogEmpty() bool

Retrieves whether the server's log has no entries.

func (*Server) LastCommandName

func (s *Server) LastCommandName() string

A reference to the command name of the last entry.

func (*Server) Leader

func (s *Server) Leader() string

func (*Server) LoadSnapshot

func (s *Server) LoadSnapshot() error

Load a snapshot at restart

func (*Server) LogEntries

func (s *Server) LogEntries() []*LogEntry

A list of all the log entries. This should only be used for debugging purposes.

func (*Server) LogPath

func (s *Server) LogPath() string

Retrieves the log path for the server.

func (*Server) MemberCount

func (s *Server) MemberCount() int

Retrieves the number of member servers in the consensus.

func (*Server) Name

func (s *Server) Name() string

Retrieves the name of the server.

func (*Server) Path

func (s *Server) Path() string

Retrieves the storage path for the server.

func (*Server) QuorumSize

func (s *Server) QuorumSize() int

Retrieves the number of servers required to make a quorum.

func (*Server) RemovePeer

func (s *Server) RemovePeer(name string) error

Removes a peer from the server. This should be called by a system's join command within the context so that it is within the context of the server lock.

func (*Server) RequestVote

func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, error)

Requests a vote from a server. A vote can be obtained if the vote's term is at the server's current term and the server has not made a vote yet. A vote can also be obtained if the term is greater than the server's current term.

func (*Server) Running

func (s *Server) Running() bool

Checks if the server is currently running.

func (*Server) SetElectionTimeout

func (s *Server) SetElectionTimeout(duration time.Duration)

Sets the election timeout.

func (*Server) SetHeartbeatTimeout

func (s *Server) SetHeartbeatTimeout(duration time.Duration)

Sets the heartbeat timeout.

func (*Server) Snapshot

func (s *Server) Snapshot()

The background snapshot function

func (*Server) SnapshotPath

func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string

Retrieves the log path for the server.

func (*Server) SnapshotRecovery

func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error)

func (*Server) Start

func (s *Server) Start() error

Starts the server with a log at the given path.

func (*Server) State

func (s *Server) State() string

Retrieves the current state of the server.

func (*Server) Stop

func (s *Server) Stop()

Shuts down the server.

func (*Server) Transporter

func (s *Server) Transporter() Transporter

Retrieves the object that transports requests.

func (*Server) VotedFor

func (s *Server) VotedFor() string

Retrieves the name of the candidate this server voted for in this term.

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

the in memory SnapShot struct TODO add cluster configuration

func (*Snapshot) Remove

func (ss *Snapshot) Remove() error

remove the file of the snapshot

func (*Snapshot) Save

func (ss *Snapshot) Save() error

Save the snapshot to a file

type SnapshotRequest

type SnapshotRequest struct {
	LeaderName string `json:"leaderName"`
	LastIndex  uint64 `json:"lastTerm"`
	LastTerm   uint64 `json:"lastIndex"`
	State      []byte `json:"state"`
}

The request sent to a server to start from the snapshot.

func NewSnapshotRequest

func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest

Creates a new Snapshot request.

type SnapshotResponse

type SnapshotResponse struct {
	Term        uint64 `json:"term"`
	Success     bool   `json:"success"`
	CommitIndex uint64 `json:"commitIndex"`
}

The response returned from a server appending entries to the log.

func NewSnapshotResponse

func NewSnapshotResponse(term uint64, success bool, commitIndex uint64) *SnapshotResponse

Creates a new Snapshot response.

type StateMachine

type StateMachine interface {
	Save() ([]byte, error)
	Recovery([]byte) error
}

StateMachine is the interface for allowing the host application to save and recovery the state machine

type TestCommand1

type TestCommand1 struct {
	Val string `json:"val"`
	I   int    `json:"i"`
}

func (TestCommand1) Apply

func (c TestCommand1) Apply(server *Server) error

func (TestCommand1) CommandName

func (c TestCommand1) CommandName() string

type TestCommand2

type TestCommand2 struct {
	X int `json:"x"`
}

func (TestCommand2) Apply

func (c TestCommand2) Apply(server *Server) error

func (TestCommand2) CommandName

func (c TestCommand2) CommandName() string

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

The timer wraps the internal Go timer and provides the ability to pause, reset and stop. It also allows for the duration of the timer to be a random number between a min and max duration.

func NewTimer

func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer

Creates a new timer. Panics if a non-positive duration is used.

func (*Timer) C

func (t *Timer) C() chan time.Time

Retrieves the timer's channel.

func (*Timer) MaxDuration

func (t *Timer) MaxDuration() time.Duration

Retrieves the maximum duration of the timer.

func (*Timer) MinDuration

func (t *Timer) MinDuration() time.Duration

Retrieves the minimum duration of the timer.

func (*Timer) Pause

func (t *Timer) Pause()

Stops the timer.

func (*Timer) Reset

func (t *Timer) Reset()

Stops the timer if it is running and restarts it.

func (*Timer) Running

func (t *Timer) Running() bool

Checks if the timer is currently running.

func (*Timer) SetDuration

func (t *Timer) SetDuration(duration time.Duration)

Sets the minimum and maximum duration of the timer.

func (*Timer) SetMaxDuration

func (t *Timer) SetMaxDuration(duration time.Duration)

Sets the maximum duration of the timer.

func (*Timer) SetMinDuration

func (t *Timer) SetMinDuration(duration time.Duration)

Sets the minimum duration of the timer.

func (*Timer) Stop

func (t *Timer) Stop()

Stops the timer and closes the channel.

type Transporter

type Transporter interface {
	SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error)
	SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
	SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error)
}

Transporter is the interface for allowing the host application to transport requests to other nodes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL