Documentation
¶
Index ¶
- Constants
- Variables
- func ComputeForkChoiceHash(nodeID proto.NodeID, hlc *timestamppb.Timestamp) []byte
- func ForkChoiceLess(a, b []byte) bool
- func GenerateKeyPair() ([]byte, []byte, error)
- func MarshalEffect(eff *pb.Effect) ([]byte, error)
- func Merge2(a, b *pb.ReducedEffect) *pb.ReducedEffect
- func MergeN(branches []*pb.ReducedEffect) *pb.ReducedEffect
- func ReduceBranch(effects []*pb.Effect) *pb.ReducedEffect
- func ReduceChain(seed *pb.ReducedEffect, effects []*pb.Effect) *pb.ReducedEffect
- func UnmarshalEffect(data []byte, eff *pb.Effect) error
- type Broadcaster
- type BunnyStorage
- type Context
- func (c *Context) Abort()
- func (c *Context) BeginTx()
- func (c *Context) CheckWatches() bool
- func (c *Context) ClearWatches()
- func (c *Context) Emit(eff *pb.Effect, snapshotTips ...[]Tip) error
- func (c *Context) Flush() error
- func (c *Context) GetSnapshot(key string) (*pb.ReducedEffect, []Tip, error)
- func (c *Context) PendingKeys() []string
- func (c *Context) RestoreSavepoint(sp *ContextSavepoint)
- func (c *Context) SetTraceCtx(ctx context.Context)
- func (c *Context) TakeSavepoint() *ContextSavepoint
- func (c *Context) TraceCtx() context.Context
- func (c *Context) Watch(key string)
- type ContextSavepoint
- type Encryptor
- type Engine
- func (e *Engine) CheckSerializationLeader(key string) *pb.NodeID
- func (e *Engine) Close() error
- func (e *Engine) EffectCache() *clox.CloxCache[Tip, *pb.Effect]
- func (e *Engine) FlushIndex()
- func (e *Engine) Forward(commandName string, args [][]byte, keys []string, username string) []byte
- func (e *Engine) ForwardExec(commands []ForwardCommand, watchedKeys []string, username string) []byte
- func (e *Engine) GetLock(key string) *sync.Mutex
- func (e *Engine) GetSnapshot(key string) (*pb.ReducedEffect, []Tip, int, error)
- func (e *Engine) HandleNack(nack *pb.NackNotify) error
- func (e *Engine) HandleRemote(notify *pb.OffsetNotify) ([]*pb.NackNotify, error)
- func (e *Engine) KeyCount() int64
- func (e *Engine) MatchKeys(pattern string) []string
- func (e *Engine) NewContext() *Context
- func (e *Engine) NodeID() pb.NodeID
- func (e *Engine) ReconvergeAllKeys()
- func (e *Engine) RecordSerializationActivity(key string)
- func (e *Engine) ScanKeys(after string, pattern string, fn func(key string) bool)
- func (e *Engine) SetBroadcaster(b Broadcaster)
- func (e *Engine) SetRTTProvider(p PeerRTTProvider)
- func (e *Engine) StartAntiEntropy(interval time.Duration)
- func (e *Engine) UpdateSafetyRules(defaultMode SafetyMode, rules []KeyRangeRule)
- type EngineConfig
- type ForwardCommand
- type HorizonSet
- type KeyRangeRule
- type MemoryStorage
- type ObjectStorage
- type PeerRTTProvider
- type SafetyMode
- type StateCache
- type Tip
Constants ¶
const DefaultSerializationThreshold = 3
DefaultSerializationThreshold is the number of consecutive aborts on a key before escalating to serialized coordination.
const FlushKey = "\x00"
FlushKey is the special key used to signal a full index wipe (FLUSHDB/FLUSHALL).
Variables ¶
var ErrBootstrapIncomplete = errors.New("bootstrap incomplete: some peers unreachable")
ErrBootstrapIncomplete is returned by ensureSubscribed when the bootstrap could not fetch the full causal chain because some peers are unreachable. A background retry continues until the chain is complete.
var ErrRegionPartitioned = errors.New("region partitioned: not all same-region peers are reachable")
ErrRegionPartitioned is returned by Flush in SafeMode when not all same-region peers are reachable.
var ErrTxnAborted = errors.New("transaction aborted")
ErrTxnAborted is returned by Flush when a transaction loses FWW or encounters a real conflict that cannot be resolved.
Functions ¶
func ComputeForkChoiceHash ¶
func ComputeForkChoiceHash(nodeID proto.NodeID, hlc *timestamppb.Timestamp) []byte
ComputeForkChoiceHash computes SHA-256(nodeID_LE8 || hlc_LE8). The result is a deterministic 32-byte hash used for fork-choice tiebreaking. Lower hash wins, eliminating systematic advantage from raw HLC comparison.
func ForkChoiceLess ¶
ForkChoiceLess returns true if hash a is lexicographically less than hash b. Lower hash wins the fork-choice election.
func GenerateKeyPair ¶
GenerateKeyPair generates a new HPKE MLKEM768X25519 keypair. Returns (publicKeyBytes, privateKeyBytes, error).
func MarshalEffect ¶ added in v0.2.0
MarshalEffect serializes an Effect to its wire bytes, hex-encoding any map<string, X> keys inside an embedded ReducedEffect (Snapshot state). Proto3 requires string map keys to be valid UTF-8, but Redis-style collections (hashes, sets, zsets, streams) accept arbitrary byte sequences as element IDs — and the cluster membership encodes a uint64 as raw little-endian bytes. Without encoding, proto.Marshal silently rejects snapshots whose state has any binary-keyed elements.
The encoding is hex (always-on, no discriminator). The reverse step is in UnmarshalEffect. The original *pb.Effect is not mutated; if any sanitization is required the snapshot state is cloned first.
func Merge2 ¶
func Merge2(a, b *pb.ReducedEffect) *pb.ReducedEffect
Merge2 merges two reduced effects from concurrent branches into one. The result is always a freshly constructed *pb.ReducedEffect — inputs are never mutated, making reduced effects safe to treat as snapshots.
Decision tree:
- Both commutative, same collection → accumulate (sum/max/union)
- Both non-commutative → HLC tiebreaker
- Mixed (comm + non-comm), compatible → apply comm on top of non-comm
- Mixed, incompatible → non-comm wins
- Cross-collection → HLC tiebreaker
func MergeN ¶
func MergeN(branches []*pb.ReducedEffect) *pb.ReducedEffect
MergeN merges N reduced effects from concurrent branches using canonical merge ordering. This replaces pairwise Merge2 for multi-tip reconstruction.
Canonical ordering:
- Non-commutative branches first. Among them, LWW applies — highest HLC wins. Losing non-commutative branches are dead and skipped entirely.
- Commutative branches second. All commutative branches accumulate on top of the non-commutative winner (or on top of each other if no non-comm).
The result is deterministic: any node with the same set of branches computes the same result regardless of the order branches are provided. Inputs are never mutated.
func ReduceBranch ¶
func ReduceBranch(effects []*pb.Effect) *pb.ReducedEffect
ReduceBranch reduces a linear chain of effects (oldest-first) into a single ReducedEffect.
func ReduceChain ¶
func ReduceChain(seed *pb.ReducedEffect, effects []*pb.Effect) *pb.ReducedEffect
ReduceChain reduces effects sequentially on top of a seed ReducedEffect. If seed is nil, behaves identically to ReduceBranch. This is used at DAG merge points: the seed is the merged result of concurrent dep subtrees, and effects are the linear chain above. The seed is never mutated; a clone is made before any modifications.
Types ¶
type Broadcaster ¶
type Broadcaster interface {
Broadcast(notify *pb.OffsetNotify)
BroadcastWithData(notify *pb.OffsetNotify, effectData []byte)
Replicate(notify *pb.OffsetNotify, wireData []byte) error
// ReplicateTo sends to a specific peer and waits for ACK/NACK.
// Returns NackNotify slice on conflict, nil on ACK.
ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetNodeID pb.NodeID) ([]*pb.NackNotify, error)
// SendNack sends an enriched NACK to the originator.
SendNack(nack *pb.NackNotify, targetNodeID pb.NodeID)
FetchFromAny(ref *pb.EffectRef) ([]byte, error)
Fetch(ref *pb.EffectRef) ([]byte, error)
PeerIDs() []pb.NodeID
// AllRegionPeersReachable returns true if every same-region peer is
// alive and has a verified symmetric path. Used by SafeMode to gate
// writes: a key must not be written unless all region peers are reachable.
AllRegionPeersReachable() bool
// InMajorityPartition returns true if this node can reach a strict
// majority of same-region nodes (including itself). Used by SafeMode
// to block transactions when in a minority partition.
InMajorityPartition() bool
// ForwardTransaction sends a transaction to a specific peer for execution
// (adaptive serialization §5). Returns the leader's response.
ForwardTransaction(ctx context.Context, targetNodeID pb.NodeID, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)
}
Broadcaster sends effect notifications to cluster peers. Nil means standalone mode.
type BunnyStorage ¶
type BunnyStorage struct {
// contains filtered or unexported fields
}
BunnyStorage implements ObjectStorage using Bunny CDN's storage API.
func NewBunnyStorage ¶
func NewBunnyStorage(uploadEndpoint, cdnEndpoint, accessKey string) *BunnyStorage
NewBunnyStorage creates a BunnyStorage instance.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context tracks per-key state for dep chaining (Emit) and deferred index update + broadcast (Flush). One Context per command invocation.
func (*Context) Abort ¶
func (c *Context) Abort()
Abort discards pending index updates and broadcasts. Effects already written to the log remain durable but invisible (index not updated).
func (*Context) BeginTx ¶
func (c *Context) BeginTx()
BeginTx marks subsequent effects as transactional. This is called by read-modify-write commands (INCR, LPUSH, etc.) for atomicity AND by handleExec for MULTI/EXEC. Watch processing is NOT done here — use CheckWatches after BeginTx for EXEC.
func (*Context) CheckWatches ¶
CheckWatches validates WATCH observations and emits transactional NOOPs for the Bind's read set. Must be called AFTER BeginTx (so NOOPs get IsTransactional=true). Only called from handleExec.
Returns false if any watched key was modified — the caller should abort the transaction and return a null array to the client.
func (*Context) ClearWatches ¶
func (c *Context) ClearWatches()
ClearWatches removes all watched keys without affecting transaction state.
func (*Context) Emit ¶
Emit writes a single effect to the log. The context tracks per-key state so that consecutive effects on the same key form a dep chain. The index is NOT updated until Flush.
For read-modify-write commands, pass the tip offsets returned by GetSnapshot as snapshotTips so that the first effect depends on the tips the handler actually read, not whatever the index contains now. Pure writes (SET, LPUSH) omit snapshotTips and Emit reads the index.
func (*Context) Flush ¶
Flush updates the index for all touched keys and broadcasts all notifications per key for durability, then resets the context for reuse.
func (*Context) GetSnapshot ¶
GetSnapshot returns the current materialized state of a key, including any unflushed effects from this context. Within MULTI/EXEC, earlier commands' effects are in the log but not yet in the index; this method reconstructs from the log so commands within the same transaction can see each other's writes.
When a txSnapshot is active (MULTI/EXEC with SSI), reads for keys not yet in the context use the snapshot instead of the live index, and a NOOP is emitted to record the read in the causal structure.
func (*Context) PendingKeys ¶
PendingKeys returns the names of all keys with pending effects in this Context. Callers use this to acquire per-key locks before calling Flush — Flush's fork-choice critical section races with any other Flush on the same key, and the handler layer (redis handler, sql handler) is where that serialisation belongs.
func (*Context) RestoreSavepoint ¶
func (c *Context) RestoreSavepoint(sp *ContextSavepoint)
RestoreSavepoint replaces the Context's pending-key state with the snapshot. Any Emit issued after TakeSavepoint is discarded from the Context; its log offsets remain in the engine's log but never reach the index (identical to what happens to all effects on Abort).
func (*Context) SetTraceCtx ¶
SetTraceCtx sets the OTel trace context for this command.
func (*Context) TakeSavepoint ¶
func (c *Context) TakeSavepoint() *ContextSavepoint
TakeSavepoint captures a deep-enough copy of the Context's pending key state that RestoreSavepoint can revert every Emit issued between Take and Restore. Cheap: each contextKey is a small struct with slice fields that we clone by reference (the underlying values — notifies, elementIDs — are immutable once added).
func (*Context) TraceCtx ¶
TraceCtx returns the OTel trace context, or context.Background() if none set.
func (*Context) Watch ¶
Watch records a key for optimistic locking by emitting a NoopEffect immediately and flushing it. This records the observation in the causal log so all nodes can verify whether the key was modified between WATCH and EXEC. The NOOP offset and post-flush TipSet pointer are stored for comparison at BeginTx time.
type ContextSavepoint ¶
type ContextSavepoint struct {
// contains filtered or unexported fields
}
ContextSavepoint is an opaque snapshot of a Context's pending per-key state. Restore it (via Context.RestoreSavepoint) to discard every Emit made after the snapshot was taken; already- written log offsets become orphans (never indexed), same as Abort. Used by higher-level SQL SAVEPOINT semantics.
Savepoints do NOT snapshot the engine's committed index — the engine's state is unchanged until Flush, so reverting the Context's pending keys is sufficient to roll back.
type Encryptor ¶
type Encryptor struct {
// contains filtered or unexported fields
}
Encryptor handles HPKE encryption/decryption with zstd compression. Uses MLKEM768X25519 KEM for post-quantum hybrid safety, HKDF-SHA256 for KDF, and AES-256-GCM for AEAD.
func NewEncryptor ¶
NewEncryptor creates an Encryptor from raw public/private key bytes. privKey may be nil for encrypt-only nodes.
func (*Encryptor) OpenAndDecompress ¶
OpenAndDecompress decrypts with HPKE, then decompresses with zstd.
type Engine ¶
type Engine struct {
// Notification callbacks — fired after effects are durable
OnKeyDataAdded func(key string) // wake oldest waiter (data inserted)
OnKeyDeleted func(key string) // wake all waiters (key removed)
OnFlushAll func() // wake all waiters across all keys
// contains filtered or unexported fields
}
Engine is the central coordinator for the causal effect log. Lock-free: the log uses CAS, the index manages its own concurrency, and safety config is swapped atomically.
func NewEngine ¶
func NewEngine(cfg EngineConfig) *Engine
NewEngine creates a new Engine from the given configuration.
func NewTestEngine ¶
func NewTestEngine() *Engine
NewTestEngine creates a minimal Engine for use in tests outside this package.
func (*Engine) CheckSerializationLeader ¶
CheckSerializationLeader returns the serialization leader for a key, or nil if no serialization is active.
func (*Engine) Close ¶
Close performs graceful shutdown of the engine and its background components.
func (*Engine) EffectCache ¶
EffectCache returns the engine's deserialized effect cache for use by the fetch handler (serves effects from cache when the log is unavailable).
func (*Engine) FlushIndex ¶
func (e *Engine) FlushIndex()
FlushIndex deletes all keys from the index and evicts all cache entries.
func (*Engine) Forward ¶
Forward checks if any key has a serialization leader on another node. If so, forwards the command and returns raw RESP bytes. Returns nil if the command should execute locally.
func (*Engine) ForwardExec ¶
func (e *Engine) ForwardExec(commands []ForwardCommand, watchedKeys []string, username string) []byte
ForwardExec checks if any queued command touches a serialized key. If so, forwards the entire transaction and returns raw RESP bytes. Returns nil for local execution.
func (*Engine) GetSnapshot ¶
GetSnapshot returns the current materialized state of a key and the tip offsets the snapshot was derived from. Callers that perform read-modify-write (SETBIT, INCR, etc.) must pass the returned tips to Emit so the first effect depends on the tips the snapshot was actually computed from, not whatever the index contains at Emit time.
Cache hit returns immediately. On miss, walks the causal DAG from the index tip set and reconstructs via ReduceBranch + canonical merge.
func (*Engine) HandleNack ¶
func (e *Engine) HandleNack(nack *pb.NackNotify) error
HandleNack processes a NACK from a remote peer.
func (*Engine) HandleRemote ¶
func (e *Engine) HandleRemote(notify *pb.OffsetNotify) ([]*pb.NackNotify, error)
HandleRemote processes a remote effect notification: stores the effect in the log, updates the index, and returns NACKs if deps don't match tips.
EffectData may be in wire format [4-byte LE keyLen][key][protoData] or raw proto bytes. Both are handled transparently.
Returns all NACKs generated (one per diverged key) so the caller can send them synchronously as the ReplicateTo response.
func (*Engine) MatchKeys ¶
MatchKeys returns all keys matching the glob pattern. Uses a point-in-time snapshot for consistency.
func (*Engine) NewContext ¶
NewContext creates a new write context bound to the engine.
func (*Engine) ReconvergeAllKeys ¶
func (e *Engine) ReconvergeAllKeys()
ReconvergeAllKeys triggers a debounced background anti-entropy pass to fetch any effects missed during the partition. Multiple calls within the debounce window are coalesced into a single pass. This replaces the previous approach of deleting all subscriptions, which caused a thundering herd of blocking re-bootstraps on the read path and cascading peer timeouts.
func (*Engine) RecordSerializationActivity ¶
RecordSerializationActivity updates the last activity timestamp for a serialized key. Called when the leader processes a forwarded command or performs a local write on a serialized key.
func (*Engine) ScanKeys ¶
ScanKeys iterates over keys matching the glob pattern, starting after `after`. Pass empty string for `after` to start from the beginning. Return false from fn to stop iteration.
func (*Engine) SetBroadcaster ¶
func (e *Engine) SetBroadcaster(b Broadcaster)
SetBroadcaster sets the broadcaster for replicating effects to peers. Must be called before any Emit/Flush if cluster mode is desired.
Also lazy-inits the horizon set if not already present: NewEngine gates horizon on cfg.Broadcaster != nil, but the bootstrap path in beacon/runtime.go constructs the engine first (with a nil Broadcaster) and wires the PeerManager as broadcaster afterwards (chicken-and-egg: PeerManager needs engine to build the effect handler). Without this lazy init, horizon stays nil for the life of the engine and bind-arrival visibility isn't deferred — peers see aborting-txn effects before fork-choice has settled, which surfaces as Elle :incompatible-order on cross-node reads.
func (*Engine) SetRTTProvider ¶
func (e *Engine) SetRTTProvider(p PeerRTTProvider)
SetRTTProvider sets the RTT provider for adaptive serialization leader selection.
func (*Engine) StartAntiEntropy ¶
StartAntiEntropy launches a background goroutine that periodically exchanges tips with peers and fetches missing effect chains. This ensures effects missed during partitions are eventually discovered without polluting the log with redundant subscription effects. Also starts the reconvergence debounce loop that coalesces peer recovery events into background anti-entropy passes.
func (*Engine) UpdateSafetyRules ¶
func (e *Engine) UpdateSafetyRules(defaultMode SafetyMode, rules []KeyRangeRule)
UpdateSafetyRules atomically replaces the key-range safety configuration.
type EngineConfig ¶
type EngineConfig struct {
NodeID pb.NodeID
Index keytrie.KeyIndex
Cache StateCache // nil disables caching
Broadcaster Broadcaster // nil for standalone
RTTProvider PeerRTTProvider // nil disables RTT-based leader selection
DefaultMode SafetyMode
KeyRangeRules []KeyRangeRule
MemoryLimit int64 // memory budget for effect cache (0 = 10MB default)
// MemoryLimitPercent, when non-zero, expresses the cache budget as a
// fraction (0,1] of the memory available to this process. Takes
// precedence over MemoryLimit. The cache enforces this via a live
// re-evaluating tick so cgroup/system changes propagate.
MemoryLimitPercent float64
HorizonTimeout time.Duration // horizon wait for bind visibility (0 = 500ms default)
}
EngineConfig holds configuration for creating an Engine.
func (*EngineConfig) ModeForKey ¶
func (c *EngineConfig) ModeForKey(key string) SafetyMode
ModeForKey returns the safety mode for a key. First matching rule wins.
type ForwardCommand ¶
ForwardCommand describes a single command to be forwarded.
type HorizonSet ¶
type HorizonSet struct {
// contains filtered or unexported fields
}
HorizonSet tracks Binds in their horizon wait period. Effects from invisible Binds are excluded from reconstruction until the horizon completes (timer fires or fast-path all-ACK).
func (*HorizonSet) Add ¶
func (h *HorizonSet) Add(txnID string, bindOffset Tip, bind *pb.TransactionalBindEffect, relevantPeers []pb.NodeID)
Add registers a Bind in the invisible set. If the Bind's keys overlap with an existing group (shared consumed tips), the Bind joins that group and the timer is reset. Otherwise a new group is created.
relevantPeers is the set of peers whose concurrent binds we need to wait for before making this bind visible. On the origin that's the subscribers we just replicated to. On a remote-arrival it's all currently-alive peers (any of them may hold a competing bind we haven't seen yet). Timeout is computed from their RTTs.
func (*HorizonSet) IsInvisible ¶
func (h *HorizonSet) IsInvisible(txnID string) bool
IsInvisible returns true if the given txnID is in the invisible set.
func (*HorizonSet) MakeVisible ¶
func (h *HorizonSet) MakeVisible(txnID string)
MakeVisible promotes an entire group: removes all txnIDs from the invisible set, cleans up pendingTxTips, evicts cache, and fires OnKeyDataAdded callbacks.
func (*HorizonSet) StopAll ¶
func (h *HorizonSet) StopAll()
StopAll stops all active timers. Called during Engine.Close.
type KeyRangeRule ¶
type KeyRangeRule struct {
Pattern string
Mode SafetyMode
}
KeyRangeRule maps a key pattern to a safety mode.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is an in-memory ObjectStorage implementation for testing.
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates a new MemoryStorage.
func (*MemoryStorage) Keys ¶
func (m *MemoryStorage) Keys() []string
Keys returns all stored paths (for testing).
type ObjectStorage ¶
type ObjectStorage interface {
Upload(ctx context.Context, path string, data []byte) error
Download(ctx context.Context, path string) ([]byte, error)
}
ObjectStorage abstracts uploading/downloading blobs to/from object storage.
type PeerRTTProvider ¶
type PeerRTTProvider interface {
// GetRTT returns the estimated round-trip time to the given peer.
// Returns 0 if the peer is unknown or RTT has not been measured.
GetRTT(nodeID pb.NodeID) time.Duration
// AlivePeerIDs returns the IDs of all peers that are currently alive.
AlivePeerIDs() []pb.NodeID
}
PeerRTTProvider provides RTT measurements to peers for optimal leader selection.
type SafetyMode ¶
type SafetyMode int
SafetyMode determines write behavior during network partitions.
const ( SafeMode SafetyMode = iota // blocks writes when quorum is unreachable UnsafeMode // allows writes during partitions, forming branches )
type StateCache ¶
type StateCache interface {
Get(key string) (*pb.ReducedEffect, bool)
Put(key string, value *pb.ReducedEffect)
Evict(key string)
}
StateCache caches materialized ReducedEffect values by key. Implementations must be safe for concurrent use.