Documentation
¶
Overview ¶
Package raft implements Timebox persistence semantics using Raft for the replicated commit path, a durable write-ahead log, and a local materialized read store
The core correctness boundary remains the Timebox append contract: aggregate-local optimistic concurrency, atomic event-batch append, and aligned derived index updates. Timebox snapshots are stored through the replicated state machine as accelerative state, not as the authoritative source of truth
Archive lifecycle support is not implemented by this backend. Store archive calls therefore return timebox.ErrArchivingDisabled
Index ¶
- Constants
- Variables
- func AggregateMetaKey(encodedID string) []byte
- func AggregateMetaPrefix() []byte
- type AggregateMeta
- type ApplyResult
- type Command
- type Config
- type Persistence
- func (p *Persistence) Append(req timebox.AppendRequest) error
- func (p *Persistence) Close() error
- func (p *Persistence) GetAggregateStatus(id timebox.AggregateID) (string, error)
- func (p *Persistence) LeaderWithID() (ServerAddress, ServerID)
- func (p *Persistence) ListAggregates(id timebox.AggregateID) ([]timebox.AggregateID, error)
- func (p *Persistence) ListAggregatesByLabel(label, value string) ([]timebox.AggregateID, error)
- func (p *Persistence) ListAggregatesByStatus(status string) ([]timebox.StatusEntry, error)
- func (p *Persistence) ListLabelValues(label string) ([]string, error)
- func (p *Persistence) LoadEvents(req timebox.LoadEventsRequest) (*timebox.EventsResult, error)
- func (p *Persistence) LoadSnapshot(req timebox.LoadSnapshotRequest) (*timebox.SnapshotRecord, error)
- func (p *Persistence) NewStore(cfg timebox.Config) (*timebox.Store, error)
- func (p *Persistence) Ready() <-chan struct{}
- func (p *Persistence) SaveSnapshot(req timebox.SnapshotRequest) error
- func (p *Persistence) State() State
- type Publisher
- type Server
- type ServerAddress
- type ServerID
- type SnapshotCommand
- type State
Constants ¶
const ( CmdTypeAppend = 0 CmdTypeSnapshot = 1 )
const ( // DefaultLogTailSize is the default hot retained WAL cache size DefaultLogTailSize = 20480 // MinLogTailSize is the smallest allowed hot retained WAL cache size MinLogTailSize = 2048 )
const DefaultApplyTimeout = 10 * time.Second
DefaultApplyTimeout bounds one local proposal round trip
Variables ¶
var ( // ErrUnexpectedApplyResult indicates the FSM returned an unexpected result ErrUnexpectedApplyResult = errors.New("unexpected raft apply result") // ErrCommandTypeUnknown indicates the FSM received an unknown command type ErrCommandTypeUnknown = errors.New("unknown command type") )
var ( // ErrLocalIDRequired indicates the local Raft server ID is required ErrLocalIDRequired = errors.New("raft local ID is required") // ErrDataDirRequired indicates durable local storage must be configured ErrDataDirRequired = errors.New("raft data directory is required") // ErrAddressRequired indicates the Raft TCP listener is required ErrAddressRequired = errors.New("raft address is required") // ErrInvalidAddress indicates a raft address must be a valid host:port ErrInvalidAddress = errors.New("raft address must be a valid host:port") // ErrBootstrapMissingLocalServer indicates the bootstrap voter set must // include the local node ErrBootstrapMissingLocalServer = errors.New( "bootstrap servers must include the local raft ID", ) )
var (
ErrTransportClosed = errors.New("transport closed")
)
Functions ¶
func AggregateMetaKey ¶
AggregateMetaKey returns the metadata key for one encoded aggregate ID
func AggregateMetaPrefix ¶
func AggregateMetaPrefix() []byte
AggregateMetaPrefix returns the key prefix for aggregate metadata
Types ¶
type AggregateMeta ¶
type AggregateMeta struct {
CurrentSequence int64
BaseSequence int64
SnapshotSequence int64
Status string
StatusAt int64
Labels map[string]string
}
AggregateMeta stores the derived aggregate state needed for reads
type ApplyResult ¶
type ApplyResult struct {
Append *timebox.AppendRequest
Error error
}
ApplyResult reports the local outcome of one applied Raft command
type Command ¶
type Command []byte
Command is the encoded form of one replicated Timebox mutation
func MakeAppendCommand ¶
func MakeAppendCommand( proposalID uint64, req *timebox.AppendRequest, ) (Command, error)
MakeAppendCommand encodes one append mutation into a Raft command
func MakeSnapshotCommand ¶
func MakeSnapshotCommand(proposalID uint64, sc *SnapshotCommand) Command
MakeSnapshotCommand encodes one snapshot mutation into a Raft command
func (Command) AppendRequest ¶
func (c Command) AppendRequest() (*timebox.AppendRequest, error)
AppendRequest decodes an append request from the command payload
func (Command) ProposalID ¶
ProposalID returns the encoded proposal ID
func (Command) SnapshotRequest ¶
func (c Command) SnapshotRequest() (*SnapshotCommand, error)
SnapshotRequest decodes a snapshot request from the command payload
type Config ¶
type Config struct {
// Local state
LocalID string
DataDir string
// LogTailSize is the hot retained WAL suffix cache size in entries
LogTailSize int
// Cluster identity
Address string
Servers []Server
Publisher Publisher
}
Config defines one opinionated Raft persistence node
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns the opinionated defaults for one Raft node
func (Config) LocalServer ¶
LocalServer returns the local server entry derived from this config
type Persistence ¶
type Persistence struct {
Config
// contains filtered or unexported fields
}
Persistence applies Timebox writes through Raft and serves reads from local materialized state
func NewPersistence ¶
func NewPersistence(cfgs ...Config) (*Persistence, error)
NewPersistence opens one Raft persistence node
func (*Persistence) Append ¶
func (p *Persistence) Append(req timebox.AppendRequest) error
Append proposes one append mutation through the local Raft node
func (*Persistence) Close ¶
func (p *Persistence) Close() error
Close stops raft and closes local durable state
func (*Persistence) GetAggregateStatus ¶
func (p *Persistence) GetAggregateStatus( id timebox.AggregateID, ) (string, error)
GetAggregateStatus returns the current derived status for one aggregate
func (*Persistence) LeaderWithID ¶
func (p *Persistence) LeaderWithID() (ServerAddress, ServerID)
LeaderWithID returns the current leader address and server ID
func (*Persistence) ListAggregates ¶
func (p *Persistence) ListAggregates( id timebox.AggregateID, ) ([]timebox.AggregateID, error)
ListAggregates lists known aggregate IDs that share the given prefix
func (*Persistence) ListAggregatesByLabel ¶
func (p *Persistence) ListAggregatesByLabel( label, value string, ) ([]timebox.AggregateID, error)
ListAggregatesByLabel lists aggregates currently indexed by label/value
func (*Persistence) ListAggregatesByStatus ¶
func (p *Persistence) ListAggregatesByStatus( status string, ) ([]timebox.StatusEntry, error)
ListAggregatesByStatus lists aggregates currently indexed by status
func (*Persistence) ListLabelValues ¶
func (p *Persistence) ListLabelValues(label string) ([]string, error)
ListLabelValues lists the current indexed values for one label
func (*Persistence) LoadEvents ¶
func (p *Persistence) LoadEvents( req timebox.LoadEventsRequest, ) (*timebox.EventsResult, error)
LoadEvents loads events for one aggregate starting at the requested sequence
func (*Persistence) LoadSnapshot ¶
func (p *Persistence) LoadSnapshot( req timebox.LoadSnapshotRequest, ) (*timebox.SnapshotRecord, error)
LoadSnapshot returns the latest snapshot and tail events for one aggregate
func (*Persistence) Ready ¶
func (p *Persistence) Ready() <-chan struct{}
Ready closes once the node is ready to serve leader-directed traffic
func (*Persistence) SaveSnapshot ¶
func (p *Persistence) SaveSnapshot(req timebox.SnapshotRequest) error
SaveSnapshot proposes one Timebox snapshot mutation through Raft
func (*Persistence) State ¶
func (p *Persistence) State() State
State returns the current local Raft role
type ServerAddress ¶
type ServerAddress string
ServerAddress is the advertised Raft transport address for one node
type SnapshotCommand ¶
type SnapshotCommand struct {
ID timebox.AggregateID
Data []byte
Sequence int64
TrimEvents bool
}
SnapshotCommand carries one Timebox snapshot mutation through Raft