raft

package module
v0.0.0-...-002c195 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2023 License: MIT Imports: 16 Imported by: 0

README

Raft

Go Report Card GoDoc

Raft is a well known consensus algorithm that is used in distributed systems where fault-tolerance and data consensus is important. This is a working implementation of that algorithm that enables clients to inject their own systems alongside the raft. Whether it be a key-value store or something more complicated.

At the center of the development of this project was the raft paper which thoroughly describes the intricacies of the algorithm, and it's many sub-problems. Spending time researching and getting a solid understanding of how each problem should be solved prior to coding the solution.

NOTE: This implementation is purely for educational purposes so that I could continue learning more about distributed systems. This is not intended for production use. If raft is required for production, then it's encouraged that Hashicorp's library is used instead.

How It's Used?

This raft implementation is designed with flexibility in mind. Providing a platform to inject your own finite state machine and your own persistence storage.

In the example use-case, we create a Key-Value store that implements the FSM interface. Now whenever the raft node needs to apply a task to the FSM, it will apply those tasks through the provided API.

// kvStore type implements all the methods required for the FSM.
type kvStore struct {
	r    *raft.Raft
	data map[string]string
}

The client also needs to create their own implementation the LogStore and StableStore. In the example use-case we used the provided In-Memory Store; however, it's important to note that in-memory solutions are NOT meant for actual production use.

memStore := raft.NewMemStore()
// add some data to that memory store if needed

All raft nodes are part of a cluster. A cluster is a group of nodes and their associated addresses. A cluster is initialized by using a configuration file. Configuration files are json formatted with a similar format of, "id": {id: int, addr: string}

{
  "1": {
    "id": 1,
    "addr": ":6001"
  }
}

Open the config file and use it to initialize a cluster.

f, err := os.Open("config.json")
if err != nil {
    log.Fatalln(err)
}
c, err := raft.NewClusterWithConfig(f)
if err != nil {
    log.Fatalln(err)
}

Once all of the dependencies are initialized, we can now create the raft node and start serving it on the cluster.

r, err := raft.New(raftID, cluster, option, FSM, memStore, memStore)
if err != nil {
	log.Fatalf(err)
}
go func() {
    if err := r.ListenAndServe(":6001"); err != nil {
    log.Println(err)
    }
}()

// regular operation of the application as the raft node runs in a different goroutine.

In Action (Demo)

At initial startup, all raft nodes will start their servers using a defined configuration file. In this example, there are three servers with IDs of 1-3. This startup demo shows how all three servers are capable of choosing a leader though an election.

raft-startup

A major part of the raft consensus algorithm is the ability for the majority of nodes to safely persist the same data on their finite state machines. This demo shows how the leader (when given a request), replicates the log & state across all other nodes.

raft-data-populate

Features

List of features that have been developed.

  • Raft cluster leader elections.
  • Log committing & replication.
  • Fault tolerance of N/2 failures. (With N being total # of nodes.)
  • Extendable log/stable store interface that lets the client define how they want the data to be persistently stored. (An In-Memory implementation is provided for testing.)
  • Log snapshots which compact a given number of logs into one log entry. Enabling state to be saved while also limiting the length of the log entries.

Connect & Contact

Email - mathewestafanous13@gmail.com

Website - https://mathewestafanous.com

GitHub - https://github.com/Mathew-Estafanous

Documentation

Index

Constants

View Source
const (
	Follower  raftState = 'F'
	Candidate raftState = 'C'
	Leader    raftState = 'L'
)

Variables

View Source
var (
	Entry    logType = 'E'
	Snapshot logType = 'S'
)
View Source
var (
	// ErrRaftShutdown is thrown when a client operation has been issued after
	// the raft instance has shutdown.
	ErrRaftShutdown = errors.New("raft has already shutdown")

	// DefaultOpts provide a general baseline configuration setting for the raft
	// node such as election timeouts and log threshold.
	DefaultOpts = Options{
		MinElectionTimeout: 150 * time.Millisecond,
		MaxElectionTimout:  300 * time.Millisecond,
		HeartBeatTimout:    100 * time.Millisecond,
		SnapshotTimer:      1 * time.Second,
		LogThreshold:       200,
	}

	SlowOpts = Options{
		MinElectionTimeout: 1 * time.Second,
		MaxElectionTimout:  3 * time.Second,
		HeartBeatTimout:    500 * time.Millisecond,
		SnapshotTimer:      8 * time.Second,
		LogThreshold:       5,
	}
)
View Source
var (
	ErrLogNotFound   = errors.New("the log could not be found in the storage")
	ErrFailedToStore = errors.New("new log failed to properly be stored")
)

Functions

This section is empty.

Types

type Cluster

type Cluster interface {
	GetNode(id uint64) (Node, error)
	AllNodes() map[uint64]Node
	Quorum() int
}

Cluster keeps track of all other nodes and their addresses. It also holds agreed upon constants such as heart beat time and election timeout.

type DynamicCluster

type DynamicCluster struct {
	// contains filtered or unexported fields
}

func NewDynamicCluster

func NewDynamicCluster(port uint16) (*DynamicCluster, error)

func (*DynamicCluster) AllNodes

func (c *DynamicCluster) AllNodes() map[uint64]Node

func (*DynamicCluster) GetNode

func (c *DynamicCluster) GetNode(id uint64) (Node, error)

func (*DynamicCluster) Join

func (c *DynamicCluster) Join(otherAddr string, raftNode Node) error

func (*DynamicCluster) Leave

func (c *DynamicCluster) Leave() error

func (*DynamicCluster) OnMembershipChange

func (c *DynamicCluster) OnMembershipChange(peer memlist.Node)

func (*DynamicCluster) Quorum

func (c *DynamicCluster) Quorum() int

type FSM

type FSM interface {
	// Apply will be invoked when a log has been successfully committed and
	// should then be applied upon the state of the fsm.
	Apply(data []byte) error

	// Snapshot will create a byte slice representation of all the required data
	// to represent the current state of the machine.
	Snapshot() ([]byte, error)

	// Restore the entire state of the FSM to a starting state.
	Restore(cmd []byte) error
}

FSM (finite state machine) defines an interface that must be implemented by the client to receive commands sent by the raft Cluster.

type InMemStore

type InMemStore struct {
	// contains filtered or unexported fields
}

InMemStore is an implementation of the StableStore and LogStore interface. Since it is in-memory, all data is lost on shutdown.

NOTE: This implementation is meant for testing and example use-cases and is NOT meant to be used in any production environment. It is up to the client to create the persistence store.

func NewMemStore

func NewMemStore() *InMemStore

func (*InMemStore) AllLogs

func (m *InMemStore) AllLogs() ([]*Log, error)

func (*InMemStore) AppendLogs

func (m *InMemStore) AppendLogs(logs []*Log) error

func (*InMemStore) DeleteRange

func (m *InMemStore) DeleteRange(min, max int64) error

func (*InMemStore) Get

func (m *InMemStore) Get(key []byte) ([]byte, error)

func (*InMemStore) GetLog

func (m *InMemStore) GetLog(index int64) (*Log, error)

func (*InMemStore) LastIndex

func (m *InMemStore) LastIndex() int64

func (*InMemStore) LastTerm

func (m *InMemStore) LastTerm() uint64

func (*InMemStore) Set

func (m *InMemStore) Set(key, value []byte) error

type LeaderError

type LeaderError struct {
	LeaderId   uint64
	LeaderAddr string
}

LeaderError is an error that is returned when a request that is only meant for the leader is sent to a follower or candidate.

func NewLeaderError

func NewLeaderError(id uint64, addr string) *LeaderError

func (*LeaderError) Error

func (l *LeaderError) Error() string

type Log

type Log struct {
	// Type is the kind of log that this represents.
	Type logType

	// Index represents the index in the list of log entries.
	Index int64

	// Term contains the election term it was added.
	Term uint64

	// Cmd represents the command applied to the FSM.
	Cmd []byte
}

Log entries represent commands that alter the state of the FSM. These entries are replicated across a majority of raft instances before being considered as committed.

func (Log) String

func (l Log) String() string

type LogStore

type LogStore interface {
	// LastIndex will return the index of the last log entry that has
	// been added to the log storage.
	LastIndex() int64

	// LastTerm will return the last log term found in the list of log entries.
	LastTerm() uint64

	// GetLog will return the log found at the given index. An error will
	// be returned if the index exceeds the maximum index in the log.
	//
	// If the index is less than the minimum index, than the log at minimum
	// index will be returned instead.
	GetLog(index int64) (*Log, error)

	// AllLogs retrieves every log entry in the store and returns the result.
	AllLogs() ([]*Log, error)

	// AppendLogs will add the slice of logs to the current list of log entries.
	AppendLogs(logs []*Log) error

	// DeleteRange will remove all log entries starting from the min index all
	// the way to the max index (inclusive).
	DeleteRange(min, max int64) error
}

LogStore defines how a raft's log persistence is handled and the required operations for log replication to be successful.

type Node

type Node struct {
	// An ID that uniquely identifies the raft in the Cluster.
	ID uint64 `json:"id"`

	// Address of the node, that other rafts can contact.
	Addr string `json:"addr"`
}

type Options

type Options struct {
	// Range of possible timeouts for elections or for
	// no heartbeats from the leader.
	MinElectionTimeout time.Duration
	MaxElectionTimout  time.Duration

	// Set time between heart beats (append entries) that the leader
	// should send out.
	HeartBeatTimout time.Duration

	// SnapshotTimer is the period of time between the raft's attempts at making a
	// snapshot of the current state of the FSM. Although a snapshot is attempted periodically
	// it is not guaranteed that a snapshot will be completed unless the LogThreshold is met.
	SnapshotTimer time.Duration

	// LogThreshold represents the total number of log entries that should be reached
	// before log compaction (snapshot) is triggered.
	LogThreshold uint64
}

Options defines required constants that the raft will use while running.

This library provides about some predefined options to use instead of defining your own options configurations.

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

Raft represents a node within the entire raft static. It contains the core logic of the consensus algorithm such as keeping track of leaders, replicated logs and other important state.

func New

func New(c Cluster, id uint64, opts Options, fsm FSM, logStr LogStore, stableStr StableStore) (*Raft, error)

New creates a new raft node and registers it with the provided Cluster.

func (*Raft) Apply

func (r *Raft) Apply(cmd []byte) Task

Apply takes a command and attempts to propagate it to the FSM and all other replicas in the raft Cluster. A Task is returned which can be used to wait on the completion of the task.

func (*Raft) ListenAndServe

func (r *Raft) ListenAndServe(addr string) error

ListenAndServe will start the raft instance and listen using TCP. The listening on the address that is provided as an argument. Note that serving the raft instance is the same as Serve, so it is best to look into that method as well.

func (*Raft) Serve

func (r *Raft) Serve(l net.Listener) error

Serve (as the name suggests) will start up the raft instance and listen using the provided the net.Listener.

This is a blocking operation and will only return when the raft instance has Shutdown or a fatal error has occurred.

func (*Raft) Shutdown

func (r *Raft) Shutdown()

type StableStore

type StableStore interface {
	Set(key, value []byte) error

	// Get returns the value related to that key. An empty slice is returned if
	// there is no value with that key found.
	Get(key []byte) ([]byte, error)
}

StableStore is used to provide persistence to vital information related to the raft's state.

type StaticCluster

type StaticCluster struct {

	// AllLogs the nodes within the raft Cluster. Key is a raft id.
	Nodes map[uint64]Node
	// contains filtered or unexported fields
}

StaticCluster is a static definition of all members of the cluster. As such new members cannot be dynamically discovered. All members must be known from the start.

func NewCluster

func NewCluster() *StaticCluster

NewCluster will create an entirely new static that doesn't contain any nodes.

func NewClusterWithConfig

func NewClusterWithConfig(conf io.Reader) (*StaticCluster, error)

NewClusterWithConfig similarly creates a static and adds all the nodes that are defined by configuration reader. The config file formatting is expected to be a json format.

func (*StaticCluster) AllNodes

func (c *StaticCluster) AllNodes() map[uint64]Node

func (*StaticCluster) GetNode

func (c *StaticCluster) GetNode(id uint64) (Node, error)

func (*StaticCluster) Quorum

func (c *StaticCluster) Quorum() int

type Task

type Task interface {
	// Error is a blocking operation that will wait until the task has finished
	// before return the result of the task.
	//
	// A non-nil error will be returned if the task failed to be committed.
	Error() error
}

Task represents an operation that has been sent to the raft Cluster. Every task represents a future operation that returns when all operations have been applied to other raft replications.

Directories

Path Synopsis
This is meant as a very simple example of how this raft implementation can be used to create a distributed KV store database.
This is meant as a very simple example of how this raft implementation can be used to create a distributed KV store database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL