cluster

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: AGPL-3.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PacketTypeHeartbeat    byte = 0x01
	PacketTypeHeartbeatACK byte = 0x02
	PacketTypeNotify       byte = 0x03
	PacketTypeNotifyACK    byte = 0x04
	PacketTypeNack         byte = 0x05
)

Packet type prefixes for the UDP fast-path protocol.

Variables

View Source
var (
	WritesTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_writes_total",
		Help: "Total local write effects emitted",
	})

	ReadsTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_reads_total",
		Help: "Total reads served",
	})

	ReadsLocalTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_reads_local_total",
		Help: "Reads served from local log/cache (no fetch)",
	})

	ReadsRemoteFetchTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_reads_remote_fetch_total",
		Help: "Reads that required a remote fetch",
	})

	NotificationsSentTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_notifications_sent_total",
		Help: "OffsetNotify messages broadcast",
	})

	NotificationsReceivedTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_notifications_received_total",
		Help: "OffsetNotify messages received",
	})

	FetchesServedTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_fetches_served_total",
		Help: "Fetch RPCs served to peers",
	})

	BindsEmittedTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_binds_emitted_total",
		Help: "Bind effects emitted (concurrent writes detected)",
	})

	SnapshotsEmittedTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "cluster_snapshots_emitted_total",
		Help: "Snapshot effects emitted (bind resolution)",
	})
)
View Source
var (
	SegmentActiveBytes = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "cluster_segment_active_bytes",
		Help: "Bytes used in the current live segment",
	})

	SegmentActiveSlots = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "cluster_segment_active_slots",
		Help: "Slots used in the current live segment (out of 1M)",
	})

	SegmentsSealedTotal = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "cluster_segments_sealed_total",
		Help: "Number of sealed segments on this node",
	})

	DiskUsedBytes = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "cluster_disk_used_bytes",
		Help: "Total disk used by all log segments",
	})

	DiskCapacityBytes = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "cluster_disk_capacity_bytes",
		Help: "Total disk capacity",
	})

	DiskUsageRatio = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "cluster_disk_usage_ratio",
		Help: "Disk usage ratio (used / capacity)",
	})
)
View Source
var (
	ErrNoPeers            = errors.New("no alive symmetric peers available for replication")
	ErrReplicationTimeout = errors.New("replication timed out waiting for ACK")
)

Sentinel errors for replication failures.

View Source
var (
	// ErrPeerUnavailable is returned when a peer connection is not established.
	ErrPeerUnavailable = errors.New("peer unavailable")
)

Functions

func DeriveCAFromPassphrase

func DeriveCAFromPassphrase(passphrase string) (ed25519.PrivateKey, *x509.Certificate, error)

DeriveCAFromPassphrase deterministically derives an Ed25519 CA key pair and self-signed CA certificate from a passphrase. Every node with the same passphrase produces identical output.

func GenerateLeafCert

func GenerateLeafCert(caKey crypto.Signer, caCert *x509.Certificate, nodeAddr string) (tls.Certificate, error)

GenerateLeafCert creates an ephemeral leaf certificate signed by the given CA. The leaf key is randomly generated (not deterministic) and the cert is short-lived. nodeAddr is embedded as a DNS SAN (or IP SAN if it parses as IP).

func GeneratePassphrase

func GeneratePassphrase() (string, error)

GeneratePassphrase returns a cryptographically random passphrase suitable for use with DeriveCAFromPassphrase. The result is 32 random bytes encoded as base64 RawURL (43 characters, no padding).

func MarshalHeartbeat

func MarshalHeartbeat(nodeID NodeId, timestamp uint64) []byte

MarshalHeartbeat builds a heartbeat plaintext body: [1:type][8:nodeID LE][8:timestamp LE] Encryption (Noise AEAD) is applied by the transport layer.

func MarshalHeartbeatACK

func MarshalHeartbeatACK(nodeID NodeId, echoedTimestamp uint64) []byte

MarshalHeartbeatACK builds a heartbeat ACK plaintext body: [1:type 0x02][8:nodeID LE][8:timestamp LE] The timestamp is the original sender's timestamp echoed back unchanged. Encryption (Noise AEAD) is applied by the transport layer.

func MarshalNackPacket

func MarshalNackPacket(nack *pb.NackNotify) ([]byte, error)

MarshalNackPacket builds a NACK plaintext body: [1:type][proto-marshaled NackNotify]

func MarshalNotifyACKPacket

func MarshalNotifyACKPacket(nodeID NodeId, requestID uint64, status byte, credits uint32) []byte

MarshalNotifyACKPacket builds a notify ACK plaintext body: [1:type][8:nodeID LE][8:requestID LE][1:status][4:credits LE]

func MarshalNotifyNACKPacket

func MarshalNotifyNACKPacket(nodeID NodeId, requestID uint64, nacks []*pb.NackNotify, credits uint32) []byte

MarshalNotifyNACKPacket builds a NotifyACK with status=0x01 and embedded NACKs: [1:0x04][8:nodeID LE][8:requestID LE][1:0x01][4:credits LE][4:nack_count LE][for each: [4:len LE][proto NackNotify]]

func MarshalNotifyPacket

func MarshalNotifyPacket(requestID uint64, notify *pb.OffsetNotify) ([]byte, error)

MarshalNotifyPacket builds a notify plaintext body: [1:type][8:requestID LE][proto-marshaled OffsetNotify]

func RecordFetchServed

func RecordFetchServed()

RecordFetchServed increments the fetch served counter.

func RecordHeartbeatReceived

func RecordHeartbeatReceived()

RecordHeartbeatReceived increments the heartbeat received counter.

func RecordHeartbeatsSent

func RecordHeartbeatsSent()

RecordHeartbeatsSent increments the heartbeat sent counter.

func RecordNotificationDropped

func RecordNotificationDropped(peerID NodeId)

RecordNotificationDropped increments the dropped notification counter for a peer.

func RecordNotificationReceived

func RecordNotificationReceived()

RecordNotificationReceived increments the received counter.

func RecordNotificationSent

func RecordNotificationSent()

RecordNotificationSent increments the sent counter for a broadcast.

func RecordPeerAlive

func RecordPeerAlive(peerID NodeId, alive bool)

RecordPeerAlive updates the alive gauge for a peer.

func RecordPeerConnected

func RecordPeerConnected(peerID NodeId)

RecordPeerConnected sets the peer connection gauge to 1.

func RecordPeerDisconnected

func RecordPeerDisconnected(peerID NodeId)

RecordPeerDisconnected sets the peer connection gauge to 0.

func RecordPeerRTT

func RecordPeerRTT(peerID NodeId, rttNanos int64)

RecordPeerRTT updates the RTT gauge for a peer.

func RecordPeerReconnect

func RecordPeerReconnect(peerID NodeId)

RecordPeerReconnect increments the reconnection counter for a peer.

func RecordPeerSymmetric

func RecordPeerSymmetric(peerID NodeId, symmetric bool)

RecordPeerSymmetric updates the symmetry gauge for a peer.

func RecordQUICStreamError

func RecordQUICStreamError()

RecordQUICStreamError increments the QUIC stream error counter.

func RecordQUICStreamOpened

func RecordQUICStreamOpened()

RecordQUICStreamOpened increments the QUIC stream opened counter.

func RecordRetransmissionGiveUp

func RecordRetransmissionGiveUp(peerID NodeId)

RecordRetransmissionGiveUp increments the give-up counter for a peer.

func RecordUDPNotifyACKLatency

func RecordUDPNotifyACKLatency(latencyMs float64)

RecordUDPNotifyACKLatency records the latency of a notification ACK.

Types

type CDNFetcher

type CDNFetcher interface {
	FetchFromCDN(ctx context.Context, offset *pb.EffectRef) ([]byte, error)
}

CDNFetcher can fetch effect data from a CDN / object storage endpoint.

type ClusterConfig

type ClusterConfig struct {
	NodeID             NodeId        // This node's ID
	Nodes              []NodeConfig  // All nodes including self
	TLSPassphrase      string        // Shared passphrase for deterministic mTLS CA derivation
	ReplicationTimeout time.Duration // Max time to wait for replication ACK (default 5s)
	Storage            StorageConfig // Object storage for off-node durability
}

ClusterConfig holds the full cluster topology.

func (*ClusterConfig) BuildClientTLSConfig

func (c *ClusterConfig) BuildClientTLSConfig() (*tls.Config, error)

BuildClientTLSConfig returns a client-side TLS config for mTLS, derived from the shared passphrase. Returns nil, nil if TLSPassphrase is empty (insecure fallback).

func (*ClusterConfig) BuildTLSConfig

func (c *ClusterConfig) BuildTLSConfig() (*tls.Config, error)

BuildTLSConfig returns a server-side TLS config for mTLS, derived from the shared passphrase. Returns nil, nil if TLSPassphrase is empty (insecure fallback).

func (*ClusterConfig) Peers

func (c *ClusterConfig) Peers() []NodeConfig

Peers returns all nodes except self.

func (*ClusterConfig) Self

func (c *ClusterConfig) Self() *NodeConfig

Self returns this node's config, or nil if not found.

type EffectHandler

type EffectHandler interface {
	// HandleOffsetNotify processes a remote effect notification.
	// Returns all NACKs generated (one per diverged key). The caller
	// sends these as the ReplicateTo response so the originator can
	// evaluate fork-choice across all keys.
	HandleOffsetNotify(notify *pb.OffsetNotify) ([]*pb.NackNotify, error)
	// HandleNack processes an enriched NACK from a remote peer.
	HandleNack(nack *pb.NackNotify) error
}

EffectHandler is called when a remote notification or data arrives.

type EngineEffectHandler

type EngineEffectHandler struct {
	// contains filtered or unexported fields
}

EngineEffectHandler adapts an *effects.Engine to the EffectHandler interface used by PeerManager. It forwards OffsetNotify / Nack frames straight to the engine; there's no front-end-specific logic. Previously duplicated in the redis package; shared here so every transport (redis, sql, future ones) builds a PeerManager the same way.

func NewEngineEffectHandler

func NewEngineEffectHandler(engine *effects.Engine) *EngineEffectHandler

NewEngineEffectHandler wraps the given engine so it satisfies the EffectHandler interface.

func (*EngineEffectHandler) HandleNack

func (h *EngineEffectHandler) HandleNack(nack *pb.NackNotify) error

HandleNack forwards NACKs to the engine.

func (*EngineEffectHandler) HandleOffsetNotify

func (h *EngineEffectHandler) HandleOffsetNotify(notify *pb.OffsetNotify) ([]*pb.NackNotify, error)

HandleOffsetNotify delegates to the engine's HandleRemote. The engine handles log storage, keytrie update, cache invalidation, NACK generation, and notification callbacks.

type EngineLogReader

type EngineLogReader struct {
	// contains filtered or unexported fields
}

EngineLogReader adapts the engine's effect cache to the LogReader interface. Previously lived in the redis package; shared here.

Wire format produced by ReadEffect:

[4 bytes LE keyLen][keyLen bytes of key][proto-marshalled effect]

The receiver reconstructs the full log entry including the key, which is stored separately from the effect data in the log.

func NewEngineLogReader

func NewEngineLogReader(cache *clox.CloxCache[effects.Tip, *pb.Effect]) *EngineLogReader

NewEngineLogReader wraps an effect cache.

func (*EngineLogReader) ReadEffect

func (r *EngineLogReader) ReadEffect(ref *pb.EffectRef) ([]byte, error)

ReadEffect looks the effect up in the cache and returns it in wire format.

type ForwardHandler

type ForwardHandler interface {
	HandleForwardedTransaction(tx *pb.ForwardedTransaction) *pb.ForwardedResponse
}

ForwardHandler executes forwarded transactions received from other nodes via the adaptive serialization mechanism (§5). The serialization leader receives the full transaction, executes it locally, and returns the raw RESP wire bytes.

type HeartbeatManager

type HeartbeatManager struct {
	// contains filtered or unexported fields
}

HeartbeatManager manages the heartbeat protocol for all same-region peers.

func NewHeartbeatManager

func NewHeartbeatManager(nodeID NodeId, healthTable *PeerHealthTable) *HeartbeatManager

NewHeartbeatManager creates a new HeartbeatManager.

func (*HeartbeatManager) AddPeer

func (hm *HeartbeatManager) AddPeer(nodeID NodeId, region string)

AddPeer registers a peer for heartbeat monitoring.

func (*HeartbeatManager) ProcessInbound

func (hm *HeartbeatManager) ProcessInbound(peerID NodeId, timestamp uint64)

ProcessInbound handles a received heartbeat packet (already decrypted by Noise). peerID and timestamp are parsed from the packet body. Sends a HeartbeatACK back to the sender echoing the received timestamp for true RTT measurement.

func (*HeartbeatManager) ProcessInboundACK

func (hm *HeartbeatManager) ProcessInboundACK(peerID NodeId, echoedTimestamp uint64)

ProcessInboundACK handles a received HeartbeatACK packet. The echoedTimestamp is the original sender's timestamp echoed back, so RTT = now - echoedTimestamp uses the same clock for true round-trip measurement.

func (*HeartbeatManager) RemovePeer

func (hm *HeartbeatManager) RemovePeer(nodeID NodeId)

RemovePeer unregisters a peer from heartbeat monitoring.

func (*HeartbeatManager) SendHeartbeatTo added in v0.0.8

func (hm *HeartbeatManager) SendHeartbeatTo(peerID NodeId)

SendHeartbeatTo sends a single heartbeat to a specific peer immediately, bypassing the ticker. Used to accelerate symmetry establishment on new connections.

func (*HeartbeatManager) Start

func (hm *HeartbeatManager) Start(sendFunc func(peerID NodeId, data []byte) error)

Start begins the heartbeat send loop and liveness checker.

func (*HeartbeatManager) Stop

func (hm *HeartbeatManager) Stop()

Stop stops the heartbeat manager.

type LogReader

type LogReader interface {
	// ReadEffect reads effect bytes from the local log at the given offset.
	// Returns the data in wire format: [4-byte LE keyLen][key bytes][effect data].
	ReadEffect(offset *pb.EffectRef) ([]byte, error)
}

LogReader reads effect data from the local log by offset. The redis package provides the concrete implementation at startup.

Wire format: ReadEffect returns [4 bytes keyLen (little-endian)][key][effectData]. This allows the receiver to reconstruct the full log entry including the key, which is stored separately from the effect data in the log.

type NodeConfig

type NodeConfig struct {
	ID      NodeId
	Address string // QUIC address (host:port)
	Region  string
}

NodeConfig represents a single node in the cluster.

type NodeId

type NodeId = pb.NodeID

func ParseHeartbeat

func ParseHeartbeat(body []byte) (nodeID NodeId, timestamp uint64, err error)

ParseHeartbeat parses a heartbeat plaintext body (already Noise-decrypted). Returns nodeID, timestamp, and an error if the body is too short.

type PacketHandler

type PacketHandler interface {
	HandleHeartbeat(peerID NodeId, timestamp uint64)
	HandleHeartbeatACK(peerID NodeId, timestamp uint64)
	HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)
	HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)
	HandleNotifyACKWithData(peerID NodeId, requestID uint64, status byte, credits uint32, fullPacket []byte)
	HandleNackNotify(peerID NodeId, nack *pb.NackNotify)
}

PacketHandler dispatches parsed inbound packets to the appropriate handler.

type PeerConn

type PeerConn struct {
	// contains filtered or unexported fields
}

PeerConn manages a single QUIC connection to a peer node. Used for fetch, forward, and notification transport.

func (*PeerConn) Fetch

func (pc *PeerConn) Fetch(ctx context.Context, offset *pb.EffectRef) ([]byte, error)

Fetch opens a short-lived QUIC stream to request effect data from this peer.

func (*PeerConn) ForwardTransaction

func (pc *PeerConn) ForwardTransaction(ctx context.Context, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)

ForwardTransaction opens a QUIC stream to forward a transaction to this peer (the serialization leader). Returns the leader's response.

func (*PeerConn) GetQuicConn

func (pc *PeerConn) GetQuicConn() *quic.Conn

GetQuicConn returns the current QUIC connection, or nil if not connected.

func (*PeerConn) IsSameRegion

func (pc *PeerConn) IsSameRegion() bool

IsSameRegion returns true if the peer is in the same region as the local node.

func (*PeerConn) IsStreamReady

func (pc *PeerConn) IsStreamReady() bool

IsStreamReady returns true if the QUIC connection is established.

func (*PeerConn) Start

func (pc *PeerConn) Start(ctx context.Context)

Start dials the peer and begins the connection loop in a goroutine.

func (*PeerConn) Stop

func (pc *PeerConn) Stop()

Stop closes the connection.

type PeerHealth

type PeerHealth struct {
	// contains filtered or unexported fields
}

PeerHealth tracks the liveness and path symmetry of a single peer.

func (*PeerHealth) IsReplicationTarget

func (ph *PeerHealth) IsReplicationTarget() bool

IsReplicationTarget returns true if this peer is both alive and has a verified symmetric path — suitable for same-region replication with ACK.

type PeerHealthTable

type PeerHealthTable struct {
	OnPeerRecovered func(peerID NodeId) // called when a peer transitions from dead to alive
	// contains filtered or unexported fields
}

PeerHealthTable maps peer node IDs to their health state.

func NewPeerHealthTable

func NewPeerHealthTable() *PeerHealthTable

NewPeerHealthTable creates an empty health table.

func (*PeerHealthTable) AlivePeerIDs

func (t *PeerHealthTable) AlivePeerIDs() []NodeId

AlivePeerIDs returns the IDs of all peers that are currently alive. Implements effects.PeerRTTProvider.

func (*PeerHealthTable) AliveSymmetricCount

func (t *PeerHealthTable) AliveSymmetricCount(localRegion string, peerRegions map[NodeId]string) int

AliveSymmetricCount returns the count of alive+symmetric peers in the local region.

func (*PeerHealthTable) AliveSymmetricPeers

func (t *PeerHealthTable) AliveSymmetricPeers(localRegion string, peerRegions map[NodeId]string) []NodeId

AliveSymmetricPeers returns the node IDs of all peers that are in the given localRegion and are both alive and symmetric (valid replication targets).

func (*PeerHealthTable) AllRegionPeersReachable

func (t *PeerHealthTable) AllRegionPeersReachable(localRegion string, peerRegions map[NodeId]string) bool

AllRegionPeersReachable returns true if every same-region peer is alive and has a verified symmetric path. Returns true when there are no same-region peers (standalone or cross-region-only deployment).

func (*PeerHealthTable) AllSameRegionPeers

func (t *PeerHealthTable) AllSameRegionPeers(localRegion string, peerRegions map[NodeId]string) []NodeId

AllSameRegionPeers returns the node IDs of all peers in the given local region, regardless of liveness. Used for fire-and-forget replication to dead peers so they receive effects when the partition heals.

func (*PeerHealthTable) CheckLiveness

func (t *PeerHealthTable) CheckLiveness(timeout time.Duration)

CheckLiveness marks peers as dead if their last heartbeat exceeds the timeout.

func (*PeerHealthTable) Get

func (t *PeerHealthTable) Get(nodeID NodeId) *PeerHealth

Get returns the PeerHealth for the given node, or nil if not tracked.

func (*PeerHealthTable) GetOrCreate

func (t *PeerHealthTable) GetOrCreate(nodeID NodeId) *PeerHealth

GetOrCreate returns the PeerHealth for the given node, creating it if needed.

func (*PeerHealthTable) GetRTT

func (t *PeerHealthTable) GetRTT(nodeID NodeId) time.Duration

GetRTT returns the estimated round-trip time to the given peer. Returns 0 if the peer is unknown or RTT has not been measured. Implements effects.PeerRTTProvider.

func (*PeerHealthTable) InMajorityPartition

func (t *PeerHealthTable) InMajorityPartition(localRegion string, peerRegions map[NodeId]string) bool

InMajorityPartition returns true if this node (counting itself) can reach a strict majority of same-region nodes. With N total region nodes (including self), majority requires > N/2 reachable. Returns true when there are no same-region peers (standalone).

func (*PeerHealthTable) PruneUnknownPeers added in v0.1.2

func (t *PeerHealthTable) PruneUnknownPeers(known map[NodeId]struct{})

PruneUnknownPeers removes health table entries for peers not present in the given set of known peer IDs. This prevents unbounded growth from transient inbound connections (e.g. crash-looping joiners).

func (*PeerHealthTable) Remove

func (t *PeerHealthTable) Remove(nodeID NodeId)

Remove removes a peer from the health table.

type PeerManager

type PeerManager struct {
	// contains filtered or unexported fields
}

PeerManager manages all peer connections and the local QUIC listener.

func NewPeerManager

func NewPeerManager(config *ClusterConfig, handler EffectHandler, logReader LogReader) (*PeerManager, error)

NewPeerManager creates a new PeerManager from the cluster configuration.

func (*PeerManager) AllRegionPeersReachable

func (pm *PeerManager) AllRegionPeersReachable() bool

AllRegionPeersReachable returns true if every same-region peer is alive and has a verified symmetric path. Returns true in standalone mode.

func (*PeerManager) Broadcast

func (pm *PeerManager) Broadcast(notify *pb.OffsetNotify)

func (*PeerManager) BroadcastWithData

func (pm *PeerManager) BroadcastWithData(notify *pb.OffsetNotify, effectData []byte)

BroadcastWithData sends an OffsetNotify to all connected peers. Routes through the replicator (UDP) when available.

func (*PeerManager) Fetch

func (pm *PeerManager) Fetch(ref *pb.EffectRef) ([]byte, error)

Fetch retrieves effect bytes from a specific peer by slot and offset.

func (*PeerManager) FetchFromAny

func (pm *PeerManager) FetchFromAny(offset *pb.EffectRef) ([]byte, error)

FetchFromAny tries to fetch effect bytes from any connected peer, racing against CDN fetch if available. First successful result wins.

func (*PeerManager) ForwardTransaction

func (pm *PeerManager) ForwardTransaction(ctx context.Context, targetNodeID NodeId, tx *pb.ForwardedTransaction) (*pb.ForwardedResponse, error)

ForwardTransaction sends a transaction to a specific peer for execution. Used by adaptive serialization to route operations through the leader.

func (*PeerManager) HandleHeartbeat

func (pm *PeerManager) HandleHeartbeat(peerID NodeId, timestamp uint64)

HandleHeartbeat routes inbound heartbeats to the heartbeat manager.

func (*PeerManager) HandleHeartbeatACK

func (pm *PeerManager) HandleHeartbeatACK(peerID NodeId, timestamp uint64)

HandleHeartbeatACK routes inbound heartbeat ACKs to the heartbeat manager.

func (*PeerManager) HandleNackNotify

func (pm *PeerManager) HandleNackNotify(peerID NodeId, nack *pb.NackNotify)

HandleNackNotify routes inbound NACKs to the effect handler.

func (*PeerManager) HandleNotify

func (pm *PeerManager) HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)

HandleNotify routes inbound notifications to the replicator.

func (*PeerManager) HandleNotifyACK

func (pm *PeerManager) HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)

HandleNotifyACK routes inbound ACKs to the replicator.

func (*PeerManager) HandleNotifyACKWithData

func (pm *PeerManager) HandleNotifyACKWithData(peerID NodeId, requestID uint64, status byte, credits uint32, fullPacket []byte)

func (*PeerManager) HealthTable

func (pm *PeerManager) HealthTable() *PeerHealthTable

Broadcast sends an OffsetNotify to all connected peers. Fire-and-forget. HealthTable returns the peer health table, or nil if UDP/Noise is not configured.

func (*PeerManager) InMajorityPartition

func (pm *PeerManager) InMajorityPartition() bool

InMajorityPartition returns true if this node can reach a strict majority of same-region nodes. Returns true in standalone mode.

func (*PeerManager) ListenAddr

func (pm *PeerManager) ListenAddr() string

ListenAddr returns the actual address the QUIC listener is bound to. Useful when binding to port 0 in tests.

func (*PeerManager) PeerIDs

func (pm *PeerManager) PeerIDs() []NodeId

PeerIDs returns the IDs of all configured peers.

func (*PeerManager) Replicate

func (pm *PeerManager) Replicate(notify *pb.OffsetNotify, wireData []byte) error

Replicate sends the notification using first-ACK semantics for same-region peers. Blocks until at least one same-region peer ACKs or the replication timeout expires. Returns nil on success, or an error if replication failed (no peers, timeout, etc.). If the replicator is not set up (no UDP transport), falls back to BroadcastWithData.

func (*PeerManager) ReplicateTo

func (pm *PeerManager) ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetNodeID NodeId) ([]*pb.NackNotify, error)

ReplicateTo sends a notification to a specific peer and waits for ACK or NACK. Returns NackNotify slice on conflict, nil on ACK.

func (*PeerManager) SendNack

func (pm *PeerManager) SendNack(nack *pb.NackNotify, targetNodeID NodeId)

SendNack sends an enriched NACK to a specific peer.

func (*PeerManager) SetCDNFetcher

func (pm *PeerManager) SetCDNFetcher(f CDNFetcher)

SetCDNFetcher sets the CDN fetcher for racing CDN against peer fetch.

func (*PeerManager) SetForwardHandler

func (pm *PeerManager) SetForwardHandler(h ForwardHandler)

SetForwardHandler sets the handler for forwarded transactions (adaptive serialization). Must be called before any forwarded transactions are received.

func (*PeerManager) Start

func (pm *PeerManager) Start(ctx context.Context) error

Start launches the QUIC transport, heartbeat manager, QUIC listener, and dials all peers.

func (*PeerManager) Stop

func (pm *PeerManager) Stop()

Stop shuts down all components.

func (*PeerManager) UpdateTopology

func (pm *PeerManager) UpdateTopology(newCfg *ClusterConfig)

UpdateTopology applies a new cluster configuration, connecting to new peers and disconnecting removed ones.

func (*PeerManager) WaitForAnyPeer

func (pm *PeerManager) WaitForAnyPeer(ctx context.Context) error

WaitForAnyPeer blocks until at least one peer connection is ready for streaming, or the context is cancelled. Must be called after Start.

func (*PeerManager) WaitForPeer

func (pm *PeerManager) WaitForPeer(ctx context.Context, nodeID NodeId) bool

WaitForPeer blocks until the given peer's stream is ready or context is cancelled.

type QUICNotifyTransport

type QUICNotifyTransport struct {
	// contains filtered or unexported fields
}

QUICNotifyTransport replaces the custom UDP transport (Noise + chunking + pacing) with QUIC stream-per-message. Each notification opens a unidirectional stream, writes the plaintext, and closes. QUIC handles encryption (TLS 1.3), retransmission, flow control, and congestion control.

func NewQUICNotifyTransport

func NewQUICNotifyTransport(
	nodeID NodeId,
	handler PacketHandler,
	connFunc func(peerID NodeId) *quic.Conn,
	onPeerIdentified func(peerID NodeId, conn *quic.Conn),
) *QUICNotifyTransport

NewQUICNotifyTransport creates a new QUIC notification transport. onPeerIdentified may be nil when the caller doesn't need inbound connection tracking (e.g. unit tests).

func (*QUICNotifyTransport) AcceptUniStreams

func (t *QUICNotifyTransport) AcceptUniStreams(conn *quic.Conn)

AcceptUniStreams accepts incoming unidirectional streams on a QUIC connection and dispatches them. Called from the accept loop for each accepted connection. Goroutine lifetime is tied to the connection, not the transport — it exits when the connection closes (conn.Context() cancelled).

func (*QUICNotifyTransport) Send

func (t *QUICNotifyTransport) Send(peerID NodeId, plaintext []byte) (wireSize int, err error)

Send opens a unidirectional QUIC stream, writes [2:senderNodeID LE][plaintext], and closes the stream. QUIC handles encryption, retransmission, and flow control. Returns the total bytes written and any error.

func (*QUICNotifyTransport) SendDirect

func (t *QUICNotifyTransport) SendDirect(peerID NodeId, plaintext []byte) error

SendDirect sends a control packet (heartbeat, ACK) via a QUIC uni-stream. With QUIC, there is no distinction between paced and direct sends — each message gets its own independent stream.

func (*QUICNotifyTransport) Start

func (t *QUICNotifyTransport) Start()

Start is a no-op for the QUIC transport (connections are managed by PeerConn).

func (*QUICNotifyTransport) Stop

func (t *QUICNotifyTransport) Stop()

Stop shuts down the transport.

type ReplicationFuture

type ReplicationFuture struct {
	// contains filtered or unexported fields
}

ReplicationFuture represents a pending replication operation. It resolves when the first same-region peer ACKs or when the deadline expires. Callers check Wait()'s return value to distinguish success from failure.

func (*ReplicationFuture) Done

func (f *ReplicationFuture) Done() <-chan struct{}

Done returns a channel that is closed when the future resolves.

func (*ReplicationFuture) Err

func (f *ReplicationFuture) Err() error

Err returns the error stored in the future, or nil on success.

func (*ReplicationFuture) Nacks

func (f *ReplicationFuture) Nacks() []*pb.NackNotify

Nacks returns the NACKs attached to this future, or nil.

func (*ReplicationFuture) Wait

func (f *ReplicationFuture) Wait() error

Wait blocks until the replication future is resolved and returns any error.

type Replicator

type Replicator struct {
	// contains filtered or unexported fields
}

Replicator manages replication with first-ACK semantics for same-region and fire-and-forget for cross-region peers. Uses QUIC stream-per-message transport.

func NewReplicator

func NewReplicator(
	nodeID NodeId,
	localRegion string,
	healthTable *PeerHealthTable,
	transport *QUICNotifyTransport,
	handler EffectHandler,
	replicationTimeout time.Duration,
) *Replicator

NewReplicator creates a new Replicator.

func (*Replicator) AddPeer

func (r *Replicator) AddPeer(nodeID NodeId, region string)

AddPeer registers a peer for replication.

func (*Replicator) AllRegionPeersReachable

func (r *Replicator) AllRegionPeersReachable() bool

AllRegionPeersReachable returns true if every same-region peer is alive and has a verified symmetric path.

func (*Replicator) HandleNotify

func (r *Replicator) HandleNotify(peerID NodeId, requestID uint64, notify *pb.OffsetNotify)

HandleNotify processes an inbound notification from a peer. It applies the effect and sends an ACK or NACK response back via QUIC.

func (*Replicator) HandleNotifyACK

func (r *Replicator) HandleNotifyACK(peerID NodeId, requestID uint64, status byte, credits uint32)

HandleNotifyACK processes an inbound ACK from a peer.

func (*Replicator) HandleNotifyACKWithData

func (r *Replicator) HandleNotifyACKWithData(peerID NodeId, requestID uint64, status byte, credits uint32, fullPacket []byte)

HandleNotifyACKWithData processes the full ACK/NACK packet including payload. Called from dispatch when status=0x01 to extract embedded NACKs.

func (*Replicator) InMajorityPartition

func (r *Replicator) InMajorityPartition() bool

InMajorityPartition returns true if this node can reach a strict majority of same-region nodes (including itself).

func (*Replicator) RemovePeer

func (r *Replicator) RemovePeer(nodeID NodeId)

RemovePeer unregisters a peer.

func (*Replicator) Replicate

func (r *Replicator) Replicate(notify *pb.OffsetNotify, wireData []byte) *ReplicationFuture

Replicate sends a notification to all peers with first-ACK semantics for same-region. Returns a future that resolves when the first same-region ACK arrives, or rejects with ErrNoPeers if there are no alive+symmetric same-region peers. Cross-region peers receive the notification fire-and-forget.

func (*Replicator) ReplicateTo

func (r *Replicator) ReplicateTo(notify *pb.OffsetNotify, wireData []byte, targetPeerID NodeId) ([]*pb.NackNotify, error)

ReplicateTo sends a notification to a specific peer and waits for ACK or NACK. Used by transactional bind to get deliberate per-subscriber responses.

func (*Replicator) SendNack

func (r *Replicator) SendNack(nack *pb.NackNotify, targetPeerID NodeId)

SendNack sends an enriched NACK to a specific peer.

func (*Replicator) Start

func (r *Replicator) Start()

Start begins the timeout sweep loop.

func (*Replicator) Stop

func (r *Replicator) Stop()

Stop shuts down the replicator.

type StorageConfig

type StorageConfig struct {
	UploadEndpoint string // Bunny storage API (e.g. "https://storage.bunnycdn.com/zone")
	CDNEndpoint    string // Bunny CDN read (e.g. "https://cdn.example.com")
	AccessKey      string // Bunny storage access key
	BasePath       string // Node-specific prefix (derived from NodeID if empty)

	// HPKE keypair for post-quantum encryption (MLKEM768X25519 + AES-256-GCM).
	// Base64-encoded. Used as the default for key ranges without specific keys.
	HPKEPublicKey  string
	HPKEPrivateKey string
}

StorageConfig holds configuration for off-node object storage (Bunny CDN).

type TopologyDiff

type TopologyDiff struct {
	Added   []NodeConfig // Peers in new but not old
	Removed []NodeConfig // Peers in old but not new
	Changed []NodeConfig // Peers whose address changed (contains the new config)
}

TopologyDiff describes changes between two cluster configurations.

func DiffTopology

func DiffTopology(old, new *ClusterConfig) TopologyDiff

DiffTopology computes the difference between two cluster configurations. Either old or new may be nil (treated as empty).

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL