Documentation
¶
Index ¶
- Constants
- Variables
- func EntryPointers(entries []rt.Entry) []*rt.Entry
- func EntryValues(entries []*rt.Entry) []rt.Entry
- func IsConfStateEqual(a, b rt.ConfState) bool
- func IsEmptyPersistentState(state rt.PersistentState) bool
- func IsEmptySnapshot(snapshot rt.Snapshot) bool
- func IsPersistentStateEqual(a, b rt.PersistentState) bool
- func ResetDefaultLogger()
- func SetLogger(l Logger)
- func SetSLoggerLevel(l Level)
- func TMarshal(data thrift.TStruct) ([]byte, error)
- func TUnmarshal(data []byte, v thrift.TStruct) error
- type BasicStatus
- type ClusterConfig
- type Config
- type FLogger
- func (fl *FLogger) Debug(_ string, _ ...any)
- func (fl *FLogger) Debugf(format string, args ...any)
- func (fl *FLogger) EnableDebug()
- func (fl *FLogger) Error(_ string, _ ...any)
- func (fl *FLogger) Errorf(format string, args ...any)
- func (fl *FLogger) Fatal(_ string, _ ...any)
- func (fl *FLogger) Fatalf(format string, args ...any)
- func (fl *FLogger) Info(_ string, _ ...any)
- func (fl *FLogger) Infof(format string, args ...any)
- func (fl *FLogger) Panic(_ string, _ ...any)
- func (fl *FLogger) Panicf(format string, args ...any)
- func (fl *FLogger) Warn(_ string, _ ...any)
- func (fl *FLogger) Warnf(format string, args ...any)
- type JointConfig
- type Level
- type Logger
- type MajorityConfig
- type MemoryStorage
- func (ms *MemoryStorage) Append(entries []rt.Entry) error
- func (ms *MemoryStorage) ApplySnapshot(snapshot rt.Snapshot) error
- func (ms *MemoryStorage) Compact(idx int64) error
- func (ms *MemoryStorage) CreateSnapshot(idx int64, cs *rt.ConfState, data []byte) (rt.Snapshot, error)
- func (ms *MemoryStorage) Entries(low, high int64) ([]rt.Entry, error)
- func (ms *MemoryStorage) FirstIndex() (int64, error)
- func (ms *MemoryStorage) InitialState() (rt.PersistentState, rt.ConfState, error)
- func (ms *MemoryStorage) LastIndex() (int64, error)
- func (ms *MemoryStorage) SetPersistentState(ps rt.PersistentState) error
- func (ms *MemoryStorage) Snapshot() (rt.Snapshot, error)
- func (ms *MemoryStorage) Term(idx int64) (int64, error)
- type Node
- type NodeState
- type Peer
- type Progress
- type RawNode
- func (rn *RawNode) Advance()
- func (rn *RawNode) ApplyConfChange(cc rt.ConfChange) *rt.ConfState
- func (rn *RawNode) BasicStatus() BasicStatus
- func (rn *RawNode) Bootstrap(peers []Peer) error
- func (rn *RawNode) Campaign() error
- func (rn *RawNode) HasReady() bool
- func (rn *RawNode) Propose(data []byte) error
- func (rn *RawNode) ProposeConfChange(cc rt.ConfChange) error
- func (rn *RawNode) Ready() Ready
- func (rn *RawNode) Status() Status
- func (rn *RawNode) Step(msg rt.Message) error
- func (rn *RawNode) Tick()
- type Ready
- type SLogger
- func (sl *SLogger) Debug(msg string, args ...any)
- func (sl *SLogger) Debugf(_ string, _ ...any)
- func (sl *SLogger) Error(msg string, args ...any)
- func (sl *SLogger) Errorf(_ string, _ ...any)
- func (sl *SLogger) Fatal(msg string, args ...any)
- func (sl *SLogger) Fatalf(_ string, _ ...any)
- func (sl *SLogger) Info(msg string, args ...any)
- func (sl *SLogger) Infof(_ string, _ ...any)
- func (sl *SLogger) Panic(msg string, args ...any)
- func (sl *SLogger) Panicf(_ string, _ ...any)
- func (sl *SLogger) Warn(msg string, args ...any)
- func (sl *SLogger) Warnf(_ string, _ ...any)
- type State
- type StateMachine
- type Status
- type Storage
- type VolatileState
- type VoteResult
Constants ¶
const ( LevelDebug = slog.Level(-4) LevelInfo = slog.Level(0) LevelWarn = slog.Level(4) LevelError = slog.Level(8) LevelFatal = slog.Level(12) LevelPanic = slog.Level(16) )
SLogger levels
const ( VotePending VoteLost VoteWon )
const ( None int64 = 0 Infinite int64 = math.MaxInt64 )
None should not be used as a simply number zero
const ( Name = "raft-foiver" Version = "v0.1.1" )
Variables ¶
var ( ErrNonePeer = errors.New("error none peer provided to boostrap") ErrNonemptyStorage = errors.New("error can not boostrap nonempty storage") )
var ( ErrProgressNotMatch = errors.New("progress does not match cluster config") ErrNoneNilVoters = errors.New("voters[1] must be nil when not joint") ErrTrueAutoLeave = errors.New("autoLeave must be false when not joint") ErrAlreadyJoint = errors.New("config is already joint") ErrZeroVoterJoint = errors.New("can not make a zero-voter config joint") ErrInvalidConfChangeType = errors.New("invalid configure change type") ErrApplyIncomingChanges = errors.New("apply incoming changes failed") ErrLeaveNonJointConfig = errors.New("can not leave a non-joint config") ErrApplySimpleToJoint = errors.New("can not apply simple config change to joint config") ErrMoreThanOneChange = errors.New("more than one voters changed by simple config change") )
var ( ErrNoneID = errors.New("cannot use none as id") ErrHeartbeatTick = errors.New("heartbeat tick must be greater than 0") ErrElectionTick = errors.New("election tick must be greater than heartbeat tick") ErrNilStorage = errors.New("storage cannot be nil") )
var ( FLog = &FLogger{ Logger: stdlog.New(os.Stderr, _flogPrefix, stdlog.LstdFlags), } // SLog TODO: optimize SLog = &SLogger{ Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: levelVar, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { if a.Key == slog.LevelKey { level := a.Value.Any().(slog.Level) levelLabel, ok := levels[level] if !ok { levelLabel = level.String() } a.Value = slog.StringValue(levelLabel) } return a }, })).With(slog.String("prefix", _slogPrefix)), } )
var ( ErrProposalDropped = errors.New("error drop proposal") ErrEmptySnapshot = errors.New("error empty snapshot") ErrRestoreConfig = errors.New("error restore config") )
var ( ErrStepInternalMsg = errors.New("error step internal message") ErrStepPeerNotFound = errors.New("error step peer not found") )
var ( ErrCompacted = errors.New("request index is unavailable due to compaction") ErrSnapshotOutOfDate = errors.New("request index is older than existing snapshot") )
var ErrStopped = errors.New("error raft stopped")
Functions ¶
func IsConfStateEqual ¶
func IsEmptyPersistentState ¶
func IsEmptyPersistentState(state rt.PersistentState) bool
func IsEmptySnapshot ¶
func IsPersistentStateEqual ¶
func IsPersistentStateEqual(a, b rt.PersistentState) bool
func ResetDefaultLogger ¶
func ResetDefaultLogger()
func SetSLoggerLevel ¶
func SetSLoggerLevel(l Level)
Types ¶
type BasicStatus ¶
type BasicStatus struct { ID int64 NodeState VolatileState rt.PersistentState }
type ClusterConfig ¶
type ClusterConfig struct { Voters JointConfig AutoLeave bool }
ClusterConfig is Config (trk.Config)
func (*ClusterConfig) String ¶
func (c *ClusterConfig) String() string
type Config ¶
type Config struct { ID int64 // broadcastTime << electionTimeout << MTBF (Mean Time Between Failures) // // - broadcastTime: Average time it takes a server to send RPCs in parallel to every server in the cluster and receive their responses // - MTBF: average time between failures for a single server // // Typically broadcastTime range from 0.5ms to 20ms, depending on storage technology // electionTimeout range from 10ms to 500ms // // electionTimeout occurs when number of calling Node.Tick but without receiving any RPC from current leader node ElectionTick int // number of calling Node.Tick needed to represent one Raft heartbeat HeartbeatTick int // Storage is an interface for the storage of Raft log // // Persistent state on all servers: // - currentTerm // - voteFor // - log[] // // The implementation can be non-persistent when using with WAL // // The implementation of etcd use a memory storage for Storage but with the use of WAL, // they can ensure that the Raft log be stored persistently in WAL and can recover after reboot // even if the Storage is base on memory Storage Storage // LastApplied is the last applied index. It should only be set when restarting // raft. raft will not return entries to the application smaller or equal to // LastApplied. If LastApplied is unset when restarting, raft might return previous // applied entries. This is a very application dependent configuration. LastApplied int64 Logger Logger }
Config is the struct used to config the raft library
NOTE: Config is not represent the cluster config in the raft concept
type FLogger ¶
func (*FLogger) EnableDebug ¶
func (fl *FLogger) EnableDebug()
type JointConfig ¶
type JointConfig [2]MajorityConfig
func (JointConfig) IDs ¶
func (jc JointConfig) IDs() map[int64]struct{}
func (JointConfig) String ¶
func (jc JointConfig) String() string
func (JointConfig) VoteResult ¶
func (jc JointConfig) VoteResult(votes map[int64]bool) VoteResult
type Logger ¶
type Logger interface { Debug(msg string, args ...any) Debugf(format string, args ...any) Info(msg string, args ...any) Infof(format string, args ...any) Warn(msg string, args ...any) Warnf(format string, args ...any) Error(msg string, args ...any) Errorf(format string, args ...any) Fatal(msg string, args ...any) Fatalf(format string, args ...any) Panic(msg string, args ...any) Panicf(format string, args ...any) }
type MajorityConfig ¶
type MajorityConfig map[int64]struct{}
func (MajorityConfig) Slice ¶
func (mc MajorityConfig) Slice() []int64
func (MajorityConfig) String ¶
func (mc MajorityConfig) String() string
func (MajorityConfig) VoteResult ¶
func (mc MajorityConfig) VoteResult(votes map[int64]bool) VoteResult
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage
entries[0] is a dummy entry used to sync with raft log index because the first index of raft log is 1 To make is easier to understand, consider the following example
Without compacted entries: Index [0 1 2 3] Term [0 1 2 2] offset = 0
With compacted entries: Index [6 7 8 9] Term [4 4 5 5] offset = 6
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage initializes an empty MemoryStorage
func (*MemoryStorage) ApplySnapshot ¶
func (ms *MemoryStorage) ApplySnapshot(snapshot rt.Snapshot) error
func (*MemoryStorage) Compact ¶
func (ms *MemoryStorage) Compact(idx int64) error
Compact discards all log entries prior to the idx
EXAMPLE: before compact: Index [6 7 8 9] Term [4 4 5 5] after compact (idx = 8): Index [8 9] Term [5 5]
NOTE: It is the application's responsibility to not attempt to compact an index greater than committed index
func (*MemoryStorage) CreateSnapshot ¶
func (*MemoryStorage) Entries ¶
func (ms *MemoryStorage) Entries(low, high int64) ([]rt.Entry, error)
func (*MemoryStorage) FirstIndex ¶
func (ms *MemoryStorage) FirstIndex() (int64, error)
func (*MemoryStorage) InitialState ¶
func (ms *MemoryStorage) InitialState() (rt.PersistentState, rt.ConfState, error)
func (*MemoryStorage) LastIndex ¶
func (ms *MemoryStorage) LastIndex() (int64, error)
func (*MemoryStorage) SetPersistentState ¶
func (ms *MemoryStorage) SetPersistentState(ps rt.PersistentState) error
type Node ¶
type Node interface { // StateMachine // Raft is essentially a state machine StateMachine // Status return the status of the Raft state machine Status() Status // Ready is an interface to receive command from the Raft module // after retrieving the state returned by Ready, call Advance // // e.g. persistence Raft log, send RPC request // // NOTE: No committed entries from the next Ready may be applied until all committed entries // and snapshots from the previous one have finished. Ready() <-chan Ready // Advance notifies the Node that the application has saved progress up to the last Ready // It prepares the node to return the next available Ready // // However, as an optimization, the application may call Advance while it is applying the // commands. For example. when the last Ready contains a snapshot, the application might take // a long time to apply the snapshot data. To continue receiving Ready without blocking raft // progress, it can call Advance before finishing applying the last ready. Advance() // Step advances the state machine using the given message. ctx.Err() will be returned, if any. // // e.g. When followers receives RPC requests from leader, it will submit the messages to the Raft module through Step // the user only responsible for the message transport through network Step(ctx context.Context, msg rt.Message) error // Campaign causes the Node to transition to candidate state and start campaigning to become leader Campaign(ctx context.Context) error // Propose proposes that data be appended to the log // // NOTE: Proposals can be lost without notice, therefore it is user's job to ensure proposal retries Propose(ctx context.Context, data []byte) error ProposeConfChange(ctx context.Context, cc rt.ConfChange) error // ApplyConfChange applies a config change (previously passed to // ProposeConfChange) to the node. This must be called whenever a config // change is observed in Ready.CommittedEntries, except when the app decides // to reject the configuration change (i.e. treats it as a noop instead), in // which case it must not be called. ApplyConfChange(cc rt.ConfChange) *rt.ConfState // Tick increments the internal logical clock for this Node. Election timeouts // and heartbeat timeouts are in units of ticks. Tick() // Stop the node immediately Stop() }
type RawNode ¶
type RawNode struct {
// contains filtered or unexported fields
}
RawNode is only responsible for handing the logic of Raft It does not care about the message transport and other thing not related to the Raft core
func NewRawNode ¶
func (*RawNode) ApplyConfChange ¶
func (rn *RawNode) ApplyConfChange(cc rt.ConfChange) *rt.ConfState
ApplyConfChange apply config change to local node
func (*RawNode) BasicStatus ¶
func (rn *RawNode) BasicStatus() BasicStatus
func (*RawNode) ProposeConfChange ¶
func (rn *RawNode) ProposeConfChange(cc rt.ConfChange) error
ProposeConfChange propose config change to local node
type Ready ¶
type StateMachine ¶
type StateMachine any
type Status ¶
type Status struct { BasicStatus Config ClusterConfig Progress map[int64]Progress }
type Storage ¶
type Storage interface { InitialState() (rt.PersistentState, rt.ConfState, error) Entries(low, high int64) ([]rt.Entry, error) Term(idx int64) (int64, error) FirstIndex() (int64, error) LastIndex() (int64, error) // Snapshot returns the most recent snapshot Snapshot() (rt.Snapshot, error) }
Storage log storage
NOTE: All the idx here represent the raft log index
type VolatileState ¶
VolatileState Volatile state on all servers
type VoteResult ¶
type VoteResult uint32
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
kitex_gen/raft/rpc
Code generated by Kitex v0.10.3.
|
Code generated by Kitex v0.10.3. |