effects

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultSerializationThreshold = 3

DefaultSerializationThreshold is the number of consecutive aborts on a key before escalating to serialized coordination.

View Source
const FlushKey = "\x00"

FlushKey is the special key used to signal a full index wipe (FLUSHDB/FLUSHALL).

Variables

View Source
var ErrBootstrapIncomplete = errors.New("bootstrap incomplete: some peers unreachable")

ErrBootstrapIncomplete is returned by ensureSubscribed when the bootstrap could not fetch the full causal chain because some peers are unreachable. A background retry continues until the chain is complete.

View Source
var ErrRegionPartitioned = errors.New("region partitioned: not all same-region peers are reachable")

ErrRegionPartitioned is returned by Flush in SafeMode when not all same-region peers are reachable.

View Source
var ErrTxnAborted = errors.New("transaction aborted")

ErrTxnAborted is returned by Flush when a transaction loses FWW or encounters a real conflict that cannot be resolved.

Functions

func ComputeForkChoiceHash

func ComputeForkChoiceHash(nodeID proto.NodeID, hlc *timestamppb.Timestamp) []byte

ComputeForkChoiceHash computes SHA-256(nodeID_LE8 || hlc_LE8). The result is a deterministic 32-byte hash used for fork-choice tiebreaking. Lower hash wins, eliminating systematic advantage from raw HLC comparison.

func ForkChoiceLess

func ForkChoiceLess(a, b []byte) bool

ForkChoiceLess returns true if hash a is lexicographically less than hash b. Lower hash wins the fork-choice election.

func GenerateKeyPair

func GenerateKeyPair() ([]byte, []byte, error)

GenerateKeyPair generates a new HPKE MLKEM768X25519 keypair. Returns (publicKeyBytes, privateKeyBytes, error).

func MarshalEffect added in v0.2.0

func MarshalEffect(eff *pb.Effect) ([]byte, error)

MarshalEffect serializes an Effect to its wire bytes, hex-encoding any map<string, X> keys inside an embedded ReducedEffect (Snapshot state). Proto3 requires string map keys to be valid UTF-8, but Redis-style collections (hashes, sets, zsets, streams) accept arbitrary byte sequences as element IDs — and the cluster membership encodes a uint64 as raw little-endian bytes. Without encoding, proto.Marshal silently rejects snapshots whose state has any binary-keyed elements.

The encoding is hex (always-on, no discriminator). The reverse step is in UnmarshalEffect. The original *pb.Effect is not mutated; if any sanitization is required the snapshot state is cloned first.

func Merge2

func Merge2(a, b *pb.ReducedEffect) *pb.ReducedEffect

Merge2 merges two reduced effects from concurrent branches into one. The result is always a freshly constructed *pb.ReducedEffect — inputs are never mutated, making reduced effects safe to treat as snapshots.

Decision tree:

  • Both commutative, same collection → accumulate (sum/max/union)
  • Both non-commutative → HLC tiebreaker
  • Mixed (comm + non-comm), compatible → apply comm on top of non-comm
  • Mixed, incompatible → non-comm wins
  • Cross-collection → HLC tiebreaker

func MergeN

func MergeN(branches []*pb.ReducedEffect) *pb.ReducedEffect

MergeN merges N reduced effects from concurrent branches using canonical merge ordering. This replaces pairwise Merge2 for multi-tip reconstruction.

Canonical ordering:

  1. Non-commutative branches first. Among them, LWW applies — highest HLC wins. Losing non-commutative branches are dead and skipped entirely.
  2. Commutative branches second. All commutative branches accumulate on top of the non-commutative winner (or on top of each other if no non-comm).

The result is deterministic: any node with the same set of branches computes the same result regardless of the order branches are provided. Inputs are never mutated.

func ReduceBranch

func ReduceBranch(effects []*pb.Effect) *pb.ReducedEffect

ReduceBranch reduces a linear chain of effects (oldest-first) into a single ReducedEffect.

func ReduceChain

func ReduceChain(seed *pb.ReducedEffect, effects []*pb.Effect) *pb.ReducedEffect

ReduceChain reduces effects sequentially on top of a seed ReducedEffect. If seed is nil, behaves identically to ReduceBranch. This is used at DAG merge points: the seed is the merged result of concurrent dep subtrees, and effects are the linear chain above. The seed is never mutated; a clone is made before any modifications.

func UnmarshalEffect added in v0.2.0

func UnmarshalEffect(data []byte, eff *pb.Effect) error

UnmarshalEffect parses wire bytes into eff and reverses the hex encoding applied by MarshalEffect on any embedded ReducedEffect (Snapshot state).

Types

type Broadcaster

type Broadcaster interface {
	Broadcast(notify *pb.OffsetNotify)
	BroadcastWithData(notify *pb.OffsetNotify, effectData []byte)
	Replicate(notify *pb.OffsetNotify, wireData []byte) error
	// ReplicateTo sends to a specific peer and waits for ACK/NACK.
	// Returns NackNotify slice on conflict, nil on ACK.
	ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetNodeID pb.NodeID) ([]*pb.NackNotify, error)
	// SendNack sends an enriched NACK to the originator.
	SendNack(nack *pb.NackNotify, targetNodeID pb.NodeID)
	FetchFromAny(ref *pb.EffectRef) ([]byte, error)
	Fetch(ref *pb.EffectRef) ([]byte, error)
	PeerIDs() []pb.NodeID
	// AllRegionPeersReachable returns true if every same-region peer is
	// alive and has a verified symmetric path. Used by SafeMode to gate
	// writes: a key must not be written unless all region peers are reachable.
	AllRegionPeersReachable() bool
	// InMajorityPartition returns true if this node can reach a strict
	// majority of same-region nodes (including itself). Used by SafeMode
	// to block transactions when in a minority partition.
	InMajorityPartition() bool
	// ForwardTransaction sends a transaction to a specific peer for execution
	// (adaptive serialization §5). Returns the leader's response.
	ForwardTransaction(ctx context.Context, targetNodeID pb.NodeID, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)
}

Broadcaster sends effect notifications to cluster peers. Nil means standalone mode.

type BunnyStorage

type BunnyStorage struct {
	// contains filtered or unexported fields
}

BunnyStorage implements ObjectStorage using Bunny CDN's storage API.

func NewBunnyStorage

func NewBunnyStorage(uploadEndpoint, cdnEndpoint, accessKey string) *BunnyStorage

NewBunnyStorage creates a BunnyStorage instance.

func (*BunnyStorage) Download

func (b *BunnyStorage) Download(ctx context.Context, path string) ([]byte, error)

func (*BunnyStorage) Upload

func (b *BunnyStorage) Upload(ctx context.Context, path string, data []byte) error

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context tracks per-key state for dep chaining (Emit) and deferred index update + broadcast (Flush). One Context per command invocation.

func (*Context) Abort

func (c *Context) Abort()

Abort discards pending index updates and broadcasts. Effects already written to the log remain durable but invisible (index not updated).

func (*Context) BeginTx

func (c *Context) BeginTx()

BeginTx marks subsequent effects as transactional. This is called by read-modify-write commands (INCR, LPUSH, etc.) for atomicity AND by handleExec for MULTI/EXEC. Watch processing is NOT done here — use CheckWatches after BeginTx for EXEC.

func (*Context) CheckWatches

func (c *Context) CheckWatches() bool

CheckWatches validates WATCH observations and emits transactional NOOPs for the Bind's read set. Must be called AFTER BeginTx (so NOOPs get IsTransactional=true). Only called from handleExec.

Returns false if any watched key was modified — the caller should abort the transaction and return a null array to the client.

func (*Context) ClearWatches

func (c *Context) ClearWatches()

ClearWatches removes all watched keys without affecting transaction state.

func (*Context) Emit

func (c *Context) Emit(eff *pb.Effect, snapshotTips ...[]Tip) error

Emit writes a single effect to the log. The context tracks per-key state so that consecutive effects on the same key form a dep chain. The index is NOT updated until Flush.

For read-modify-write commands, pass the tip offsets returned by GetSnapshot as snapshotTips so that the first effect depends on the tips the handler actually read, not whatever the index contains now. Pure writes (SET, LPUSH) omit snapshotTips and Emit reads the index.

func (*Context) Flush

func (c *Context) Flush() error

Flush updates the index for all touched keys and broadcasts all notifications per key for durability, then resets the context for reuse.

func (*Context) GetSnapshot

func (c *Context) GetSnapshot(key string) (*pb.ReducedEffect, []Tip, error)

GetSnapshot returns the current materialized state of a key, including any unflushed effects from this context. Within MULTI/EXEC, earlier commands' effects are in the log but not yet in the index; this method reconstructs from the log so commands within the same transaction can see each other's writes.

When a txSnapshot is active (MULTI/EXEC with SSI), reads for keys not yet in the context use the snapshot instead of the live index, and a NOOP is emitted to record the read in the causal structure.

func (*Context) PendingKeys

func (c *Context) PendingKeys() []string

PendingKeys returns the names of all keys with pending effects in this Context. Callers use this to acquire per-key locks before calling Flush — Flush's fork-choice critical section races with any other Flush on the same key, and the handler layer (redis handler, sql handler) is where that serialisation belongs.

func (*Context) RestoreSavepoint

func (c *Context) RestoreSavepoint(sp *ContextSavepoint)

RestoreSavepoint replaces the Context's pending-key state with the snapshot. Any Emit issued after TakeSavepoint is discarded from the Context; its log offsets remain in the engine's log but never reach the index (identical to what happens to all effects on Abort).

func (*Context) SetTraceCtx

func (c *Context) SetTraceCtx(ctx context.Context)

SetTraceCtx sets the OTel trace context for this command.

func (*Context) TakeSavepoint

func (c *Context) TakeSavepoint() *ContextSavepoint

TakeSavepoint captures a deep-enough copy of the Context's pending key state that RestoreSavepoint can revert every Emit issued between Take and Restore. Cheap: each contextKey is a small struct with slice fields that we clone by reference (the underlying values — notifies, elementIDs — are immutable once added).

func (*Context) TraceCtx

func (c *Context) TraceCtx() context.Context

TraceCtx returns the OTel trace context, or context.Background() if none set.

func (*Context) Watch

func (c *Context) Watch(key string)

Watch records a key for optimistic locking by emitting a NoopEffect immediately and flushing it. This records the observation in the causal log so all nodes can verify whether the key was modified between WATCH and EXEC. The NOOP offset and post-flush TipSet pointer are stored for comparison at BeginTx time.

type ContextSavepoint

type ContextSavepoint struct {
	// contains filtered or unexported fields
}

ContextSavepoint is an opaque snapshot of a Context's pending per-key state. Restore it (via Context.RestoreSavepoint) to discard every Emit made after the snapshot was taken; already- written log offsets become orphans (never indexed), same as Abort. Used by higher-level SQL SAVEPOINT semantics.

Savepoints do NOT snapshot the engine's committed index — the engine's state is unchanged until Flush, so reverting the Context's pending keys is sufficient to roll back.

type Encryptor

type Encryptor struct {
	// contains filtered or unexported fields
}

Encryptor handles HPKE encryption/decryption with zstd compression. Uses MLKEM768X25519 KEM for post-quantum hybrid safety, HKDF-SHA256 for KDF, and AES-256-GCM for AEAD.

func NewEncryptor

func NewEncryptor(pubKeyBytes, privKeyBytes []byte) (*Encryptor, error)

NewEncryptor creates an Encryptor from raw public/private key bytes. privKey may be nil for encrypt-only nodes.

func (*Encryptor) OpenAndDecompress

func (enc *Encryptor) OpenAndDecompress(sealed, info []byte) ([]byte, error)

OpenAndDecompress decrypts with HPKE, then decompresses with zstd.

func (*Encryptor) SealAndCompress

func (enc *Encryptor) SealAndCompress(plaintext, info []byte) ([]byte, error)

SealAndCompress compresses with zstd, then encrypts with HPKE. The info parameter enables domain separation (e.g. "effect" vs "tip-recovery").

type Engine

type Engine struct {

	// Notification callbacks — fired after effects are durable
	OnKeyDataAdded func(key string) // wake oldest waiter (data inserted)
	OnKeyDeleted   func(key string) // wake all waiters (key removed)
	OnFlushAll     func()           // wake all waiters across all keys
	// contains filtered or unexported fields
}

Engine is the central coordinator for the causal effect log. Lock-free: the log uses CAS, the index manages its own concurrency, and safety config is swapped atomically.

func NewEngine

func NewEngine(cfg EngineConfig) *Engine

NewEngine creates a new Engine from the given configuration.

func NewTestEngine

func NewTestEngine() *Engine

NewTestEngine creates a minimal Engine for use in tests outside this package.

func (*Engine) CheckSerializationLeader

func (e *Engine) CheckSerializationLeader(key string) *pb.NodeID

CheckSerializationLeader returns the serialization leader for a key, or nil if no serialization is active.

func (*Engine) Close

func (e *Engine) Close() error

Close performs graceful shutdown of the engine and its background components.

func (*Engine) EffectCache

func (e *Engine) EffectCache() *clox.CloxCache[Tip, *pb.Effect]

EffectCache returns the engine's deserialized effect cache for use by the fetch handler (serves effects from cache when the log is unavailable).

func (*Engine) FlushIndex

func (e *Engine) FlushIndex()

FlushIndex deletes all keys from the index and evicts all cache entries.

func (*Engine) Forward

func (e *Engine) Forward(commandName string, args [][]byte, keys []string, username string) []byte

Forward checks if any key has a serialization leader on another node. If so, forwards the command and returns raw RESP bytes. Returns nil if the command should execute locally.

func (*Engine) ForwardExec

func (e *Engine) ForwardExec(commands []ForwardCommand, watchedKeys []string, username string) []byte

ForwardExec checks if any queued command touches a serialized key. If so, forwards the entire transaction and returns raw RESP bytes. Returns nil for local execution.

func (*Engine) GetLock

func (e *Engine) GetLock(key string) *sync.Mutex

GetLock returns a striped lock for the given key using FNV-1a hashing.

func (*Engine) GetSnapshot

func (e *Engine) GetSnapshot(key string) (*pb.ReducedEffect, []Tip, int, error)

GetSnapshot returns the current materialized state of a key and the tip offsets the snapshot was derived from. Callers that perform read-modify-write (SETBIT, INCR, etc.) must pass the returned tips to Emit so the first effect depends on the tips the snapshot was actually computed from, not whatever the index contains at Emit time.

Cache hit returns immediately. On miss, walks the causal DAG from the index tip set and reconstructs via ReduceBranch + canonical merge.

func (*Engine) HandleNack

func (e *Engine) HandleNack(nack *pb.NackNotify) error

HandleNack processes a NACK from a remote peer.

func (*Engine) HandleRemote

func (e *Engine) HandleRemote(notify *pb.OffsetNotify) ([]*pb.NackNotify, error)

HandleRemote processes a remote effect notification: stores the effect in the log, updates the index, and returns NACKs if deps don't match tips.

EffectData may be in wire format [4-byte LE keyLen][key][protoData] or raw proto bytes. Both are handled transparently.

Returns all NACKs generated (one per diverged key) so the caller can send them synchronously as the ReplicateTo response.

func (*Engine) KeyCount

func (e *Engine) KeyCount() int64

KeyCount returns the number of keys in the index.

func (*Engine) MatchKeys

func (e *Engine) MatchKeys(pattern string) []string

MatchKeys returns all keys matching the glob pattern. Uses a point-in-time snapshot for consistency.

func (*Engine) NewContext

func (e *Engine) NewContext() *Context

NewContext creates a new write context bound to the engine.

func (*Engine) NodeID

func (e *Engine) NodeID() pb.NodeID

func (*Engine) ReconvergeAllKeys

func (e *Engine) ReconvergeAllKeys()

ReconvergeAllKeys triggers a debounced background anti-entropy pass to fetch any effects missed during the partition. Multiple calls within the debounce window are coalesced into a single pass. This replaces the previous approach of deleting all subscriptions, which caused a thundering herd of blocking re-bootstraps on the read path and cascading peer timeouts.

func (*Engine) RecordSerializationActivity

func (e *Engine) RecordSerializationActivity(key string)

RecordSerializationActivity updates the last activity timestamp for a serialized key. Called when the leader processes a forwarded command or performs a local write on a serialized key.

func (*Engine) ScanKeys

func (e *Engine) ScanKeys(after string, pattern string, fn func(key string) bool)

ScanKeys iterates over keys matching the glob pattern, starting after `after`. Pass empty string for `after` to start from the beginning. Return false from fn to stop iteration.

func (*Engine) SetBroadcaster

func (e *Engine) SetBroadcaster(b Broadcaster)

SetBroadcaster sets the broadcaster for replicating effects to peers. Must be called before any Emit/Flush if cluster mode is desired.

Also lazy-inits the horizon set if not already present: NewEngine gates horizon on cfg.Broadcaster != nil, but the bootstrap path in beacon/runtime.go constructs the engine first (with a nil Broadcaster) and wires the PeerManager as broadcaster afterwards (chicken-and-egg: PeerManager needs engine to build the effect handler). Without this lazy init, horizon stays nil for the life of the engine and bind-arrival visibility isn't deferred — peers see aborting-txn effects before fork-choice has settled, which surfaces as Elle :incompatible-order on cross-node reads.

func (*Engine) SetRTTProvider

func (e *Engine) SetRTTProvider(p PeerRTTProvider)

SetRTTProvider sets the RTT provider for adaptive serialization leader selection.

func (*Engine) StartAntiEntropy

func (e *Engine) StartAntiEntropy(interval time.Duration)

StartAntiEntropy launches a background goroutine that periodically exchanges tips with peers and fetches missing effect chains. This ensures effects missed during partitions are eventually discovered without polluting the log with redundant subscription effects. Also starts the reconvergence debounce loop that coalesces peer recovery events into background anti-entropy passes.

func (*Engine) UpdateSafetyRules

func (e *Engine) UpdateSafetyRules(defaultMode SafetyMode, rules []KeyRangeRule)

UpdateSafetyRules atomically replaces the key-range safety configuration.

type EngineConfig

type EngineConfig struct {
	NodeID        pb.NodeID
	Index         keytrie.KeyIndex
	Cache         StateCache      // nil disables caching
	Broadcaster   Broadcaster     // nil for standalone
	RTTProvider   PeerRTTProvider // nil disables RTT-based leader selection
	DefaultMode   SafetyMode
	KeyRangeRules []KeyRangeRule
	MemoryLimit   int64 // memory budget for effect cache (0 = 10MB default)
	// MemoryLimitPercent, when non-zero, expresses the cache budget as a
	// fraction (0,1] of the memory available to this process. Takes
	// precedence over MemoryLimit. The cache enforces this via a live
	// re-evaluating tick so cgroup/system changes propagate.
	MemoryLimitPercent float64
	HorizonTimeout     time.Duration // horizon wait for bind visibility (0 = 500ms default)
}

EngineConfig holds configuration for creating an Engine.

func (*EngineConfig) ModeForKey

func (c *EngineConfig) ModeForKey(key string) SafetyMode

ModeForKey returns the safety mode for a key. First matching rule wins.

type ForwardCommand

type ForwardCommand struct {
	Name string
	Args [][]byte
	Keys []string
}

ForwardCommand describes a single command to be forwarded.

type HorizonSet

type HorizonSet struct {
	// contains filtered or unexported fields
}

HorizonSet tracks Binds in their horizon wait period. Effects from invisible Binds are excluded from reconstruction until the horizon completes (timer fires or fast-path all-ACK).

func (*HorizonSet) Add

func (h *HorizonSet) Add(txnID string, bindOffset Tip, bind *pb.TransactionalBindEffect, relevantPeers []pb.NodeID)

Add registers a Bind in the invisible set. If the Bind's keys overlap with an existing group (shared consumed tips), the Bind joins that group and the timer is reset. Otherwise a new group is created.

relevantPeers is the set of peers whose concurrent binds we need to wait for before making this bind visible. On the origin that's the subscribers we just replicated to. On a remote-arrival it's all currently-alive peers (any of them may hold a competing bind we haven't seen yet). Timeout is computed from their RTTs.

func (*HorizonSet) IsInvisible

func (h *HorizonSet) IsInvisible(txnID string) bool

IsInvisible returns true if the given txnID is in the invisible set.

func (*HorizonSet) MakeVisible

func (h *HorizonSet) MakeVisible(txnID string)

MakeVisible promotes an entire group: removes all txnIDs from the invisible set, cleans up pendingTxTips, evicts cache, and fires OnKeyDataAdded callbacks.

func (*HorizonSet) StopAll

func (h *HorizonSet) StopAll()

StopAll stops all active timers. Called during Engine.Close.

type KeyRangeRule

type KeyRangeRule struct {
	Pattern string
	Mode    SafetyMode
}

KeyRangeRule maps a key pattern to a safety mode.

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is an in-memory ObjectStorage implementation for testing.

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates a new MemoryStorage.

func (*MemoryStorage) Download

func (m *MemoryStorage) Download(_ context.Context, path string) ([]byte, error)

func (*MemoryStorage) Keys

func (m *MemoryStorage) Keys() []string

Keys returns all stored paths (for testing).

func (*MemoryStorage) Upload

func (m *MemoryStorage) Upload(_ context.Context, path string, data []byte) error

type ObjectStorage

type ObjectStorage interface {
	Upload(ctx context.Context, path string, data []byte) error
	Download(ctx context.Context, path string) ([]byte, error)
}

ObjectStorage abstracts uploading/downloading blobs to/from object storage.

type PeerRTTProvider

type PeerRTTProvider interface {
	// GetRTT returns the estimated round-trip time to the given peer.
	// Returns 0 if the peer is unknown or RTT has not been measured.
	GetRTT(nodeID pb.NodeID) time.Duration
	// AlivePeerIDs returns the IDs of all peers that are currently alive.
	AlivePeerIDs() []pb.NodeID
}

PeerRTTProvider provides RTT measurements to peers for optimal leader selection.

type SafetyMode

type SafetyMode int

SafetyMode determines write behavior during network partitions.

const (
	SafeMode   SafetyMode = iota // blocks writes when quorum is unreachable
	UnsafeMode                   // allows writes during partitions, forming branches
)

type StateCache

type StateCache interface {
	Get(key string) (*pb.ReducedEffect, bool)
	Put(key string, value *pb.ReducedEffect)
	Evict(key string)
}

StateCache caches materialized ReducedEffect values by key. Implementations must be safe for concurrent use.

type Tip

type Tip = keytrie.EffectRef

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL