raft

package module
v0.0.0-...-d85b35c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2025 License: Unlicense Imports: 11 Imported by: 2

Documentation

Overview

Core Raft implementation - Consensus Module.

Eli Bendersky [https://eli.thegreenplace.net] This code is in the public domain.

Server container for a Raft Consensus Module. Exposes Raft to the network and enables RPCs between Raft peers.

Eli Bendersky [https://eli.thegreenplace.net] This code is in the public domain.

Eli Bendersky [https://eli.thegreenplace.net] This code is in the public domain.

Test harness for writing tests for Raft.

Eli Bendersky [https://eli.thegreenplace.net] This code is in the public domain.

Index

Constants

View Source
const DebugCM = 1

Variables

This section is empty.

Functions

This section is empty.

Types

type AppendEntriesArgs

type AppendEntriesArgs struct {
	Term     int
	LeaderId int

	PrevLogIndex int
	PrevLogTerm  int
	Entries      []LogEntry
	LeaderCommit int
}

See figure 2 in the paper.

type AppendEntriesReply

type AppendEntriesReply struct {
	Term    int
	Success bool

	// Faster conflict resolution optimization (described near the end of section
	// 5.3 in the paper.)
	ConflictIndex int
	ConflictTerm  int
}

type CMState

type CMState int
const (
	Follower CMState = iota
	Candidate
	Leader
	Dead
)

func (CMState) String

func (s CMState) String() string

type CommitEntry

type CommitEntry struct {
	// Command is the client command being committed.
	Command any

	// Index is the log index at which the client command is committed.
	Index int

	// Term is the Raft term at which the client command is committed.
	Term int
}

CommitEntry is the data reported by Raft to the commit channel. Each commit entry notifies the client that consensus was reached on a command and it can be applied to the client's state machine.

type ConsensusModule

type ConsensusModule struct {
	// contains filtered or unexported fields
}

ConsensusModule (CM) implements a single node of Raft consensus.

func NewConsensusModule

func NewConsensusModule(id int, peerIds []int, server *Server, storage Storage, ready <-chan any, commitChan chan<- CommitEntry) *ConsensusModule

NewConsensusModule creates a new CM with the given ID, list of peer IDs and server. The ready channel signals the CM that all peers are connected and it's safe to start its state machine. commitChan is going to be used by the CM to send log entries that have been committed by the Raft cluster.

func (*ConsensusModule) AppendEntries

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error

func (*ConsensusModule) Report

func (cm *ConsensusModule) Report() (id int, term int, isLeader bool)

Report reports the state of this CM.

func (*ConsensusModule) RequestVote

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error

RequestVote RPC.

func (*ConsensusModule) Stop

func (cm *ConsensusModule) Stop()

Stop stops this CM, cleaning up its state. This method returns quickly, but it may take a bit of time (up to ~election timeout) for all goroutines to exit.

func (*ConsensusModule) Submit

func (cm *ConsensusModule) Submit(command any) int

Submit submits a new command to the CM. This function doesn't block; clients read the commit channel passed in the constructor to be notified of new committed entries. If this CM is the leader, Submit returns the log index where the command is submitted. Otherwise, it returns -1

type Harness

type Harness struct {
	// contains filtered or unexported fields
}

func NewHarness

func NewHarness(t *testing.T, n int) *Harness

NewHarness creates a new test Harness, initialized with n servers connected to each other.

func (*Harness) CheckCommitted

func (h *Harness) CheckCommitted(cmd int) (nc int, index int)

CheckCommitted verifies that all connected servers have cmd committed with the same index. It also verifies that all commands *before* cmd in the commit sequence match. For this to work properly, all commands submitted to Raft should be unique positive ints. Returns the number of servers that have this command committed, and its log index.

func (*Harness) CheckCommittedN

func (h *Harness) CheckCommittedN(cmd int, n int)

CheckCommittedN verifies that cmd was committed by exactly n connected servers.

func (*Harness) CheckNoLeader

func (h *Harness) CheckNoLeader()

CheckNoLeader checks that no connected server considers itself the leader.

func (*Harness) CheckNotCommitted

func (h *Harness) CheckNotCommitted(cmd int)

CheckNotCommitted verifies that no command equal to cmd has been committed by any of the active servers yet.

func (*Harness) CheckSingleLeader

func (h *Harness) CheckSingleLeader() (int, int)

CheckSingleLeader checks that only a single server thinks it's the leader. Returns the leader's id and term. It retries several times if no leader is identified yet.

func (*Harness) CrashPeer

func (h *Harness) CrashPeer(id int)

CrashPeer "crashes" a server by disconnecting it from all peers and then asking it to shut down. We're not going to use the same server instance again, but its storage is retained.

func (*Harness) DisconnectPeer

func (h *Harness) DisconnectPeer(id int)

DisconnectPeer disconnects a server from all other servers in the cluster.

func (*Harness) PeerDontDropCalls

func (h *Harness) PeerDontDropCalls(id int)

PeerDontDropCalls instructs peer `id` to stop dropping calls.

func (*Harness) PeerDropCallsAfterN

func (h *Harness) PeerDropCallsAfterN(id int, n int)

PeerDropCallsAfterN instructs peer `id` to drop calls after the next `n` are made.

func (*Harness) ReconnectPeer

func (h *Harness) ReconnectPeer(id int)

ReconnectPeer connects a server to all other servers in the cluster.

func (*Harness) RestartPeer

func (h *Harness) RestartPeer(id int)

RestartPeer "restarts" a server by creating a new Server instance and giving it the appropriate storage, reconnecting it to peers.

func (*Harness) Shutdown

func (h *Harness) Shutdown()

Shutdown shuts down all the servers in the harness and waits for them to stop running.

func (*Harness) SubmitToServer

func (h *Harness) SubmitToServer(serverId int, cmd any) int

SubmitToServer submits the command to serverId.

type LogEntry

type LogEntry struct {
	Command any
	Term    int
}

type MapStorage

type MapStorage struct {
	// contains filtered or unexported fields
}

MapStorage is a simple in-memory implementation of Storage for testing.

func NewMapStorage

func NewMapStorage() *MapStorage

func (*MapStorage) Get

func (ms *MapStorage) Get(key string) ([]byte, bool)

func (*MapStorage) HasData

func (ms *MapStorage) HasData() bool

func (*MapStorage) Set

func (ms *MapStorage) Set(key string, value []byte)

type RPCProxy

type RPCProxy struct {
	// contains filtered or unexported fields
}

RPCProxy is a pass-thru proxy server for ConsensusModule's RPC methods. It serves RPC requests made to a CM and manipulates them before forwarding to the CM itself.

It's useful for things like:

  • Simulating dropping of RPC calls
  • Simulating a small delay in RPC transmission.
  • Simulating possible unreliable connections by delaying some messages significantly and dropping others when RAFT_UNRELIABLE_RPC is set.

func NewProxy

func NewProxy(cm *ConsensusModule) *RPCProxy

func (*RPCProxy) AppendEntries

func (rpp *RPCProxy) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error

func (*RPCProxy) Call

func (rpp *RPCProxy) Call(peer *rpc.Client, method string, args any, reply any) error

func (*RPCProxy) DontDropCalls

func (rpp *RPCProxy) DontDropCalls()

func (*RPCProxy) DropCallsAfterN

func (rpp *RPCProxy) DropCallsAfterN(n int)

DropCallsAfterN instruct the proxy to drop calls after n are made from this point.

func (*RPCProxy) RequestVote

func (rpp *RPCProxy) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error

type RequestVoteArgs

type RequestVoteArgs struct {
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

See figure 2 in the paper.

type RequestVoteReply

type RequestVoteReply struct {
	Term        int
	VoteGranted bool
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server wraps a raft.ConsensusModule along with a rpc.Server that exposes its methods as RPC endpoints. It also manages the peers of the Raft server. The main goal of this type is to simplify the code of raft.Server for presentation purposes. raft.ConsensusModule has a *Server to do its peer communication and doesn't have to worry about the specifics of running an RPC server.

func NewServer

func NewServer(serverId int, peerIds []int, storage Storage, ready <-chan any, commitChan chan<- CommitEntry) *Server

func (*Server) Call

func (s *Server) Call(id int, serviceMethod string, args any, reply any) error

func (*Server) ConnectToPeer

func (s *Server) ConnectToPeer(peerId int, addr net.Addr) error

func (*Server) DisconnectAll

func (s *Server) DisconnectAll()

DisconnectAll closes all the client connections to peers for this server.

func (*Server) DisconnectPeer

func (s *Server) DisconnectPeer(peerId int) error

DisconnectPeer disconnects this server from the peer identified by peerId.

func (*Server) GetListenAddr

func (s *Server) GetListenAddr() net.Addr

func (*Server) IsLeader

func (s *Server) IsLeader() bool

IsLeader checks if s thinks it's the leader in the Raft cluster.

func (*Server) Proxy

func (s *Server) Proxy() *RPCProxy

Proxy provides access to the RPC proxy this server is using; this is only for testing purposes to simulate faults.

func (*Server) Serve

func (s *Server) Serve()

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown closes the server and waits for it to shut down properly.

func (*Server) Submit

func (s *Server) Submit(cmd any) int

Submit wraps the underlying CM's Submit; see that method for documentation.

type Storage

type Storage interface {
	Set(key string, value []byte)

	Get(key string) ([]byte, bool)

	// HasData returns true iff any Sets were made on this Storage.
	HasData() bool
}

Storage is an interface implemented by stable storage providers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL