Documentation
¶
Index ¶
- Constants
- Variables
- func DeriveCAFromPassphrase(passphrase string) (ed25519.PrivateKey, *x509.Certificate, error)
- func GenerateLeafCert(caKey crypto.Signer, caCert *x509.Certificate, nodeAddr string) (tls.Certificate, error)
- func GeneratePassphrase() (string, error)
- func MarshalHeartbeat(nodeID NodeId, timestamp uint64) []byte
- func MarshalHeartbeatACK(nodeID NodeId, echoedTimestamp uint64) []byte
- func MarshalNackPacket(nack *pb.NackNotify) ([]byte, error)
- func MarshalNotifyACKPacket(nodeID NodeId, requestID uint64, status byte, credits uint32) []byte
- func MarshalNotifyNACKPacket(nodeID NodeId, requestID uint64, nacks []*pb.NackNotify, credits uint32) []byte
- func MarshalNotifyPacket(requestID uint64, notify *pb.OffsetNotify) ([]byte, error)
- func RecordFetchServed()
- func RecordHeartbeatReceived()
- func RecordHeartbeatsSent()
- func RecordNotificationDropped(peerID NodeId)
- func RecordNotificationReceived()
- func RecordNotificationSent()
- func RecordPeerAlive(peerID NodeId, alive bool)
- func RecordPeerConnected(peerID NodeId)
- func RecordPeerDisconnected(peerID NodeId)
- func RecordPeerRTT(peerID NodeId, rttNanos int64)
- func RecordPeerReconnect(peerID NodeId)
- func RecordPeerSymmetric(peerID NodeId, symmetric bool)
- func RecordQUICStreamError()
- func RecordQUICStreamOpened()
- func RecordRetransmissionGiveUp(peerID NodeId)
- func RecordUDPNotifyACKLatency(latencyMs float64)
- type CDNFetcher
- type ClusterConfig
- type EffectHandler
- type EngineEffectHandler
- type EngineLogReader
- type ForwardHandler
- type HeartbeatManager
- func (hm *HeartbeatManager) AddPeer(nodeID NodeId, region string)
- func (hm *HeartbeatManager) ProcessInbound(peerID NodeId, timestamp uint64)
- func (hm *HeartbeatManager) ProcessInboundACK(peerID NodeId, echoedTimestamp uint64)
- func (hm *HeartbeatManager) RemovePeer(nodeID NodeId)
- func (hm *HeartbeatManager) SendHeartbeatTo(peerID NodeId)
- func (hm *HeartbeatManager) Start(sendFunc func(peerID NodeId, data []byte) error)
- func (hm *HeartbeatManager) Stop()
- type LogReader
- type NodeConfig
- type NodeId
- type PacketHandler
- type PeerConn
- func (pc *PeerConn) Fetch(ctx context.Context, offset *pb.EffectRef) ([]byte, error)
- func (pc *PeerConn) ForwardTransaction(ctx context.Context, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)
- func (pc *PeerConn) GetQuicConn() *quic.Conn
- func (pc *PeerConn) IsSameRegion() bool
- func (pc *PeerConn) IsStreamReady() bool
- func (pc *PeerConn) Start(ctx context.Context)
- func (pc *PeerConn) Stop()
- type PeerHealth
- type PeerHealthTable
- func (t *PeerHealthTable) AlivePeerIDs() []NodeId
- func (t *PeerHealthTable) AliveSymmetricCount(localRegion string, peerRegions map[NodeId]string) int
- func (t *PeerHealthTable) AliveSymmetricPeers(localRegion string, peerRegions map[NodeId]string) []NodeId
- func (t *PeerHealthTable) AllRegionPeersReachable(localRegion string, peerRegions map[NodeId]string) bool
- func (t *PeerHealthTable) AllSameRegionPeers(localRegion string, peerRegions map[NodeId]string) []NodeId
- func (t *PeerHealthTable) CheckLiveness(timeout time.Duration)
- func (t *PeerHealthTable) Get(nodeID NodeId) *PeerHealth
- func (t *PeerHealthTable) GetOrCreate(nodeID NodeId) *PeerHealth
- func (t *PeerHealthTable) GetRTT(nodeID NodeId) time.Duration
- func (t *PeerHealthTable) InMajorityPartition(localRegion string, peerRegions map[NodeId]string) bool
- func (t *PeerHealthTable) PruneUnknownPeers(known map[NodeId]struct{})
- func (t *PeerHealthTable) Remove(nodeID NodeId)
- type PeerManager
- func (pm *PeerManager) AllRegionPeersReachable() bool
- func (pm *PeerManager) Broadcast(notify *pb.OffsetNotify)
- func (pm *PeerManager) BroadcastWithData(notify *pb.OffsetNotify, effectData []byte)
- func (pm *PeerManager) Fetch(ref *pb.EffectRef) ([]byte, error)
- func (pm *PeerManager) FetchFromAny(offset *pb.EffectRef) ([]byte, error)
- func (pm *PeerManager) ForwardTransaction(ctx context.Context, targetNodeID NodeId, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)
- func (pm *PeerManager) HandleHeartbeat(peerID NodeId, timestamp uint64)
- func (pm *PeerManager) HandleHeartbeatACK(peerID NodeId, timestamp uint64)
- func (pm *PeerManager) HandleNackNotify(peerID NodeId, nack *pb.NackNotify)
- func (pm *PeerManager) HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)
- func (pm *PeerManager) HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)
- func (pm *PeerManager) HandleNotifyACKWithData(peerID NodeId, requestID uint64, status byte, credits uint32, ...)
- func (pm *PeerManager) HealthTable() *PeerHealthTable
- func (pm *PeerManager) InMajorityPartition() bool
- func (pm *PeerManager) ListenAddr() string
- func (pm *PeerManager) PeerIDs() []NodeId
- func (pm *PeerManager) Replicate(notify *pb.OffsetNotify, wireData []byte) error
- func (pm *PeerManager) ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetNodeID NodeId) ([]*pb.NackNotify, error)
- func (pm *PeerManager) SendNack(nack *pb.NackNotify, targetNodeID NodeId)
- func (pm *PeerManager) SetCDNFetcher(f CDNFetcher)
- func (pm *PeerManager) SetForwardHandler(h ForwardHandler)
- func (pm *PeerManager) Start(ctx context.Context) error
- func (pm *PeerManager) Stop()
- func (pm *PeerManager) UpdateTopology(newCfg *ClusterConfig)
- func (pm *PeerManager) WaitForAnyPeer(ctx context.Context) error
- func (pm *PeerManager) WaitForPeer(ctx context.Context, nodeID NodeId) bool
- type QUICNotifyTransport
- func (t *QUICNotifyTransport) AcceptUniStreams(conn *quic.Conn)
- func (t *QUICNotifyTransport) Send(peerID NodeId, plaintext []byte) (wireSize int, err error)
- func (t *QUICNotifyTransport) SendDirect(peerID NodeId, plaintext []byte) error
- func (t *QUICNotifyTransport) Start()
- func (t *QUICNotifyTransport) Stop()
- type ReplicationFuture
- type Replicator
- func (r *Replicator) AddPeer(nodeID NodeId, region string)
- func (r *Replicator) AllRegionPeersReachable() bool
- func (r *Replicator) HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)
- func (r *Replicator) HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)
- func (r *Replicator) HandleNotifyACKWithData(peerID NodeId, requestID uint64, status byte, credits uint32, ...)
- func (r *Replicator) InMajorityPartition() bool
- func (r *Replicator) RemovePeer(nodeID NodeId)
- func (r *Replicator) Replicate(notify *pb.OffsetNotify, wireData []byte) *ReplicationFuture
- func (r *Replicator) ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetPeerID NodeId) ([]*pb.NackNotify, error)
- func (r *Replicator) SendNack(nack *pb.NackNotify, targetPeerID NodeId)
- func (r *Replicator) Start()
- func (r *Replicator) Stop()
- type StorageConfig
- type TopologyDiff
Constants ¶
const ( PacketTypeHeartbeat byte = 0x01 PacketTypeHeartbeatACK byte = 0x02 PacketTypeNotify byte = 0x03 PacketTypeNotifyACK byte = 0x04 PacketTypeNack byte = 0x05 )
Packet type prefixes for the UDP fast-path protocol.
Variables ¶
var ( WritesTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_writes_total", Help: "Total local write effects emitted", }) ReadsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_reads_total", Help: "Total reads served", }) ReadsLocalTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_reads_local_total", Help: "Reads served from local log/cache (no fetch)", }) ReadsRemoteFetchTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_reads_remote_fetch_total", Help: "Reads that required a remote fetch", }) NotificationsSentTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_notifications_sent_total", Help: "OffsetNotify messages broadcast", }) NotificationsReceivedTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_notifications_received_total", Help: "OffsetNotify messages received", }) FetchesServedTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_fetches_served_total", Help: "Fetch RPCs served to peers", }) BindsEmittedTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_binds_emitted_total", Help: "Bind effects emitted (concurrent writes detected)", }) SnapshotsEmittedTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cluster_snapshots_emitted_total", Help: "Snapshot effects emitted (bind resolution)", }) )
var ( SegmentActiveBytes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cluster_segment_active_bytes", Help: "Bytes used in the current live segment", }) SegmentActiveSlots = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cluster_segment_active_slots", Help: "Slots used in the current live segment (out of 1M)", }) SegmentsSealedTotal = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cluster_segments_sealed_total", Help: "Number of sealed segments on this node", }) DiskUsedBytes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cluster_disk_used_bytes", Help: "Total disk used by all log segments", }) DiskCapacityBytes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cluster_disk_capacity_bytes", Help: "Total disk capacity", }) DiskUsageRatio = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cluster_disk_usage_ratio", Help: "Disk usage ratio (used / capacity)", }) )
var ( ErrNoPeers = errors.New("no alive symmetric peers available for replication") ErrReplicationTimeout = errors.New("replication timed out waiting for ACK") )
Sentinel errors for replication failures.
var ( ErrPeerUnavailable = errors.New("peer unavailable") )
Functions ¶
func DeriveCAFromPassphrase ¶
func DeriveCAFromPassphrase(passphrase string) (ed25519.PrivateKey, *x509.Certificate, error)
DeriveCAFromPassphrase deterministically derives an Ed25519 CA key pair and self-signed CA certificate from a passphrase. Every node with the same passphrase produces identical output.
func GenerateLeafCert ¶
func GenerateLeafCert(caKey crypto.Signer, caCert *x509.Certificate, nodeAddr string) (tls.Certificate, error)
GenerateLeafCert creates an ephemeral leaf certificate signed by the given CA. The leaf key is randomly generated (not deterministic) and the cert is short-lived. nodeAddr is embedded as a DNS SAN (or IP SAN if it parses as IP).
func GeneratePassphrase ¶
GeneratePassphrase returns a cryptographically random passphrase suitable for use with DeriveCAFromPassphrase. The result is 32 random bytes encoded as base64 RawURL (43 characters, no padding).
func MarshalHeartbeat ¶
MarshalHeartbeat builds a heartbeat plaintext body: [1:type][8:nodeID LE][8:timestamp LE] Encryption (Noise AEAD) is applied by the transport layer.
func MarshalHeartbeatACK ¶
MarshalHeartbeatACK builds a heartbeat ACK plaintext body: [1:type 0x02][8:nodeID LE][8:timestamp LE] The timestamp is the original sender's timestamp echoed back unchanged. Encryption (Noise AEAD) is applied by the transport layer.
func MarshalNackPacket ¶
func MarshalNackPacket(nack *pb.NackNotify) ([]byte, error)
MarshalNackPacket builds a NACK plaintext body: [1:type][proto-marshaled NackNotify]
func MarshalNotifyACKPacket ¶
MarshalNotifyACKPacket builds a notify ACK plaintext body: [1:type][8:nodeID LE][8:requestID LE][1:status][4:credits LE]
func MarshalNotifyNACKPacket ¶
func MarshalNotifyNACKPacket(nodeID NodeId, requestID uint64, nacks []*pb.NackNotify, credits uint32) []byte
MarshalNotifyNACKPacket builds a NotifyACK with status=0x01 and embedded NACKs: [1:0x04][8:nodeID LE][8:requestID LE][1:0x01][4:credits LE][4:nack_count LE][for each: [4:len LE][proto NackNotify]]
func MarshalNotifyPacket ¶
func MarshalNotifyPacket(requestID uint64, notify *pb.OffsetNotify) ([]byte, error)
MarshalNotifyPacket builds a notify plaintext body: [1:type][8:requestID LE][proto-marshaled OffsetNotify]
func RecordFetchServed ¶
func RecordFetchServed()
RecordFetchServed increments the fetch served counter.
func RecordHeartbeatReceived ¶
func RecordHeartbeatReceived()
RecordHeartbeatReceived increments the heartbeat received counter.
func RecordHeartbeatsSent ¶
func RecordHeartbeatsSent()
RecordHeartbeatsSent increments the heartbeat sent counter.
func RecordNotificationDropped ¶
func RecordNotificationDropped(peerID NodeId)
RecordNotificationDropped increments the dropped notification counter for a peer.
func RecordNotificationReceived ¶
func RecordNotificationReceived()
RecordNotificationReceived increments the received counter.
func RecordNotificationSent ¶
func RecordNotificationSent()
RecordNotificationSent increments the sent counter for a broadcast.
func RecordPeerAlive ¶
RecordPeerAlive updates the alive gauge for a peer.
func RecordPeerConnected ¶
func RecordPeerConnected(peerID NodeId)
RecordPeerConnected sets the peer connection gauge to 1.
func RecordPeerDisconnected ¶
func RecordPeerDisconnected(peerID NodeId)
RecordPeerDisconnected sets the peer connection gauge to 0.
func RecordPeerRTT ¶
RecordPeerRTT updates the RTT gauge for a peer.
func RecordPeerReconnect ¶
func RecordPeerReconnect(peerID NodeId)
RecordPeerReconnect increments the reconnection counter for a peer.
func RecordPeerSymmetric ¶
RecordPeerSymmetric updates the symmetry gauge for a peer.
func RecordQUICStreamError ¶
func RecordQUICStreamError()
RecordQUICStreamError increments the QUIC stream error counter.
func RecordQUICStreamOpened ¶
func RecordQUICStreamOpened()
RecordQUICStreamOpened increments the QUIC stream opened counter.
func RecordRetransmissionGiveUp ¶
func RecordRetransmissionGiveUp(peerID NodeId)
RecordRetransmissionGiveUp increments the give-up counter for a peer.
func RecordUDPNotifyACKLatency ¶
func RecordUDPNotifyACKLatency(latencyMs float64)
RecordUDPNotifyACKLatency records the latency of a notification ACK.
Types ¶
type CDNFetcher ¶
type CDNFetcher interface {
FetchFromCDN(ctx context.Context, offset *pb.EffectRef) ([]byte, error)
}
CDNFetcher can fetch effect data from a CDN / object storage endpoint.
type ClusterConfig ¶
type ClusterConfig struct {
NodeID NodeId // This node's ID
Nodes []NodeConfig // All nodes including self
TLSPassphrase string // Shared passphrase for deterministic mTLS CA derivation
ReplicationTimeout time.Duration // Max time to wait for replication ACK (default 5s)
Storage StorageConfig // Object storage for off-node durability
}
ClusterConfig holds the full cluster topology.
func (*ClusterConfig) BuildClientTLSConfig ¶
func (c *ClusterConfig) BuildClientTLSConfig() (*tls.Config, error)
BuildClientTLSConfig returns a client-side TLS config for mTLS, derived from the shared passphrase. Returns nil, nil if TLSPassphrase is empty (insecure fallback).
func (*ClusterConfig) BuildTLSConfig ¶
func (c *ClusterConfig) BuildTLSConfig() (*tls.Config, error)
BuildTLSConfig returns a server-side TLS config for mTLS, derived from the shared passphrase. Returns nil, nil if TLSPassphrase is empty (insecure fallback).
func (*ClusterConfig) Peers ¶
func (c *ClusterConfig) Peers() []NodeConfig
Peers returns all nodes except self.
func (*ClusterConfig) Self ¶
func (c *ClusterConfig) Self() *NodeConfig
Self returns this node's config, or nil if not found.
type EffectHandler ¶
type EffectHandler interface {
// HandleOffsetNotify processes a remote effect notification.
// Returns all NACKs generated (one per diverged key). The caller
// sends these as the ReplicateTo response so the originator can
// evaluate fork-choice across all keys.
HandleOffsetNotify(notify *pb.OffsetNotify) ([]*pb.NackNotify, error)
// HandleNack processes an enriched NACK from a remote peer.
HandleNack(nack *pb.NackNotify) error
}
EffectHandler is called when a remote notification or data arrives.
type EngineEffectHandler ¶
type EngineEffectHandler struct {
// contains filtered or unexported fields
}
EngineEffectHandler adapts an *effects.Engine to the EffectHandler interface used by PeerManager. It forwards OffsetNotify / Nack frames straight to the engine; there's no front-end-specific logic. Previously duplicated in the redis package; shared here so every transport (redis, sql, future ones) builds a PeerManager the same way.
func NewEngineEffectHandler ¶
func NewEngineEffectHandler(engine *effects.Engine) *EngineEffectHandler
NewEngineEffectHandler wraps the given engine so it satisfies the EffectHandler interface.
func (*EngineEffectHandler) HandleNack ¶
func (h *EngineEffectHandler) HandleNack(nack *pb.NackNotify) error
HandleNack forwards NACKs to the engine.
func (*EngineEffectHandler) HandleOffsetNotify ¶
func (h *EngineEffectHandler) HandleOffsetNotify(notify *pb.OffsetNotify) ([]*pb.NackNotify, error)
HandleOffsetNotify delegates to the engine's HandleRemote. The engine handles log storage, keytrie update, cache invalidation, NACK generation, and notification callbacks.
type EngineLogReader ¶
type EngineLogReader struct {
// contains filtered or unexported fields
}
EngineLogReader adapts the engine's effect cache to the LogReader interface. Previously lived in the redis package; shared here.
Wire format produced by ReadEffect:
[4 bytes LE keyLen][keyLen bytes of key][proto-marshalled effect]
The receiver reconstructs the full log entry including the key, which is stored separately from the effect data in the log.
func NewEngineLogReader ¶
NewEngineLogReader wraps an effect cache.
func (*EngineLogReader) ReadEffect ¶
func (r *EngineLogReader) ReadEffect(ref *pb.EffectRef) ([]byte, error)
ReadEffect looks the effect up in the cache and returns it in wire format.
type ForwardHandler ¶
type ForwardHandler interface {
HandleForwardedTransaction(tx *pb.ForwardedTransaction) *pb.ForwardedResponse
}
ForwardHandler executes forwarded transactions received from other nodes via the adaptive serialization mechanism (§5). The serialization leader receives the full transaction, executes it locally, and returns the raw RESP wire bytes.
type HeartbeatManager ¶
type HeartbeatManager struct {
// contains filtered or unexported fields
}
HeartbeatManager manages the heartbeat protocol for all same-region peers.
func NewHeartbeatManager ¶
func NewHeartbeatManager(nodeID NodeId, healthTable *PeerHealthTable) *HeartbeatManager
NewHeartbeatManager creates a new HeartbeatManager.
func (*HeartbeatManager) AddPeer ¶
func (hm *HeartbeatManager) AddPeer(nodeID NodeId, region string)
AddPeer registers a peer for heartbeat monitoring.
func (*HeartbeatManager) ProcessInbound ¶
func (hm *HeartbeatManager) ProcessInbound(peerID NodeId, timestamp uint64)
ProcessInbound handles a received heartbeat packet (already decrypted by Noise). peerID and timestamp are parsed from the packet body. Sends a HeartbeatACK back to the sender echoing the received timestamp for true RTT measurement.
func (*HeartbeatManager) ProcessInboundACK ¶
func (hm *HeartbeatManager) ProcessInboundACK(peerID NodeId, echoedTimestamp uint64)
ProcessInboundACK handles a received HeartbeatACK packet. The echoedTimestamp is the original sender's timestamp echoed back, so RTT = now - echoedTimestamp uses the same clock for true round-trip measurement.
func (*HeartbeatManager) RemovePeer ¶
func (hm *HeartbeatManager) RemovePeer(nodeID NodeId)
RemovePeer unregisters a peer from heartbeat monitoring.
func (*HeartbeatManager) SendHeartbeatTo ¶ added in v0.0.8
func (hm *HeartbeatManager) SendHeartbeatTo(peerID NodeId)
SendHeartbeatTo sends a single heartbeat to a specific peer immediately, bypassing the ticker. Used to accelerate symmetry establishment on new connections.
func (*HeartbeatManager) Start ¶
func (hm *HeartbeatManager) Start(sendFunc func(peerID NodeId, data []byte) error)
Start begins the heartbeat send loop and liveness checker.
func (*HeartbeatManager) Stop ¶
func (hm *HeartbeatManager) Stop()
Stop stops the heartbeat manager.
type LogReader ¶
type LogReader interface {
// ReadEffect reads effect bytes from the local log at the given offset.
// Returns the data in wire format: [4-byte LE keyLen][key bytes][effect data].
ReadEffect(offset *pb.EffectRef) ([]byte, error)
}
LogReader reads effect data from the local log by offset. The redis package provides the concrete implementation at startup.
Wire format: ReadEffect returns [4 bytes keyLen (little-endian)][key][effectData]. This allows the receiver to reconstruct the full log entry including the key, which is stored separately from the effect data in the log.
type NodeConfig ¶
NodeConfig represents a single node in the cluster.
type PacketHandler ¶
type PacketHandler interface {
HandleHeartbeat(peerID NodeId, timestamp uint64)
HandleHeartbeatACK(peerID NodeId, timestamp uint64)
HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)
HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)
HandleNotifyACKWithData(peerID NodeId, requestID uint64, status byte, credits uint32, fullPacket []byte)
HandleNackNotify(peerID NodeId, nack *pb.NackNotify)
}
PacketHandler dispatches parsed inbound packets to the appropriate handler.
type PeerConn ¶
type PeerConn struct {
// contains filtered or unexported fields
}
PeerConn manages a single QUIC connection to a peer node. Used for fetch, forward, and notification transport.
func (*PeerConn) Fetch ¶
Fetch opens a short-lived QUIC stream to request effect data from this peer.
func (*PeerConn) ForwardTransaction ¶
func (pc *PeerConn) ForwardTransaction(ctx context.Context, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)
ForwardTransaction opens a QUIC stream to forward a transaction to this peer (the serialization leader). Returns the leader's response.
func (*PeerConn) GetQuicConn ¶
GetQuicConn returns the current QUIC connection, or nil if not connected.
func (*PeerConn) IsSameRegion ¶
IsSameRegion returns true if the peer is in the same region as the local node.
func (*PeerConn) IsStreamReady ¶
IsStreamReady returns true if the QUIC connection is established.
type PeerHealth ¶
type PeerHealth struct {
// contains filtered or unexported fields
}
PeerHealth tracks the liveness and path symmetry of a single peer.
func (*PeerHealth) IsReplicationTarget ¶
func (ph *PeerHealth) IsReplicationTarget() bool
IsReplicationTarget returns true if this peer is both alive and has a verified symmetric path — suitable for same-region replication with ACK.
type PeerHealthTable ¶
type PeerHealthTable struct {
OnPeerRecovered func(peerID NodeId) // called when a peer transitions from dead to alive
// contains filtered or unexported fields
}
PeerHealthTable maps peer node IDs to their health state.
func NewPeerHealthTable ¶
func NewPeerHealthTable() *PeerHealthTable
NewPeerHealthTable creates an empty health table.
func (*PeerHealthTable) AlivePeerIDs ¶
func (t *PeerHealthTable) AlivePeerIDs() []NodeId
AlivePeerIDs returns the IDs of all peers that are currently alive. Implements effects.PeerRTTProvider.
func (*PeerHealthTable) AliveSymmetricCount ¶
func (t *PeerHealthTable) AliveSymmetricCount(localRegion string, peerRegions map[NodeId]string) int
AliveSymmetricCount returns the count of alive+symmetric peers in the local region.
func (*PeerHealthTable) AliveSymmetricPeers ¶
func (t *PeerHealthTable) AliveSymmetricPeers(localRegion string, peerRegions map[NodeId]string) []NodeId
AliveSymmetricPeers returns the node IDs of all peers that are in the given localRegion and are both alive and symmetric (valid replication targets).
func (*PeerHealthTable) AllRegionPeersReachable ¶
func (t *PeerHealthTable) AllRegionPeersReachable(localRegion string, peerRegions map[NodeId]string) bool
AllRegionPeersReachable returns true if every same-region peer is alive and has a verified symmetric path. Returns true when there are no same-region peers (standalone or cross-region-only deployment).
func (*PeerHealthTable) AllSameRegionPeers ¶
func (t *PeerHealthTable) AllSameRegionPeers(localRegion string, peerRegions map[NodeId]string) []NodeId
AllSameRegionPeers returns the node IDs of all peers in the given local region, regardless of liveness. Used for fire-and-forget replication to dead peers so they receive effects when the partition heals.
func (*PeerHealthTable) CheckLiveness ¶
func (t *PeerHealthTable) CheckLiveness(timeout time.Duration)
CheckLiveness marks peers as dead if their last heartbeat exceeds the timeout.
func (*PeerHealthTable) Get ¶
func (t *PeerHealthTable) Get(nodeID NodeId) *PeerHealth
Get returns the PeerHealth for the given node, or nil if not tracked.
func (*PeerHealthTable) GetOrCreate ¶
func (t *PeerHealthTable) GetOrCreate(nodeID NodeId) *PeerHealth
GetOrCreate returns the PeerHealth for the given node, creating it if needed.
func (*PeerHealthTable) GetRTT ¶
func (t *PeerHealthTable) GetRTT(nodeID NodeId) time.Duration
GetRTT returns the estimated round-trip time to the given peer. Returns 0 if the peer is unknown or RTT has not been measured. Implements effects.PeerRTTProvider.
func (*PeerHealthTable) InMajorityPartition ¶
func (t *PeerHealthTable) InMajorityPartition(localRegion string, peerRegions map[NodeId]string) bool
InMajorityPartition returns true if this node (counting itself) can reach a strict majority of same-region nodes. With N total region nodes (including self), majority requires > N/2 reachable. Returns true when there are no same-region peers (standalone).
func (*PeerHealthTable) PruneUnknownPeers ¶ added in v0.1.2
func (t *PeerHealthTable) PruneUnknownPeers(known map[NodeId]struct{})
PruneUnknownPeers removes health table entries for peers not present in the given set of known peer IDs. This prevents unbounded growth from transient inbound connections (e.g. crash-looping joiners).
func (*PeerHealthTable) Remove ¶
func (t *PeerHealthTable) Remove(nodeID NodeId)
Remove removes a peer from the health table.
type PeerManager ¶
type PeerManager struct {
// contains filtered or unexported fields
}
PeerManager manages all peer connections and the local QUIC listener.
func NewPeerManager ¶
func NewPeerManager(config *ClusterConfig, handler EffectHandler, logReader LogReader) (*PeerManager, error)
NewPeerManager creates a new PeerManager from the cluster configuration.
func (*PeerManager) AllRegionPeersReachable ¶
func (pm *PeerManager) AllRegionPeersReachable() bool
AllRegionPeersReachable returns true if every same-region peer is alive and has a verified symmetric path. Returns true in standalone mode.
func (*PeerManager) Broadcast ¶
func (pm *PeerManager) Broadcast(notify *pb.OffsetNotify)
func (*PeerManager) BroadcastWithData ¶
func (pm *PeerManager) BroadcastWithData(notify *pb.OffsetNotify, effectData []byte)
BroadcastWithData sends an OffsetNotify to all connected peers. Routes through the replicator (UDP) when available.
func (*PeerManager) Fetch ¶
func (pm *PeerManager) Fetch(ref *pb.EffectRef) ([]byte, error)
Fetch retrieves effect bytes from a specific peer by slot and offset.
func (*PeerManager) FetchFromAny ¶
func (pm *PeerManager) FetchFromAny(offset *pb.EffectRef) ([]byte, error)
FetchFromAny tries to fetch effect bytes from any connected peer, racing against CDN fetch if available. First successful result wins.
func (*PeerManager) ForwardTransaction ¶
func (pm *PeerManager) ForwardTransaction(ctx context.Context, targetNodeID NodeId, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)
ForwardTransaction sends a transaction to a specific peer for execution. Used by adaptive serialization to route operations through the leader.
func (*PeerManager) HandleHeartbeat ¶
func (pm *PeerManager) HandleHeartbeat(peerID NodeId, timestamp uint64)
HandleHeartbeat routes inbound heartbeats to the heartbeat manager.
func (*PeerManager) HandleHeartbeatACK ¶
func (pm *PeerManager) HandleHeartbeatACK(peerID NodeId, timestamp uint64)
HandleHeartbeatACK routes inbound heartbeat ACKs to the heartbeat manager.
func (*PeerManager) HandleNackNotify ¶
func (pm *PeerManager) HandleNackNotify(peerID NodeId, nack *pb.NackNotify)
HandleNackNotify routes inbound NACKs to the effect handler.
func (*PeerManager) HandleNotify ¶
func (pm *PeerManager) HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)
HandleNotify routes inbound notifications to the replicator.
func (*PeerManager) HandleNotifyACK ¶
func (pm *PeerManager) HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)
HandleNotifyACK routes inbound ACKs to the replicator.
func (*PeerManager) HandleNotifyACKWithData ¶
func (*PeerManager) HealthTable ¶
func (pm *PeerManager) HealthTable() *PeerHealthTable
Broadcast sends an OffsetNotify to all connected peers. Fire-and-forget. HealthTable returns the peer health table, or nil if UDP/Noise is not configured.
func (*PeerManager) InMajorityPartition ¶
func (pm *PeerManager) InMajorityPartition() bool
InMajorityPartition returns true if this node can reach a strict majority of same-region nodes. Returns true in standalone mode.
func (*PeerManager) ListenAddr ¶
func (pm *PeerManager) ListenAddr() string
ListenAddr returns the actual address the QUIC listener is bound to. Useful when binding to port 0 in tests.
func (*PeerManager) PeerIDs ¶
func (pm *PeerManager) PeerIDs() []NodeId
PeerIDs returns the IDs of all configured peers.
func (*PeerManager) Replicate ¶
func (pm *PeerManager) Replicate(notify *pb.OffsetNotify, wireData []byte) error
Replicate sends the notification using first-ACK semantics for same-region peers. Blocks until at least one same-region peer ACKs or the replication timeout expires. Returns nil on success, or an error if replication failed (no peers, timeout, etc.). If the replicator is not set up (no UDP transport), falls back to BroadcastWithData.
func (*PeerManager) ReplicateTo ¶
func (pm *PeerManager) ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetNodeID NodeId) ([]*pb.NackNotify, error)
ReplicateTo sends a notification to a specific peer and waits for ACK or NACK. Returns NackNotify slice on conflict, nil on ACK.
func (*PeerManager) SendNack ¶
func (pm *PeerManager) SendNack(nack *pb.NackNotify, targetNodeID NodeId)
SendNack sends an enriched NACK to a specific peer.
func (*PeerManager) SetCDNFetcher ¶
func (pm *PeerManager) SetCDNFetcher(f CDNFetcher)
SetCDNFetcher sets the CDN fetcher for racing CDN against peer fetch.
func (*PeerManager) SetForwardHandler ¶
func (pm *PeerManager) SetForwardHandler(h ForwardHandler)
SetForwardHandler sets the handler for forwarded transactions (adaptive serialization). Must be called before any forwarded transactions are received.
func (*PeerManager) Start ¶
func (pm *PeerManager) Start(ctx context.Context) error
Start launches the QUIC transport, heartbeat manager, QUIC listener, and dials all peers.
func (*PeerManager) UpdateTopology ¶
func (pm *PeerManager) UpdateTopology(newCfg *ClusterConfig)
UpdateTopology applies a new cluster configuration, connecting to new peers and disconnecting removed ones.
func (*PeerManager) WaitForAnyPeer ¶
func (pm *PeerManager) WaitForAnyPeer(ctx context.Context) error
WaitForAnyPeer blocks until at least one peer connection is ready for streaming, or the context is cancelled. Must be called after Start.
func (*PeerManager) WaitForPeer ¶
func (pm *PeerManager) WaitForPeer(ctx context.Context, nodeID NodeId) bool
WaitForPeer blocks until the given peer's stream is ready or context is cancelled.
type QUICNotifyTransport ¶
type QUICNotifyTransport struct {
// contains filtered or unexported fields
}
QUICNotifyTransport replaces the custom UDP transport (Noise + chunking + pacing) with QUIC stream-per-message. Each notification opens a unidirectional stream, writes the plaintext, and closes. QUIC handles encryption (TLS 1.3), retransmission, flow control, and congestion control.
func NewQUICNotifyTransport ¶
func NewQUICNotifyTransport( nodeID NodeId, handler PacketHandler, connFunc func(peerID NodeId) *quic.Conn, onPeerIdentified func(peerID NodeId, conn *quic.Conn), ) *QUICNotifyTransport
NewQUICNotifyTransport creates a new QUIC notification transport. onPeerIdentified may be nil when the caller doesn't need inbound connection tracking (e.g. unit tests).
func (*QUICNotifyTransport) AcceptUniStreams ¶
func (t *QUICNotifyTransport) AcceptUniStreams(conn *quic.Conn)
AcceptUniStreams accepts incoming unidirectional streams on a QUIC connection and dispatches them. Called from the accept loop for each accepted connection. Goroutine lifetime is tied to the connection, not the transport — it exits when the connection closes (conn.Context() cancelled).
func (*QUICNotifyTransport) Send ¶
func (t *QUICNotifyTransport) Send(peerID NodeId, plaintext []byte) (wireSize int, err error)
Send opens a unidirectional QUIC stream, writes [2:senderNodeID LE][plaintext], and closes the stream. QUIC handles encryption, retransmission, and flow control. Returns the total bytes written and any error.
func (*QUICNotifyTransport) SendDirect ¶
func (t *QUICNotifyTransport) SendDirect(peerID NodeId, plaintext []byte) error
SendDirect sends a control packet (heartbeat, ACK) via a QUIC uni-stream. With QUIC, there is no distinction between paced and direct sends — each message gets its own independent stream.
func (*QUICNotifyTransport) Start ¶
func (t *QUICNotifyTransport) Start()
Start is a no-op for the QUIC transport (connections are managed by PeerConn).
func (*QUICNotifyTransport) Stop ¶
func (t *QUICNotifyTransport) Stop()
Stop shuts down the transport.
type ReplicationFuture ¶
type ReplicationFuture struct {
// contains filtered or unexported fields
}
ReplicationFuture represents a pending replication operation. It resolves when the first same-region peer ACKs or when the deadline expires. Callers check Wait()'s return value to distinguish success from failure.
func (*ReplicationFuture) Done ¶
func (f *ReplicationFuture) Done() <-chan struct{}
Done returns a channel that is closed when the future resolves.
func (*ReplicationFuture) Err ¶
func (f *ReplicationFuture) Err() error
Err returns the error stored in the future, or nil on success.
func (*ReplicationFuture) Nacks ¶
func (f *ReplicationFuture) Nacks() []*pb.NackNotify
Nacks returns the NACKs attached to this future, or nil.
func (*ReplicationFuture) Wait ¶
func (f *ReplicationFuture) Wait() error
Wait blocks until the replication future is resolved and returns any error.
type Replicator ¶
type Replicator struct {
// contains filtered or unexported fields
}
Replicator manages replication with first-ACK semantics for same-region and fire-and-forget for cross-region peers. Uses QUIC stream-per-message transport.
func NewReplicator ¶
func NewReplicator( nodeID NodeId, localRegion string, healthTable *PeerHealthTable, transport *QUICNotifyTransport, handler EffectHandler, replicationTimeout time.Duration, ) *Replicator
NewReplicator creates a new Replicator.
func (*Replicator) AddPeer ¶
func (r *Replicator) AddPeer(nodeID NodeId, region string)
AddPeer registers a peer for replication.
func (*Replicator) AllRegionPeersReachable ¶
func (r *Replicator) AllRegionPeersReachable() bool
AllRegionPeersReachable returns true if every same-region peer is alive and has a verified symmetric path.
func (*Replicator) HandleNotify ¶
func (r *Replicator) HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)
HandleNotify processes an inbound notification from a peer. It applies the effect and sends an ACK or NACK response back via QUIC.
func (*Replicator) HandleNotifyACK ¶
func (r *Replicator) HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)
HandleNotifyACK processes an inbound ACK from a peer.
func (*Replicator) HandleNotifyACKWithData ¶
func (r *Replicator) HandleNotifyACKWithData(peerID NodeId, requestID uint64, status byte, credits uint32, fullPacket []byte)
HandleNotifyACKWithData processes the full ACK/NACK packet including payload. Called from dispatch when status=0x01 to extract embedded NACKs.
func (*Replicator) InMajorityPartition ¶
func (r *Replicator) InMajorityPartition() bool
InMajorityPartition returns true if this node can reach a strict majority of same-region nodes (including itself).
func (*Replicator) RemovePeer ¶
func (r *Replicator) RemovePeer(nodeID NodeId)
RemovePeer unregisters a peer.
func (*Replicator) Replicate ¶
func (r *Replicator) Replicate(notify *pb.OffsetNotify, wireData []byte) *ReplicationFuture
Replicate sends a notification to all peers with first-ACK semantics for same-region. Returns a future that resolves when the first same-region ACK arrives, or rejects with ErrNoPeers if there are no alive+symmetric same-region peers. Cross-region peers receive the notification fire-and-forget.
func (*Replicator) ReplicateTo ¶
func (r *Replicator) ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetPeerID NodeId) ([]*pb.NackNotify, error)
ReplicateTo sends a notification to a specific peer and waits for ACK or NACK. Used by transactional bind to get deliberate per-subscriber responses.
func (*Replicator) SendNack ¶
func (r *Replicator) SendNack(nack *pb.NackNotify, targetPeerID NodeId)
SendNack sends an enriched NACK to a specific peer.
type StorageConfig ¶
type StorageConfig struct {
UploadEndpoint string // Bunny storage API (e.g. "https://storage.bunnycdn.com/zone")
CDNEndpoint string // Bunny CDN read (e.g. "https://cdn.example.com")
AccessKey string // Bunny storage access key
BasePath string // Node-specific prefix (derived from NodeID if empty)
// HPKE keypair for post-quantum encryption (MLKEM768X25519 + AES-256-GCM).
// Base64-encoded. Used as the default for key ranges without specific keys.
HPKEPublicKey string
HPKEPrivateKey string
}
StorageConfig holds configuration for off-node object storage (Bunny CDN).
type TopologyDiff ¶
type TopologyDiff struct {
Added []NodeConfig // Peers in new but not old
Removed []NodeConfig // Peers in old but not new
Changed []NodeConfig // Peers whose address changed (contains the new config)
}
TopologyDiff describes changes between two cluster configurations.
func DiffTopology ¶
func DiffTopology(old, new *ClusterConfig) TopologyDiff
DiffTopology computes the difference between two cluster configurations. Either old or new may be nil (treated as empty).