Documentation
¶
Overview ¶
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Package whisper is a generic, topic-based gossip engine for peer-to-peer state propagation. It provides the pub/sub, delta sync, and topology routing layers that sit between a wire protocol (Aether) and application state (Ledger, Minerva, Mercury).
Whisper is transport-agnostic — it operates on Aether streams but knows nothing about what it carries.
Three gossip modes:
- BroadcastOnly: firehose pub/sub, no state, no merge
- StatefulMerge: delta sync with merge-on-apply via StateStore
- RequestResponse: solicit/reply pattern
Three wire frame types:
G1 (magic 0x4731): full bidirectional exchange with ExchangeMeta
G2 (magic 0x4732): 12-byte digest probe for fingerprint comparison
G3 (magic 0x4733): immediate rumor push via hypercube routing
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.
Queries: licensing@hstles.com
Index ¶
- Constants
- Variables
- func HandlePullRequest(req PullRequest, allowFn func() bool, fetch PullFetchFn) ([]byte, error)
- func HandleRumorFrame(r io.Reader, tracker *RumorTracker, pusher *RumorPusher, senderNodeID string, ...)
- func IsStalePEXEntry(entry PEXEntry) bool
- func ReadDigestBody(r io.Reader) (fingerprint uint64, flags uint16, err error)
- func ReadRumorACKBody(r io.Reader) ([]byte, error)
- func ReadRumorBody(r io.Reader) (payload []byte, hopCount uint8, fromDimension uint8, err error)
- func ShouldInitiateDial(localNodeID, peerNodeID string) bool
- func VerifyPEXEntry(entry PEXEntry, publicKey ed25519.PublicKey) bool
- func WriteDigestProbe(conn net.Conn, fingerprint uint64, flags uint16) error
- func WritePullRequest(conn net.Conn, req PullRequest) error
- func WriteRumor(conn net.Conn, payload []byte, hopCount uint8, fromDimension uint8) error
- func WriteRumorACK(conn net.Conn, rumorID []byte) error
- type AdaptiveInterval
- type AdaptivePolicy
- func (p *AdaptivePolicy) PeerOverride(peer string) (NetworkProfile, bool)
- func (p *AdaptivePolicy) Profile() NetworkProfile
- func (p *AdaptivePolicy) SetEventEmitter(fn func(GossipEvent))
- func (p *AdaptivePolicy) SetPeerOverride(peer string, profile NetworkProfile)
- func (p *AdaptivePolicy) Signals() NetworkSignals
- func (p *AdaptivePolicy) Subscribe(fn func(NetworkProfile))
- func (p *AdaptivePolicy) UpdateSignals(s NetworkSignals)
- type AdaptiveStats
- type BackpressureMonitor
- type CPUGate
- type CPUGateConfig
- type ConnectionOffer
- type DebugLogger
- type DecodedExchange
- type DeltaPeerStats
- type DeltaPersistence
- type DeltaSyncMeta
- type DeltaTracker
- func (dt *DeltaTracker) AttachPersistence(ctx context.Context, p DeltaPersistence, saveInterval time.Duration, ...)
- func (dt *DeltaTracker) RecordFullSyncResult(peerID string, appliedRecords int)
- func (dt *DeltaTracker) RecordWatermark(peerID string) time.Time
- func (dt *DeltaTracker) Reset(peerID string)
- func (dt *DeltaTracker) ShouldFullSync(peerID string) bool
- func (dt *DeltaTracker) StartSweeper()
- func (dt *DeltaTracker) Stats() map[string]DeltaPeerStats
- func (dt *DeltaTracker) StopSweeper()
- func (dt *DeltaTracker) UpdateWatermark(peerID string, timestamp time.Time)
- type Engine
- func (e *Engine) AllowPeer(peerID string) bool
- func (e *Engine) Backpressure() *BackpressureMonitor
- func (e *Engine) Delta() *DeltaTracker
- func (e *Engine) EventBus() *eventBus
- func (e *Engine) Hypercube() *Hypercube
- func (e *Engine) PEXManager() *PEXManager
- func (e *Engine) Publish(topic string, payload []byte) error
- func (e *Engine) Query(ctx context.Context, topic string, payload []byte) ([][]byte, error)
- func (e *Engine) RTT() *NetworkRTTMeasurer
- func (e *Engine) RateLimiter() *RateLimiter
- func (e *Engine) RegisterFrameKind(magic uint16, handler FrameHandler) error
- func (e *Engine) RegisterTopic(name string, config TopicConfig) error
- func (e *Engine) Registry() *TopicRegistry
- func (e *Engine) Run(ctx context.Context, conn net.Conn, peerNodeID string, ...) error
- func (e *Engine) RunResponder(ctx context.Context, conn net.Conn, peerNodeID string) error
- func (e *Engine) Stats() EngineStats
- func (e *Engine) Stop()
- func (e *Engine) Subscribe(topic string, sub Subscriber)
- func (e *Engine) SubscribeEvents(ch chan<- GossipEvent)
- type EngineOption
- func WithAdaptive(a *AdaptiveInterval) EngineOption
- func WithBackpressure(bp *BackpressureMonitor) EngineOption
- func WithBufferPool(p *sync.Pool) EngineOption
- func WithDelta(dt *DeltaTracker) EngineOption
- func WithExchangeDeadline(d time.Duration) EngineOption
- func WithExchangeFunc(...) EngineOption
- func WithFatalErrorClassifier(fn func(error) bool) EngineOption
- func WithFingerprintProvider(fp FingerprintProvider) EngineOption
- func WithFirstFrameGrace(d time.Duration) EngineOption
- func WithG1Codec(c G1Codec) EngineOption
- func WithG1ExchangeObserver(obs G1ExchangeObserver) EngineOption
- func WithG1Store(s StateStore) EngineOption
- func WithGlobalEgressLimit(bytesPerSec int) EngineOption
- func WithHypercube(h *Hypercube) EngineOption
- func WithMaxPayloadSize(n uint32) EngineOption
- func WithMetaProvider(fn func() *ExchangeMeta) EngineOption
- func WithNetworkPolicy(p NetworkPolicy) EngineOption
- func WithOverloadQueueThreshold(n int) EngineOption
- func WithPEX(pex *PEXManager) EngineOption
- func WithPeerRateLimit(per float64, burst int) EngineOption
- func WithRTT(rtt *NetworkRTTMeasurer) EngineOption
- func WithRateLimiter(rl *RateLimiter) EngineOption
- func WithReconcile(d *ReconcileDriver) EngineOption
- func WithRumor(r *RumorPusher) EngineOption
- func WithRumorApplyFn(fn func([]byte) error) EngineOption
- func WithRumorDedupKeyFunc(fn RumorIDFunc) EngineOption
- func WithSeenIDTracking(enabled bool) EngineOption
- func WithSnapshot(d *SnapshotDriver) EngineOption
- type EngineStats
- type ExchangeMeta
- type FingerprintProvider
- type FixedProfilePolicy
- func (p *FixedProfilePolicy) PeerOverride(peer string) (NetworkProfile, bool)
- func (p *FixedProfilePolicy) Profile() NetworkProfile
- func (p *FixedProfilePolicy) SetPeerOverride(peer string, profile NetworkProfile)
- func (p *FixedProfilePolicy) SetProfile(profile NetworkProfile)
- func (p *FixedProfilePolicy) Signals() NetworkSignals
- func (p *FixedProfilePolicy) Subscribe(fn func(NetworkProfile))
- type FrameAction
- type FrameHandler
- type FrameHandlerFunc
- type G1Codec
- type G1ExchangeObserver
- type GossipEvent
- type GossipEventKind
- type GossipResult
- type Hypercube
- func (h *Hypercube) Dimension() int
- func (h *Hypercube) DimensionNeighbor(d int) string
- func (h *Hypercube) MemberCount() int
- func (h *Hypercube) Neighbors() []string
- func (h *Hypercube) Position() int
- func (h *Hypercube) Rebuild(memberIDs []string)
- func (h *Hypercube) RouteRumor(fromDimension int) []int
- func (h *Hypercube) Valid() bool
- func (h *Hypercube) Version() uint64
- type HypercubeExt
- func (h *HypercubeExt) DimensionWeight(dim int) float64
- func (h *HypercubeExt) Dual() *Hypercube
- func (h *HypercubeExt) EffectiveDimensions(peerID string) int
- func (h *HypercubeExt) MarkDirty()
- func (h *HypercubeExt) Primary() *Hypercube
- func (h *HypercubeExt) Rebuild(members []string)
- func (h *HypercubeExt) RecordDimensionLoss(dim int)
- func (h *HypercubeExt) RecordDimensionSuccess(dim int)
- func (h *HypercubeExt) RouteToFar(targetNodeID string) (nextHop string, ok bool)
- type HypercubeOptions
- type HypercubeRebuildPolicy
- type NetworkPolicy
- type NetworkProfile
- type NetworkRTTMeasurer
- type NetworkSignals
- type PEXEntry
- type PEXManager
- func (m *PEXManager) BuildPEXEntries(peerNodeID string) []PEXEntry
- func (m *PEXManager) KnownPeers() []PEXEntry
- func (m *PEXManager) ProcessPEXEntries(entries []PEXEntry) []string
- func (m *PEXManager) RefreshLocalEntry(nodeID string, addresses []string, region string)
- func (m *PEXManager) RemovePeer(peerNodeID string)
- type PEXRateLimiter
- type PEXRequest
- type PEXResponse
- type PeerBackoff
- type PeerRateLimiter
- type PersistedPeerState
- type PullFetchFn
- type PullRequest
- type RateLimitStats
- type RateLimiter
- func (r *RateLimiter) AllowPeer(peerID string) bool
- func (r *RateLimiter) DecQueue()
- func (r *RateLimiter) EgressSaturated(n int) bool
- func (r *RateLimiter) IncQueue()
- func (r *RateLimiter) Overloaded() bool
- func (r *RateLimiter) PeerCount() int
- func (r *RateLimiter) PeerInCooldown(peerID string) bool
- func (r *RateLimiter) QueueDepth() int
- func (r *RateLimiter) QueueThreshold() int
- func (r *RateLimiter) RemovePeer(peerID string)
- func (r *RateLimiter) ReserveEgress(n int) bool
- func (r *RateLimiter) SetQueueThreshold(n int)
- type ReconcileCodec
- type ReconcileDriver
- func (d *ReconcileDriver) Codec() ReconcileCodec
- func (d *ReconcileDriver) PeerCount() int
- func (d *ReconcileDriver) Policy() NetworkPolicy
- func (d *ReconcileDriver) RegisterPeer(p *ReconcilePeer)
- func (d *ReconcileDriver) RunInitiatorRound(nodeID string, timeout time.Duration) (applied, sent int, err error)
- func (d *ReconcileDriver) RunResponderRound(conn net.Conn, body []byte) error
- func (d *ReconcileDriver) Store() ReconcileStore
- func (d *ReconcileDriver) UnregisterPeer(nodeID string)
- type ReconcileFlags
- type ReconcileMode
- type ReconcilePeer
- type ReconcileStore
- type ReedSolomonErasure
- type RumorConfig
- type RumorIDFunc
- type RumorMessage
- type RumorPeerConn
- type RumorPusher
- func (rp *RumorPusher) EnableACKs(timeout time.Duration, maxRetries int)
- func (rp *RumorPusher) HandleACK(rumorID, peerID string)
- func (rp *RumorPusher) HypercubeExt() *HypercubeExt
- func (rp *RumorPusher) NotifyNewPayload(payload []byte)
- func (rp *RumorPusher) PushRumor(payload []byte, hopCount uint8, fromDimension uint8)
- func (rp *RumorPusher) PushRumorExcluding(payload []byte, hopCount uint8, fromDimension uint8, excludeNodeID string)
- func (rp *RumorPusher) RegisterPeer(nodeID string, conn RumorPeerConn)
- func (rp *RumorPusher) Run(ctx context.Context)
- func (rp *RumorPusher) SetEventEmitter(fn func(GossipEvent))
- func (rp *RumorPusher) SetHypercube(cube *Hypercube)
- func (rp *RumorPusher) SetHypercubeExt(ext *HypercubeExt)
- func (rp *RumorPusher) SetNetworkPolicy(p NetworkPolicy)
- func (rp *RumorPusher) SetPeerCapabilityCheck(fn func(peerID string) bool)
- func (rp *RumorPusher) Stats() RumorStats
- func (rp *RumorPusher) Tracker() *RumorTracker
- func (rp *RumorPusher) UnregisterPeer(nodeID string)
- type RumorStats
- type RumorTracker
- type SnapshotChunk
- type SnapshotCodec
- type SnapshotDriver
- type SnapshotFrameKind
- type SnapshotManifest
- type SnapshotShardSpec
- type SnapshotStore
- type StateStore
- type Subscriber
- type TopicConfig
- type TopicHandler
- type TopicMode
- type TopicRegistry
Constants ¶
const ( // DefaultPeerMessagesPerSec is the steady-state per-peer message allowance. DefaultPeerMessagesPerSec = 100.0 // DefaultPeerBurst is the per-peer burst capacity. DefaultPeerBurst = 500 // DefaultGlobalEgressBytesPerSec is the total outbound throughput cap. DefaultGlobalEgressBytesPerSec = 10 * 1024 * 1024 // 10 MB/s // DefaultGlobalEgressBurst is the burst capacity for the egress limiter. DefaultGlobalEgressBurst = 10 * 1024 * 1024 // 10 MB burst // DefaultOverloadQueueThreshold is the outbound queue size above which // Publish returns ErrBackpressure. 0 disables the check. DefaultOverloadQueueThreshold = 1024 )
Default rate-limiting parameters. All are overridable via engine options.
const DefaultBackpressureInitialBackoff = 30 * time.Second
DefaultBackpressureInitialBackoff is the starting backoff when a peer first signals backpressure.
const DefaultBackpressureMaxBackoff = 120 * time.Second
DefaultBackpressureMaxBackoff is the maximum backoff duration a sender will apply when a peer repeatedly signals backpressure.
const DefaultBackpressureThreshold = 5
DefaultBackpressureThreshold is the number of concurrent pending gossip exchanges before a node signals backpressure to its peers.
const DefaultPullResponseMaxSize uint32 = 1 << 20
DefaultPullResponseMaxSize is the response-size cap used when the requester does not supply one (maxResponseSize=0). Matches the G1 push-delta cap (1 MB) so the pull path doesn't accidentally become a larger-payload channel than the push path.
const DigestMagic uint16 = 0x4732 // "G2"
const (
FlagDigestMatch uint16 = 1 << 0 // responder sets when fingerprints match
)
Digest frame flags.
const GossipMagic uint16 = 0x4731 // "G1"
GossipMagic is the 2-byte prefix on all G1 gossip frames for corruption detection. Wire format: [2-byte magic 0x4731][4-byte big-endian length][payload]
const JitterFraction = 0.15
JitterFraction is the ±fraction applied to each computed gossip interval to desynchronise peers that came up together. Without this a fleet-restart produces synchronised gossip cycles every 10 s, which spikes cross-region egress at the same wall-clock boundary for every peer. ±15% spreads the spike over ~1.5 s without changing the effective rate.
Applied in Next() after the base/idle/convergence arithmetic picks the unjittered value; also applied in ApplyBackpressure() so an overloaded fleet doesn't all back off in lockstep. Uses math/rand/v2 so the randomness is goroutine-safe without an explicit seed.
const MaxPullTopicLen = 255
MaxPullTopicLen caps the advertised topic name at 255 bytes. Any legitimate topic is far shorter; an oversized field is a protocol violation.
const MaxRumorPayloadBytes = 256 * 1024
MaxRumorPayloadBytes caps the size of a payload admitted into the rumor inbound queue. Oversized payloads are dropped with a warning — they still propagate through the delta (G1) path; rumor fast-push is best-effort and must not pin large slices alive in the bounded inbound channel.
const PEXInterval = 5 * time.Minute
PEXInterval is the minimum time between PEX sends to the same peer.
const PEXMaxAge = 1 * time.Hour
PEXMaxAge is the maximum age of a PEX entry before it's treated as stale.
const PEXMaxEntriesPerExchange = 10
PEXMaxEntriesPerExchange caps entries piggybacked on a single gossip exchange.
const PEXMaxKnownPeers = 500
PEXMaxKnownPeers caps the in-memory PEX peer table to prevent unbounded growth.
const PEXMaxRateLimitEntries = 500
PEXMaxRateLimitEntries caps the rate limiter's per-peer tracking map.
const PullMagic uint16 = 0x4734 // "G4"
Pull-based gossip (G4) — the dual of G1 push-delta. A peer with a stale watermark for some topic explicitly asks a neighbour for records newer than that watermark, instead of waiting for the next adaptive- interval G1 tick.
Use cases:
- Short-lived browser peers that connect briefly, pull current LAD state to catch up, then disconnect. No gossip-cycle participation needed — they just need a one-shot sync.
- Post-partition convergence — after a netsplit heals, a lagging peer can pull to catch up in one round-trip instead of waiting up to the full adaptive-full-sync cycle.
- Diagnostic pulls — operator tooling can pull from any peer for on-demand LAD state without disturbing the peer's gossip cadence.
Wire format (G4 request):
[2-byte magic 0x4734] [1-byte topicLen] [topicLen-byte topic] [8-byte sinceWatermark — unix nanoseconds, 0 = full sync] [4-byte maxResponseSize — responder caps at min(this, its own cap)]
The RESPONSE shape is implementation-defined (passes through the same G1 payload encoder typically). The caller wires its own response handler via HandlePullRequest — the package provides the wire-format primitives and per-peer rate-limiting hooks only, not an opinion on the payload shape.
const ReconcileMagic uint16 = 0x4734 // "G4"
ReconcileMagic is the 2-byte prefix on G4 set-reconciliation frames. Coexists with G1 (full/delta), G2 (digest), G3 (rumor) as a fourth gossip mode. Capability-gated via PeerCapabilities — peers without `Reconciliation` advertised continue to use G1.
const RumorACKMagic uint16 = 0x4735 // "G5" pun on the 5th gossip frame
RumorACKMagic is the 2-byte prefix on G3-ACK frames sent back from rumor receiver to rumor sender to confirm delivery + apply. Lives on the same bidirectional rumor stream as G3 rumors themselves — receiver-side handlers write an ACK after a successful apply; sender-side reader loop dispatches ACKs into the pusher's pending-ACK tracker.
Frame format: [magic 2][rumor_id 32] = 34 bytes total. Rumor IDs are content hashes (BLAKE3 / SHA-256 truncated) — the same value the rumor pusher's dedup tracker uses, so receiver and sender share key space.
const RumorMagic uint16 = 0x4733 // "G3"
const SnapshotMagic uint16 = 0x4736 // "G6" pun on the 5th magic
SnapshotMagic is the 2-byte prefix on G5 cold-start snapshot frames. Coexists with G1 (full/delta), G2 (digest), G3 (rumor), G4 (reconciliation). Capability-gated via PeerCapabilities — peers without `Snapshot` advertised fall back to G1 full sync for cold-start.
Variables ¶
var ( // ErrTopicNotFound is returned when a topic is not registered. ErrTopicNotFound = errors.New("whisper: topic not found") // ErrTopicExists is returned when registering a duplicate topic. ErrTopicExists = errors.New("whisper: topic already registered") // ErrStoreMissing is returned when a StatefulMerge topic has no StateStore. ErrStoreMissing = errors.New("whisper: StatefulMerge topic requires a StateStore") // ErrHandlerMissing is returned when a RequestResponse topic has no TopicHandler. ErrHandlerMissing = errors.New("whisper: RequestResponse topic requires a TopicHandler") )
var ErrBackpressure = errors.New("whisper: publish rejected — engine overloaded")
ErrBackpressure is returned by Publish when the engine is overloaded and cannot accept more outbound messages. Callers decide whether to retry, drop, or bubble up to their caller.
var ErrPeerThrottled = errors.New("whisper: peer rate limit exceeded")
ErrPeerThrottled is returned (internally, via dropped sends) when a single peer has exceeded its rate limit. The publish for that peer is skipped but other peers continue to receive the message.
Functions ¶
func HandlePullRequest ¶ added in v0.0.4
func HandlePullRequest(req PullRequest, allowFn func() bool, fetch PullFetchFn) ([]byte, error)
HandlePullRequest validates the request's rate limit, applies the response-size cap, and calls the consumer-supplied fetch function. Returns the response bytes (ready to write back on the stream) or an error if rate-limited, malformed, or the fetch failed.
The rate limiter is the existing per-peer PEXRateLimiter-style limiter consumers already run for G1 gossip — pulls reuse that mechanism so a peer can't bypass normal gossip backpressure by spamming pulls.
func HandleRumorFrame ¶
func HandleRumorFrame(r io.Reader, tracker *RumorTracker, pusher *RumorPusher, senderNodeID string, rumorIDFn RumorIDFunc, applyFn func([]byte) error)
HandleRumorFrame processes an incoming G3 rumor frame in the responder. Called from the responder multiplexer after the 2-byte magic is consumed. senderNodeID is excluded from forwarding to prevent back-propagation. applyFn is the consumer's function to apply the payload to local state.
func IsStalePEXEntry ¶
IsStalePEXEntry returns true if the entry is older than PEXMaxAge.
func ReadDigestBody ¶
ReadDigestBody reads the remaining 10 bytes of a digest frame after the 2-byte magic has already been consumed by the multiplexer.
func ReadRumorACKBody ¶ added in v0.0.10
ReadRumorACKBody reads the rumor ID following an already-consumed G3-ACK magic. Mirrors ReadRumorBody's contract.
func ReadRumorBody ¶
ReadRumorBody reads the remaining bytes of a G3 frame after the 2-byte magic has been consumed by the multiplexer. Returns the opaque payload bytes.
func ShouldInitiateDial ¶
ShouldInitiateDial implements the deterministic tiebreaker for simultaneous offers. When both sides include WantDirectConnect in simultaneous gossip exchanges, the node with the HIGHER NodeID initiates the dial. The lower NodeID defers and waits for an inbound connection. This ensures exactly one connection attempt per pair.
Returns true if localNodeID should initiate the dial to peerNodeID.
func VerifyPEXEntry ¶
VerifyPEXEntry checks the ed25519 signature on a PEX entry. The publicKey must correspond to the entry's NodeID (looked up from LAD member records).
func WriteDigestProbe ¶
WriteDigestProbe writes a 12-byte digest frame to conn.
func WritePullRequest ¶ added in v0.0.4
func WritePullRequest(conn net.Conn, req PullRequest) error
WritePullRequest writes a G4 pull request frame to conn. Counterpart of ReadPullRequestBody (which expects the caller to have consumed the 2-byte magic already, matching the responder's multiplexer contract used by G1/G2/G3).
func WriteRumor ¶
WriteRumor writes a G3 rumor frame to conn. The payload is opaque bytes that the consumer has already serialised. Uses net.Buffers to hand the header and payload to the OS writev path (where supported), avoiding the per-send copy into a merged buffer.
func WriteRumorACK ¶ added in v0.0.10
WriteRumorACK serialises a G3-ACK frame to conn. Called by the rumor receiver after a successful apply to confirm delivery to the originator.
rumorID must be exactly rumorIDLen bytes. Shorter / longer IDs are right-padded / truncated to keep the frame size constant.
Types ¶
type AdaptiveInterval ¶
type AdaptiveInterval struct {
// contains filtered or unexported fields
}
AdaptiveInterval adjusts gossip exchange timing based on exchange results. When records are applied (convergence), interval snaps to min (2s). When caches match (idle), interval backs off exponentially to max (60s). When records are sent but not applied (peer already had them), interval returns to base (10s).
Stateless — reads only GossipResult fields, no direct state store dependency.
func NewAdaptiveInterval ¶
func NewAdaptiveInterval(base, min, max time.Duration) *AdaptiveInterval
NewAdaptiveInterval creates an adaptive interval with the given bounds. base is the normal interval, min is the fastest (during convergence), max is the slowest (when idle).
func (*AdaptiveInterval) ApplyBackpressure ¶
func (a *AdaptiveInterval) ApplyBackpressure(overloaded bool) time.Duration
ApplyBackpressure grows the adaptive interval when global backpressure is active. Called by the engine between exchanges so over-subscribed nodes slow their G1 cycles until load clears. Returns the jittered sample around the backpressured base for the same reason Next() does: prevent an overloaded fleet from all backing off in lockstep.
func (*AdaptiveInterval) Current ¶
func (a *AdaptiveInterval) Current() time.Duration
Current returns the most recently computed interval.
func (*AdaptiveInterval) Next ¶
func (a *AdaptiveInterval) Next(result GossipResult) time.Duration
Next returns the interval for the next exchange based on the last result. The returned interval includes ±JitterFraction randomisation so peers that came up together don't produce synchronised gossip bursts. The stored a.current is the UNJITTERED value; jitter is applied per-call so every call to Next() returns a fresh jittered sample around it.
func (*AdaptiveInterval) Stats ¶ added in v0.0.3
func (a *AdaptiveInterval) Stats() AdaptiveStats
Stats returns a snapshot for metrics / diagnostics.
type AdaptivePolicy ¶ added in v0.0.10
type AdaptivePolicy struct {
// contains filtered or unexported fields
}
AdaptivePolicy is a NetworkPolicy that synthesises a profile from multiple input signals — LinkType, RTT trends, loss rate, battery state, time-of-day. Used by agents on roaming devices where a fixed-profile policy would be wrong (constant cellular profile even on wifi, or vice versa).
Profile transitions are smooth gradients for bandwidth values (30-sec ramp on direction changes) and immediate for cadence values (next gossip in 10 min vs 30 min has no meaningful gradient). Battery-low and predictive transitions short-circuit directly to conservative profiles regardless of LinkType.
func NewAdaptivePolicy ¶ added in v0.0.10
func NewAdaptivePolicy() *AdaptivePolicy
NewAdaptivePolicy returns an AdaptivePolicy seeded with the default profile. The synthesizer recomputes whenever UpdateSignals is called; consumers wire UpdateSignals to platform monitors (NetworkMonitor.onChanged, battery API, RTT histogram).
func (*AdaptivePolicy) PeerOverride ¶ added in v0.0.10
func (p *AdaptivePolicy) PeerOverride(peer string) (NetworkProfile, bool)
PeerOverride returns a per-peer override or false.
func (*AdaptivePolicy) Profile ¶ added in v0.0.10
func (p *AdaptivePolicy) Profile() NetworkProfile
Profile returns the synthesised profile. Bandwidth/pacing fields are ramped between current and target on transition.
func (*AdaptivePolicy) SetEventEmitter ¶ added in v0.0.10
func (p *AdaptivePolicy) SetEventEmitter(fn func(GossipEvent))
SetEventEmitter wires a callback that fires on every committed profile transition. Pass nil to disable. Used by consumers to surface link/battery-driven behavior changes in mesh-debug, OTLP, or operator dashboards without polling Profile() on a clock.
func (*AdaptivePolicy) SetPeerOverride ¶ added in v0.0.10
func (p *AdaptivePolicy) SetPeerOverride(peer string, profile NetworkProfile)
SetPeerOverride installs a per-peer override.
func (*AdaptivePolicy) Signals ¶ added in v0.0.10
func (p *AdaptivePolicy) Signals() NetworkSignals
Signals returns the most recent signal vector.
func (*AdaptivePolicy) Subscribe ¶ added in v0.0.10
func (p *AdaptivePolicy) Subscribe(fn func(NetworkProfile))
Subscribe registers a callback fired on every profile change.
func (*AdaptivePolicy) UpdateSignals ¶ added in v0.0.10
func (p *AdaptivePolicy) UpdateSignals(s NetworkSignals)
UpdateSignals re-runs the synthesizer with new signals. Triggers a profile transition if any change crosses a hysteresis threshold. Safe for concurrent use; subscribers are called outside the lock.
type AdaptiveStats ¶ added in v0.0.3
type AdaptiveStats struct {
Current time.Duration // most recently computed interval
Base time.Duration
Min time.Duration
Max time.Duration
IdleExchanges int // consecutive exchanges with matching fingerprint
Fingerprint uint64 // last peer-cache fingerprint seen
}
AdaptiveStats is a point-in-time snapshot of adaptive-interval state.
type BackpressureMonitor ¶
type BackpressureMonitor struct {
// contains filtered or unexported fields
}
BackpressureMonitor tracks gossip processing load for a node. When the number of concurrent (pending) gossip exchanges exceeds the threshold, the node sets BackpressureSignal=true in its outbound ExchangeMeta, telling peers to slow down.
func NewBackpressureMonitor ¶
func NewBackpressureMonitor() *BackpressureMonitor
NewBackpressureMonitor creates a monitor with the default threshold (5).
func NewBackpressureMonitorWithThreshold ¶
func NewBackpressureMonitorWithThreshold(threshold int) *BackpressureMonitor
NewBackpressureMonitorWithThreshold creates a monitor with a custom threshold.
func (*BackpressureMonitor) Enter ¶
func (bp *BackpressureMonitor) Enter() bool
Enter increments the pending exchange count. Returns true if backpressure should be signaled (pending count exceeds threshold).
func (*BackpressureMonitor) Exit ¶
func (bp *BackpressureMonitor) Exit()
Exit decrements the pending exchange count.
func (*BackpressureMonitor) IsOverloaded ¶
func (bp *BackpressureMonitor) IsOverloaded() bool
IsOverloaded returns true if the node should signal backpressure.
func (*BackpressureMonitor) Pending ¶
func (bp *BackpressureMonitor) Pending() int
Pending returns the current number of pending exchanges.
type CPUGate ¶ added in v0.0.4
type CPUGate struct {
// contains filtered or unexported fields
}
CPUGate provides a defer-or-drop decision based on process CPU load. It is queried before a gossip publish or response is enqueued; when load is above the configured threshold, the caller receives ErrBackpressure (or similar) and can shed work instead of piling up queue depth.
Two signal sources, max-merged so either one can trip the gate:
Tick-skew (zero-dep, always-on). A background goroutine wakes on a 50 ms ticker and records the actual elapsed time since the last wake. When the Go runtime scheduler is stressed (CPU saturation, GC thrash, huge goroutine count), elapsed balloons well above the nominal interval. Observed skew / expected interval gives a ratio; max over a rolling window is the load estimate.
Pluggable reader (optional). A consumer-supplied readFn returning a load ratio in [0, 1] — typically from a gopsutil-based probe or a /sys/fs/cgroup/cpu.stat reader. Added via SetReader so callers can plug in higher-fidelity signals when they have them, without making the whisper package itself depend on gopsutil.
The gate's reported load is max(tickSkewLoad, readerLoad). Allow() returns true when load < threshold, false otherwise.
func NewCPUGate ¶ added in v0.0.4
func NewCPUGate(cfg CPUGateConfig) *CPUGate
NewCPUGate starts a gate with the given config and spawns the tick goroutine. Call Close to stop it.
func (*CPUGate) Allow ¶ added in v0.0.4
Allow reports whether new work should be admitted. Returns true when current Load() is below the threshold.
func (*CPUGate) Close ¶ added in v0.0.4
func (g *CPUGate) Close()
Close stops the tick goroutine. Idempotent.
func (*CPUGate) SetReader ¶ added in v0.0.4
SetReader installs a pluggable load-reader. Pass nil to clear. The reader runs on Allow()'s goroutine — keep it cheap; a slow reader blocks the caller.
func (*CPUGate) SetThreshold ¶ added in v0.0.4
SetThreshold adjusts the gate threshold at runtime. Useful for operator tuning via a diagnostics endpoint. Values are clamped to [0, 1]; 0 disables the gate (always allow); 1 forces it on (never allow above zero load).
type CPUGateConfig ¶ added in v0.0.4
type CPUGateConfig struct {
Threshold float64 // 0..1, fraction above which Allow returns false. Default 0.8.
TickInterval time.Duration // how often the tick-skew probe fires. Default 50 ms.
WindowSize int // rolling window of skew samples. Default 10 (→ 500 ms at 50 ms ticks).
}
CPUGateConfig controls construction. Zero-value → sensible defaults (threshold 0.8, 50 ms tick, 500 ms rolling window).
type ConnectionOffer ¶
type ConnectionOffer struct {
// WantDirectConnect signals that the sender wants a direct connection.
WantDirectConnect bool `json:"want_direct,omitempty"`
// OfferedTransports lists transports the sender can accept (e.g., "noise-udp", "websocket").
// Empty means "any available".
OfferedTransports []string `json:"offered_transports,omitempty"`
// SenderNodeID is the offering node's identity. Populated by the exchange layer,
// not set by the caller.
SenderNodeID string `json:"-"`
// Timestamp when the offer was created. Used for deduplication.
Timestamp time.Time `json:"offer_ts,omitempty"`
}
ConnectionOffer represents a request from a peer to establish a direct connection. Included in gossip exchange metadata when a node wants to upgrade from indirect gossip to a direct transport connection.
type DebugLogger ¶
type DebugLogger struct {
// contains filtered or unexported fields
}
DebugLogger provides conditional debug logging for Whisper subsystems. Enabled by setting the DEBUG environment variable to a comma-separated list of subsystem names (e.g., "whisper.gossip,whisper.rumor"). Use "whisper.*" to enable all Whisper debug output.
func (*DebugLogger) Printf ¶
func (d *DebugLogger) Printf(format string, args ...interface{})
Printf logs a formatted message if this subsystem is enabled.
type DecodedExchange ¶ added in v0.0.8
type DecodedExchange struct {
Records [][]byte
Meta *ExchangeMeta
SeenNodeIDs []string
}
DecodedExchange is the G1Codec's decoded view of an inbound G1 frame body. SeenNodeIDs is optional — codecs that can't cheaply extract identity leave it nil and liveness falls back to coarser signals.
type DeltaPeerStats ¶
type DeltaPeerStats struct {
Watermark time.Time
ExchangeCount int
NextFullSync int // exchanges until next forced full sync (per-peer adaptive)
AdaptiveInterval int // current full-sync interval for this peer
FullSyncCount int // total full-syncs performed for this peer
AppliedOnFull int // full-syncs that applied > 0 records
}
DeltaPeerStats is debug info for a single peer's delta state.
type DeltaPersistence ¶ added in v0.0.10
type DeltaPersistence interface {
// Save persists the snapshot to durable storage. Errors are logged
// but don't stop the tracker — a failed save means the next
// restart costs more, not that the running process breaks.
Save([]PersistedPeerState) error
// Load returns the most recent snapshot. An empty slice + nil err
// means "nothing persisted yet" (first-ever boot). A non-nil error
// means the backend is broken; the tracker logs and continues
// with empty state (degrades to today's behavior).
Load() ([]PersistedPeerState, error)
}
DeltaPersistence is the storage backend the consumer supplies for surviving DeltaTracker state across process restarts. Without this, every fly redeploy wipes per-peer watermarks and the next gossip exchange to every peer is a full snapshot — the convergence-burst pattern that drains stream credit windows.
Implementations:
- Library: JSON file under <dataDir>/.mesh/watermarks.json with atomic write (temp + rename).
- Agent: row in the existing SQLite keystore (encrypts at rest when ColumnEncryptor is wired).
Save / Load run on the DeltaTracker's lifecycle ticks (every 30 sec by default plus on shutdown); both are called with the tracker's mutex NOT held so backends can do whatever IO they need.
type DeltaSyncMeta ¶
type DeltaSyncMeta struct {
IsFullSync bool `json:"full"` // true if this is a full dump
Watermark time.Time `json:"wm,omitempty"` // sender's watermark for this peer
RecordCount int `json:"count"` // number of records in this exchange
}
DeltaSyncMeta carries delta sync metadata in the gossip envelope.
type DeltaTracker ¶
type DeltaTracker struct {
// contains filtered or unexported fields
}
DeltaTracker maintains per-peer sync watermarks for delta gossip. Each peer connection tracks the highest record timestamp successfully exchanged. On each gossip cycle, only records newer than the watermark are sent. Every Nth exchange triggers a full sync for consistency.
The full-sync cadence is per-peer and adaptive: noisy (lossy / high- disagreement) peers get full syncs more often; quiet (consistent) peers get them less often. The baseline is fullSyncInterval; the actual threshold is scaled per peer via peerScale ∈ [0.5, 2.0] based on how many full-syncs have actually APPLIED records recently (a signal that the delta path alone is missing state).
Callers are expected to call Reset(peerNodeID) from their peer- disconnect hook so stale state doesn't accumulate across reconnects. A background sweeper (started by StartSweeper) evicts entries older than deltaMaxPeerAge every deltaSweepInterval as a safety net.
func NewDeltaTracker ¶
func NewDeltaTracker() *DeltaTracker
NewDeltaTracker creates a tracker with the default full sync interval (10).
func NewDeltaTrackerWithInterval ¶
func NewDeltaTrackerWithInterval(fullSyncInterval int) *DeltaTracker
NewDeltaTrackerWithInterval creates a tracker with a custom full sync interval.
func (*DeltaTracker) AttachPersistence ¶ added in v0.0.10
func (dt *DeltaTracker) AttachPersistence(ctx context.Context, p DeltaPersistence, saveInterval time.Duration, maxAge time.Duration)
AttachPersistence wires a persistence backend to the tracker. On attach, the backend's Load is invoked synchronously; persisted entries with watermarks older than maxAge are discarded (they refer to records the peer has likely evicted, so the watermark would suppress a needed full sync).
The save loop runs on saveInterval ticks until ctx is cancelled. On ctx cancellation a final Save fires so graceful shutdowns preserve state. Crash-stops lose at most saveInterval of progress — acceptable for an optimisation layer (worst case is one full sync per crashed peer pair, which Phase 0a's credit-leak fix tolerates).
Idempotent — multiple calls overwrite the persistence reference but only one save loop runs (guarded by sweeperOnce-like state).
func (*DeltaTracker) RecordFullSyncResult ¶ added in v0.0.3
func (dt *DeltaTracker) RecordFullSyncResult(peerID string, appliedRecords int)
RecordFullSyncResult is called by the engine after a full-sync exchange completes. appliedRecords is the number of records the peer's cache accepted from our full dump. Drives the adaptive scale in peerFullSyncIntervalLocked.
func (*DeltaTracker) RecordWatermark ¶
func (dt *DeltaTracker) RecordWatermark(peerID string) time.Time
RecordWatermark returns the last successful sync timestamp for a peer. Returns zero time if no prior exchange exists (triggers full sync).
func (*DeltaTracker) Reset ¶
func (dt *DeltaTracker) Reset(peerID string)
Reset clears the state for a peer. Callers should invoke this from their peer-disconnect hook — the tracker doesn't have its own view of connection lifecycle.
func (*DeltaTracker) ShouldFullSync ¶
func (dt *DeltaTracker) ShouldFullSync(peerID string) bool
ShouldFullSync returns true if this peer should receive a full cache dump. True when:
- The peer has no watermark (first exchange)
- The exchange count is a multiple of the peer's ADAPTIVE interval
Adaptive interval = baseline × peerScale, where peerScale is derived from the ratio of applied-on-full to total full-syncs for this peer:
- high ratio (≥0.5): delta is missing state on this peer — scale 0.5 (full-sync at half the baseline interval, i.e. more often)
- mid ratio (0.1-0.5): baseline scale 1.0
- low ratio (<0.1): delta is keeping up fine — scale 2.0 (less often)
Scale factors are clamped to [0.5, 2.0] so the adaptive interval stays in [5, 20] with the default 10-baseline — never far from the static default but responsive to per-peer reality.
func (*DeltaTracker) StartSweeper ¶ added in v0.0.5
func (dt *DeltaTracker) StartSweeper()
StartSweeper launches a background goroutine that periodically evicts entries older than deltaMaxPeerAge. Safe to call multiple times — only the first call starts the goroutine. The sweeper is a safety net; the primary eviction path is caller-driven Reset on peer disconnect.
func (*DeltaTracker) Stats ¶
func (dt *DeltaTracker) Stats() map[string]DeltaPeerStats
Stats returns debug information about delta tracking for all peers.
func (*DeltaTracker) StopSweeper ¶ added in v0.0.5
func (dt *DeltaTracker) StopSweeper()
StopSweeper signals the background sweeper to exit. Idempotent — safe to call before StartSweeper or multiple times.
func (*DeltaTracker) UpdateWatermark ¶
func (dt *DeltaTracker) UpdateWatermark(peerID string, timestamp time.Time)
UpdateWatermark sets the watermark for a peer after a successful exchange.
type Engine ¶
type Engine struct {
// ExchangeFunc is called by the engine to perform a single G1 exchange.
// Consumers provide this — it encapsulates the wire-format-specific
// serialization (protobuf, JSON, etc.) and the actual send/receive over conn.
// Returns the exchange result for adaptive interval tuning.
ExchangeFunc func(ctx context.Context, conn net.Conn, meta *ExchangeMeta) (GossipResult, error)
// contains filtered or unexported fields
}
Engine is the central gossip orchestrator. It manages topic registration, coordinates exchange timing via AdaptiveInterval, routes rumors via Hypercube, and provides the Publish/Subscribe API for consumers.
The exchange loop (G1/G2/G3) runs over a net.Conn provided by the consumer. Consumers wire the Engine to their transport layer (e.g., Aether streams via adapter.StreamConn).
func NewEngine ¶
func NewEngine(opts ...EngineOption) *Engine
NewEngine creates a gossip engine with the given options.
func (*Engine) AllowPeer ¶
AllowPeer is a convenience shortcut that returns true if the engine's rate limiter permits a message to the named peer right now. Fanout paths call this before writing to skip throttled peers without tearing down the publish to other peers.
func (*Engine) Backpressure ¶
func (e *Engine) Backpressure() *BackpressureMonitor
Backpressure returns the backpressure monitor.
func (*Engine) EventBus ¶ added in v0.0.10
func (e *Engine) EventBus() *eventBus
EventBus returns the engine's event bus, lazily creating it. Used by sibling components (rumor pusher, reconcile driver, snapshot driver) that emit events outside the responder loop.
func (*Engine) PEXManager ¶
func (e *Engine) PEXManager() *PEXManager
PEXManager returns the PEX manager attached to this engine, or nil if PEX was not enabled. Consumers (e.g. Minerva's discovery layer) use this to register PEXDiscoverer hooks without reaching into engine internals.
func (*Engine) Publish ¶
Publish sends a payload to all subscribers of a BroadcastOnly topic. For StatefulMerge topics, use the StateStore.Apply path instead.
Returns ErrBackpressure if the engine is overloaded. Low-priority topics (Priority <= 0) are rejected first when the global egress bucket is saturated; high-priority topics are allowed through. Callers decide whether to retry, drop, or bubble up.
func (*Engine) Query ¶
Query sends a request to matching peers and collects responses. For RequestResponse topics only.
func (*Engine) RateLimiter ¶
func (e *Engine) RateLimiter() *RateLimiter
RateLimiter returns the engine rate limiter. Peers and fanout loops use this to enforce per-peer token buckets before writing messages.
func (*Engine) RegisterFrameKind ¶ added in v0.0.6
func (e *Engine) RegisterFrameKind(magic uint16, handler FrameHandler) error
RegisterFrameKind installs a custom FrameHandler for a specific 2-byte magic. Returns an error if:
- the magic is DigestMagic or RumorMagic (built-ins that the consumer must not override — use the With* options to customise their behaviour instead);
- the magic is already registered (double-register is almost always a consumer wiring bug);
- handler is nil.
GossipMagic (G1) is intentionally allowed because the G1 wire format depends on the consumer's record codec — no built-in G1 handler exists, so the consumer MUST register one for G1 to work.
Call before RunResponder — concurrent registration during responder loop execution is a race.
func (*Engine) RegisterTopic ¶
func (e *Engine) RegisterTopic(name string, config TopicConfig) error
RegisterTopic registers a topic with the engine's registry.
func (*Engine) Registry ¶
func (e *Engine) Registry() *TopicRegistry
Registry returns the topic registry for external access.
func (*Engine) Run ¶
func (e *Engine) Run(ctx context.Context, conn net.Conn, peerNodeID string, onExchange func(GossipResult)) error
Run starts the gossip exchange loop on the given connection. Blocks until ctx is cancelled, the connection closes, or Stop is called. The ExchangeFunc must be set before calling Run.
func (*Engine) RunResponder ¶ added in v0.0.6
RunResponder runs the responder-side loop on conn: read a 2-byte magic, dispatch to the registered FrameHandler, repeat until the context cancels or a handler returns FrameReturn/FrameFail.
Unlike Run (which is timer-driven and initiator-only), RunResponder is purely reactive — it blocks reading frames until the peer sends one. Consumers typically run ONE side as initiator (via Run) and the OTHER side as responder (via RunResponder) over the same transport; some protocols (library LAD gossip) run responder on both sides and accept either-side initiation.
The built-in G1/G2/G3 handlers are installed on first call so the standard gossip protocol works without custom RegisterFrameKind calls. Consumers that want to extend the protocol register before calling RunResponder.
func (*Engine) Stats ¶ added in v0.0.3
func (e *Engine) Stats() EngineStats
Stats returns a snapshot of aggregate engine state. Cheap — one atomic read per subsystem + a few locks. Intended for diagnostics endpoints (e.g. /api/monitoring/mesh-debug) that want a single call instead of pulling from each subsystem individually.
func (*Engine) Subscribe ¶
func (e *Engine) Subscribe(topic string, sub Subscriber)
Subscribe registers a subscriber for broadcast messages on a topic.
func (*Engine) SubscribeEvents ¶ added in v0.0.6
func (e *Engine) SubscribeEvents(ch chan<- GossipEvent)
SubscribeEvents registers a non-blocking subscriber for gossip events. Unbuffered or slow channels drop events. Safe for concurrent use.
type EngineOption ¶
type EngineOption func(*Engine)
EngineOption configures the engine at creation time.
func WithAdaptive ¶
func WithAdaptive(a *AdaptiveInterval) EngineOption
WithAdaptive sets the adaptive interval tuner.
func WithBackpressure ¶
func WithBackpressure(bp *BackpressureMonitor) EngineOption
WithBackpressure enables backpressure signaling.
func WithBufferPool ¶ added in v0.0.6
func WithBufferPool(p *sync.Pool) EngineOption
WithBufferPool shares a single buffer pool across gossip inbound reads. Useful when the consumer already owns a pool tuned for its typical payload size distribution.
func WithDelta ¶
func WithDelta(dt *DeltaTracker) EngineOption
WithDelta enables delta sync with per-peer watermarks.
func WithExchangeDeadline ¶ added in v0.0.6
func WithExchangeDeadline(d time.Duration) EngineOption
WithExchangeDeadline caps each G1 exchange at the given duration.
func WithExchangeFunc ¶
func WithExchangeFunc(fn func(ctx context.Context, conn net.Conn, meta *ExchangeMeta) (GossipResult, error)) EngineOption
WithExchangeFunc sets the wire-format-specific exchange function.
func WithFatalErrorClassifier ¶ added in v0.0.6
func WithFatalErrorClassifier(fn func(error) bool) EngineOption
WithFatalErrorClassifier replaces the default terminal-error check. The classifier returns true when the error means "session gone, exit loop" and false when "transient, keep trying."
func WithFingerprintProvider ¶ added in v0.0.6
func WithFingerprintProvider(fp FingerprintProvider) EngineOption
WithFingerprintProvider wires the G2 digest probe handler's state source. Without this, G2 probes are dropped and G1 always runs.
func WithFirstFrameGrace ¶ added in v0.0.6
func WithFirstFrameGrace(d time.Duration) EngineOption
WithFirstFrameGrace sets the grace period for the very first inbound frame on a responder connection. See responderConfig.firstFrameGrace.
func WithG1Codec ¶ added in v0.0.8
func WithG1Codec(c G1Codec) EngineOption
WithG1Codec wires the record serialisation used by the native G1 handler. Required alongside WithG1Store — without a codec the handler can't translate between StateStore records ([]byte) and wire bytes.
func WithG1ExchangeObserver ¶ added in v0.0.8
func WithG1ExchangeObserver(obs G1ExchangeObserver) EngineOption
WithG1ExchangeObserver registers a callback fired after every successful G1 exchange on this engine. Observer invocations are synchronous on the responder goroutine — keep them quick or dispatch to a worker.
func WithG1Store ¶ added in v0.0.8
func WithG1Store(s StateStore) EngineOption
WithG1Store wires the state store whose records drive G1 exchanges. Required for the native G1 handler to activate.
func WithGlobalEgressLimit ¶
func WithGlobalEgressLimit(bytesPerSec int) EngineOption
WithGlobalEgressLimit sets the total outbound gossip throughput cap (bytes/sec). Exceeded traffic is prioritised by TopicConfig.Priority.
func WithHypercube ¶
func WithHypercube(h *Hypercube) EngineOption
WithHypercube enables structured rumor routing.
func WithMaxPayloadSize ¶ added in v0.0.6
func WithMaxPayloadSize(n uint32) EngineOption
WithMaxPayloadSize rejects inbound frames declaring a length above the threshold. Prevents malicious or malformed peers from forcing unbounded allocations.
func WithMetaProvider ¶ added in v0.0.6
func WithMetaProvider(fn func() *ExchangeMeta) EngineOption
WithMetaProvider supplies the outbound ExchangeMeta builder for G1 handlers. Called once per exchange tick; nil = send an empty meta.
func WithNetworkPolicy ¶ added in v0.0.10
func WithNetworkPolicy(p NetworkPolicy) EngineOption
WithNetworkPolicy wires the engine's policy-aware components (rumor retry timing, gossip cadence ceiling, reconciliation pacing) to consult a shared NetworkPolicy. Without this option every component falls back to its hard-coded defaults — same behavior as today.
func WithOverloadQueueThreshold ¶
func WithOverloadQueueThreshold(n int) EngineOption
WithOverloadQueueThreshold sets the outbound queue depth above which Publish returns ErrBackpressure.
func WithPeerRateLimit ¶
func WithPeerRateLimit(per float64, burst int) EngineOption
WithPeerRateLimit configures the per-peer token bucket. per is messages per second (steady state), burst is the burst capacity. If the engine already has a rate limiter, these values are applied on the next NewEngine boot only — create the limiter via NewRateLimiter to reuse across restarts.
func WithRateLimiter ¶
func WithRateLimiter(rl *RateLimiter) EngineOption
WithRateLimiter installs a fully-configured engine rate limiter. For most use cases, prefer WithPeerRateLimit and WithGlobalEgressLimit.
func WithReconcile ¶ added in v0.0.10
func WithReconcile(d *ReconcileDriver) EngineOption
WithReconcile wires a ReconcileDriver into the engine so the G4 frame handler dispatches inbound TableFrames into the driver's RunResponderRound. Without this option the engine's responder loop returns "unknown frame magic" on G4 frames — peers must fall back to G1 + DeltaTracker.
func WithRumor ¶
func WithRumor(r *RumorPusher) EngineOption
WithRumor enables rumor-mongering (G3 immediate push).
func WithRumorApplyFn ¶ added in v0.0.6
func WithRumorApplyFn(fn func([]byte) error) EngineOption
WithRumorApplyFn sets the record-apply callback for the default G3 handler. Consumers typically route by magic-byte prefix or unmarshal as their record type and apply to the local store.
func WithRumorDedupKeyFunc ¶ added in v0.0.6
func WithRumorDedupKeyFunc(fn RumorIDFunc) EngineOption
WithRumorDedupKeyFunc sets the dedup key function for the default G3 handler. Defaults to a sha256 of the payload when nil.
func WithSeenIDTracking ¶ added in v0.0.6
func WithSeenIDTracking(enabled bool) EngineOption
WithSeenIDTracking enables/disables NodeID collection on G1 applies (for liveness tracking).
func WithSnapshot ¶ added in v0.0.10
func WithSnapshot(d *SnapshotDriver) EngineOption
WithSnapshot wires a SnapshotDriver into the engine so the G5 frame handler dispatches inbound ManifestRequest / ShardRequest frames into the driver. Without this option G5 frames return "unknown magic" and cold-start falls back to G1 full sync.
type EngineStats ¶ added in v0.0.3
type EngineStats struct {
Topics int // registered topic count
Subscribers int // total subscribers across topics
Adaptive *AdaptiveStats // current interval + idle counter
Backpressure bool // currently overloaded?
PendingExchange int // current pendingExchanges on backpressure monitor
Rumor *RumorStats // rumor-push effectiveness
RateLimiter *RateLimitStats // queue depth, threshold, peer count
RTT time.Duration // recent avg RTT from NetworkRTTMeasurer (0 if unmeasured)
HypercubeValid bool // hypercube has usable state
HypercubeDim int // current hypercube dimension
}
EngineStats aggregates observable state from all attached subsystems. Nil fields correspond to subsystems not wired on this engine.
type ExchangeMeta ¶
type ExchangeMeta struct {
// Offer signals desire for a direct connection.
Offer *ConnectionOffer `json:"conn_offer,omitempty"`
// ConnectionCounts is the gossip-propagated connection map.
// Maps nodeID -> current inbound+outbound connection count.
ConnectionCounts map[string]int `json:"conn_counts,omitempty"`
// PEXEntries carries signed peer advertisements piggybacked on gossip.
// Max PEXMaxEntriesPerExchange entries per exchange, rate-limited per peer.
PEXEntries []PEXEntry `json:"pex_entries,omitempty"`
// CacheFingerprint is an order-independent XOR hash of all cache keys.
// When two peers exchange fingerprints and they differ, the next exchange
// should be a full sync (not delta) to reconcile the divergence.
CacheFingerprint uint64 `json:"cache_fp,omitempty"`
// BackpressureSignal indicates the sender is overwhelmed and cannot process
// gossip exchanges fast enough. When true, receiving peers should
// apply exponential backoff to their gossip interval for this peer:
// - Initial backoff: 30s
// - Doubles each consecutive signal: 30s -> 60s -> 120s (capped)
// - Resets to 0 when the peer stops signaling backpressure
// The signal is set by BackpressureMonitor when pendingExchanges exceeds
// the threshold (default 5). The field is omitempty so peers that never
// set it are treated as healthy (no backoff applied).
BackpressureSignal bool `json:"backpressure,omitempty"`
}
ExchangeMeta extends gossip exchange metadata with connection signaling fields and connection map data. This is the wire format addition to each gossip exchange.
type FingerprintProvider ¶ added in v0.0.6
type FingerprintProvider interface {
Fingerprint() uint64
}
FingerprintProvider is implemented by any StateStore whose entire state reduces to a single 64-bit fingerprint (fast equality probe before a full G1 exchange). When a consumer's store satisfies this interface AND the consumer registers the default G2 handler, peers can skip the full G1 exchange when fingerprints match.
type FixedProfilePolicy ¶ added in v0.0.10
type FixedProfilePolicy struct {
// contains filtered or unexported fields
}
FixedProfilePolicy is a NetworkPolicy that always returns the same profile. Used by library mesh nodes on stable wired links and by any consumer that wants explicit control over cadence without signal-driven adaptation. Thread-safe.
func NewFixedProfilePolicy ¶ added in v0.0.10
func NewFixedProfilePolicy(p NetworkProfile) *FixedProfilePolicy
NewFixedProfilePolicy returns a policy backed by the given profile. Pass DefaultProfile() for the standard wired-network behavior.
func (*FixedProfilePolicy) PeerOverride ¶ added in v0.0.10
func (p *FixedProfilePolicy) PeerOverride(peer string) (NetworkProfile, bool)
PeerOverride returns the override for a specific peer, or false if none is configured. Allows individual peers to deviate from the default profile (e.g. tighter retry timings for a known-flaky peer).
func (*FixedProfilePolicy) Profile ¶ added in v0.0.10
func (p *FixedProfilePolicy) Profile() NetworkProfile
Profile returns the current default profile.
func (*FixedProfilePolicy) SetPeerOverride ¶ added in v0.0.10
func (p *FixedProfilePolicy) SetPeerOverride(peer string, profile NetworkProfile)
SetPeerOverride installs a per-peer profile override. Empty peer string clears all overrides.
func (*FixedProfilePolicy) SetProfile ¶ added in v0.0.10
func (p *FixedProfilePolicy) SetProfile(profile NetworkProfile)
SetProfile updates the default profile and notifies subscribers. Used by signal-driven adapters layered on top of a FixedProfilePolicy (the agent's adapter to wire.NetworkMonitor).
func (*FixedProfilePolicy) Signals ¶ added in v0.0.10
func (p *FixedProfilePolicy) Signals() NetworkSignals
Signals returns an empty signal set — fixed-profile policies don't observe network conditions.
func (*FixedProfilePolicy) Subscribe ¶ added in v0.0.10
func (p *FixedProfilePolicy) Subscribe(fn func(NetworkProfile))
Subscribe registers a callback fired on every profile change. Callbacks run synchronously in SetProfile's caller goroutine — slow callbacks delay subsequent profile transitions.
type FrameAction ¶ added in v0.0.6
type FrameAction int
FrameAction is returned by a FrameHandler to drive the responder loop's next step. The handler has already consumed whatever body follows the frame magic; the action tells the loop whether to keep reading the next frame, exit cleanly (peer closed, no error), or exit with a failure (e.g. malformed frame that may leave the stream in an undefined state).
const ( // FrameContinue keeps the responder loop running — read the next // frame magic. FrameContinue FrameAction = iota // FrameReturn exits the loop cleanly without error. Used when the // handler determines the peer-side has no more frames to send // (e.g. graceful goaway). FrameReturn // FrameFail exits the loop with an error — the stream is in an // unrecoverable state and the caller should tear down the // connection. FrameFail )
type FrameHandler ¶ added in v0.0.6
type FrameHandler interface {
Handle(ctx context.Context, conn net.Conn, peerNodeID string) FrameAction
}
FrameHandler handles a single frame whose 2-byte magic has already been read by the responder loop. The handler reads the remaining bytes of the frame from conn, processes it, and returns a FrameAction to continue or exit.
peerNodeID is the remote session's identity — threaded through from Engine.RunResponder so handlers can discriminate per-peer without a separate lookup.
type FrameHandlerFunc ¶ added in v0.0.6
FrameHandlerFunc adapts a free function into a FrameHandler.
func (FrameHandlerFunc) Handle ¶ added in v0.0.6
func (f FrameHandlerFunc) Handle(ctx context.Context, conn net.Conn, peerNodeID string) FrameAction
Handle delegates to the underlying function.
type G1Codec ¶ added in v0.0.8
type G1Codec interface {
// EncodeExchange serialises records + meta into the G1 body.
EncodeExchange(records [][]byte, meta *ExchangeMeta) ([]byte, error)
// DecodeExchange extracts records, the peer's meta, and an
// optional list of node IDs referenced by those records (for
// liveness tracking via GossipResult.SeenNodeIDs). Unknown
// fields must be tolerated so forward-compatible meta
// additions don't break existing peers.
DecodeExchange(body []byte) (DecodedExchange, error)
}
G1Codec serialises and deserialises the body of a G1 exchange frame. Whisper owns the [magic][length] framing; the codec owns everything inside so consumers can choose protobuf, JSON, CBOR, or a custom envelope that carries both records and ExchangeMeta piggyback data.
The codec MUST round-trip without loss: DecodeExchange(EncodeExchange(r, m)) yields an equivalent DecodedExchange. Errors from either direction are surfaced to the native G1 handler, which logs and skips the frame rather than tearing down the session.
type G1ExchangeObserver ¶ added in v0.0.8
type G1ExchangeObserver func(res GossipResult, peerNodeID string)
G1ExchangeObserver is invoked once per successful G1 exchange with the observed result and the peer's identity. Consumers use this for latency metrics, per-peer liveness bookkeeping, and callbacks that want to publish ledger records (e.g. round-trip telemetry) after each exchange.
type GossipEvent ¶ added in v0.0.6
type GossipEvent struct {
Kind GossipEventKind
PeerNodeID string
Time time.Time
Topic string // optional — populated by topic-aware events
Bytes int64 // optional — payload size
Records int // optional — record count
DurationMs int64 // optional — operation duration
Err string // optional — error description
}
GossipEvent is the payload for SubscribeEvents subscribers. Optional fields are populated when the producing handler has the data; consumers use zero-value detection to know what's available.
The shape was kept compact for the original four event kinds; the later additions accept it as-is rather than introduce a parallel MeshEvent type. Callers that need richer per-kind detail attach the fields they care about (e.g. reconciliation handlers populate Bytes + Records + DurationMs).
type GossipEventKind ¶ added in v0.0.6
type GossipEventKind int
GossipEventKind identifies a protocol-level event consumers can subscribe to via Engine.SubscribeEvents. The kind enum is open-ended — new kinds are added by Phase as the protocol grows (rumor ACK, reconciliation, snapshot, hypercube re-shape, network policy transition). Existing kinds keep their numeric values to stay wire-stable across mixed-version meshes.
const ( // EventDigestMatch fires when a G2 probe found local and peer // fingerprints equal — consumers typically update per-peer // watermarks to avoid re-exchanging already-converged state. EventDigestMatch GossipEventKind = iota // EventRecordApplied fires when the G1 handler applies at least // one record from a peer exchange. EventRecordApplied // EventRumorReceived fires when a G3 rumor frame was accepted // (passed the dedup tracker and was applied). EventRumorReceived // EventFrameError fires on any frame-level error the loop // recovered from (non-fatal; fatal errors exit the loop // without producing an event). EventFrameError // EventRumorSent fires when the rumor pusher emitted a G3 frame // to a peer. EventRumorSent // EventRumorAcked fires when a G3-ACK was received for an // outstanding rumor — confirms delivery. EventRumorAcked // EventRumorRetry fires when ACK timeout triggered a retry via // an alternate hypercube edge. EventRumorRetry // EventRumorDropped fires when a rumor exhausted its retry budget // (typically 3) and was deferred to the next freshness sweep. EventRumorDropped // EventFreshnessSignal fires when a digest mismatch on the // freshness bus triggered an on-demand exchange. EventFreshnessSignal // EventReconcileStart fires when an IBLT reconciliation round // began with a peer. EventReconcileStart // EventReconcileComplete fires when reconciliation decoded // successfully and records were exchanged. EventReconcileComplete // EventReconcileDecodeFailure fires when peeling decode failed, // triggering a mode-flip from rate-1.5 to rateless or a // fallback to G1 + watermark. EventReconcileDecodeFailure // EventSnapshotStart fires when a cold-start snapshot exchange // began. EventSnapshotStart // EventSnapshotComplete fires when snapshot reconstruction // finished and records were applied. EventSnapshotComplete // EventSnapshotShardFailure fires on Reed-Solomon shard // recovery — observable signal that a neighbor missed but // the parity shard saved the transfer. EventSnapshotShardFailure // EventHypercubeRebuild fires on every cube re-shape (lazy or // eager). EventHypercubeRebuild // EventHypercubeNeighborPromoted fires when an ambient session // became a hypercube neighbor. EventHypercubeNeighborPromoted // EventHypercubeNeighborDemoted fires when a hypercube neighbor // was demoted to ambient (still connected, no longer a // proactive-dial target). EventHypercubeNeighborDemoted // EventNetworkPolicyTransition fires when the policy synthesizer // produced a new profile in response to signal changes. EventNetworkPolicyTransition // EventCapabilityNegotiated fires when a peer's capability // advertisement was applied to the per-peer feature set. EventCapabilityNegotiated )
type GossipResult ¶
type GossipResult struct {
RTT time.Duration // Total exchange time (serialize+write+read+deserialize)
NetworkRTT time.Duration // Network round-trip from header exchange timing
RecordsSent int // Number of records we sent
RecordsApplied int // Number of records applied from peer
SeenNodeIDs []string // All unique NodeIDs received (for liveness tracking)
PeerMeta *ExchangeMeta // metadata from the peer's exchange
DigestSkipped bool // true when G2 digest matched — no data exchanged
}
GossipResult captures the outcome of a single gossip exchange round.
type Hypercube ¶
type Hypercube struct {
// contains filtered or unexported fields
}
Hypercube implements a virtual hypercube overlay for structured rumor routing.
Each node is assigned a position based on its index in the sorted LAD member list. Neighbors are computed by flipping one bit at a time (one per dimension). Dimension-ordered routing ensures each node receives a rumor exactly once:
- Origin sends to ALL dimensions (fromDimension = 0xFF → -1)
- Each receiver forwards only on dimensions GREATER than the one it received on
- Total messages = N-1 (optimal, zero redundancy)
Example: 4 nodes (2D), positions 00, 01, 10, 11
Node 00 originates: → dim0→01, dim1→10 Node 01 received on dim0: → dim1→11 (only dims > 0) Node 10 received on dim1: → stop (no dims > 1) Node 11 received on dim1: → stop Total: 3 messages for 4 nodes. Optimal.
func NewHypercube ¶
NewHypercube creates a hypercube for the given local node ID. Call Rebuild() with the current member list to initialize.
selfID is required and immutable after construction — pass the local node's canonical NodeID. Consumers that don't know their NodeID until after an async step (e.g. an agent waiting on enrollment) should defer construction until the identity is available rather than mutating it later; the immutable design prevents the "empty selfID → nil Neighbors → silent routing fallback" class of bug that arose from constructor-time uncertainty.
func (*Hypercube) DimensionNeighbor ¶
DimensionNeighbor returns the neighbor in a specific dimension, or "" if none. Fully bounds-checked: rejects negative d, d ≥ dimension, selfPos < 0, and the neighbor-position-exceeds-members case that arises when member count is not a power of 2. Safe to call with any int from any source.
func (*Hypercube) MemberCount ¶
MemberCount returns the number of members in the hypercube.
func (*Hypercube) Neighbors ¶
Neighbors returns all neighbor node IDs (one per dimension). Returns nil if this node is not in the hypercube.
func (*Hypercube) Rebuild ¶
Rebuild recomputes the hypercube from the current member list. All nodes produce the same topology from the same sorted member list.
func (*Hypercube) RouteRumor ¶
RouteRumor returns the dimensions to forward on based on dimension-ordered routing. fromDimension = -1 (or 0xFF as uint8) means origin — forward on ALL dimensions. Otherwise, forward on dimensions strictly greater than fromDimension.
func (*Hypercube) Valid ¶ added in v0.0.3
Valid reports whether this hypercube has usable state (self is a member, dimension is consistent with the members list). Intended for pre-flight checks by callers that loop over dimensions — a single Valid call amortises what would otherwise be repeated bounds checks.
type HypercubeExt ¶ added in v0.0.10
type HypercubeExt struct {
// contains filtered or unexported fields
}
HypercubeExt wraps the base Hypercube with the Phase 6 extension state. Co-exists with the original Hypercube — consumers that don't want extensions continue to use the original NewHypercube directly.
func NewHypercubeExt ¶ added in v0.0.10
func NewHypercubeExt(selfID string, opts HypercubeOptions) *HypercubeExt
NewHypercubeExt returns an extended hypercube wrapping a primary Hypercube. opts.RegionOf, MaxDimensionsOf, etc. are honored on every Rebuild call.
func (*HypercubeExt) DimensionWeight ¶ added in v0.0.10
func (h *HypercubeExt) DimensionWeight(dim int) float64
DimensionWeight returns the current adaptive weight for a dimension. Used by fanout selection — lower-weight dims get fewer rumors. Weights are in [0.1, 1.0]; default 1.0.
func (*HypercubeExt) Dual ¶ added in v0.0.10
func (h *HypercubeExt) Dual() *Hypercube
Dual returns the secondary (hash-sorted) cube, or nil if EnableDualCube wasn't set.
func (*HypercubeExt) EffectiveDimensions ¶ added in v0.0.10
func (h *HypercubeExt) EffectiveDimensions(peerID string) int
EffectiveDimensions returns the dimension cap to use for a specific peer. Combines self.MaxDimensions, peer.MaxDimensions, link.SupportedDimensions (asymmetric-dim + per-link scaling). 0 = no cap.
func (*HypercubeExt) MarkDirty ¶ added in v0.0.10
func (h *HypercubeExt) MarkDirty()
MarkDirty signals that membership has changed. Under HypercubeRebuildLazy, increments the dirty count; the next Rebuild only actually runs if dirtyCount > threshold or staleness elapsed.
func (*HypercubeExt) Primary ¶ added in v0.0.10
func (h *HypercubeExt) Primary() *Hypercube
Primary returns the underlying primary cube. Used when the HypercubeExt isn't visible but the base API is needed.
func (*HypercubeExt) Rebuild ¶ added in v0.0.10
func (h *HypercubeExt) Rebuild(members []string)
Rebuild recomputes the cube(s) from the member list. Honors region-aware ordering when opts.RegionOf is set; honors per-node dimension caps when opts.MaxDimensionsOf is set; honors lazy policy when configured. Excludes members for which opts.IsEphemeral returns true — they remain reachable as gossip leaves but aren't selected as cube neighbors.
func (*HypercubeExt) RecordDimensionLoss ¶ added in v0.0.10
func (h *HypercubeExt) RecordDimensionLoss(dim int)
RecordDimensionLoss is called by rumor pusher when a fanout to a specific dimension fails (ACK timeout, write error). Bumps the dimension's adaptive weight down so subsequent fanout preferentially picks healthier dimensions.
func (*HypercubeExt) RecordDimensionSuccess ¶ added in v0.0.10
func (h *HypercubeExt) RecordDimensionSuccess(dim int)
RecordDimensionSuccess is called when a fanout succeeds. Recovers the dimension's weight toward 1.0.
func (*HypercubeExt) RouteToFar ¶ added in v0.0.10
func (h *HypercubeExt) RouteToFar(targetNodeID string) (nextHop string, ok bool)
RouteToFar returns the next-hop neighbor for an RPC targeting a peer that isn't a direct hypercube neighbor. Picks the neighbor whose hypercube position is closest (by XOR distance) to the target's position. Loop-free by hypercube's dimension-ordered rule — forward-only-on-greater-dimensions guarantees acyclic routing.
Returns ("", false) if no useful neighbor exists (target is already a direct neighbor, or self isn't in the cube).
type HypercubeOptions ¶ added in v0.0.10
type HypercubeOptions struct {
// RegionOf returns a region key for a NodeID. Used by
// region-aware dimension ordering. nil = no region awareness
// (today's behavior).
RegionOf func(nodeID string) string
// MaxDimensionsOf returns the per-node dimension cap. Used by
// asymmetric-dimensions support. nil = no cap.
MaxDimensionsOf func(nodeID string) int
// LinkSupportedDimsOf returns the per-link dimension support
// (effectiveDims = min(self.Max, peer.Max, link.Supported)).
// nil = no link-level cap.
LinkSupportedDimsOf func(nodeID string) int
// RebuildPolicy: eager (default) or lazy.
RebuildPolicy HypercubeRebuildPolicy
// DirtyThreshold: lazy rebuild fires after N MarkDirty calls.
// 0 = use default (10).
DirtyThreshold int
// Staleness: lazy rebuild fires after wall-clock interval
// since last rebuild. 0 = use default (60 sec).
Staleness time.Duration
// EnableDualCube: maintain a second cube (hash-sorted)
// alongside the primary (NodeID-sorted) for redundancy.
EnableDualCube bool
// IsEphemeral returns true for peers that should be excluded from
// hypercube neighbor selection (browsers, admin CLIs, low-battery
// cellular agents). Excluded members are NOT included in the
// member list passed to the cube; they participate as gossip
// leaves only.
//
// nil = no ephemerals (all members are cube candidates).
IsEphemeral func(nodeID string) bool
}
HypercubeOptions extends the original Hypercube with all 7 extensions from Phase 6 of the redesign:
- Region-aware dimension ordering — sort dimensions so early-dim hops stay in-region, late-dim hops cross regions.
- Asymmetric dimensions — per-node MaxDimensions cap.
- Lazy rebuild — rebuild on dirty threshold or staleness.
- RPC routing — hypercube-routed RPC for far peers.
- Adaptive dimensional weighting — bump down flaky dimensions in fanout selection.
- Per-link dimensionality scaling — link advertises supported dimensions; effective = min(self, peer, link).
- Dual-hypercube redundancy — two cubes (NodeID-sorted + hash-sorted) for instant fault-tolerance.
All extensions are optional / additive — a Hypercube without any of these set behaves exactly as today's implementation.
type HypercubeRebuildPolicy ¶ added in v0.0.10
type HypercubeRebuildPolicy uint8
HypercubeRebuildPolicy defines when to rebuild the cube. Eager rebuilds on every membership change (today's behavior). Lazy defers rebuilds until a configurable threshold of dirty events or a wall-clock interval.
const ( // HypercubeRebuildEager: rebuild on every Rebuild() call. // Default for backward compatibility. HypercubeRebuildEager HypercubeRebuildPolicy = iota // HypercubeRebuildLazy: rebuild only when MarkDirty has been // called more than `dirtyThreshold` times OR when more than // `staleness` has elapsed since the last rebuild. Cuts cube- // rebuild thrash on a churning fleet. HypercubeRebuildLazy )
type NetworkPolicy ¶ added in v0.0.10
type NetworkPolicy interface {
Profile() NetworkProfile
Signals() NetworkSignals
PeerOverride(peer string) (NetworkProfile, bool)
Subscribe(func(NetworkProfile))
}
NetworkPolicy is the consumer-facing surface every cadence/pacing decision reads. Implementations may be static (library mesh-node) or signal-driven (agent). Subscribe is non-blocking — slow subscribers miss profile transitions, which is fine because every caller re-reads Profile() before each interval/pacing decision.
type NetworkProfile ¶ added in v0.0.10
type NetworkProfile struct {
GossipMin time.Duration // 2s — convergence floor
GossipBase time.Duration // 10s — base interval
GossipMax time.Duration // 60s — idle ceiling (wifi default)
FreshnessProbeMin time.Duration // 30s — between probes per peer
FreshnessProbeMax time.Duration // 5min — coalesce repeated stale
RumorRetryInitial time.Duration // 200ms — first retry delay
RumorRetryMax time.Duration // 5s — final retry cap
RumorFanout int // 1 — hypercube dimensions
ReconcilePacing int // 0 = unlimited; cellular: 8 KB/s
SnapshotChunkMax int // 65536 — bytes per snapshot chunk
SnapshotShardCount int // 2 — k-of-N for cold-start
HypercubeMaxDims int // 0 = unlimited
}
NetworkProfile is the synthesised configuration consumers read for every cadence / pacing / sizing decision. All durations and byte counts are absolute values — consumers don't need to know which signals produced them.
Defaults (zero values) are chosen so a consumer that never updates the policy still gets sensible behavior (matches the existing AdaptiveHWPInterval defaults from 2026-04-09 Task 10a + the 2026-04-23 freshness 30-sec emit + RumorPusher defaults).
func CellularProfile ¶ added in v0.0.10
func CellularProfile() NetworkProfile
CellularProfile returns a NetworkProfile suitable for metered cellular links — wider intervals, paced reconciliation, smaller snapshot chunks. Used by the multi-signal synthesizer when LinkType=cellular and one of the secondary signals (battery low, RTT high) confirms the transition.
func DefaultProfile ¶ added in v0.0.10
func DefaultProfile() NetworkProfile
DefaultProfile returns a NetworkProfile suitable for stable wired connections (fly nodes, ethernet agents). Consumers without a signal source can use this directly.
type NetworkRTTMeasurer ¶
type NetworkRTTMeasurer struct {
// contains filtered or unexported fields
}
NetworkRTTMeasurer tracks per-connection network RTT using header exchange timing. During a gossip exchange, both sides simultaneously write their length-prefixed header (4 bytes). The time from our write to receiving the peer's header approximates one network round-trip.
Samples live in a fixed-size ring buffer indexed by idx%len(ring). No slice copy on roll-over, and the backing array never grows — matches cpu_gate.go's rolling-window pattern.
func NewNetworkRTTMeasurer ¶
func NewNetworkRTTMeasurer(maxSamples int) *NetworkRTTMeasurer
NewNetworkRTTMeasurer creates a measurer that keeps the last N samples.
func (*NetworkRTTMeasurer) AvgRTT ¶
func (m *NetworkRTTMeasurer) AvgRTT() time.Duration
AvgRTT returns the average of recent RTT samples. Returns 0 if no samples.
func (*NetworkRTTMeasurer) LastRTT ¶
func (m *NetworkRTTMeasurer) LastRTT() time.Duration
LastRTT returns the most recent RTT sample. Returns 0 if no samples.
func (*NetworkRTTMeasurer) Record ¶
func (m *NetworkRTTMeasurer) Record(rtt time.Duration)
Record adds a new RTT sample.
func (*NetworkRTTMeasurer) SampleCount ¶
func (m *NetworkRTTMeasurer) SampleCount() int
SampleCount returns how many RTT samples are currently held in the window. Caps at the ring capacity once the ring has wrapped.
type NetworkSignals ¶ added in v0.0.10
type NetworkSignals struct {
LinkType int // 0=unknown, 1=ethernet, 2=wifi, 3=cellular, 4=vpn
AvgRTT time.Duration // exponential moving average of observed RTT
LossRate float64 // recent rumor-ACK miss rate, 0.0-1.0
EventBurstRate float64 // events/min on the address-change bus
BatteryLevel float64 // 0.0-1.0; -1.0 = AC power / unknown
BatteryDraining bool // explicit signal independent of level
TimeOfDay int // 0-23 hour, local; 0 = unknown
LinkStableSince time.Duration // how long current LinkType has held
PeerCount int // active sessions
}
NetworkSignals is the multi-signal input vector that drives a NetworkProfile. Consumers update signals asynchronously; the policy synthesizer reads them whenever a profile is requested.
Not every signal is meaningful for every consumer:
- Library mesh nodes on fly run on stable wired links, so LinkType is always "ethernet-equivalent" and battery fields are zero / N/A. The fixed-profile policy ignores most signals.
- Agents on roaming devices populate every field — LinkType transitions, RTT histograms, loss-rate, battery state all drive profile transitions.
type PEXEntry ¶
type PEXEntry struct {
NodeID string `json:"node_id"`
Addresses []string `json:"addresses"`
Region string `json:"region"`
Signature []byte `json:"signature"` // ed25519 sig over NodeID+Addresses+SignedAt
SignedAt time.Time `json:"signed_at"`
}
PEXEntry represents a single peer advertisement in a Peer Exchange response. Entries are signed by the advertising node's ed25519 key to prove ownership.
func SignPEXEntry ¶
func SignPEXEntry(nodeID string, addresses []string, region string, privateKey ed25519.PrivateKey) PEXEntry
SignPEXEntry creates a signed PEX entry for the local node.
type PEXManager ¶
type PEXManager struct {
// contains filtered or unexported fields
}
PEXManager coordinates PEX entry creation, verification, and sharing for a node.
func NewPEXManager ¶
func NewPEXManager( nodeID string, addresses []string, region string, privateKey ed25519.PrivateKey, publicKeyLookup func(nodeID string) ed25519.PublicKey, ) *PEXManager
NewPEXManager creates a PEX manager for the local node.
func (*PEXManager) BuildPEXEntries ¶
func (m *PEXManager) BuildPEXEntries(peerNodeID string) []PEXEntry
BuildPEXEntries returns up to PEXMaxEntriesPerExchange entries to piggyback on a gossip exchange to the given peer. Rate-limited per peer.
func (*PEXManager) KnownPeers ¶
func (m *PEXManager) KnownPeers() []PEXEntry
KnownPeers returns all verified, non-stale PEX entries.
func (*PEXManager) ProcessPEXEntries ¶
func (m *PEXManager) ProcessPEXEntries(entries []PEXEntry) []string
ProcessPEXEntries validates and stores inbound PEX entries from a peer. Returns the list of newly discovered node IDs.
func (*PEXManager) RefreshLocalEntry ¶
func (m *PEXManager) RefreshLocalEntry(nodeID string, addresses []string, region string)
RefreshLocalEntry re-signs our PEX entry (call when addresses change).
func (*PEXManager) RemovePeer ¶ added in v0.0.3
func (m *PEXManager) RemovePeer(peerNodeID string)
RemovePeer drops a disconnected peer's state — both the known-peers entry AND the rate-limiter entry. Previously only known-peers was touched (via pruneKnownPeers' age-based cleanup), leaving the rate-limiter's lastSend map with a stale entry that only timed out after 2× PEXInterval. Call this from the ConnectionManager teardown path so PEX state tracks peer lifecycle exactly.
type PEXRateLimiter ¶
type PEXRateLimiter struct {
// contains filtered or unexported fields
}
PEXRateLimiter enforces rate limits on PEX sends per peer. Each peer is allowed one PEX send per PEXInterval.
func NewPEXRateLimiter ¶
func NewPEXRateLimiter() *PEXRateLimiter
NewPEXRateLimiter creates a rate limiter with the default interval.
func (*PEXRateLimiter) Allow ¶
func (rl *PEXRateLimiter) Allow(peerNodeID string) bool
Allow returns true if a PEX send to peerNodeID is allowed.
func (*PEXRateLimiter) Forget ¶ added in v0.0.3
func (rl *PEXRateLimiter) Forget(peerNodeID string)
Forget removes a peer's rate-limit state. Called from PEXManager.RemovePeer so a disconnected peer's lastSend entry doesn't linger (otherwise it only clears via age-based prune, which ignores stale peer identity). Safe no-op if the peer is unknown.
type PEXRequest ¶
type PEXRequest struct {
RequestingNodeID string `json:"requesting_node_id"`
MaxEntries int `json:"max_entries,omitempty"` // 0 = use default (20)
}
PEXRequest is sent by a node to request peer lists from a connected peer.
type PEXResponse ¶
type PEXResponse struct {
Entries []PEXEntry `json:"entries"`
}
PEXResponse contains the peer list from the responding node.
type PeerBackoff ¶
type PeerBackoff struct {
// contains filtered or unexported fields
}
PeerBackoff tracks exponential backoff state for a single peer that has signaled backpressure. Used by the sender side of the gossip loop.
func NewPeerBackoff ¶
func NewPeerBackoff() *PeerBackoff
NewPeerBackoff creates a new per-peer backoff tracker.
func (*PeerBackoff) CurrentBackoff ¶
func (pb *PeerBackoff) CurrentBackoff() time.Duration
CurrentBackoff returns the current backoff duration (0 if not backing off).
func (*PeerBackoff) RecordSignal ¶
func (pb *PeerBackoff) RecordSignal(peerSignaledBackpressure bool)
RecordSignal processes a backpressure signal from a peer exchange result. If the peer signaled backpressure, the backoff doubles (exponential). If the peer did NOT signal backpressure, the backoff resets to zero.
func (*PeerBackoff) ShouldSkip ¶
func (pb *PeerBackoff) ShouldSkip() bool
ShouldSkip returns true if the peer is currently in backoff and gossip should be skipped for this tick.
type PeerRateLimiter ¶
type PeerRateLimiter struct {
// contains filtered or unexported fields
}
PeerRateLimiter tracks token-bucket state plus violation accounting for a single peer. When violations exceed the threshold in the window, the peer is placed in cooldown and skipped by the fanout.
func (*PeerRateLimiter) Allow ¶
func (p *PeerRateLimiter) Allow() bool
Allow returns true if a single message is allowed for this peer right now. A false return indicates a violation (either throttled or in cooldown) and increments the violation counter. When the counter crosses the threshold within violationWindow, the peer is put in cooldown.
func (*PeerRateLimiter) InCooldown ¶
func (p *PeerRateLimiter) InCooldown() bool
InCooldown reports whether the peer is currently cooling down.
func (*PeerRateLimiter) Violations ¶
func (p *PeerRateLimiter) Violations() int
Violations returns the current violation count in the rolling window.
type PersistedPeerState ¶ added in v0.0.10
type PersistedPeerState struct {
PeerID string `json:"peer_id"`
Watermark time.Time `json:"watermark"`
ExchangeCount int `json:"exchange_count"`
LastActivity time.Time `json:"last_activity"`
AppliedOnFull int `json:"applied_on_full"`
FullSyncCount int `json:"full_sync_count"`
}
PersistedPeerState is the over-the-wire / over-the-disk representation of a single peer's delta state, used by DeltaPersistence to round-trip state across process restarts. Keep field tags stable — they're the disk schema for any backend that serialises this struct.
type PullFetchFn ¶ added in v0.0.4
PullFetchFn fetches records matching the pull request. Returns the serialised response payload (implementation-specific — typically a G1 records block), or an error. The returned byte slice is already size- capped per the request's MaxResponseSize (or DefaultPullResponseMaxSize when zero) — HandlePullRequest validates the cap before calling.
type PullRequest ¶ added in v0.0.4
type PullRequest struct {
Topic string // topic to pull
SinceWatermark time.Time // records strictly newer than this; zero = full sync
MaxResponseSize uint32 // responder-side cap (bytes); 0 = use DefaultPullResponseMaxSize
}
PullRequest is the decoded form of a G4 PULL_REQUEST frame.
func ReadPullRequestBody ¶ added in v0.0.4
func ReadPullRequestBody(r io.Reader) (PullRequest, error)
ReadPullRequestBody reads the body of a G4 frame after the 2-byte magic has been consumed by the responder multiplexer. Symmetric with ReadGossipBody / ReadDigestBody / ReadRumorBody.
type RateLimitStats ¶ added in v0.0.3
RateLimitStats is an inlined view of RateLimiter state for EngineStats.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is the engine-level coordinator: per-peer buckets plus a global egress (bytes/sec) limiter plus overload queue tracking.
func NewRateLimiter ¶
func NewRateLimiter(peerPerSec float64, peerBurst int, egressBytesPerSec int, queueThreshold int) *RateLimiter
NewRateLimiter builds an engine rate limiter using the provided settings. Zero values fall back to package defaults.
func (*RateLimiter) AllowPeer ¶
func (r *RateLimiter) AllowPeer(peerID string) bool
AllowPeer returns true if a publish to peerID is currently allowed. A false return means the message should be dropped for that peer (other peers still receive it). Unknown peers are lazily registered.
func (*RateLimiter) DecQueue ¶
func (r *RateLimiter) DecQueue()
DecQueue decrements the outbound queue counter.
func (*RateLimiter) EgressSaturated ¶
func (r *RateLimiter) EgressSaturated(n int) bool
EgressSaturated returns true if the global egress bucket has no tokens available for the given payload size. Used by the engine to decide whether to hold low-priority traffic in favor of high-priority topics.
func (*RateLimiter) IncQueue ¶
func (r *RateLimiter) IncQueue()
IncQueue increments the outbound queue counter. Used by the engine to signal that a publish is in flight.
func (*RateLimiter) Overloaded ¶
func (r *RateLimiter) Overloaded() bool
Overloaded returns true if the outbound queue has exceeded the threshold. When Overloaded is true, Publish returns ErrBackpressure.
func (*RateLimiter) PeerCount ¶
func (r *RateLimiter) PeerCount() int
PeerCount returns the number of peers currently tracked.
func (*RateLimiter) PeerInCooldown ¶
func (r *RateLimiter) PeerInCooldown(peerID string) bool
PeerInCooldown reports whether the peer is currently cooling down after sustained violations.
func (*RateLimiter) QueueDepth ¶
func (r *RateLimiter) QueueDepth() int
QueueDepth returns the current outbound queue depth (test observability).
func (*RateLimiter) QueueThreshold ¶ added in v0.0.3
func (r *RateLimiter) QueueThreshold() int
QueueThreshold returns the current overload threshold. Useful for dashboards showing headroom (depth vs threshold) and for operator validation after startup.
func (*RateLimiter) RemovePeer ¶
func (r *RateLimiter) RemovePeer(peerID string)
RemovePeer drops per-peer rate-limit state. Called when a peer disconnects.
func (*RateLimiter) ReserveEgress ¶
func (r *RateLimiter) ReserveEgress(n int) bool
ReserveEgress attempts to consume n bytes from the global egress bucket. Returns true if allowed immediately; false means egress is saturated and the caller should defer or drop (priority-aware callers pick by priority).
func (*RateLimiter) SetQueueThreshold ¶ added in v0.0.3
func (r *RateLimiter) SetQueueThreshold(n int)
SetQueueThreshold adjusts the overload threshold at runtime. Values ≤ 0 are ignored (keeps previous value). Intended for operator tuning via a diagnostics endpoint — e.g. temporarily raise the threshold during a known convergence storm without bouncing the process.
type ReconcileCodec ¶ added in v0.0.10
type ReconcileCodec interface {
// EncodeTable serialises an IBLT into a wire body.
EncodeTable(t *iblt.IBLT, mode ReconcileMode) ([]byte, error)
// DecodeTable deserialises an IBLT from a wire body. Receiver-
// side path; mode is decoded from the body.
DecodeTable(body []byte) (*iblt.IBLT, ReconcileMode, error)
// EncodeReply packs records the responder is sending back to
// the initiator (records the initiator was missing). Records
// are opaque to whisper; codec defines the per-record format.
EncodeReply(records [][]byte) ([]byte, error)
// DecodeReply unpacks a reply body.
DecodeReply(body []byte) ([][]byte, error)
// EncodeRequest packs a list of record IDs (16-byte content
// hashes) the responder is asking the initiator to send.
EncodeRequest(ids []iblt.Key) ([]byte, error)
// DecodeRequest unpacks a request body into IDs.
DecodeRequest(body []byte) ([]iblt.Key, error)
}
ReconcileCodec is the consumer-supplied serialiser for the bodies of G4 frames. Whisper owns the [magic][length][flags] framing; the codec owns everything inside.
type ReconcileDriver ¶ added in v0.0.10
type ReconcileDriver struct {
// contains filtered or unexported fields
}
ReconcileDriver is the per-process protocol driver. One instance shared across all peer connections; per-peer state lives on ReconcilePeer entries.
func NewReconcileDriver ¶ added in v0.0.10
func NewReconcileDriver(codec ReconcileCodec, store ReconcileStore, policy NetworkPolicy) *ReconcileDriver
NewReconcileDriver returns a driver wiring the consumer's codec and store into whisper's protocol logic. policy is optional — when present, RunRound uses NetworkPolicy.Profile().ReconcilePacing to throttle cell streams on metered links.
func (*ReconcileDriver) Codec ¶ added in v0.0.10
func (d *ReconcileDriver) Codec() ReconcileCodec
Codec exposes the codec for handlers that need to decode frames outside the driver's run loops.
func (*ReconcileDriver) PeerCount ¶ added in v0.0.10
func (d *ReconcileDriver) PeerCount() int
PeerCount is the number of registered peers.
func (*ReconcileDriver) Policy ¶ added in v0.0.10
func (d *ReconcileDriver) Policy() NetworkPolicy
Policy returns the NetworkPolicy reference if one was wired.
func (*ReconcileDriver) RegisterPeer ¶ added in v0.0.10
func (d *ReconcileDriver) RegisterPeer(p *ReconcilePeer)
RegisterPeer adds a peer. Idempotent.
func (*ReconcileDriver) RunInitiatorRound ¶ added in v0.0.10
func (d *ReconcileDriver) RunInitiatorRound(nodeID string, timeout time.Duration) (applied, sent int, err error)
RunInitiatorRound drives one reconciliation round as the initiator. Builds an IBLT from the store, sizes per the peer's current mode, sends the table frame, waits for the reply + request, applies received records, and serves requested records.
Returns the count of records applied (received from peer) and records sent (peer was missing). Errors are wire-level — protocol-level failures (decode incomplete) trigger mode flips internally and return success with 0 counts.
func (*ReconcileDriver) RunResponderRound ¶ added in v0.0.10
func (d *ReconcileDriver) RunResponderRound(conn net.Conn, body []byte) error
RunResponderRound is called by a frame-handler when it has just consumed a G4 TableFrame's [magic][length][flags=0x00] header and the body bytes. Builds the symmetric difference and writes back the reply + request frames.
func (*ReconcileDriver) Store ¶ added in v0.0.10
func (d *ReconcileDriver) Store() ReconcileStore
Store exposes the store for handlers that need to read/apply records outside the driver's run loops.
func (*ReconcileDriver) UnregisterPeer ¶ added in v0.0.10
func (d *ReconcileDriver) UnregisterPeer(nodeID string)
UnregisterPeer drops a peer's state.
type ReconcileFlags ¶ added in v0.0.10
type ReconcileFlags uint8
ReconcileFlags is the per-frame flag byte in the G4 body header. Distinguishes the three frame variants:
0x00 = TableFrame (initiator → responder): the IBLT itself
0x01 = ReplyFrame (responder → initiator): records the
initiator is missing
0x02 = RequestFrame (responder → initiator): rumor IDs the
responder is missing (initiator pulls these from its
store and sends them as a follow-up TableFrame variant)
const ( ReconcileFlagTable ReconcileFlags = 0x00 ReconcileFlagReply ReconcileFlags = 0x01 ReconcileFlagRequest ReconcileFlags = 0x02 // ReconcileFlagDecodeFailed is sent by the responder when its IBLT // subtract did not converge — body is empty, used purely to signal // the initiator that the empty reply is a decode failure (not "we // were already in sync"). Without this flag, the initiator can't // distinguish "0 records both ways because of failure" from "0 // records both ways because diff was empty," which means the // mode-flip state machine never advances out of rate-1.5 even when // the table consistently fails to decode. ReconcileFlagDecodeFailed ReconcileFlags = 0x03 )
type ReconcileMode ¶ added in v0.0.10
type ReconcileMode uint8
ReconcileMode is the per-peer mode the protocol driver runs in.
const ( // ReconcileRate15 is rate-1.5 mode: sender ships a single // fixed-size IBLT (cells = 1.5 × d_max). Receiver decodes once. // Most efficient when d_max is a good estimate. ReconcileRate15 ReconcileMode = iota // ReconcileRateless is rateless mode: sender streams cells; // receiver attempts decode after each batch. Adapts to any // |d| at the cost of slightly higher bandwidth. ReconcileRateless )
type ReconcilePeer ¶ added in v0.0.10
ReconcilePeer represents a peer the driver can run a reconciliation round with. Wraps the per-peer wire conn plus state for mode-flip decisions.
type ReconcileStore ¶ added in v0.0.10
type ReconcileStore interface {
// Snapshot returns the full set of (content_hash, record_bytes)
// pairs currently in the store. Used to seed the IBLT.
Snapshot() ([]iblt.Key, [][]byte)
// FetchByID returns the record bytes for a given content hash,
// or false if the store doesn't have it (peer is asking for
// something we evicted; benign).
FetchByID(id iblt.Key) ([]byte, bool)
// Apply ingests an inbound record. Same contract as
// StateStore.Apply — merge/tombstone resolution is the
// consumer's responsibility.
Apply(record []byte) error
}
ReconcileStore is the consumer's record store, keyed by content hash. Whisper's reconciliation driver calls Snapshot to populate the IBLT, FetchByID to pull records the peer is missing, and Apply to ingest records the peer sent.
type ReedSolomonErasure ¶ added in v0.0.10
type ReedSolomonErasure struct{}
ReedSolomonErasure is a default Reed-Solomon implementation of the erasure-coding portion of SnapshotCodec. Codec implementations embed it (or call directly) to satisfy EncodeErasure / DecodeErasure without re-implementing the byte-slice marshalling that pads variable-length record shards into the equal-length buffers reedsolomon.Encode requires.
The wire format inside each shard is:
[count uint32][len uint32][record_bytes][len uint32][record_bytes]...
Padded with zero bytes to the shard buffer length. The first 4 bytes (count) on a parity shard are not record bytes — parity shards are opaque to the consumer; only the n-of-n decode path reads them.
func (ReedSolomonErasure) DecodeErasure ¶ added in v0.0.10
func (ReedSolomonErasure) DecodeErasure(shards [][][]byte, k int) ([][][]byte, error)
DecodeErasure reconstructs the original k data shards from any k-of-n shards. Missing shards in the input MUST be nil entries (their inner slice empty or the outer entry nil). The output is the first k reconstructed shards as record-byte-slice slices.
func (ReedSolomonErasure) EncodeErasure ¶ added in v0.0.10
func (ReedSolomonErasure) EncodeErasure(dataShards [][][]byte, parityCount int) ([][][]byte, error)
EncodeErasure flattens k data shards (each carrying its own records) into equal-length buffers, runs reed-solomon over them to produce parityCount parity shards, and returns all k+parityCount shards in a stable order: data first, parity after.
Output shape: [][][]byte where each outer entry is one shard's flattened bytes split into a single inner [][]byte (the codec keeps a slice-of-slices interface for symmetry with the input). Concretely each output shard's inner slice has length 1 — the flattened buffer.
type RumorConfig ¶
type RumorConfig struct {
MaxHops int // max hop count before rumor dies (default 4)
Fanout int // number of peers to forward to (default 3)
ForwardProbability float64 // base probability of forwarding (1.0 = always)
Enabled bool // master switch
}
RumorConfig configures rumor-mongering behavior.
func AgentMeshRumorConfig ¶
func AgentMeshRumorConfig() RumorConfig
AgentMeshRumorConfig returns config optimized for large meshes (hundreds of devices).
func ServiceMeshRumorConfig ¶
func ServiceMeshRumorConfig() RumorConfig
ServiceMeshRumorConfig returns config optimized for small meshes (11 nodes).
type RumorIDFunc ¶
RumorIDFunc generates a dedup key from a rumor payload. Consumers provide their own implementation based on their record structure. The returned string must be deterministic for the same payload.
type RumorMessage ¶
type RumorMessage struct {
ID string // dedup key (generated by consumer's RumorIDFunc)
Payload []byte // opaque serialised record
}
RumorMessage is a payload with its dedup key, queued for push.
type RumorPeerConn ¶
type RumorPeerConn interface {
PeerNodeID() string
WriteRumor(payload []byte, hopCount uint8, fromDimension uint8) error
}
RumorPeerConn is the interface for sending rumors to a peer.
type RumorPusher ¶
type RumorPusher struct {
// contains filtered or unexported fields
}
RumorPusher pushes new records to peers immediately via hypercube or random.
func NewRumorPusher ¶
func NewRumorPusher(cfg RumorConfig, rumorIDFn RumorIDFunc) *RumorPusher
NewRumorPusher creates a pusher with the given config. rumorIDFn generates dedup keys from payloads; if nil, a hash-based default is used.
func (*RumorPusher) EnableACKs ¶ added in v0.0.10
func (rp *RumorPusher) EnableACKs(timeout time.Duration, maxRetries int)
EnableACKs activates the rumor ACK protocol on this pusher. Without it, PushRumor / PushRumorExcluding behave exactly as before (best- effort, no delivery confirmation). With it active:
- every per-peer rumor send registers an entry in the ACK tracker
- the receiver-of-ACK loop on the rumor stream calls HandleACK to dispatch incoming G3-ACK frames into the tracker
- a sweeper goroutine polls for ACK timeouts on the configured interval; timed-out rumors retry via an alternate hypercube edge until the retry budget is exhausted
Capability gating MUST happen at the registration boundary: only peers that advertise RumorACK should be passed to RegisterPeer when ACKs are enabled, otherwise their rumors will time out every cycle and waste retry budget. Mixed-version meshes register non-ACK peers separately or run with EnableACKs disabled per-peer.
Idempotent — multiple calls update timeout/retry parameters but only one sweeper runs.
func (*RumorPusher) HandleACK ¶ added in v0.0.10
func (rp *RumorPusher) HandleACK(rumorID, peerID string)
HandleACK is called by the ACK reader loop on each peer's rumor stream when a G3-ACK frame arrives. Looks up the (rumorID, peerID) pair in the tracker; if matched, clears the entry, bumps the acksReceived counter, and emits EventRumorAcked. Unknown ACKs are silently ignored — duplicates or late arrivals after retry-budget exhaustion.
func (*RumorPusher) HypercubeExt ¶ added in v0.0.10
func (rp *RumorPusher) HypercubeExt() *HypercubeExt
HypercubeExt returns the configured extended cube, or nil if only a plain Hypercube has been wired.
func (*RumorPusher) NotifyNewPayload ¶
func (rp *RumorPusher) NotifyNewPayload(payload []byte)
NotifyNewPayload queues an opaque payload for rumor push (non-blocking). Payloads larger than MaxRumorPayloadBytes are dropped; they still reach peers via the G1 delta path, but holding them in the bounded inbound queue would pin (capacity × payload-size) bytes resident in the worst case. queueFull covers both overflow and oversized-drop cases.
func (*RumorPusher) PushRumor ¶
func (rp *RumorPusher) PushRumor(payload []byte, hopCount uint8, fromDimension uint8)
PushRumor sends a payload to selected peers via hypercube or random fallback.
func (*RumorPusher) PushRumorExcluding ¶
func (rp *RumorPusher) PushRumorExcluding(payload []byte, hopCount uint8, fromDimension uint8, excludeNodeID string)
PushRumorExcluding is like PushRumor but excludes a specific peer (the sender) to prevent back-propagation of rumors to the node that sent them.
func (*RumorPusher) RegisterPeer ¶
func (rp *RumorPusher) RegisterPeer(nodeID string, conn RumorPeerConn)
RegisterPeer adds a peer connection for rumor delivery.
func (*RumorPusher) Run ¶
func (rp *RumorPusher) Run(ctx context.Context)
Run processes inbound new-record notifications and pushes rumors.
func (*RumorPusher) SetEventEmitter ¶ added in v0.0.10
func (rp *RumorPusher) SetEventEmitter(fn func(GossipEvent))
SetEventEmitter wires the rumor pusher to a MeshEvent producer callback. Lifecycle events (RumorSent, RumorAcked, RumorRetry, RumorDropped) emit through the callback when set; without it the pusher updates only its atomic counters. Function-typed so consumers can fan events into their own subscribers (connection-history, mesh-debug, OTLP) without depending on whisper's internal event-bus type.
func (*RumorPusher) SetHypercube ¶
func (rp *RumorPusher) SetHypercube(cube *Hypercube)
SetHypercube sets the structured overlay for dimension-ordered routing.
func (*RumorPusher) SetHypercubeExt ¶ added in v0.0.10
func (rp *RumorPusher) SetHypercubeExt(ext *HypercubeExt)
SetHypercubeExt sets the extended hypercube. When set, it supersedes any plain Hypercube wired via SetHypercube — the pusher uses the primary cube for the canonical routing decision and (when the extended options enable it) the dual cube for redundant fanout.
Adaptive dimension weighting, RouteToFar dispatch, region-aware ordering and ephemeral exclusion all read from the extended cube; the plain Hypercube path retains today's behavior for callers that haven't migrated.
func (*RumorPusher) SetNetworkPolicy ¶ added in v0.0.10
func (rp *RumorPusher) SetNetworkPolicy(p NetworkPolicy)
SetNetworkPolicy wires the rumor pusher to consult a NetworkPolicy for retry timing and fanout overrides. Subsequent pushes read `Profile().RumorRetryInitial` and `Profile().RumorFanout` to override the fixed values supplied to EnableACKs / RumorConfig. Without this, fixed defaults apply.
func (*RumorPusher) SetPeerCapabilityCheck ¶ added in v0.0.10
func (rp *RumorPusher) SetPeerCapabilityCheck(fn func(peerID string) bool)
SetPeerCapabilityCheck wires a per-peer capability lookup. The pusher only registers ACK tracking for peers that the callback returns true for. Pass nil to disable gating (every peer is treated as capable; fleet-wide ACK semantics).
func (*RumorPusher) Stats ¶ added in v0.0.3
func (rp *RumorPusher) Stats() RumorStats
Stats returns a snapshot of rumor-push effectiveness. Cheap: atomic reads for counters + one RLock for peer/cache sizes.
func (*RumorPusher) Tracker ¶
func (rp *RumorPusher) Tracker() *RumorTracker
Tracker returns the underlying RumorTracker (for responder G3 handling).
func (*RumorPusher) UnregisterPeer ¶
func (rp *RumorPusher) UnregisterPeer(nodeID string)
UnregisterPeer removes a peer connection.
type RumorStats ¶ added in v0.0.3
type RumorStats struct {
Notified uint64 // NotifyNewPayload calls
Deduped uint64 // skipped because already-seen
QueueFull uint64 // notifies dropped because inbound channel full
PushesHypercube uint64 // frame writes via hypercube overlay
PushesRandom uint64 // frame writes via random-peer fallback
WriteErrors uint64 // peer.WriteRumor failures
AcksReceived uint64 // G3-ACK confirmations matched to outstanding rumors
RumorRetries uint64 // rumor re-pushes after ACK timeout
RumorDropped uint64 // rumors that exhausted retry budget
PendingAcks int // outstanding (rumor, peer) pairs awaiting ACK
Peers int // current peer count (RegisterPeer − UnregisterPeer)
SeenCache int // current size of the tracker's seen map
}
RumorStats is a point-in-time snapshot of rumor-push effectiveness.
type RumorTracker ¶
type RumorTracker struct {
// contains filtered or unexported fields
}
RumorTracker deduplicates rumors using an O(1) LRU. The doubly-linked list orders entries by recency (front = newest, back = oldest); the map provides O(1) lookup into that list. MarkSeen moves an existing entry to the front or pushes a new one; eviction pops from the back.
func NewRumorTracker ¶
func NewRumorTracker(cfg RumorConfig) *RumorTracker
NewRumorTracker creates a tracker with the given config.
func (*RumorTracker) IsSeen ¶
func (rt *RumorTracker) IsSeen(rumorID string) bool
IsSeen returns true if this rumor has been processed before.
func (*RumorTracker) MarkSeen ¶
func (rt *RumorTracker) MarkSeen(rumorID string)
MarkSeen records that this rumor has been processed. Existing keys are moved to the front (most-recent); new keys are pushed and the tail is evicted if capacity is exceeded.
func (*RumorTracker) ShouldForward ¶
func (rt *RumorTracker) ShouldForward(hopCount uint8) bool
ShouldForward returns true based on probability decay: P / (1 + hopCount).
type SnapshotChunk ¶ added in v0.0.10
type SnapshotChunk struct {
ShardIndex uint8
ChunkIndex uint16
TotalChunks uint16 // 0 = "more coming"; >0 = "this is final, total=N"
Body []byte
}
SnapshotChunk is a single wire chunk of a shard. The shard's record bytes are split into chunks bounded by the stream's available credit and SnapshotChunkMax — the only place where chunked-on-the-wire transfer lives.
type SnapshotCodec ¶ added in v0.0.10
type SnapshotCodec interface {
EncodeManifest(m SnapshotManifest) ([]byte, error)
DecodeManifest(body []byte) (SnapshotManifest, error)
EncodeShardRequest(spec SnapshotShardSpec) ([]byte, error)
DecodeShardRequest(body []byte) (SnapshotShardSpec, error)
EncodeShardChunk(c SnapshotChunk) ([]byte, error)
DecodeShardChunk(body []byte) (SnapshotChunk, error)
// PartitionForShard returns the records belonging to a specific
// shard. Deterministic — every peer with the same record set
// produces the same partitioning. Used by the responder to
// extract its slice of the encoded snapshot.
PartitionForShard(records [][]byte, spec SnapshotShardSpec) [][]byte
// Erasure encodes k data shards into n total shards (n-k
// parity) using Reed-Solomon. n=k means no erasure coding
// (single neighbor, no parity); n>k tolerates n-k missing
// shards.
EncodeErasure(dataShards [][][]byte, parityCount int) ([][][]byte, error)
// DecodeErasure reconstructs the full k data shards from any
// k-of-n received shards. Missing shards are nil entries in
// the input slice.
DecodeErasure(shards [][][]byte, k int) ([][][]byte, error)
}
SnapshotCodec is the consumer-supplied serialiser for the four G5 body shapes. Whisper owns the [magic][length][kind] framing; the codec owns the per-kind body encoding.
type SnapshotDriver ¶ added in v0.0.10
type SnapshotDriver struct {
// contains filtered or unexported fields
}
SnapshotDriver runs the cold-start protocol. One instance per process, shared across peers.
Cold-start sequence (initiator side):
- Connect to N=3 hypercube neighbors.
- Parallel SnapshotManifestRequest to each.
- Pick freshest by HLC; verify N≥2 agree on fingerprint (consensus check — outlier flagged in MeshEvents).
- Pick k=ShardCount fastest neighbors that agreed; one extra for parity.
- Parallel SnapshotShardRequest to all k+parity neighbors.
- Reassemble chunks per shard; Reed-Solomon decode across.
- Apply records to local store.
- On any single-shard failure: parity shard saves the transfer.
- On 2+ shard failures: retry against next-best neighbor pair.
On metered cellular (NetworkPolicy.SnapshotShardCount = 1), the driver falls back to single-neighbor-with-retry (the original Option A) — parallel transfer over cellular costs more in connection setup than it saves.
func NewSnapshotDriver ¶ added in v0.0.10
func NewSnapshotDriver(codec SnapshotCodec, store SnapshotStore, policy NetworkPolicy) *SnapshotDriver
NewSnapshotDriver returns a driver wiring the consumer's codec and store into the G5 protocol. policy is optional; nil falls back to default sizing (3-shard fan-in with one parity).
func (*SnapshotDriver) HandleManifestRequest ¶ added in v0.0.10
func (d *SnapshotDriver) HandleManifestRequest(conn net.Conn) error
HandleManifestRequest is called by the engine's frame handler when a SnapshotKindManifestRequest arrives. Builds a manifest from the local store and writes it back as SnapshotKindManifest.
func (*SnapshotDriver) HandleShardRequest ¶ added in v0.0.10
func (d *SnapshotDriver) HandleShardRequest(conn net.Conn, body []byte) error
HandleShardRequest is called when a SnapshotKindShardRequest arrives. Extracts the requested shard's records from the store, chunks them per the policy's SnapshotChunkMax, and streams the chunks back as SnapshotKindShard frames.
func (*SnapshotDriver) RunInitiator ¶ added in v0.0.10
func (d *SnapshotDriver) RunInitiator(neighbors []net.Conn) (applied int, manifest SnapshotManifest, err error)
RunInitiator drives the cold-start sequence. neighbors is the pool of hypercube peers to probe; the driver picks freshest + fastest. Returns the count of records applied and the manifest it converged on (for telemetry).
Three-step protocol:
- Parallel manifest probe — fan out N requests, collect responses within probeTimeout. Filter out dead peers. Pick the freshest by HLC; require a fingerprint quorum (≥ ceil(N/2) peers agree) before committing — guards against an outlier with stale data.
- Shard fan-in — request shard i from peer i (round-robin) so bandwidth fans out across ShardCount streams. Reed-Solomon parity tolerates one lost neighbor mid-transfer.
- Apply records to the local store as chunks arrive.
On metered cellular (NetworkPolicy.SnapshotShardCount = 1), the driver collapses to single-neighbor sequential transfer so the connection-setup-overhead penalty doesn't dominate transfer time.
type SnapshotFrameKind ¶ added in v0.0.10
type SnapshotFrameKind uint8
SnapshotFrameKind discriminates the four frame types in the G5 protocol. Each frame starts with [magic][length] common header then a one-byte kind discriminator.
const ( // SnapshotKindManifestRequest: probe to a hypercube neighbor // asking for its current snapshot manifest. Body: empty. SnapshotKindManifestRequest SnapshotFrameKind = 0x10 // SnapshotKindManifest: response carrying HLC, fingerprint, // total record count, total bytes, shard layout proposal. // Initiator picks the freshest of N parallel manifests. SnapshotKindManifest SnapshotFrameKind = 0x11 // SnapshotKindShardRequest: initiator asks for a specific shard // of the snapshot (hash-prefix bucket). Body: [shard_index][shard_count]. SnapshotKindShardRequest SnapshotFrameKind = 0x12 // SnapshotKindShard: streaming chunks for a single shard. Body // carries chunk_index + total_chunks + record bytes; receiver // reassembles, then Reed-Solomon decodes across shards. SnapshotKindShard SnapshotFrameKind = 0x13 )
type SnapshotManifest ¶ added in v0.0.10
type SnapshotManifest struct {
HLC uint64 // peer's current HLC
Fingerprint uint64 // cache fingerprint (XOR of content hashes)
RecordCount uint32 // total records the peer has
TotalBytes uint64 // approximate total wire size
ShardCount uint8 // proposed shard count for k-of-N transfer
IsAnchor bool // peer advertises anchor service
}
SnapshotManifest is the per-peer "what I have" advertisement returned by SnapshotKindManifest. Tiny (~200 bytes); cheap to fetch from N=3 peers in parallel before committing to a transfer.
type SnapshotShardSpec ¶ added in v0.0.10
SnapshotShardSpec describes one shard of an erasure-coded snapshot. ShardIndex is the position (0..ShardCount-1); ShardCount is the total shards (k data + parity = N total). For a 2-of-3 layout (current default), ShardCount=3 and shards 0+1 are data while shard 2 is parity.
type SnapshotStore ¶ added in v0.0.10
type SnapshotStore interface {
Snapshot() (records [][]byte, hlc uint64, fingerprint uint64)
Apply(record []byte) error
}
SnapshotStore is the consumer's record store, used by the responder side to populate snapshots. Same Snapshot interface as ReconcileStore — most consumers implement both with the same underlying type.
type StateStore ¶
type StateStore interface {
// Fingerprint returns a 64-bit summary of current state. Used by
// G2 digest probes — equality means both sides agree on every
// record and the follow-up G1 can be skipped. Must change on any
// mutation to avoid false-positive matches.
Fingerprint() uint64
// Snapshot returns every live record as opaque bytes. Called by
// the initiator when no per-peer watermark is available and by
// the responder on forced full-sync cycles.
Snapshot() [][]byte
// Delta returns records mutated since the given time. A zero
// time is equivalent to Snapshot. Implementations may return the
// full snapshot when they can't cheaply compute a delta — the
// initiator tolerates over-sending.
Delta(since time.Time) [][]byte
// Apply ingests a single inbound record. Merge/tombstone/LWW
// resolution is the consumer's responsibility; Whisper just
// hands over the bytes and trusts the return for success
// counting.
Apply(data []byte) error
}
StateStore is the read/write surface Whisper drives during G1/G2 exchanges and rumor push. Consumers implement this by wrapping their authoritative record store (e.g., Ledger's DirectoryCache).
Records are opaque byte slices from Whisper's perspective — the consumer's G1Codec handles (de)serialisation at the exchange boundary. Apply is the single ingest point for both G1 responses and G3 rumors so merge semantics live in one place.
type Subscriber ¶
Subscriber receives broadcast messages for subscribed topics.
type TopicConfig ¶
type TopicConfig struct {
Mode TopicMode
Proto proto.Message // type hint for unmarshal (nil = opaque bytes)
Store StateStore // required for StatefulMerge, nil for others
Handler TopicHandler // required for RequestResponse, optional for others
TTL time.Duration // BroadcastOnly: message expiry (0 = no expiry)
Rumor bool // StatefulMerge: enable G3 rumor push on mutation (default true)
Priority int // dispatch priority: 0 = default, higher = preferred under egress pressure
}
TopicConfig configures a registered topic.
type TopicHandler ¶
type TopicHandler interface {
// OnMessage is called when a message arrives for this topic.
// For BroadcastOnly: informational (message already propagated).
// For RequestResponse: must return response bytes (nil = no response).
OnMessage(ctx context.Context, from string, topic string, payload []byte) (response []byte, err error)
}
TopicHandler handles incoming messages for a topic.
type TopicMode ¶
type TopicMode int
TopicMode defines the gossip semantics for a topic.
const ( // BroadcastOnly is firehose pub/sub. No state, no merge, no delta sync. // Messages propagate to all subscribers and are not stored. BroadcastOnly TopicMode = iota // StatefulMerge uses delta sync with merge-on-apply. // Records are stored in a StateStore. Delta sync uses per-peer watermarks. // Merge conflicts are resolved by the store's registered MergeFunc. StatefulMerge // RequestResponse is solicit/reply. One peer sends a query, matching // peers reply. No state stored. No propagation beyond responders. RequestResponse )
type TopicRegistry ¶
type TopicRegistry struct {
// contains filtered or unexported fields
}
TopicRegistry manages registered topics and their configurations.
func NewTopicRegistry ¶
func NewTopicRegistry() *TopicRegistry
NewTopicRegistry creates an empty topic registry.
func (*TopicRegistry) All ¶
func (r *TopicRegistry) All() []string
All returns all registered topic names.
func (*TopicRegistry) Count ¶
func (r *TopicRegistry) Count() int
Count returns the number of registered topics.
func (*TopicRegistry) Get ¶
func (r *TopicRegistry) Get(name string) (TopicConfig, bool)
Get returns the config for a topic, or false if not found.
func (*TopicRegistry) Register ¶
func (r *TopicRegistry) Register(name string, config TopicConfig) error
Register adds a topic to the registry. Returns an error if the topic already exists or if required fields are missing for the mode.