raft

package
v0.0.0-...-2fcb348 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 28 Imported by: 1

Documentation

Overview

Package raft implements Timebox persistence semantics using Raft for the replicated commit path, a durable write-ahead log, and a local materialized read store

The core correctness boundary remains the Timebox append contract: aggregate-local optimistic concurrency, atomic event-batch append, and aligned derived index updates. Timebox snapshots are stored through the replicated state machine as accelerative state, not as the authoritative source of truth

Archive lifecycle support is not implemented by this backend. Store archive calls therefore return timebox.ErrArchivingDisabled

Index

Constants

View Source
const (
	CmdTypeAppend   = 0
	CmdTypeSnapshot = 1
)
View Source
const (
	// DefaultLogTailSize is the default hot retained WAL cache size
	DefaultLogTailSize = 20480

	// MinLogTailSize is the smallest allowed hot retained WAL cache size
	MinLogTailSize = 2048
)
View Source
const DefaultApplyTimeout = 10 * time.Second

DefaultApplyTimeout bounds one local proposal round trip

Variables

View Source
var (
	// ErrUnexpectedApplyResult indicates the FSM returned an unexpected result
	ErrUnexpectedApplyResult = errors.New("unexpected raft apply result")

	// ErrCommandTypeUnknown indicates the FSM received an unknown command type
	ErrCommandTypeUnknown = errors.New("unknown command type")
)
View Source
var (
	// ErrLocalIDRequired indicates the local Raft server ID is required
	ErrLocalIDRequired = errors.New("raft local ID is required")

	// ErrDataDirRequired indicates durable local storage must be configured
	ErrDataDirRequired = errors.New("raft data directory is required")

	// ErrAddressRequired indicates the Raft TCP listener is required
	ErrAddressRequired = errors.New("raft address is required")

	// ErrInvalidAddress indicates a raft address must be a valid host:port
	ErrInvalidAddress = errors.New("raft address must be a valid host:port")

	// ErrBootstrapMissingLocalServer indicates the bootstrap voter set must
	// include the local node
	ErrBootstrapMissingLocalServer = errors.New(
		"bootstrap servers must include the local raft ID",
	)
)
View Source
var (
	ErrTransportClosed = errors.New("transport closed")
)

Functions

func AggregateMetaKey

func AggregateMetaKey(encodedID string) []byte

AggregateMetaKey returns the metadata key for one encoded aggregate ID

func AggregateMetaPrefix

func AggregateMetaPrefix() []byte

AggregateMetaPrefix returns the key prefix for aggregate metadata

Types

type AggregateMeta

type AggregateMeta struct {
	CurrentSequence  int64
	BaseSequence     int64
	SnapshotSequence int64
	Status           string
	StatusAt         int64
	Labels           map[string]string
}

AggregateMeta stores the derived aggregate state needed for reads

type ApplyResult

type ApplyResult struct {
	Append *timebox.AppendRequest
	Error  error
}

ApplyResult reports the local outcome of one applied Raft command

type Command

type Command []byte

Command is the encoded form of one replicated Timebox mutation

func MakeAppendCommand

func MakeAppendCommand(
	proposalID uint64, req *timebox.AppendRequest,
) (Command, error)

MakeAppendCommand encodes one append mutation into a Raft command

func MakeSnapshotCommand

func MakeSnapshotCommand(proposalID uint64, sc *SnapshotCommand) Command

MakeSnapshotCommand encodes one snapshot mutation into a Raft command

func (Command) AppendRequest

func (c Command) AppendRequest() (*timebox.AppendRequest, error)

AppendRequest decodes an append request from the command payload

func (Command) ProposalID

func (c Command) ProposalID() (uint64, error)

ProposalID returns the encoded proposal ID

func (Command) SnapshotRequest

func (c Command) SnapshotRequest() (*SnapshotCommand, error)

SnapshotRequest decodes a snapshot request from the command payload

func (Command) Type

func (c Command) Type() int

Type returns the encoded command type

type Config

type Config struct {
	// Local state
	LocalID string
	DataDir string

	// LogTailSize is the hot retained WAL suffix cache size in entries
	LogTailSize int

	// Cluster identity
	Address string
	Servers []Server

	Publisher Publisher
}

Config defines one opinionated Raft persistence node

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the opinionated defaults for one Raft node

func (Config) LocalServer

func (c Config) LocalServer() Server

LocalServer returns the local server entry derived from this config

func (Config) Validate

func (c Config) Validate() error

Validate checks whether the config is usable

func (Config) With

func (c Config) With(other Config) Config

With merges another config into this config

type Persistence

type Persistence struct {
	Config
	// contains filtered or unexported fields
}

Persistence applies Timebox writes through Raft and serves reads from local materialized state

func NewPersistence

func NewPersistence(cfgs ...Config) (*Persistence, error)

NewPersistence opens one Raft persistence node

func (*Persistence) Append

func (p *Persistence) Append(req timebox.AppendRequest) error

Append proposes one append mutation through the local Raft node

func (*Persistence) Close

func (p *Persistence) Close() error

Close stops raft and closes local durable state

func (*Persistence) GetAggregateStatus

func (p *Persistence) GetAggregateStatus(
	id timebox.AggregateID,
) (string, error)

GetAggregateStatus returns the current derived status for one aggregate

func (*Persistence) LeaderWithID

func (p *Persistence) LeaderWithID() (ServerAddress, ServerID)

LeaderWithID returns the current leader address and server ID

func (*Persistence) ListAggregates

func (p *Persistence) ListAggregates(
	id timebox.AggregateID,
) ([]timebox.AggregateID, error)

ListAggregates lists known aggregate IDs that share the given prefix

func (*Persistence) ListAggregatesByLabel

func (p *Persistence) ListAggregatesByLabel(
	label, value string,
) ([]timebox.AggregateID, error)

ListAggregatesByLabel lists aggregates currently indexed by label/value

func (*Persistence) ListAggregatesByStatus

func (p *Persistence) ListAggregatesByStatus(
	status string,
) ([]timebox.StatusEntry, error)

ListAggregatesByStatus lists aggregates currently indexed by status

func (*Persistence) ListLabelValues

func (p *Persistence) ListLabelValues(label string) ([]string, error)

ListLabelValues lists the current indexed values for one label

func (*Persistence) LoadEvents

func (p *Persistence) LoadEvents(
	req timebox.LoadEventsRequest,
) (*timebox.EventsResult, error)

LoadEvents loads events for one aggregate starting at the requested sequence

func (*Persistence) LoadSnapshot

func (p *Persistence) LoadSnapshot(
	req timebox.LoadSnapshotRequest,
) (*timebox.SnapshotRecord, error)

LoadSnapshot returns the latest snapshot and tail events for one aggregate

func (*Persistence) NewStore

func (p *Persistence) NewStore(cfg timebox.Config) (*timebox.Store, error)

NewStore creates a Store using the current Raft Persistence

func (*Persistence) Ready

func (p *Persistence) Ready() <-chan struct{}

Ready closes once the node is ready to serve leader-directed traffic

func (*Persistence) SaveSnapshot

func (p *Persistence) SaveSnapshot(req timebox.SnapshotRequest) error

SaveSnapshot proposes one Timebox snapshot mutation through Raft

func (*Persistence) State

func (p *Persistence) State() State

State returns the current local Raft role

type Publisher

type Publisher func(...*timebox.Event)

Publisher reports committed events after they are durably applied

type Server

type Server struct {
	ID      string
	Address string
}

Server identifies one voter in the bootstrap configuration

type ServerAddress

type ServerAddress string

ServerAddress is the advertised Raft transport address for one node

type ServerID

type ServerID string

ServerID identifies one Raft voter

type SnapshotCommand

type SnapshotCommand struct {
	ID         timebox.AggregateID
	Data       []byte
	Sequence   int64
	TrimEvents bool
}

SnapshotCommand carries one Timebox snapshot mutation through Raft

type State

type State string

State is the current local Raft role

const (
	// StateFollower marks a follower node
	StateFollower State = "follower"

	// StateCandidate marks a candidate or pre-candidate node
	StateCandidate State = "candidate"

	// StateLeader marks the current leader node
	StateLeader State = "leader"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL