raft

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2021 License: MIT Imports: 20 Imported by: 0

README

Raft library

Raft is a protocol with which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log. For more details on Raft, see "In Search of an Understandable Consensus Algorithm" (https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.

This Raft library is stable and feature complete. As of 2016, it is the most widely used Raft library in production, serving tens of thousands clusters each day. It powers distributed systems such as etcd, Kubernetes, Docker Swarm, Cloud Foundry Diego, CockroachDB, TiDB, Project Calico, Flannel, and more.

Most Raft implementations have a monolithic design, including storage handling, messaging serialization, and network transport. This library instead follows a minimalistic design philosophy by only implementing the core raft algorithm. This minimalism buys flexibility, determinism, and performance.

To keep the codebase small as well as provide flexibility, the library only implements the Raft algorithm; both network and disk IO are left to the user. Library users must implement their own transportation layer for message passing between Raft peers over the wire. Similarly, users must implement their own storage layer to persist the Raft log and state.

In order to easily test the Raft library, its behavior should be deterministic. To achieve this determinism, the library models Raft as a state machine. The state machine takes a Message as input. A message can either be a local timer update or a network message sent from a remote peer. The state machine's output is a 3-tuple {[]Messages, []LogEntries, NextState} consisting of an array of Messages, log entries, and Raft state changes. For state machines with the same state, the same state machine input should always generate the same state machine output.

A simple example application, raftexample, is also available to help illustrate how to use this package in practice: https://github.com/coreos/etcd/tree/master/contrib/raftexample

Features

This raft implementation is a full feature implementation of Raft protocol. Features includes:

  • Leader election
  • Log replication
  • Log compaction
  • Membership changes
  • Leadership transfer extension
  • Efficient linearizable read-only queries served by both the leader and followers
  • leader checks with quorum and bypasses Raft log before processing read-only queries
  • followers asks leader to get a safe read index before processing read-only queries
  • More efficient lease-based linearizable read-only queries served by both the leader and followers
  • leader bypasses Raft log and processing read-only queries locally
  • followers asks leader to get a safe read index before processing read-only queries
  • this approach relies on the clock of the all the machines in raft group

This raft implementation also includes a few optional enhancements:

  • Optimistic pipelining to reduce log replication latency
  • Flow control for log replication
  • Batching Raft messages to reduce synchronized network I/O calls
  • Batching log entries to reduce disk synchronized I/O
  • Writing to leader's disk in parallel
  • Internal proposal redirection from followers to leader
  • Automatic stepping down when the leader loses quorum

Notable Users

  • cockroachdb A Scalable, Survivable, Strongly-Consistent SQL Database
  • dgraph A Scalable, Distributed, Low Latency, High Throughput Graph Database
  • etcd A distributed reliable key-value store
  • tikv A Distributed transactional key value database powered by Rust and Raft
  • swarmkit A toolkit for orchestrating distributed systems at any scale.

Usage

The primary object in raft is a Node. Either start a Node from scratch using raft.StartNode or start a Node from some initial state using raft.RestartNode.

To start a three-node cluster

  storage := raft.NewMemoryStorage()
  c := &Config{
    ID:              0x01,
    ElectionTick:    10,
    HeartbeatTick:   1,
    Storage:         storage,
    MaxSizePerMsg:   4096,
    MaxInflightMsgs: 256,
  }
  // Set peer list to the other nodes in the cluster.
  // Note that they need to be started separately as well.
  n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

Start a single node cluster, like so:

  // Create storage and config as shown above.
  // Set peer list to itself, so this node can become the leader of this single-node cluster.
  peers := []raft.Peer{{ID: 0x01}}
  n := raft.StartNode(c, peers)

To allow a new node to join this cluster, do not pass in any peers. First, add the node to the existing cluster by calling ProposeConfChange on any existing node inside the cluster. Then, start the node with an empty peer list, like so:

  // Create storage and config as shown above.
  n := raft.StartNode(c, nil)

To restart a node from previous state:

  storage := raft.NewMemoryStorage()

  // Recover the in-memory storage from persistent snapshot, state and entries.
  storage.ApplySnapshot(snapshot)
  storage.SetHardState(state)
  storage.Append(entries)

  c := &Config{
    ID:              0x01,
    ElectionTick:    10,
    HeartbeatTick:   1,
    Storage:         storage,
    MaxSizePerMsg:   4096,
    MaxInflightMsgs: 256,
  }

  // Restart raft without peer information.
  // Peer information is already included in the storage.
  n := raft.RestartNode(c)

After creating a Node, the user has a few responsibilities:

First, read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2.

  1. Write HardState, Entries, and Snapshot to persistent storage if they are not empty. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded.

  2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). To reduce the I/O latency, an optimization can be applied to make leader write to disk in parallel with its followers (as explained at section 10.2.1 in Raft thesis). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large). Note: Marshalling messages is not thread-safe; it is important to make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialise the messages directly inside the main raft loop.

  3. Apply Snapshot (if any) and CommittedEntries to the state machine. If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() to apply it to the node. The configuration change may be cancelled at this point by setting the NodeID field to zero before calling ApplyConfChange (but ApplyConfChange must be called one way or the other, and the decision to cancel must be based solely on the state machine and not external information such as the observed health of the node).

  4. Call Node.Advance() to signal readiness for the next batch of updates. This may be done at any time after step 1, although all updates must be processed in the order they were returned by Ready.

Second, all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this (if repopulating its state upon a restart), or a custom disk-backed implementation can be supplied.

Third, after receiving a message from another node, pass it to Node.Step:

	func recvRaftRPC(ctx context.Context, m raftpb.Message) {
		n.Step(ctx, m)
	}

Finally, call Node.Tick() at regular intervals (probably via a time.Ticker). Raft has two important timeouts: heartbeat and the election timeout. However, internally to the raft package time is represented by an abstract "tick".

The total state machine handling loop will look something like this:

  for {
    select {
    case <-s.Ticker:
      n.Tick()
    case rd := <-s.Node.Ready():
      saveToStorage(rd.State, rd.Entries, rd.Snapshot)
      send(rd.Messages)
      if !raft.IsEmptySnap(rd.Snapshot) {
        processSnapshot(rd.Snapshot)
      }
      for _, entry := range rd.CommittedEntries {
        process(entry)
        if entry.Type == raftpb.EntryConfChange {
          var cc raftpb.ConfChange
          cc.Unmarshal(entry.Data)
          s.Node.ApplyConfChange(cc)
        }
      }
      s.Node.Advance()
    case <-s.done:
      return
    }
  }

To propose changes to the state machine from the node to take application data, serialize it into a byte slice and call:

	n.Propose(ctx, data)

If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; the command may have to be reproposed after a timeout.

To add or remove node in a cluster, build ConfChange struct 'cc' and call:

	n.ProposeConfChange(ctx, cc)

After config change is committed, some committed entry with type raftpb.EntryConfChange will be returned. This must be applied to node through:

	var cc raftpb.ConfChange
	cc.Unmarshal(data)
	n.ApplyConfChange(cc)

Note: An ID represents a unique node in a cluster for all time. A given ID MUST be used only once even if the old node has been removed. This means that for example IP addresses make poor node IDs since they may be reused. Node IDs must be non-zero.

Implementation notes

This implementation is up to date with the final Raft thesis (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although this implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.

To ensure there is no attempt to commit two membership changes at once by matching log positions (which would be unsafe since they should have different quorum requirements), any proposed membership change is simply disallowed while any uncommitted change appears in the leader's log.

This approach introduces a problem when removing a member from a two-member cluster: If one of the members dies before the other one receives the commit of the confchange entry, then the member cannot be removed any more since the cluster cannot make progress. For this reason it is highly recommended to use three or more nodes in every cluster.

Documentation

Overview

Package raft sends and receives messages in the Protocol Buffer format defined in the raftpb package.

Raft is a protocol with which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log. For more details on Raft, see "In Search of an Understandable Consensus Algorithm" (https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.

A simple example application, _raftexample_, is also available to help illustrate how to use this package in practice: https://github.com/coreos/etcd/tree/master/contrib/raftexample

Usage

The primary object in raft is a Node. You either start a Node from scratch using raft.StartNode or start a Node from some initial state using raft.RestartNode.

To start a node from scratch:

storage := raft.NewMemoryStorage()
c := &Config{
  ID:              0x01,
  ElectionTick:    10,
  HeartbeatTick:   1,
  Storage:         storage,
  MaxSizePerMsg:   4096,
  MaxInflightMsgs: 256,
}
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

To restart a node from previous state:

storage := raft.NewMemoryStorage()

// recover the in-memory storage from persistent
// snapshot, state and entries.
storage.ApplySnapshot(snapshot)
storage.SetHardState(state)
storage.Append(entries)

c := &Config{
  ID:              0x01,
  ElectionTick:    10,
  HeartbeatTick:   1,
  Storage:         storage,
  MaxSizePerMsg:   4096,
  MaxInflightMsgs: 256,
}

// restart raft without peer information.
// peer information is already included in the storage.
n := raft.RestartNode(c)

Now that you are holding onto a Node you have a few responsibilities:

First, you must read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2.

1. Write HardState, Entries, and Snapshot to persistent storage if they are not empty. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded.

2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). To reduce the I/O latency, an optimization can be applied to make leader write to disk in parallel with its followers (as explained at section 10.2.1 in Raft thesis). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large).

Note: Marshalling messages is not thread-safe; it is important that you make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialise the messages directly inside your main raft loop.

3. Apply Snapshot (if any) and CommittedEntries to the state machine. If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() to apply it to the node. The configuration change may be cancelled at this point by setting the NodeID field to zero before calling ApplyConfChange (but ApplyConfChange must be called one way or the other, and the decision to cancel must be based solely on the state machine and not external information such as the observed health of the node).

4. Call Node.Advance() to signal readiness for the next batch of updates. This may be done at any time after step 1, although all updates must be processed in the order they were returned by Ready.

Second, all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this (if you repopulate its state upon a restart), or you can supply your own disk-backed implementation.

Third, when you receive a message from another node, pass it to Node.Step:

func recvRaftRPC(ctx context.Context, m raftpb.Message) {
	n.Step(ctx, m)
}

Finally, you need to call Node.Tick() at regular intervals (probably via a time.Ticker). Raft has two important timeouts: heartbeat and the election timeout. However, internally to the raft package time is represented by an abstract "tick".

The total state machine handling loop will look something like this:

for {
  select {
  case <-s.Ticker:
    n.Tick()
  case rd := <-s.Node.Ready():
    saveToStorage(rd.State, rd.Entries, rd.Snapshot)
    send(rd.Messages)
    if !raft.IsEmptySnap(rd.Snapshot) {
      processSnapshot(rd.Snapshot)
    }
    for _, entry := range rd.CommittedEntries {
      process(entry)
      if entry.Type == raftpb.EntryConfChange {
        var cc raftpb.ConfChange
        cc.Unmarshal(entry.Data)
        s.Node.ApplyConfChange(cc)
      }
    }
    s.Node.Advance()
  case <-s.done:
    return
  }
}

To propose changes to the state machine from your node take your application data, serialize it into a byte slice and call:

n.Propose(ctx, data)

If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; you may have to re-propose after a timeout.

To add or remove node in a cluster, build ConfChange struct 'cc' and call:

n.ProposeConfChange(ctx, cc)

After config change is committed, some committed entry with type raftpb.EntryConfChange will be returned. You must apply it to node through:

var cc raftpb.ConfChange
cc.Unmarshal(data)
n.ApplyConfChange(cc)

Note: An ID represents a unique node in a cluster for all time. A given ID MUST be used only once even if the old node has been removed. This means that for example IP addresses make poor node IDs since they may be reused. Node IDs must be non-zero.

Implementation notes

This implementation is up to date with the final Raft thesis (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.

To ensure that we do not attempt to commit two membership changes at once by matching log positions (which would be unsafe since they should have different quorum requirements), we simply disallow any proposed membership change while any uncommitted change appears in the leader's log.

This approach introduces a problem when you try to remove a member from a two-member cluster: If one of the members dies before the other one receives the commit of the confchange entry, then the member cannot be removed any more since the cluster cannot make progress. For this reason it is highly recommended to use three or more nodes in every cluster.

MessageType

Package raft sends and receives message in Protocol Buffer format (defined in raftpb package). Each state (follower, candidate, leader) implements its own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when advancing with the given raftpb.Message. Each step is determined by its raftpb.MessageType. Note that every step is checked by one common method 'Step' that safety-checks the terms of node and incoming message to prevent stale log entries:

'MsgHup' is used for election. If a node is a follower or candidate, the
'tick' function in 'raft' struct is set as 'tickElection'. If a follower or
candidate has not received any heartbeat before the election timeout, it
passes 'MsgHup' to its Step method and becomes (or remains) a candidate to
start a new election.

'MsgBeat' is an internal type that signals the leader to send a heartbeat of
the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in
the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to
send periodic 'MsgHeartbeat' messages to its followers.

'MsgProp' proposes to append data to its log entries. This is a special
type to redirect proposals to leader. Therefore, send method overwrites
raftpb.Message's term with its HardState's term to avoid attaching its
local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step'
method, the leader first calls the 'appendEntry' method to append entries
to its log, and then calls 'bcastAppend' method to send those entries to
its peers. When passed to candidate, 'MsgProp' is dropped. When passed to
follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send
method. It is stored with sender's ID and later forwarded to leader by
rafthttp package.

'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
back to follower, because it indicates that there is a valid leader sending
'MsgApp' messages. Candidate and follower respond to this message in
'MsgAppResp' type.

'MsgAppResp' is response to log replication request('MsgApp'). When
'MsgApp' is passed to candidate or follower's Step method, it responds by
calling 'handleAppendEntries' method, which sends 'MsgAppResp' to raft
mailbox.

'MsgVote' requests votes for election. When a node is a follower or
candidate and 'MsgHup' is passed to its Step method, then the node calls
'campaign' method to campaign itself to become a leader. Once 'campaign'
method is called, the node becomes candidate and sends 'MsgVote' to peers
in cluster to request votes. When passed to leader or candidate's Step
method and the message's Term is lower than leader's or candidate's,
'MsgVote' will be rejected ('MsgVoteResp' is returned with Reject true).
If leader or candidate receives 'MsgVote' with higher term, it will revert
back to follower. When 'MsgVote' is passed to follower, it votes for the
sender only when sender's last term is greater than MsgVote's term or
sender's last term is equal to MsgVote's term but sender's last committed
index is greater than or equal to follower's.

'MsgVoteResp' contains responses from voting request. When 'MsgVoteResp' is
passed to candidate, the candidate calculates how many votes it has won. If
it's more than majority (quorum), it becomes leader and calls 'bcastAppend'.
If candidate receives majority of votes of denials, it reverts back to
follower.

'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
protocol. When Config.PreVote is true, a pre-election is carried out first
(using the same rules as a regular election), and no node increases its term
number unless the pre-election indicates that the campaigining node would win.
This minimizes disruption when a partitioned node rejoins the cluster.

'MsgSnap' requests to install a snapshot message. When a node has just
become a leader or the leader receives 'MsgProp' message, it calls
'bcastAppend' method, which then calls 'sendAppend' method to each
follower. In 'sendAppend', if a leader fails to get term or entries,
the leader requests snapshot by sending 'MsgSnap' type message.

'MsgSnapStatus' tells the result of snapshot install message. When a
follower rejected 'MsgSnap', it indicates the snapshot request with
'MsgSnap' had failed from network issues which causes the network layer
to fail to send out snapshots to its followers. Then leader considers
follower's progress as probe. When 'MsgSnap' were not rejected, it
indicates that the snapshot succeeded and the leader sets follower's
progress to probe and resumes its log replication.

'MsgHeartbeat' sends heartbeat from leader. When 'MsgHeartbeat' is passed
to candidate and message's term is higher than candidate's, the candidate
reverts back to follower and updates its committed index from the one in
this heartbeat. And it sends the message to its mailbox. When
'MsgHeartbeat' is passed to follower's Step method and message's term is
higher than follower's, the follower updates its leaderID with the ID
from the message.

'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
is passed to leader's Step method, the leader knows which follower
responded. And only when the leader's last committed index is greater than
follower's Match index, the leader runs 'sendAppend` method.

'MsgUnreachable' tells that request(message) wasn't delivered. When
'MsgUnreachable' is passed to leader's Step method, the leader discovers
that the follower that sent this 'MsgUnreachable' is not reachable, often
indicating 'MsgApp' is lost. When follower's progress state is replicate,
the leader sets it back to probe.

Index

Examples

Constants

View Source
const None uint64 = 0

None is a placeholder node ID used when there is no leader.

Variables

View Source
var ErrCompacted = errors.New("requested index is unavailable due to compaction")

ErrCompacted is returned by Storage.Entries/Compact when a requested index is unavailable because it predates the last snapshot.

View Source
var ErrProposalQueueTimeout = errors.New("queue proposal timeout")
View Source
var ErrProposalQueueTooFull = errors.New("queue proposal too full")
View Source
var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")

ErrSnapOutOfDate is returned by Storage.CreateSnapshot when a requested index is older than the existing snapshot.

View Source
var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unavailable")

ErrSnapshotTemporarilyUnavailable is returned by the Storage interface when the required snapshot is temporarily unavailable.

View Source
var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")

ErrStepLocalMsg is returned when try to step a local raft message

View Source
var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")

ErrStepPeerNotFound is returned when try to step a response message but there is no peer found in raft.prs for that node.

View Source
var (

	// ErrStopped is returned by methods on Nodes that have been stopped.
	ErrStopped = errors.New("raft: stopped")
)
View Source
var ErrUnavailable = errors.New("requested entry at index is unavailable")

ErrUnavailable is returned by Storage interface when the requested log entries are unavailable.

Functions

func DescribeEntry

func DescribeEntry(e pb.Entry, f EntryFormatter) string

DescribeEntry returns a concise human-readable description of an Entry for debugging.

func DescribeMessage

func DescribeMessage(m pb.Message, f EntryFormatter) string

DescribeMessage returns a concise human-readable description of a Message for debugging.

func IsEmptyHardState

func IsEmptyHardState(st pb.HardState) bool

IsEmptyHardState returns true if the given HardState is empty.

func IsEmptySnap

func IsEmptySnap(sp pb.Snapshot) bool

IsEmptySnap returns true if the given Snapshot is empty.

func IsFromLocalNodeMsg

func IsFromLocalNodeMsg(r *raft, m *pb.Message) bool

func IsLocalMsg

func IsLocalMsg(msgt pb.MessageType) bool

func IsResponseMsg

func IsResponseMsg(msgt pb.MessageType) bool

func MustSync

func MustSync(st, prevst pb.HardState, entsnum int) bool

MustSync returns true if the hard state and count of Raft entries indicate that a synchronous write to persistent storage is required.

func SetLogger

func SetLogger(l Logger)

Types

type BadgerStorage added in v0.6.0

type BadgerStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex
	// contains filtered or unexported fields
}

BadgerStorage implements the Storage interface backed by badger.

func NewBadgerStorage added in v0.6.0

func NewBadgerStorage(id uint64, gid uint32, dir string) (*BadgerStorage, error)

func (*BadgerStorage) Append added in v0.6.0

func (ms *BadgerStorage) Append(entries []pb.Entry) error

Append the new entries to storage. TODO (xiangli): ensure the entries are continuous and entries[0].Index > ms.entries[0].Index

func (*BadgerStorage) ApplySnapshot added in v0.6.0

func (ms *BadgerStorage) ApplySnapshot(snap pb.Snapshot) error

ApplySnapshot overwrites the contents of this Storage object with those of the given snapshot. delete all the entries up until the snapshot index. But, keep the raft entry at the snapshot index, to make it easier to build the logic; like the dummy entry in BadgerStorage.

func (*BadgerStorage) Close added in v0.6.0

func (ms *BadgerStorage) Close()

func (*BadgerStorage) Compact added in v0.6.0

func (ms *BadgerStorage) Compact(compactIndex uint64) error

Compact discards all log entries prior to compactIndex. It is the application's responsibility to not attempt to compact an index greater than raftLog.applied.

func (*BadgerStorage) CreateSnapshot added in v0.6.0

func (ms *BadgerStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)

CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and can be used to reconstruct the state at that point. If any configuration changes have been made since the last compaction, the result of the last ApplyConfChange must be passed in.

func (*BadgerStorage) Entries added in v0.6.0

func (ms *BadgerStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)

Entries implements the Storage interface.

func (*BadgerStorage) FirstIndex added in v0.6.0

func (ms *BadgerStorage) FirstIndex() (uint64, error)

FirstIndex implements the Storage interface.

func (*BadgerStorage) InitialState added in v0.6.0

func (ms *BadgerStorage) InitialState() (pb.HardState, pb.ConfState, error)

InitialState implements the Storage interface.

func (*BadgerStorage) LastIndex added in v0.6.0

func (ms *BadgerStorage) LastIndex() (uint64, error)

LastIndex implements the Storage interface.

func (*BadgerStorage) NumEntries added in v0.6.0

func (ms *BadgerStorage) NumEntries() (int, error)

NumEntries return the number of all entries in db

func (*BadgerStorage) SetHardState added in v0.6.0

func (ms *BadgerStorage) SetHardState(st pb.HardState) error

SetHardState saves the current HardState.

func (*BadgerStorage) Snapshot added in v0.6.0

func (ms *BadgerStorage) Snapshot() (pb.Snapshot, error)

Snapshot implements the Storage interface.

func (*BadgerStorage) Term added in v0.6.0

func (ms *BadgerStorage) Term(idx uint64) (uint64, error)

Term implements the Storage interface.

type CampaignType

type CampaignType string

CampaignType represents the type of campaigning the reason we use the type of string instead of uint64 is because it's simpler to compare and fill in raft entries

type Config

type Config struct {
	// ID is the identity of the local raft. ID cannot be 0.
	ID uint64

	Group pb.Group

	// ElectionTick is the number of Node.Tick invocations that must pass between
	// elections. That is, if a follower does not receive any message from the
	// leader of current term before ElectionTick has elapsed, it will become
	// candidate and start an election. ElectionTick must be greater than
	// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
	// unnecessary leader switching.
	ElectionTick int
	// HeartbeatTick is the number of Node.Tick invocations that must pass between
	// heartbeats. That is, a leader sends heartbeat messages to maintain its
	// leadership every HeartbeatTick ticks.
	HeartbeatTick int

	// Storage is the storage for raft. raft generates entries and states to be
	// stored in storage. raft reads the persisted entries and states out of
	// Storage when it needs. raft reads out the previous state and configuration
	// out of storage when restarting.
	Storage Storage
	// Applied is the last applied index. It should only be set when restarting
	// raft. raft will not return entries to the application smaller or equal to
	// Applied. If Applied is unset when restarting, raft might return previous
	// applied entries. This is a very application dependent configuration.
	Applied uint64

	// MaxSizePerMsg limits the max size of each append message. Smaller value
	// lowers the raft recovery cost(initial probing and message lost during normal
	// operation). On the other side, it might affect the throughput during normal
	// replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
	// message.
	MaxSizePerMsg uint64
	// MaxCommittedSizePerReady limits the size of the committed entries which
	// can be applied.
	MaxCommittedSizePerReady uint64
	// MaxInflightMsgs limits the max number of in-flight append messages during
	// optimistic replication phase. The application transportation layer usually
	// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
	// overflowing that sending buffer. TODO (xiangli): feedback to application to
	// limit the proposal rate?
	MaxInflightMsgs int

	// CheckQuorum specifies if the leader should check quorum activity. Leader
	// steps down when quorum is not active for an electionTimeout.
	CheckQuorum bool

	// PreVote enables the Pre-Vote algorithm described in raft thesis section
	// 9.6. This prevents disruption when a node that has been partitioned away
	// rejoins the cluster.
	PreVote bool

	// ReadOnlyOption specifies how the read only request is processed.
	//
	// ReadOnlySafe guarantees the linearizability of the read only request by
	// communicating with the quorum. It is the default and suggested option.
	//
	// ReadOnlyLeaseBased ensures linearizability of the read only request by
	// relying on the leader lease. It can be affected by clock drift.
	// If the clock drift is unbounded, leader might keep the lease longer than it
	// should (clock can move backward/pause without any bound). ReadIndex is not safe
	// in that case.
	// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
	ReadOnlyOption ReadOnlyOption

	// Logger is the logger used for raft log. For multinode which can host
	// multiple raft group, each raft group can have its own logger
	Logger Logger

	// DisableProposalForwarding set to true means that followers will drop
	// proposals, rather than forwarding them to the leader. One use case for
	// this feature would be in a situation where the Raft leader is used to
	// compute the data of a proposal, for example, adding a timestamp from a
	// hybrid logical clock to data in a monotonically increasing way. Forwarding
	// should be disabled to prevent a follower with an innaccurate hybrid
	// logical clock from assigning the timestamp and then forwarding the data
	// to the leader.
	DisableProposalForwarding bool
	// contains filtered or unexported fields
}

Config contains the parameters to start a raft.

type DefaultLogger

type DefaultLogger struct {
	*log.Logger
	// contains filtered or unexported fields
}

DefaultLogger is a default implementation of the Logger interface.

func (*DefaultLogger) Debug

func (l *DefaultLogger) Debug(v ...interface{})

func (*DefaultLogger) Debugf

func (l *DefaultLogger) Debugf(format string, v ...interface{})

func (*DefaultLogger) EnableDebug

func (l *DefaultLogger) EnableDebug()

func (*DefaultLogger) EnableTimestamps

func (l *DefaultLogger) EnableTimestamps()

func (*DefaultLogger) Error

func (l *DefaultLogger) Error(v ...interface{})

func (*DefaultLogger) Errorf

func (l *DefaultLogger) Errorf(format string, v ...interface{})

func (*DefaultLogger) Fatal

func (l *DefaultLogger) Fatal(v ...interface{})

func (*DefaultLogger) Fatalf

func (l *DefaultLogger) Fatalf(format string, v ...interface{})

func (*DefaultLogger) Info

func (l *DefaultLogger) Info(v ...interface{})

func (*DefaultLogger) Infof

func (l *DefaultLogger) Infof(format string, v ...interface{})

func (*DefaultLogger) Panic

func (l *DefaultLogger) Panic(v ...interface{})

func (*DefaultLogger) Panicf

func (l *DefaultLogger) Panicf(format string, v ...interface{})

func (*DefaultLogger) Warning

func (l *DefaultLogger) Warning(v ...interface{})

func (*DefaultLogger) Warningf

func (l *DefaultLogger) Warningf(format string, v ...interface{})

type EntryFormatter

type EntryFormatter func([]byte) string

EntryFormatter can be implemented by the application to provide human-readable formatting of entry data. Nil is a valid EntryFormatter and will use a default format.

type IExtRaftStorage added in v0.6.0

type IExtRaftStorage interface {
	Storage
	// Close closes the Storage and performs finalization.
	Close()
	ApplySnapshot(pb.Snapshot) error
	SetHardState(pb.HardState) error
	CreateSnapshot(uint64, *pb.ConfState, []byte) (pb.Snapshot, error)
	Compact(uint64) error
	Append([]pb.Entry) error
}

func NewMemoryStorage

func NewMemoryStorage() IExtRaftStorage

NewMemoryStorage creates an default MemoryStorage. This should only be used in test

type Logger

type Logger interface {
	Debug(v ...interface{})
	Debugf(format string, v ...interface{})

	Error(v ...interface{})
	Errorf(format string, v ...interface{})

	Info(v ...interface{})
	Infof(format string, v ...interface{})

	Warning(v ...interface{})
	Warningf(format string, v ...interface{})

	Fatal(v ...interface{})
	Fatalf(format string, v ...interface{})

	Panic(v ...interface{})
	Panicf(format string, v ...interface{})
}

type MemoryStorage

type MemoryStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex
	// contains filtered or unexported fields
}

MemoryStorage implements the Storage interface backed by an in-memory array.

func NewRealMemoryStorage added in v0.6.0

func NewRealMemoryStorage() *MemoryStorage

func (*MemoryStorage) Append

func (ms *MemoryStorage) Append(entries []pb.Entry) error

Append the new entries to storage. TODO (xiangli): ensure the entries are continuous and entries[0].Index > ms.entries[0].Index

func (*MemoryStorage) ApplySnapshot

func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error

ApplySnapshot overwrites the contents of this Storage object with those of the given snapshot.

func (*MemoryStorage) Close added in v0.6.0

func (ms *MemoryStorage) Close()

func (*MemoryStorage) Compact

func (ms *MemoryStorage) Compact(compactIndex uint64) error

Compact discards all log entries prior to compactIndex. It is the application's responsibility to not attempt to compact an index greater than raftLog.applied.

func (*MemoryStorage) CreateSnapshot

func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)

CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and can be used to reconstruct the state at that point. If any configuration changes have been made since the last compaction, the result of the last ApplyConfChange must be passed in.

func (*MemoryStorage) Entries

func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)

Entries implements the Storage interface.

func (*MemoryStorage) FirstIndex

func (ms *MemoryStorage) FirstIndex() (uint64, error)

FirstIndex implements the Storage interface.

func (*MemoryStorage) InitialState

func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error)

InitialState implements the Storage interface.

func (*MemoryStorage) LastIndex

func (ms *MemoryStorage) LastIndex() (uint64, error)

LastIndex implements the Storage interface.

func (*MemoryStorage) SetHardState

func (ms *MemoryStorage) SetHardState(st pb.HardState) error

SetHardState saves the current HardState.

func (*MemoryStorage) Snapshot

func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error)

Snapshot implements the Storage interface.

func (*MemoryStorage) Term

func (ms *MemoryStorage) Term(i uint64) (uint64, error)

Term implements the Storage interface.

type MessageQueue added in v0.7.1

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue is the queue used to hold Raft messages.

func NewMessageQueue added in v0.7.1

func NewMessageQueue(size uint64, ch bool, lazyFreeCycle uint64) *MessageQueue

NewMessageQueue creates a new MessageQueue instance.

func (*MessageQueue) Add added in v0.7.1

func (q *MessageQueue) Add(msg raftpb.Message) (bool, bool)

Add adds the specified message to the queue.

func (*MessageQueue) AddSnapshot added in v0.7.1

func (q *MessageQueue) AddSnapshot(msg raftpb.Message) bool

AddSnapshot adds the specified snapshot to the queue.

func (*MessageQueue) Ch added in v0.7.1

func (q *MessageQueue) Ch() <-chan struct{}

Ch returns the notification channel.

func (*MessageQueue) Close added in v0.7.1

func (q *MessageQueue) Close()

Close closes the queue so no further messages can be added.

func (*MessageQueue) Get added in v0.7.1

func (q *MessageQueue) Get() []raftpb.Message

Get returns everything current in the queue.

func (*MessageQueue) Notify added in v0.7.1

func (q *MessageQueue) Notify()

Notify notifies the notification channel listener that a new message is now available in the queue.

type Node

type Node interface {
	// Tick increments the internal logical clock for the Node by a single tick. Election
	// timeouts and heartbeat timeouts are in units of ticks.
	Tick() bool
	// Campaign causes the Node to transition to candidate state and start campaigning to become leader.
	Campaign(ctx context.Context) error
	// Propose proposes that data be appended to the log.
	Propose(ctx context.Context, data []byte) error
	// Propose proposed that data be appended to the log and cancel it if dropped
	ProposeWithDrop(ctx context.Context, data []byte, cancel context.CancelFunc) error
	ProposeEntryWithDrop(ctx context.Context, e pb.Entry, cancel context.CancelFunc) error
	// ProposeConfChange proposes config change.
	// At most one ConfChange can be in the process of going through consensus.
	// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
	ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
	// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
	Step(ctx context.Context, msg pb.Message) error

	// StepNode handle raft events and returns the current point-in-time state.
	// Users of the Node must call Advance after retrieving the state returned by StepNode.
	//
	// NOTE: No committed entries from the next Ready may be applied until all committed entries
	// and snapshots from the previous one have finished.
	StepNode(moreApplyEntries bool, busySnap bool) (Ready, bool)
	// EventNotifyCh is used to notify or receive the notify for the raft event
	EventNotifyCh() chan bool
	// NotifyEventCh will notify the raft loop event to check new event
	NotifyEventCh()

	ConfChangedCh() <-chan pb.ConfChange
	// HandleConfChanged will  handle configure change event
	HandleConfChanged(cc pb.ConfChange)
	// Advance notifies the Node that the application has saved progress up to the last Ready.
	// It prepares the node to return the next available Ready.
	//
	// The application should generally call Advance after it applies the entries in last Ready.
	//
	// However, as an optimization, the application may call Advance while it is applying the
	// commands. For example. when the last Ready contains a snapshot, the application might take
	// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
	// progress, it can call Advance before finishing applying the last ready.
	Advance(rd Ready)
	// ApplyConfChange applies config change to the local node.
	// Returns an opaque ConfState protobuf which must be recorded
	// in snapshots. Will never return nil; it returns a pointer only
	// to match MemoryStorage.Compact.
	ApplyConfChange(cc pb.ConfChange) *pb.ConfState

	// TransferLeadership attempts to transfer leadership to the given transferee.
	TransferLeadership(ctx context.Context, lead, transferee uint64)

	// ReadIndex request a read state. The read state will be set in the ready.
	// Read state has a read index. Once the application advances further than the read
	// index, any linearizable read requests issued before the read request can be
	// processed safely. The read state will have the same rctx attached.
	ReadIndex(ctx context.Context, rctx []byte) error

	// Status returns the current status of the raft state machine.
	Status() Status
	// ReportUnreachable reports the given node is not reachable for the last send.
	ReportUnreachable(id uint64, group pb.Group)
	// ReportSnapshot reports the status of the sent snapshot.
	ReportSnapshot(id uint64, group pb.Group, status SnapshotStatus)
	// Stop performs any necessary termination of the Node.
	Stop()
	DebugString() string
}

Node represents a node in a raft cluster.

Example
package main

import (
	pb "github.com/youzan/ZanRedisDB/raft/raftpb"
)

func applyToStore(ents []pb.Entry)    {}
func sendMessages(msgs []pb.Message)  {}
func saveStateToDisk(st pb.HardState) {}
func saveToDisk(ents []pb.Entry)      {}

func main() {
	c := &Config{}
	n := StartNode(c, nil, false)
	defer n.Stop()

	// stuff to n happens in other goroutines

	// the last known state
	var prev pb.HardState
	for {
		// Ready blocks until there is new state ready.
		<-n.EventNotifyCh()
		rd, hasEvent := n.StepNode(true, false)
		if !hasEvent {
			continue
		}
		if !isHardStateEqual(prev, rd.HardState) {
			saveStateToDisk(rd.HardState)
			prev = rd.HardState
		}

		saveToDisk(rd.Entries)
		go applyToStore(rd.CommittedEntries)
		sendMessages(rd.Messages)
	}
}
Output:

func RestartNode

func RestartNode(c *Config) Node

RestartNode is similar to StartNode but does not take a list of peers. The current membership of the cluster will be restored from the Storage. If the caller has an existing state machine, pass in the last log index that has been applied to it; otherwise use zero.

func StartNode

func StartNode(c *Config, peers []Peer, isLearner bool) Node

StartNode returns a new Node given configuration and a list of raft peers. It appends a ConfChangeAddNode entry for each given peer to the initial log.

type Peer

type Peer struct {
	NodeID    uint64
	ReplicaID uint64
	Context   []byte
}

type Progress

type Progress struct {
	Match, Next uint64
	// State defines how the leader should interact with the follower.
	//
	// When in ProgressStateProbe, leader sends at most one replication message
	// per heartbeat interval. It also probes actual progress of the follower.
	//
	// When in ProgressStateReplicate, leader optimistically increases next
	// to the latest entry sent after sending replication message. This is
	// an optimized state for fast replicating log entries to the follower.
	//
	// When in ProgressStateSnapshot, leader should have sent out snapshot
	// before and stops sending any replication message.
	State ProgressStateType
	// Paused is used in ProgressStateProbe.
	// When Paused is true, raft should pause sending replication message to this peer.
	Paused bool
	// PendingSnapshot is used in ProgressStateSnapshot.
	// If there is a pending snapshot, the pendingSnapshot will be set to the
	// index of the snapshot. If pendingSnapshot is set, the replication process of
	// this Progress will be paused. raft will not resend snapshot until the pending one
	// is reported to be failed.
	PendingSnapshot uint64

	// RecentActive is true if the progress is recently active. Receiving any messages
	// from the corresponding follower indicates the progress is active.
	// RecentActive can be reset to false after an election timeout.
	RecentActive bool

	// IsLearner is true if this progress is tracked for a learner.
	IsLearner bool
	// contains filtered or unexported fields
}

Progress represents a follower’s progress in the view of the leader. Leader maintains progresses of all followers, and sends entries to the follower based on its progress.

func (*Progress) IsPaused

func (pr *Progress) IsPaused() bool

IsPaused returns whether sending log entries to this node has been paused. A node may be paused because it has rejected recent MsgApps, is currently waiting for a snapshot, or has reached the MaxInflightMsgs limit.

func (*Progress) String

func (pr *Progress) String() string

type ProgressStateType

type ProgressStateType uint64
const (
	ProgressStateProbe ProgressStateType = iota
	ProgressStateReplicate
	ProgressStateSnapshot
)

func (ProgressStateType) String

func (st ProgressStateType) String() string

type ProgressType added in v0.6.0

type ProgressType byte

ProgressType indicates the type of replica a Progress corresponds to.

const (
	// ProgressTypePeer accompanies a Progress for a regular peer replica.
	ProgressTypePeer ProgressType = iota
	// ProgressTypeLearner accompanies a Progress for a learner replica.
	ProgressTypeLearner
)

type ProposalQueue added in v0.7.1

type ProposalQueue struct {
	// contains filtered or unexported fields
}

ProposalQueue is the queue used to hold Raft messages.

func NewProposalQueue added in v0.7.1

func NewProposalQueue(size uint64, lazyFreeCycle uint64) *ProposalQueue

NewProposalQueue creates a new ProposalQueue instance.

func (*ProposalQueue) Add added in v0.7.1

func (q *ProposalQueue) Add(msg msgWithDrop) (bool, bool, chan struct{})

Add adds the specified message to the queue.

func (*ProposalQueue) AddWait added in v0.7.1

func (q *ProposalQueue) AddWait(ctx context.Context, msg msgWithDrop,
	to time.Duration, stopC chan struct{}) (bool, bool, error)

AddWait adds the specified message to the queue and wait if full.

func (*ProposalQueue) Close added in v0.7.1

func (q *ProposalQueue) Close()

Close closes the queue so no further messages can be added.

func (*ProposalQueue) Get added in v0.7.1

func (q *ProposalQueue) Get() []msgWithDrop

Get returns everything current in the queue.

type RawNode

type RawNode struct {
	// contains filtered or unexported fields
}

RawNode is a thread-unsafe Node. The methods of this struct correspond to the methods of Node and are described more fully there.

func NewRawNode

func NewRawNode(config *Config, peers []Peer) (*RawNode, error)

NewRawNode returns a new RawNode given configuration and a list of raft peers.

func (*RawNode) Advance

func (rn *RawNode) Advance(rd Ready)

Advance notifies the RawNode that the application has applied and saved progress in the last Ready results.

func (*RawNode) ApplyConfChange

func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState

ApplyConfChange applies a config change to the local node.

func (*RawNode) Campaign

func (rn *RawNode) Campaign() error

Campaign causes this RawNode to transition to candidate state.

func (*RawNode) HasReady

func (rn *RawNode) HasReady() bool

HasReady called when RawNode user need to check if any Ready pending. Checking logic in this method should be consistent with Ready.containsUpdates().

func (*RawNode) Propose

func (rn *RawNode) Propose(data []byte) error

Propose proposes data be appended to the raft log.

func (*RawNode) ProposeConfChange

func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error

ProposeConfChange proposes a config change.

func (*RawNode) ReadIndex

func (rn *RawNode) ReadIndex(rctx []byte)

ReadIndex requests a read state. The read state will be set in ready. Read State has a read index. Once the application advances further than the read index, any linearizable read requests issued before the read request can be processed safely. The read state will have the same rctx attached.

func (*RawNode) Ready

func (rn *RawNode) Ready() Ready

Ready returns the current point-in-time state of this RawNode.

func (*RawNode) ReportSnapshot

func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus)

ReportSnapshot reports the status of the sent snapshot.

func (*RawNode) ReportUnreachable

func (rn *RawNode) ReportUnreachable(id uint64)

ReportUnreachable reports the given node is not reachable for the last send.

func (*RawNode) Status

func (rn *RawNode) Status() *Status

Status returns the current status of the given group.

func (*RawNode) StatusWithoutProgress added in v0.6.0

func (rn *RawNode) StatusWithoutProgress() Status

StatusWithoutProgress returns a Status without populating the Progress field (and returns the Status as a value to avoid forcing it onto the heap). This is more performant if the Progress is not required. See WithProgress for an allocation-free way to introspect the Progress.

func (*RawNode) Step

func (rn *RawNode) Step(m pb.Message) error

Step advances the state machine using the given message.

func (*RawNode) Tick

func (rn *RawNode) Tick()

Tick advances the internal logical clock by a single tick.

func (*RawNode) TickQuiesced

func (rn *RawNode) TickQuiesced()

TickQuiesced advances the internal logical clock by a single tick without performing any other state machine processing. It allows the caller to avoid periodic heartbeats and elections when all of the peers in a Raft group are known to be at the same state. Expected usage is to periodically invoke Tick or TickQuiesced depending on whether the group is "active" or "quiesced".

WARNING: Be very careful about using this method as it subverts the Raft state machine. You should probably be using Tick instead.

func (*RawNode) TransferLeader

func (rn *RawNode) TransferLeader(transferee uint64)

TransferLeader tries to transfer leadership to the given transferee.

func (*RawNode) WithProgress added in v0.6.0

func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress))

WithProgress is a helper to introspect the Progress for this node and its peers.

type ReadOnlyOption

type ReadOnlyOption int
const (
	// ReadOnlySafe guarantees the linearizability of the read only request by
	// communicating with the quorum. It is the default and suggested option.
	ReadOnlySafe ReadOnlyOption = iota
	// ReadOnlyLeaseBased ensures linearizability of the read only request by
	// relying on the leader lease. It can be affected by clock drift.
	// If the clock drift is unbounded, leader might keep the lease longer than it
	// should (clock can move backward/pause without any bound). ReadIndex is not safe
	// in that case.
	ReadOnlyLeaseBased
)

type ReadState

type ReadState struct {
	Index      uint64
	RequestCtx []byte
}

type Ready

type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry
	// Whether there are more committed entries ready to be applied.
	MoreCommittedEntries bool
	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}

Ready encapsulates the entries and messages that are ready to read, be saved to stable storage, committed or sent to other peers. All fields in Ready are read-only.

type RocksStorage added in v0.6.0

type RocksStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex
	// contains filtered or unexported fields
}

RocksStorage implements the Storage interface backed by rocksdb.

func NewRocksStorage added in v0.6.0

func NewRocksStorage(id uint64, gid uint32, shared bool, db engine.KVEngine) *RocksStorage

func (*RocksStorage) Append added in v0.6.0

func (ms *RocksStorage) Append(entries []pb.Entry) error

Append the new entries to storage.

func (*RocksStorage) ApplySnapshot added in v0.6.0

func (ms *RocksStorage) ApplySnapshot(snap pb.Snapshot) error

ApplySnapshot overwrites the contents of this Storage object with those of the given snapshot. delete all the entries up until the snapshot index. But, keep the raft entry at the snapshot index, to make it easier to build the logic; like the dummy entry in RocksStorage.

func (*RocksStorage) Close added in v0.6.0

func (ms *RocksStorage) Close()

func (*RocksStorage) Compact added in v0.6.0

func (ms *RocksStorage) Compact(compactIndex uint64) error

Compact discards all log entries prior to compactIndex. It is the application's responsibility to not attempt to compact an index greater than raftLog.applied.

func (*RocksStorage) CreateSnapshot added in v0.6.0

func (ms *RocksStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)

CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and can be used to reconstruct the state at that point. If any configuration changes have been made since the last compaction, the result of the last ApplyConfChange must be passed in.

func (*RocksStorage) Eng added in v0.6.0

func (ms *RocksStorage) Eng() engine.KVEngine

func (*RocksStorage) Entries added in v0.6.0

func (ms *RocksStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)

Entries implements the Storage interface.

func (*RocksStorage) FirstIndex added in v0.6.0

func (ms *RocksStorage) FirstIndex() (uint64, error)

FirstIndex implements the Storage interface.

func (*RocksStorage) InitialState added in v0.6.0

func (ms *RocksStorage) InitialState() (pb.HardState, pb.ConfState, error)

InitialState implements the Storage interface.

func (*RocksStorage) LastIndex added in v0.6.0

func (ms *RocksStorage) LastIndex() (uint64, error)

LastIndex implements the Storage interface.

func (*RocksStorage) NumEntries added in v0.6.0

func (ms *RocksStorage) NumEntries() (int, error)

NumEntries return the number of all entries in db

func (*RocksStorage) SetHardState added in v0.6.0

func (ms *RocksStorage) SetHardState(st pb.HardState) error

SetHardState saves the current HardState.

func (*RocksStorage) Snapshot added in v0.6.0

func (ms *RocksStorage) Snapshot() (pb.Snapshot, error)

Snapshot implements the Storage interface.

func (*RocksStorage) Term added in v0.6.0

func (ms *RocksStorage) Term(idx uint64) (uint64, error)

Term implements the Storage interface.

type SnapshotStatus

type SnapshotStatus int
const (
	SnapshotFinish  SnapshotStatus = 1
	SnapshotFailure SnapshotStatus = 2
)

type SoftState

type SoftState struct {
	Lead      uint64 // must use atomic operations to access; keep 64-bit aligned.
	RaftState StateType
}

SoftState provides state that is useful for logging and debugging. The state is volatile and does not need to be persisted to the WAL.

type StateType

type StateType uint64

StateType represents the role of a node in a cluster.

const (
	StateFollower StateType = iota
	StateCandidate
	StateLeader
	StatePreCandidate
)

Possible values for StateType.

func (StateType) MarshalJSON

func (st StateType) MarshalJSON() ([]byte, error)

func (StateType) String

func (st StateType) String() string

type Status

type Status struct {
	ID uint64

	pb.HardState
	SoftState

	Applied        uint64
	Progress       map[uint64]Progress
	LeadTransferee uint64
}

func (Status) MarshalJSON

func (s Status) MarshalJSON() ([]byte, error)

MarshalJSON translates the raft status into JSON. TODO: try to simplify this by introducing ID type into raft

func (Status) String

func (s Status) String() string

type Storage

type Storage interface {
	// InitialState returns the saved HardState and ConfState information.
	InitialState() (pb.HardState, pb.ConfState, error)
	// Entries returns a slice of log entries in the range [lo,hi).
	// MaxSize limits the total size of the log entries returned, but
	// Entries returns at least one entry if any.
	Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
	// Term returns the term of entry i, which must be in the range
	// [FirstIndex()-1, LastIndex()]. The term of the entry before
	// FirstIndex is retained for matching purposes even though the
	// rest of that entry may not be available.
	Term(i uint64) (uint64, error)
	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)
	// FirstIndex returns the index of the first log entry that is
	// possibly available via Entries (older entries have been incorporated
	// into the latest Snapshot; if storage only contains the dummy entry the
	// first log entry is not available).
	FirstIndex() (uint64, error)
	// Snapshot returns the most recent snapshot.
	// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
	// so raft state machine could know that Storage needs some time to prepare
	// snapshot and call Snapshot later.
	Snapshot() (pb.Snapshot, error)
}

Storage is an interface that may be implemented by the application to retrieve log entries from storage.

If any Storage method returns an error, the raft instance will become inoperable and refuse to participate in elections; the application is responsible for cleanup and recovery in this case.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL