raft

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

README

raft-foiver

Raft Consensus Algorithm implementation used by FOIVER system.

Usage

Use raft-foiver as a library rather than a framework.

Please refer to the usage example.

Credits

RAFT-FOIVER is inspired by etcd-io/raft, without which this project would not have been possible.

Blogs

License

RAFT-FOIVER is distributed under the Apache License 2.0. The licenses of third party dependencies of RAFT-FOIVER are explained here.

ECOLOGY



RAFT is Part of PROJECT: FOIVER

Documentation

Index

Constants

View Source
const (
	LevelDebug = slog.Level(-4)
	LevelInfo  = slog.Level(0)
	LevelWarn  = slog.Level(4)
	LevelError = slog.Level(8)
	LevelFatal = slog.Level(12)
	LevelPanic = slog.Level(16)
)

SLogger levels

View Source
const (
	VotePending
	VoteLost
	VoteWon
)
View Source
const (
	None     int64 = 0
	Infinite int64 = math.MaxInt64
)

None should not be used as a simply number zero

View Source
const (
	Name    = "raft-foiver"
	Version = "v0.1.1"
)

Variables

View Source
var (
	ErrNonePeer        = errors.New("error none peer provided to boostrap")
	ErrNonemptyStorage = errors.New("error can not boostrap nonempty storage")
)
View Source
var (
	ErrProgressNotMatch      = errors.New("progress does not match cluster config")
	ErrNoneNilVoters         = errors.New("voters[1] must be nil when not joint")
	ErrTrueAutoLeave         = errors.New("autoLeave must be false when not joint")
	ErrAlreadyJoint          = errors.New("config is already joint")
	ErrZeroVoterJoint        = errors.New("can not make a zero-voter config joint")
	ErrInvalidConfChangeType = errors.New("invalid configure change type")
	ErrApplyIncomingChanges  = errors.New("apply incoming changes failed")
	ErrLeaveNonJointConfig   = errors.New("can not leave a non-joint config")
	ErrApplySimpleToJoint    = errors.New("can not apply simple config change to joint config")
	ErrMoreThanOneChange     = errors.New("more than one voters changed by simple config change")
)
View Source
var (
	ErrNoneID        = errors.New("cannot use none as id")
	ErrHeartbeatTick = errors.New("heartbeat tick must be greater than 0")
	ErrElectionTick  = errors.New("election tick must be greater than heartbeat tick")
	ErrNilStorage    = errors.New("storage cannot be nil")
)
View Source
var (
	FLog = &FLogger{
		Logger: stdlog.New(os.Stderr, _flogPrefix, stdlog.LstdFlags),
	}
	// SLog TODO: optimize
	SLog = &SLogger{
		Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
			Level: levelVar,
			ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
				if a.Key == slog.LevelKey {
					level := a.Value.Any().(slog.Level)
					levelLabel, ok := levels[level]
					if !ok {
						levelLabel = level.String()
					}
					a.Value = slog.StringValue(levelLabel)
				}
				return a
			},
		})).With(slog.String("prefix", _slogPrefix)),
	}
)
View Source
var (
	ErrProposalDropped = errors.New("error drop proposal")
	ErrEmptySnapshot   = errors.New("error empty snapshot")
	ErrRestoreConfig   = errors.New("error restore config")
)
View Source
var (
	ErrStepInternalMsg  = errors.New("error step internal message")
	ErrStepPeerNotFound = errors.New("error step peer not found")
)
View Source
var (
	ErrCompacted         = errors.New("request index is unavailable due to compaction")
	ErrUnavailable       = errors.New("entries not available")
	ErrSnapshotOutOfDate = errors.New("request index is older than existing snapshot")
)
View Source
var ErrStopped = errors.New("error raft stopped")

Functions

func EntryPointers

func EntryPointers(entries []rt.Entry) []*rt.Entry

func EntryValues

func EntryValues(entries []*rt.Entry) []rt.Entry

func IsConfStateEqual

func IsConfStateEqual(a, b rt.ConfState) bool

func IsEmptyPersistentState

func IsEmptyPersistentState(state rt.PersistentState) bool

func IsEmptySnapshot

func IsEmptySnapshot(snapshot rt.Snapshot) bool

func IsPersistentStateEqual

func IsPersistentStateEqual(a, b rt.PersistentState) bool

func ResetDefaultLogger

func ResetDefaultLogger()

func SetLogger

func SetLogger(l Logger)

func SetSLoggerLevel

func SetSLoggerLevel(l Level)

func TMarshal

func TMarshal(data thrift.TStruct) ([]byte, error)

func TUnmarshal added in v0.1.1

func TUnmarshal(data []byte, v thrift.TStruct) error

Types

type BasicStatus

type BasicStatus struct {
	ID int64
	NodeState

	VolatileState
	rt.PersistentState
}

type ClusterConfig

type ClusterConfig struct {
	Voters    JointConfig
	AutoLeave bool
}

ClusterConfig is Config (trk.Config)

func (*ClusterConfig) String

func (c *ClusterConfig) String() string

type Config

type Config struct {
	ID int64

	// broadcastTime << electionTimeout << MTBF (Mean Time Between Failures)
	//
	// - broadcastTime: Average time it takes a server to send RPCs in parallel to every server in the cluster and receive their responses
	// - MTBF: average time between failures for a single server
	//
	// Typically broadcastTime range from 0.5ms to 20ms, depending on storage technology
	// electionTimeout range from 10ms to 500ms
	//
	// electionTimeout occurs when number of calling Node.Tick but without receiving any RPC from current leader node
	ElectionTick int
	// number of calling Node.Tick needed to represent one Raft heartbeat
	HeartbeatTick int

	// Storage is an interface for the storage of Raft log
	//
	// Persistent state on all servers:
	// - currentTerm
	// - voteFor
	// - log[]
	//
	// The implementation can be non-persistent when using with WAL
	//
	// The implementation of etcd use a memory storage for Storage but with the use of WAL,
	// they can ensure that the Raft log be stored persistently in WAL and can recover after reboot
	// even if the Storage is base on memory
	Storage Storage
	// LastApplied is the last applied index. It should only be set when restarting
	// raft. raft will not return entries to the application smaller or equal to
	// LastApplied. If LastApplied is unset when restarting, raft might return previous
	// applied entries. This is a very application dependent configuration.
	LastApplied int64
	Logger      Logger
}

Config is the struct used to config the raft library

NOTE: Config is not represent the cluster config in the raft concept

type FLogger

type FLogger struct {
	*stdlog.Logger
	// contains filtered or unexported fields
}

func (*FLogger) Debug

func (fl *FLogger) Debug(_ string, _ ...any)

func (*FLogger) Debugf

func (fl *FLogger) Debugf(format string, args ...any)

func (*FLogger) EnableDebug

func (fl *FLogger) EnableDebug()

func (*FLogger) Error

func (fl *FLogger) Error(_ string, _ ...any)

func (*FLogger) Errorf

func (fl *FLogger) Errorf(format string, args ...any)

func (*FLogger) Fatal

func (fl *FLogger) Fatal(_ string, _ ...any)

func (*FLogger) Fatalf

func (fl *FLogger) Fatalf(format string, args ...any)

func (*FLogger) Info

func (fl *FLogger) Info(_ string, _ ...any)

func (*FLogger) Infof

func (fl *FLogger) Infof(format string, args ...any)

func (*FLogger) Panic

func (fl *FLogger) Panic(_ string, _ ...any)

func (*FLogger) Panicf

func (fl *FLogger) Panicf(format string, args ...any)

func (*FLogger) Warn

func (fl *FLogger) Warn(_ string, _ ...any)

func (*FLogger) Warnf

func (fl *FLogger) Warnf(format string, args ...any)

type JointConfig

type JointConfig [2]MajorityConfig

func (JointConfig) IDs

func (jc JointConfig) IDs() map[int64]struct{}

func (JointConfig) String

func (jc JointConfig) String() string

func (JointConfig) VoteResult

func (jc JointConfig) VoteResult(votes map[int64]bool) VoteResult

type Level

type Level = slog.Level

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Debugf(format string, args ...any)
	Info(msg string, args ...any)
	Infof(format string, args ...any)
	Warn(msg string, args ...any)
	Warnf(format string, args ...any)
	Error(msg string, args ...any)
	Errorf(format string, args ...any)
	Fatal(msg string, args ...any)
	Fatalf(format string, args ...any)
	Panic(msg string, args ...any)
	Panicf(format string, args ...any)
}

type MajorityConfig

type MajorityConfig map[int64]struct{}

func (MajorityConfig) Slice

func (mc MajorityConfig) Slice() []int64

func (MajorityConfig) String

func (mc MajorityConfig) String() string

func (MajorityConfig) VoteResult

func (mc MajorityConfig) VoteResult(votes map[int64]bool) VoteResult

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage

entries[0] is a dummy entry used to sync with raft log index because the first index of raft log is 1 To make is easier to understand, consider the following example

Without compacted entries: Index [0 1 2 3] Term [0 1 2 2] offset = 0

With compacted entries: Index [6 7 8 9] Term [4 4 5 5] offset = 6

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage initializes an empty MemoryStorage

func (*MemoryStorage) Append

func (ms *MemoryStorage) Append(entries []rt.Entry) error

func (*MemoryStorage) ApplySnapshot

func (ms *MemoryStorage) ApplySnapshot(snapshot rt.Snapshot) error

func (*MemoryStorage) Compact

func (ms *MemoryStorage) Compact(idx int64) error

Compact discards all log entries prior to the idx

EXAMPLE: before compact: Index [6 7 8 9] Term [4 4 5 5] after compact (idx = 8): Index [8 9] Term [5 5]

NOTE: It is the application's responsibility to not attempt to compact an index greater than committed index

func (*MemoryStorage) CreateSnapshot

func (ms *MemoryStorage) CreateSnapshot(idx int64, cs *rt.ConfState, data []byte) (rt.Snapshot, error)

func (*MemoryStorage) Entries

func (ms *MemoryStorage) Entries(low, high int64) ([]rt.Entry, error)

func (*MemoryStorage) FirstIndex

func (ms *MemoryStorage) FirstIndex() (int64, error)

func (*MemoryStorage) InitialState

func (ms *MemoryStorage) InitialState() (rt.PersistentState, rt.ConfState, error)

func (*MemoryStorage) LastIndex

func (ms *MemoryStorage) LastIndex() (int64, error)

func (*MemoryStorage) SetPersistentState

func (ms *MemoryStorage) SetPersistentState(ps rt.PersistentState) error

func (*MemoryStorage) Snapshot

func (ms *MemoryStorage) Snapshot() (rt.Snapshot, error)

func (*MemoryStorage) Term

func (ms *MemoryStorage) Term(idx int64) (int64, error)

type Node

type Node interface {
	// StateMachine
	// Raft is essentially a state machine
	StateMachine
	// Status return the status of the Raft state machine
	Status() Status
	// Ready is an interface to receive command from the Raft module
	// after retrieving the state returned by Ready, call Advance
	//
	// e.g. persistence Raft log, send RPC request
	//
	// NOTE: No committed entries from the next Ready may be applied until all committed entries
	// and snapshots from the previous one have finished.
	Ready() <-chan Ready
	// Advance notifies the Node that the application has saved progress up to the last Ready
	// It prepares the node to return the next available Ready
	//
	// However, as an optimization, the application may call Advance while it is applying the
	// commands. For example. when the last Ready contains a snapshot, the application might take
	// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
	// progress, it can call Advance before finishing applying the last ready.
	Advance()
	// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
	//
	// e.g. When followers receives RPC requests from leader, it will submit the messages to the Raft module through Step
	// the user only responsible for the message transport through network
	Step(ctx context.Context, msg rt.Message) error
	// Campaign causes the Node to transition to candidate state and start campaigning to become leader
	Campaign(ctx context.Context) error
	// Propose proposes that data be appended to the log
	//
	// NOTE: Proposals can be lost without notice, therefore it is user's job to ensure proposal retries
	Propose(ctx context.Context, data []byte) error
	ProposeConfChange(ctx context.Context, cc rt.ConfChange) error
	// ApplyConfChange applies a config change (previously passed to
	// ProposeConfChange) to the node. This must be called whenever a config
	// change is observed in Ready.CommittedEntries, except when the app decides
	// to reject the configuration change (i.e. treats it as a noop instead), in
	// which case it must not be called.
	ApplyConfChange(cc rt.ConfChange) *rt.ConfState
	// Tick increments the internal logical clock for this Node. Election timeouts
	// and heartbeat timeouts are in units of ticks.
	Tick()
	// Stop the node immediately
	Stop()
}

func RestartNode

func RestartNode(c *Config) Node

RestartNode restart a raft node

func StartNode

func StartNode(c *Config, peers []Peer) Node

StartNode start a raft node

type NodeState

type NodeState struct {
	LeaderID int64
	State    State
}

NodeState for rawnode.HasReady

type Peer

type Peer struct {
	ID  int64
	URL []byte
}

type Progress

type Progress struct {
	NextIndex  int64
	MatchIndex int64
}

type RawNode

type RawNode struct {
	// contains filtered or unexported fields
}

RawNode is only responsible for handing the logic of Raft It does not care about the message transport and other thing not related to the Raft core

func NewRawNode

func NewRawNode(c *Config) (*RawNode, error)

func (*RawNode) Advance

func (rn *RawNode) Advance()

func (*RawNode) ApplyConfChange

func (rn *RawNode) ApplyConfChange(cc rt.ConfChange) *rt.ConfState

ApplyConfChange apply config change to local node

func (*RawNode) BasicStatus

func (rn *RawNode) BasicStatus() BasicStatus

func (*RawNode) Bootstrap

func (rn *RawNode) Bootstrap(peers []Peer) error

func (*RawNode) Campaign

func (rn *RawNode) Campaign() error

Campaign start a leader election and become candidate

func (*RawNode) HasReady

func (rn *RawNode) HasReady() bool

func (*RawNode) Propose

func (rn *RawNode) Propose(data []byte) error

Propose a command to be appended to raft log

func (*RawNode) ProposeConfChange

func (rn *RawNode) ProposeConfChange(cc rt.ConfChange) error

ProposeConfChange propose config change to local node

func (*RawNode) Ready

func (rn *RawNode) Ready() Ready

func (*RawNode) Status

func (rn *RawNode) Status() Status

func (*RawNode) Step

func (rn *RawNode) Step(msg rt.Message) error

Step advances the state machine with the given message

func (*RawNode) Tick

func (rn *RawNode) Tick()

Tick advances the internal logical clock by a single tick

type Ready

type Ready struct {
	*NodeState
	*VolatileState
	rt.PersistentState

	Entries          []rt.Entry
	CommittedEntries []rt.Entry

	Messages []rt.Message

	Snapshot rt.Snapshot
}

func (Ready) String

func (r Ready) String() string

type SLogger

type SLogger struct {
	*slog.Logger
}

func (*SLogger) Debug

func (sl *SLogger) Debug(msg string, args ...any)

func (*SLogger) Debugf

func (sl *SLogger) Debugf(_ string, _ ...any)

func (*SLogger) Error

func (sl *SLogger) Error(msg string, args ...any)

func (*SLogger) Errorf

func (sl *SLogger) Errorf(_ string, _ ...any)

func (*SLogger) Fatal

func (sl *SLogger) Fatal(msg string, args ...any)

func (*SLogger) Fatalf

func (sl *SLogger) Fatalf(_ string, _ ...any)

func (*SLogger) Info

func (sl *SLogger) Info(msg string, args ...any)

func (*SLogger) Infof

func (sl *SLogger) Infof(_ string, _ ...any)

func (*SLogger) Panic

func (sl *SLogger) Panic(msg string, args ...any)

func (*SLogger) Panicf

func (sl *SLogger) Panicf(_ string, _ ...any)

func (*SLogger) Warn

func (sl *SLogger) Warn(msg string, args ...any)

func (*SLogger) Warnf

func (sl *SLogger) Warnf(_ string, _ ...any)

type State

type State int64
const (
	StateFollower State = iota
	StateCandidate
	StateLeader
)

type StateMachine

type StateMachine any

type Status

type Status struct {
	BasicStatus
	Config   ClusterConfig
	Progress map[int64]Progress
}

type Storage

type Storage interface {
	InitialState() (rt.PersistentState, rt.ConfState, error)
	Entries(low, high int64) ([]rt.Entry, error)
	Term(idx int64) (int64, error)
	FirstIndex() (int64, error)
	LastIndex() (int64, error)
	// Snapshot returns the most recent snapshot
	Snapshot() (rt.Snapshot, error)
}

Storage log storage

NOTE: All the idx here represent the raft log index

type VolatileState

type VolatileState struct {
	CommitIndex int64
	LastApplied int64
}

VolatileState Volatile state on all servers

type VoteResult

type VoteResult uint32

Directories

Path Synopsis
kitex_gen/raft/rpc
Code generated by Kitex v0.10.3.
Code generated by Kitex v0.10.3.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL