whisper

package module
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 24 Imported by: 0

README

Whisper — Topic-Based Gossip Engine

go get github.com/bbmumford/whisper

Related: Aether (wire protocol) · Ledger (state directory)

Whisper is a generic, topic-based gossip engine for peer-to-peer state propagation. It provides the pub/sub, delta sync, and topology routing layers that sit between a wire protocol and application state. Transport-agnostic — it operates on any bidirectional byte stream (Aether streams, raw TCP, WebSocket, gRPC bidi) but knows nothing about what it carries.


Status

Area Status
Topic registry + three modes (BroadcastOnly, StatefulMerge, RequestResponse) Stable
G1/G2/G3 wire framing (full exchange / digest probe / rumor push) Stable
Delta sync with per-peer HLC watermarks Stable
Hypercube routing (O(log N) dimension-ordered propagation) Stable
Peer exchange (PEX) with signed address advertisements Stable
Adaptive cadence (backs off when idle, accelerates during convergence) Stable
Backpressure signalling between peers Stable
RTT tracking (per-peer round-trip measurement) Stable
Protobuf envelope with opaque payload Stable
Rate limiting + rumor tracker Stable
v0.0.1 release Shipped (2026-04-20)

In production use by the HSTLES mesh directory gossip across bootstrap.hstles.com + node.hstles.com + every ORBTR endpoint, carrying Ledger records between peers with delta sync cutting gossip bandwidth by ~90% vs full-state exchange.


What Whisper Provides

Capability Description
Topic registry Named topics with typed payloads, per-topic configuration
Three gossip modes BroadcastOnly (firehose), StatefulMerge (delta sync + merge), RequestResponse (solicit/reply)
G1/G2/G3 wire framing Full exchange (G1), digest probe (G2), rumor push (G3)
Delta sync Per-peer hybrid-logical-clock watermarks for ~90% bandwidth reduction
Hypercube routing O(log N) dimension-ordered routing for rumor propagation
Peer exchange (PEX) Signed peer address advertisements
Adaptive interval Gossip timing that backs off when idle, accelerates during convergence
Backpressure signaling Generic congestion signaling between peers
RTT tracking Per-peer round-trip time measurement
Protobuf envelope Generic GossipEnvelope with opaque repeated bytes payload
Connection offers Signaling for NAT traversal and peer introduction
Fingerprint matching Cache fingerprint comparison to skip redundant exchanges

Architecture

┌─────────────────────────────────────────────────┐
│  Applications                                   │
│      HSTLES / ORBTR      custom app             │
│             │                  │                │
│             └──────┬───────────┘                │
│                    │ RegisterTopic()            │
│            ┌───────▼─────────────┐              │
│            │      Whisper        │              │
│            │                     │              │
│            │  BroadcastOnly      │              │
│            │  StatefulMerge      │              │
│            │  RequestResponse    │              │
│            └───────┬─────────────┘              │
│                    │ bidirectional byte stream  │
│            ┌───────▼─────────────┐              │
│            │   Aether / TCP /    │              │
│            │   WS / gRPC / etc.  │              │
│            └─────────────────────┘              │
└─────────────────────────────────────────────────┘

Consumer Examples

// HSTLES — mesh directory (delta sync + merge via Ledger)
whisper.RegisterTopic("member", whisper.TopicConfig{
    Mode:  whisper.StatefulMerge,
    Store: ladCache, // implements whisper.StateStore
    Proto: &pb.LADRecord{},
})

// Application-level broadcast topic (no state, no merge)
whisper.RegisterTopic("app.announcements", whisper.TopicConfig{
    Mode:  whisper.BroadcastOnly,
    Proto: &pb.Announcement{},
})

// Application-level request/response topic
whisper.RegisterTopic("app.query", whisper.TopicConfig{
    Mode:    whisper.RequestResponse,
    Proto:   &pb.Query{},
    Handler: queryHandler,
})

Depends On

  • Aether — wire protocol, stream multiplexing, encryption (whisper runs on Aether streams in typical deployments)

Depended On By

  • Ledger — registers StatefulMerge topics for mesh directory records

What Whisper Is NOT

  • Not a state store — Whisper propagates; Ledger (or whatever StateStore backend you provide) stores and merges
  • Not a wire protocol — Whisper uses any bidirectional byte stream; it doesn't define low-level frame formats
  • Not application-specific — Whisper knows about topics and bytes, not domain objects

API Docs

Generated API reference (pkgsite) for each tagged release is published to GitHub Pages:


License

MIT

Copyright (c) 2026 HSTLES / ORBTR Pty Ltd

Documentation

Overview

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

Package whisper is a generic, topic-based gossip engine for peer-to-peer state propagation. It provides the pub/sub, delta sync, and topology routing layers that sit between a wire protocol (Aether) and application state (Ledger, Minerva, Mercury).

Whisper is transport-agnostic — it operates on Aether streams but knows nothing about what it carries.

Three gossip modes:

  • BroadcastOnly: firehose pub/sub, no state, no merge
  • StatefulMerge: delta sync with merge-on-apply via StateStore
  • RequestResponse: solicit/reply pattern

Three wire frame types:

  • G1 (magic 0x4731): full bidirectional exchange with ExchangeMeta

  • G2 (magic 0x4732): 12-byte digest probe for fingerprint comparison

  • G3 (magic 0x4733): immediate rumor push via hypercube routing

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

  • Copyright (c) 2026 HSTLES / ORBTR Pty Ltd. All Rights Reserved.

  • Queries: licensing@hstles.com

Index

Constants

View Source
const (
	// DefaultPeerMessagesPerSec is the steady-state per-peer message allowance.
	DefaultPeerMessagesPerSec = 100.0

	// DefaultPeerBurst is the per-peer burst capacity.
	DefaultPeerBurst = 500

	// DefaultGlobalEgressBytesPerSec is the total outbound throughput cap.
	DefaultGlobalEgressBytesPerSec = 10 * 1024 * 1024 // 10 MB/s

	// DefaultGlobalEgressBurst is the burst capacity for the egress limiter.
	DefaultGlobalEgressBurst = 10 * 1024 * 1024 // 10 MB burst

	// DefaultOverloadQueueThreshold is the outbound queue size above which
	// Publish returns ErrBackpressure. 0 disables the check.
	DefaultOverloadQueueThreshold = 1024
)

Default rate-limiting parameters. All are overridable via engine options.

View Source
const DefaultBackpressureInitialBackoff = 30 * time.Second

DefaultBackpressureInitialBackoff is the starting backoff when a peer first signals backpressure.

View Source
const DefaultBackpressureMaxBackoff = 120 * time.Second

DefaultBackpressureMaxBackoff is the maximum backoff duration a sender will apply when a peer repeatedly signals backpressure.

View Source
const DefaultBackpressureThreshold = 5

DefaultBackpressureThreshold is the number of concurrent pending gossip exchanges before a node signals backpressure to its peers.

View Source
const DefaultPullResponseMaxSize uint32 = 1 << 20

DefaultPullResponseMaxSize is the response-size cap used when the requester does not supply one (maxResponseSize=0). Matches the G1 push-delta cap (1 MB) so the pull path doesn't accidentally become a larger-payload channel than the push path.

View Source
const DigestMagic uint16 = 0x4732 // "G2"
View Source
const (
	FlagDigestMatch uint16 = 1 << 0 // responder sets when fingerprints match
)

Digest frame flags.

View Source
const GossipMagic uint16 = 0x4731 // "G1"

GossipMagic is the 2-byte prefix on all G1 gossip frames for corruption detection. Wire format: [2-byte magic 0x4731][4-byte big-endian length][payload]

View Source
const JitterFraction = 0.15

JitterFraction is the ±fraction applied to each computed gossip interval to desynchronise peers that came up together. Without this a fleet-restart produces synchronised gossip cycles every 10 s, which spikes cross-region egress at the same wall-clock boundary for every peer. ±15% spreads the spike over ~1.5 s without changing the effective rate.

Applied in Next() after the base/idle/convergence arithmetic picks the unjittered value; also applied in ApplyBackpressure() so an overloaded fleet doesn't all back off in lockstep. Uses math/rand/v2 so the randomness is goroutine-safe without an explicit seed.

View Source
const MaxPullTopicLen = 255

MaxPullTopicLen caps the advertised topic name at 255 bytes. Any legitimate topic is far shorter; an oversized field is a protocol violation.

View Source
const MaxRumorPayloadBytes = 256 * 1024

MaxRumorPayloadBytes caps the size of a payload admitted into the rumor inbound queue. Oversized payloads are dropped with a warning — they still propagate through the delta (G1) path; rumor fast-push is best-effort and must not pin large slices alive in the bounded inbound channel.

View Source
const PEXInterval = 5 * time.Minute

PEXInterval is the minimum time between PEX sends to the same peer.

View Source
const PEXMaxAge = 1 * time.Hour

PEXMaxAge is the maximum age of a PEX entry before it's treated as stale.

View Source
const PEXMaxEntriesPerExchange = 10

PEXMaxEntriesPerExchange caps entries piggybacked on a single gossip exchange.

View Source
const PEXMaxKnownPeers = 500

PEXMaxKnownPeers caps the in-memory PEX peer table to prevent unbounded growth.

View Source
const PEXMaxRateLimitEntries = 500

PEXMaxRateLimitEntries caps the rate limiter's per-peer tracking map.

View Source
const PullMagic uint16 = 0x4734 // "G4"

Pull-based gossip (G4) — the dual of G1 push-delta. A peer with a stale watermark for some topic explicitly asks a neighbour for records newer than that watermark, instead of waiting for the next adaptive- interval G1 tick.

Use cases:

  1. Short-lived browser peers that connect briefly, pull current LAD state to catch up, then disconnect. No gossip-cycle participation needed — they just need a one-shot sync.
  2. Post-partition convergence — after a netsplit heals, a lagging peer can pull to catch up in one round-trip instead of waiting up to the full adaptive-full-sync cycle.
  3. Diagnostic pulls — operator tooling can pull from any peer for on-demand LAD state without disturbing the peer's gossip cadence.

Wire format (G4 request):

[2-byte magic 0x4734]
[1-byte topicLen]
[topicLen-byte topic]
[8-byte sinceWatermark — unix nanoseconds, 0 = full sync]
[4-byte maxResponseSize — responder caps at min(this, its own cap)]

The RESPONSE shape is implementation-defined (passes through the same G1 payload encoder typically). The caller wires its own response handler via HandlePullRequest — the package provides the wire-format primitives and per-peer rate-limiting hooks only, not an opinion on the payload shape.

View Source
const ReconcileMagic uint16 = 0x4734 // "G4"

ReconcileMagic is the 2-byte prefix on G4 set-reconciliation frames. Coexists with G1 (full/delta), G2 (digest), G3 (rumor) as a fourth gossip mode. Capability-gated via PeerCapabilities — peers without `Reconciliation` advertised continue to use G1.

View Source
const RumorACKMagic uint16 = 0x4735 // "G5" pun on the 5th gossip frame

RumorACKMagic is the 2-byte prefix on G3-ACK frames sent back from rumor receiver to rumor sender to confirm delivery + apply. Lives on the same bidirectional rumor stream as G3 rumors themselves — receiver-side handlers write an ACK after a successful apply; sender-side reader loop dispatches ACKs into the pusher's pending-ACK tracker.

Frame format: [magic 2][rumor_id 32] = 34 bytes total. Rumor IDs are content hashes (BLAKE3 / SHA-256 truncated) — the same value the rumor pusher's dedup tracker uses, so receiver and sender share key space.

View Source
const RumorMagic uint16 = 0x4733 // "G3"
View Source
const SnapshotMagic uint16 = 0x4736 // "G6" pun on the 5th magic

SnapshotMagic is the 2-byte prefix on G5 cold-start snapshot frames. Coexists with G1 (full/delta), G2 (digest), G3 (rumor), G4 (reconciliation). Capability-gated via PeerCapabilities — peers without `Snapshot` advertised fall back to G1 full sync for cold-start.

Variables

View Source
var (
	// ErrTopicNotFound is returned when a topic is not registered.
	ErrTopicNotFound = errors.New("whisper: topic not found")

	// ErrTopicExists is returned when registering a duplicate topic.
	ErrTopicExists = errors.New("whisper: topic already registered")

	// ErrStoreMissing is returned when a StatefulMerge topic has no StateStore.
	ErrStoreMissing = errors.New("whisper: StatefulMerge topic requires a StateStore")

	// ErrHandlerMissing is returned when a RequestResponse topic has no TopicHandler.
	ErrHandlerMissing = errors.New("whisper: RequestResponse topic requires a TopicHandler")
)
View Source
var ErrBackpressure = errors.New("whisper: publish rejected — engine overloaded")

ErrBackpressure is returned by Publish when the engine is overloaded and cannot accept more outbound messages. Callers decide whether to retry, drop, or bubble up to their caller.

View Source
var ErrPeerThrottled = errors.New("whisper: peer rate limit exceeded")

ErrPeerThrottled is returned (internally, via dropped sends) when a single peer has exceeded its rate limit. The publish for that peer is skipped but other peers continue to receive the message.

Functions

func HandlePullRequest added in v0.0.4

func HandlePullRequest(req PullRequest, allowFn func() bool, fetch PullFetchFn) ([]byte, error)

HandlePullRequest validates the request's rate limit, applies the response-size cap, and calls the consumer-supplied fetch function. Returns the response bytes (ready to write back on the stream) or an error if rate-limited, malformed, or the fetch failed.

The rate limiter is the existing per-peer PEXRateLimiter-style limiter consumers already run for G1 gossip — pulls reuse that mechanism so a peer can't bypass normal gossip backpressure by spamming pulls.

func HandleRumorFrame

func HandleRumorFrame(r io.Reader, tracker *RumorTracker, pusher *RumorPusher, senderNodeID string, rumorIDFn RumorIDFunc, applyFn func([]byte) error)

HandleRumorFrame processes an incoming G3 rumor frame in the responder. Called from the responder multiplexer after the 2-byte magic is consumed. senderNodeID is excluded from forwarding to prevent back-propagation. applyFn is the consumer's function to apply the payload to local state.

func IsStalePEXEntry

func IsStalePEXEntry(entry PEXEntry) bool

IsStalePEXEntry returns true if the entry is older than PEXMaxAge.

func ReadDigestBody

func ReadDigestBody(r io.Reader) (fingerprint uint64, flags uint16, err error)

ReadDigestBody reads the remaining 10 bytes of a digest frame after the 2-byte magic has already been consumed by the multiplexer.

func ReadRumorACKBody added in v0.0.10

func ReadRumorACKBody(r io.Reader) ([]byte, error)

ReadRumorACKBody reads the rumor ID following an already-consumed G3-ACK magic. Mirrors ReadRumorBody's contract.

func ReadRumorBody

func ReadRumorBody(r io.Reader) (payload []byte, hopCount uint8, fromDimension uint8, err error)

ReadRumorBody reads the remaining bytes of a G3 frame after the 2-byte magic has been consumed by the multiplexer. Returns the opaque payload bytes.

func ShouldInitiateDial

func ShouldInitiateDial(localNodeID, peerNodeID string) bool

ShouldInitiateDial implements the deterministic tiebreaker for simultaneous offers. When both sides include WantDirectConnect in simultaneous gossip exchanges, the node with the HIGHER NodeID initiates the dial. The lower NodeID defers and waits for an inbound connection. This ensures exactly one connection attempt per pair.

Returns true if localNodeID should initiate the dial to peerNodeID.

func VerifyPEXEntry

func VerifyPEXEntry(entry PEXEntry, publicKey ed25519.PublicKey) bool

VerifyPEXEntry checks the ed25519 signature on a PEX entry. The publicKey must correspond to the entry's NodeID (looked up from LAD member records).

func WriteDigestProbe

func WriteDigestProbe(conn net.Conn, fingerprint uint64, flags uint16) error

WriteDigestProbe writes a 12-byte digest frame to conn.

func WritePullRequest added in v0.0.4

func WritePullRequest(conn net.Conn, req PullRequest) error

WritePullRequest writes a G4 pull request frame to conn. Counterpart of ReadPullRequestBody (which expects the caller to have consumed the 2-byte magic already, matching the responder's multiplexer contract used by G1/G2/G3).

func WriteRumor

func WriteRumor(conn net.Conn, payload []byte, hopCount uint8, fromDimension uint8) error

WriteRumor writes a G3 rumor frame to conn. The payload is opaque bytes that the consumer has already serialised. Uses net.Buffers to hand the header and payload to the OS writev path (where supported), avoiding the per-send copy into a merged buffer.

func WriteRumorACK added in v0.0.10

func WriteRumorACK(conn net.Conn, rumorID []byte) error

WriteRumorACK serialises a G3-ACK frame to conn. Called by the rumor receiver after a successful apply to confirm delivery to the originator.

rumorID must be exactly rumorIDLen bytes. Shorter / longer IDs are right-padded / truncated to keep the frame size constant.

Types

type AdaptiveInterval

type AdaptiveInterval struct {
	// contains filtered or unexported fields
}

AdaptiveInterval adjusts gossip exchange timing based on exchange results. When records are applied (convergence), interval snaps to min (2s). When caches match (idle), interval backs off exponentially to max (60s). When records are sent but not applied (peer already had them), interval returns to base (10s).

Stateless — reads only GossipResult fields, no direct state store dependency.

func NewAdaptiveInterval

func NewAdaptiveInterval(base, min, max time.Duration) *AdaptiveInterval

NewAdaptiveInterval creates an adaptive interval with the given bounds. base is the normal interval, min is the fastest (during convergence), max is the slowest (when idle).

func (*AdaptiveInterval) ApplyBackpressure

func (a *AdaptiveInterval) ApplyBackpressure(overloaded bool) time.Duration

ApplyBackpressure grows the adaptive interval when global backpressure is active. Called by the engine between exchanges so over-subscribed nodes slow their G1 cycles until load clears. Returns the jittered sample around the backpressured base for the same reason Next() does: prevent an overloaded fleet from all backing off in lockstep.

func (*AdaptiveInterval) Current

func (a *AdaptiveInterval) Current() time.Duration

Current returns the most recently computed interval.

func (*AdaptiveInterval) Next

func (a *AdaptiveInterval) Next(result GossipResult) time.Duration

Next returns the interval for the next exchange based on the last result. The returned interval includes ±JitterFraction randomisation so peers that came up together don't produce synchronised gossip bursts. The stored a.current is the UNJITTERED value; jitter is applied per-call so every call to Next() returns a fresh jittered sample around it.

func (*AdaptiveInterval) Stats added in v0.0.3

func (a *AdaptiveInterval) Stats() AdaptiveStats

Stats returns a snapshot for metrics / diagnostics.

type AdaptivePolicy added in v0.0.10

type AdaptivePolicy struct {
	// contains filtered or unexported fields
}

AdaptivePolicy is a NetworkPolicy that synthesises a profile from multiple input signals — LinkType, RTT trends, loss rate, battery state, time-of-day. Used by agents on roaming devices where a fixed-profile policy would be wrong (constant cellular profile even on wifi, or vice versa).

Profile transitions are smooth gradients for bandwidth values (30-sec ramp on direction changes) and immediate for cadence values (next gossip in 10 min vs 30 min has no meaningful gradient). Battery-low and predictive transitions short-circuit directly to conservative profiles regardless of LinkType.

func NewAdaptivePolicy added in v0.0.10

func NewAdaptivePolicy() *AdaptivePolicy

NewAdaptivePolicy returns an AdaptivePolicy seeded with the default profile. The synthesizer recomputes whenever UpdateSignals is called; consumers wire UpdateSignals to platform monitors (NetworkMonitor.onChanged, battery API, RTT histogram).

func (*AdaptivePolicy) PeerOverride added in v0.0.10

func (p *AdaptivePolicy) PeerOverride(peer string) (NetworkProfile, bool)

PeerOverride returns a per-peer override or false.

func (*AdaptivePolicy) Profile added in v0.0.10

func (p *AdaptivePolicy) Profile() NetworkProfile

Profile returns the synthesised profile. Bandwidth/pacing fields are ramped between current and target on transition.

func (*AdaptivePolicy) SetEventEmitter added in v0.0.10

func (p *AdaptivePolicy) SetEventEmitter(fn func(GossipEvent))

SetEventEmitter wires a callback that fires on every committed profile transition. Pass nil to disable. Used by consumers to surface link/battery-driven behavior changes in mesh-debug, OTLP, or operator dashboards without polling Profile() on a clock.

func (*AdaptivePolicy) SetPeerOverride added in v0.0.10

func (p *AdaptivePolicy) SetPeerOverride(peer string, profile NetworkProfile)

SetPeerOverride installs a per-peer override.

func (*AdaptivePolicy) Signals added in v0.0.10

func (p *AdaptivePolicy) Signals() NetworkSignals

Signals returns the most recent signal vector.

func (*AdaptivePolicy) Subscribe added in v0.0.10

func (p *AdaptivePolicy) Subscribe(fn func(NetworkProfile))

Subscribe registers a callback fired on every profile change.

func (*AdaptivePolicy) UpdateSignals added in v0.0.10

func (p *AdaptivePolicy) UpdateSignals(s NetworkSignals)

UpdateSignals re-runs the synthesizer with new signals. Triggers a profile transition if any change crosses a hysteresis threshold. Safe for concurrent use; subscribers are called outside the lock.

type AdaptiveStats added in v0.0.3

type AdaptiveStats struct {
	Current       time.Duration // most recently computed interval
	Base          time.Duration
	Min           time.Duration
	Max           time.Duration
	IdleExchanges int    // consecutive exchanges with matching fingerprint
	Fingerprint   uint64 // last peer-cache fingerprint seen
}

AdaptiveStats is a point-in-time snapshot of adaptive-interval state.

type BackpressureMonitor

type BackpressureMonitor struct {
	// contains filtered or unexported fields
}

BackpressureMonitor tracks gossip processing load for a node. When the number of concurrent (pending) gossip exchanges exceeds the threshold, the node sets BackpressureSignal=true in its outbound ExchangeMeta, telling peers to slow down.

func NewBackpressureMonitor

func NewBackpressureMonitor() *BackpressureMonitor

NewBackpressureMonitor creates a monitor with the default threshold (5).

func NewBackpressureMonitorWithThreshold

func NewBackpressureMonitorWithThreshold(threshold int) *BackpressureMonitor

NewBackpressureMonitorWithThreshold creates a monitor with a custom threshold.

func (*BackpressureMonitor) Enter

func (bp *BackpressureMonitor) Enter() bool

Enter increments the pending exchange count. Returns true if backpressure should be signaled (pending count exceeds threshold).

func (*BackpressureMonitor) Exit

func (bp *BackpressureMonitor) Exit()

Exit decrements the pending exchange count.

func (*BackpressureMonitor) IsOverloaded

func (bp *BackpressureMonitor) IsOverloaded() bool

IsOverloaded returns true if the node should signal backpressure.

func (*BackpressureMonitor) Pending

func (bp *BackpressureMonitor) Pending() int

Pending returns the current number of pending exchanges.

type CPUGate added in v0.0.4

type CPUGate struct {
	// contains filtered or unexported fields
}

CPUGate provides a defer-or-drop decision based on process CPU load. It is queried before a gossip publish or response is enqueued; when load is above the configured threshold, the caller receives ErrBackpressure (or similar) and can shed work instead of piling up queue depth.

Two signal sources, max-merged so either one can trip the gate:

  1. Tick-skew (zero-dep, always-on). A background goroutine wakes on a 50 ms ticker and records the actual elapsed time since the last wake. When the Go runtime scheduler is stressed (CPU saturation, GC thrash, huge goroutine count), elapsed balloons well above the nominal interval. Observed skew / expected interval gives a ratio; max over a rolling window is the load estimate.

  2. Pluggable reader (optional). A consumer-supplied readFn returning a load ratio in [0, 1] — typically from a gopsutil-based probe or a /sys/fs/cgroup/cpu.stat reader. Added via SetReader so callers can plug in higher-fidelity signals when they have them, without making the whisper package itself depend on gopsutil.

The gate's reported load is max(tickSkewLoad, readerLoad). Allow() returns true when load < threshold, false otherwise.

func NewCPUGate added in v0.0.4

func NewCPUGate(cfg CPUGateConfig) *CPUGate

NewCPUGate starts a gate with the given config and spawns the tick goroutine. Call Close to stop it.

func (*CPUGate) Allow added in v0.0.4

func (g *CPUGate) Allow() bool

Allow reports whether new work should be admitted. Returns true when current Load() is below the threshold.

func (*CPUGate) Close added in v0.0.4

func (g *CPUGate) Close()

Close stops the tick goroutine. Idempotent.

func (*CPUGate) Load added in v0.0.4

func (g *CPUGate) Load() float64

Load returns the current max-merged load estimate in [0, 1].

func (*CPUGate) SetReader added in v0.0.4

func (g *CPUGate) SetReader(fn func() float64)

SetReader installs a pluggable load-reader. Pass nil to clear. The reader runs on Allow()'s goroutine — keep it cheap; a slow reader blocks the caller.

func (*CPUGate) SetThreshold added in v0.0.4

func (g *CPUGate) SetThreshold(t float64)

SetThreshold adjusts the gate threshold at runtime. Useful for operator tuning via a diagnostics endpoint. Values are clamped to [0, 1]; 0 disables the gate (always allow); 1 forces it on (never allow above zero load).

func (*CPUGate) Threshold added in v0.0.4

func (g *CPUGate) Threshold() float64

Threshold returns the current threshold.

type CPUGateConfig added in v0.0.4

type CPUGateConfig struct {
	Threshold    float64       // 0..1, fraction above which Allow returns false. Default 0.8.
	TickInterval time.Duration // how often the tick-skew probe fires. Default 50 ms.
	WindowSize   int           // rolling window of skew samples. Default 10 (→ 500 ms at 50 ms ticks).
}

CPUGateConfig controls construction. Zero-value → sensible defaults (threshold 0.8, 50 ms tick, 500 ms rolling window).

type ConnectionOffer

type ConnectionOffer struct {
	// WantDirectConnect signals that the sender wants a direct connection.
	WantDirectConnect bool `json:"want_direct,omitempty"`

	// OfferedTransports lists transports the sender can accept (e.g., "noise-udp", "websocket").
	// Empty means "any available".
	OfferedTransports []string `json:"offered_transports,omitempty"`

	// SenderNodeID is the offering node's identity. Populated by the exchange layer,
	// not set by the caller.
	SenderNodeID string `json:"-"`

	// Timestamp when the offer was created. Used for deduplication.
	Timestamp time.Time `json:"offer_ts,omitempty"`
}

ConnectionOffer represents a request from a peer to establish a direct connection. Included in gossip exchange metadata when a node wants to upgrade from indirect gossip to a direct transport connection.

type DebugLogger

type DebugLogger struct {
	// contains filtered or unexported fields
}

DebugLogger provides conditional debug logging for Whisper subsystems. Enabled by setting the DEBUG environment variable to a comma-separated list of subsystem names (e.g., "whisper.gossip,whisper.rumor"). Use "whisper.*" to enable all Whisper debug output.

func (*DebugLogger) Printf

func (d *DebugLogger) Printf(format string, args ...interface{})

Printf logs a formatted message if this subsystem is enabled.

type DecodedExchange added in v0.0.8

type DecodedExchange struct {
	Records     [][]byte
	Meta        *ExchangeMeta
	SeenNodeIDs []string
}

DecodedExchange is the G1Codec's decoded view of an inbound G1 frame body. SeenNodeIDs is optional — codecs that can't cheaply extract identity leave it nil and liveness falls back to coarser signals.

type DeltaPeerStats

type DeltaPeerStats struct {
	Watermark        time.Time
	ExchangeCount    int
	NextFullSync     int // exchanges until next forced full sync (per-peer adaptive)
	AdaptiveInterval int // current full-sync interval for this peer
	FullSyncCount    int // total full-syncs performed for this peer
	AppliedOnFull    int // full-syncs that applied > 0 records
}

DeltaPeerStats is debug info for a single peer's delta state.

type DeltaPersistence added in v0.0.10

type DeltaPersistence interface {
	// Save persists the snapshot to durable storage. Errors are logged
	// but don't stop the tracker — a failed save means the next
	// restart costs more, not that the running process breaks.
	Save([]PersistedPeerState) error

	// Load returns the most recent snapshot. An empty slice + nil err
	// means "nothing persisted yet" (first-ever boot). A non-nil error
	// means the backend is broken; the tracker logs and continues
	// with empty state (degrades to today's behavior).
	Load() ([]PersistedPeerState, error)
}

DeltaPersistence is the storage backend the consumer supplies for surviving DeltaTracker state across process restarts. Without this, every fly redeploy wipes per-peer watermarks and the next gossip exchange to every peer is a full snapshot — the convergence-burst pattern that drains stream credit windows.

Implementations:

  • Library: JSON file under <dataDir>/.mesh/watermarks.json with atomic write (temp + rename).
  • Agent: row in the existing SQLite keystore (encrypts at rest when ColumnEncryptor is wired).

Save / Load run on the DeltaTracker's lifecycle ticks (every 30 sec by default plus on shutdown); both are called with the tracker's mutex NOT held so backends can do whatever IO they need.

type DeltaSyncMeta

type DeltaSyncMeta struct {
	IsFullSync  bool      `json:"full"`         // true if this is a full dump
	Watermark   time.Time `json:"wm,omitempty"` // sender's watermark for this peer
	RecordCount int       `json:"count"`        // number of records in this exchange
}

DeltaSyncMeta carries delta sync metadata in the gossip envelope.

type DeltaTracker

type DeltaTracker struct {
	// contains filtered or unexported fields
}

DeltaTracker maintains per-peer sync watermarks for delta gossip. Each peer connection tracks the highest record timestamp successfully exchanged. On each gossip cycle, only records newer than the watermark are sent. Every Nth exchange triggers a full sync for consistency.

The full-sync cadence is per-peer and adaptive: noisy (lossy / high- disagreement) peers get full syncs more often; quiet (consistent) peers get them less often. The baseline is fullSyncInterval; the actual threshold is scaled per peer via peerScale ∈ [0.5, 2.0] based on how many full-syncs have actually APPLIED records recently (a signal that the delta path alone is missing state).

Callers are expected to call Reset(peerNodeID) from their peer- disconnect hook so stale state doesn't accumulate across reconnects. A background sweeper (started by StartSweeper) evicts entries older than deltaMaxPeerAge every deltaSweepInterval as a safety net.

func NewDeltaTracker

func NewDeltaTracker() *DeltaTracker

NewDeltaTracker creates a tracker with the default full sync interval (10).

func NewDeltaTrackerWithInterval

func NewDeltaTrackerWithInterval(fullSyncInterval int) *DeltaTracker

NewDeltaTrackerWithInterval creates a tracker with a custom full sync interval.

func (*DeltaTracker) AttachPersistence added in v0.0.10

func (dt *DeltaTracker) AttachPersistence(ctx context.Context, p DeltaPersistence, saveInterval time.Duration, maxAge time.Duration)

AttachPersistence wires a persistence backend to the tracker. On attach, the backend's Load is invoked synchronously; persisted entries with watermarks older than maxAge are discarded (they refer to records the peer has likely evicted, so the watermark would suppress a needed full sync).

The save loop runs on saveInterval ticks until ctx is cancelled. On ctx cancellation a final Save fires so graceful shutdowns preserve state. Crash-stops lose at most saveInterval of progress — acceptable for an optimisation layer (worst case is one full sync per crashed peer pair, which Phase 0a's credit-leak fix tolerates).

Idempotent — multiple calls overwrite the persistence reference but only one save loop runs (guarded by sweeperOnce-like state).

func (*DeltaTracker) RecordFullSyncResult added in v0.0.3

func (dt *DeltaTracker) RecordFullSyncResult(peerID string, appliedRecords int)

RecordFullSyncResult is called by the engine after a full-sync exchange completes. appliedRecords is the number of records the peer's cache accepted from our full dump. Drives the adaptive scale in peerFullSyncIntervalLocked.

func (*DeltaTracker) RecordWatermark

func (dt *DeltaTracker) RecordWatermark(peerID string) time.Time

RecordWatermark returns the last successful sync timestamp for a peer. Returns zero time if no prior exchange exists (triggers full sync).

func (*DeltaTracker) Reset

func (dt *DeltaTracker) Reset(peerID string)

Reset clears the state for a peer. Callers should invoke this from their peer-disconnect hook — the tracker doesn't have its own view of connection lifecycle.

func (*DeltaTracker) ShouldFullSync

func (dt *DeltaTracker) ShouldFullSync(peerID string) bool

ShouldFullSync returns true if this peer should receive a full cache dump. True when:

  • The peer has no watermark (first exchange)
  • The exchange count is a multiple of the peer's ADAPTIVE interval

Adaptive interval = baseline × peerScale, where peerScale is derived from the ratio of applied-on-full to total full-syncs for this peer:

  • high ratio (≥0.5): delta is missing state on this peer — scale 0.5 (full-sync at half the baseline interval, i.e. more often)
  • mid ratio (0.1-0.5): baseline scale 1.0
  • low ratio (<0.1): delta is keeping up fine — scale 2.0 (less often)

Scale factors are clamped to [0.5, 2.0] so the adaptive interval stays in [5, 20] with the default 10-baseline — never far from the static default but responsive to per-peer reality.

func (*DeltaTracker) StartSweeper added in v0.0.5

func (dt *DeltaTracker) StartSweeper()

StartSweeper launches a background goroutine that periodically evicts entries older than deltaMaxPeerAge. Safe to call multiple times — only the first call starts the goroutine. The sweeper is a safety net; the primary eviction path is caller-driven Reset on peer disconnect.

func (*DeltaTracker) Stats

func (dt *DeltaTracker) Stats() map[string]DeltaPeerStats

Stats returns debug information about delta tracking for all peers.

func (*DeltaTracker) StopSweeper added in v0.0.5

func (dt *DeltaTracker) StopSweeper()

StopSweeper signals the background sweeper to exit. Idempotent — safe to call before StartSweeper or multiple times.

func (*DeltaTracker) UpdateWatermark

func (dt *DeltaTracker) UpdateWatermark(peerID string, timestamp time.Time)

UpdateWatermark sets the watermark for a peer after a successful exchange.

type Engine

type Engine struct {

	// ExchangeFunc is called by the engine to perform a single G1 exchange.
	// Consumers provide this — it encapsulates the wire-format-specific
	// serialization (protobuf, JSON, etc.) and the actual send/receive over conn.
	// Returns the exchange result for adaptive interval tuning.
	ExchangeFunc func(ctx context.Context, conn net.Conn, meta *ExchangeMeta) (GossipResult, error)
	// contains filtered or unexported fields
}

Engine is the central gossip orchestrator. It manages topic registration, coordinates exchange timing via AdaptiveInterval, routes rumors via Hypercube, and provides the Publish/Subscribe API for consumers.

The exchange loop (G1/G2/G3) runs over a net.Conn provided by the consumer. Consumers wire the Engine to their transport layer (e.g., Aether streams via adapter.StreamConn).

func NewEngine

func NewEngine(opts ...EngineOption) *Engine

NewEngine creates a gossip engine with the given options.

func (*Engine) AllowPeer

func (e *Engine) AllowPeer(peerID string) bool

AllowPeer is a convenience shortcut that returns true if the engine's rate limiter permits a message to the named peer right now. Fanout paths call this before writing to skip throttled peers without tearing down the publish to other peers.

func (*Engine) Backpressure

func (e *Engine) Backpressure() *BackpressureMonitor

Backpressure returns the backpressure monitor.

func (*Engine) Delta

func (e *Engine) Delta() *DeltaTracker

Delta returns the delta tracker.

func (*Engine) EventBus added in v0.0.10

func (e *Engine) EventBus() *eventBus

EventBus returns the engine's event bus, lazily creating it. Used by sibling components (rumor pusher, reconcile driver, snapshot driver) that emit events outside the responder loop.

func (*Engine) Hypercube

func (e *Engine) Hypercube() *Hypercube

Hypercube returns the hypercube router (may be nil).

func (*Engine) PEXManager

func (e *Engine) PEXManager() *PEXManager

PEXManager returns the PEX manager attached to this engine, or nil if PEX was not enabled. Consumers (e.g. Minerva's discovery layer) use this to register PEXDiscoverer hooks without reaching into engine internals.

func (*Engine) Publish

func (e *Engine) Publish(topic string, payload []byte) error

Publish sends a payload to all subscribers of a BroadcastOnly topic. For StatefulMerge topics, use the StateStore.Apply path instead.

Returns ErrBackpressure if the engine is overloaded. Low-priority topics (Priority <= 0) are rejected first when the global egress bucket is saturated; high-priority topics are allowed through. Callers decide whether to retry, drop, or bubble up.

func (*Engine) Query

func (e *Engine) Query(ctx context.Context, topic string, payload []byte) ([][]byte, error)

Query sends a request to matching peers and collects responses. For RequestResponse topics only.

func (*Engine) RTT

func (e *Engine) RTT() *NetworkRTTMeasurer

RTT returns the RTT measurer.

func (*Engine) RateLimiter

func (e *Engine) RateLimiter() *RateLimiter

RateLimiter returns the engine rate limiter. Peers and fanout loops use this to enforce per-peer token buckets before writing messages.

func (*Engine) RegisterFrameKind added in v0.0.6

func (e *Engine) RegisterFrameKind(magic uint16, handler FrameHandler) error

RegisterFrameKind installs a custom FrameHandler for a specific 2-byte magic. Returns an error if:

  • the magic is DigestMagic or RumorMagic (built-ins that the consumer must not override — use the With* options to customise their behaviour instead);
  • the magic is already registered (double-register is almost always a consumer wiring bug);
  • handler is nil.

GossipMagic (G1) is intentionally allowed because the G1 wire format depends on the consumer's record codec — no built-in G1 handler exists, so the consumer MUST register one for G1 to work.

Call before RunResponder — concurrent registration during responder loop execution is a race.

func (*Engine) RegisterTopic

func (e *Engine) RegisterTopic(name string, config TopicConfig) error

RegisterTopic registers a topic with the engine's registry.

func (*Engine) Registry

func (e *Engine) Registry() *TopicRegistry

Registry returns the topic registry for external access.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, conn net.Conn, peerNodeID string, onExchange func(GossipResult)) error

Run starts the gossip exchange loop on the given connection. Blocks until ctx is cancelled, the connection closes, or Stop is called. The ExchangeFunc must be set before calling Run.

func (*Engine) RunResponder added in v0.0.6

func (e *Engine) RunResponder(ctx context.Context, conn net.Conn, peerNodeID string) error

RunResponder runs the responder-side loop on conn: read a 2-byte magic, dispatch to the registered FrameHandler, repeat until the context cancels or a handler returns FrameReturn/FrameFail.

Unlike Run (which is timer-driven and initiator-only), RunResponder is purely reactive — it blocks reading frames until the peer sends one. Consumers typically run ONE side as initiator (via Run) and the OTHER side as responder (via RunResponder) over the same transport; some protocols (library LAD gossip) run responder on both sides and accept either-side initiation.

The built-in G1/G2/G3 handlers are installed on first call so the standard gossip protocol works without custom RegisterFrameKind calls. Consumers that want to extend the protocol register before calling RunResponder.

func (*Engine) Stats added in v0.0.3

func (e *Engine) Stats() EngineStats

Stats returns a snapshot of aggregate engine state. Cheap — one atomic read per subsystem + a few locks. Intended for diagnostics endpoints (e.g. /api/monitoring/mesh-debug) that want a single call instead of pulling from each subsystem individually.

func (*Engine) Stop

func (e *Engine) Stop()

Stop signals the engine to stop all exchange loops.

func (*Engine) Subscribe

func (e *Engine) Subscribe(topic string, sub Subscriber)

Subscribe registers a subscriber for broadcast messages on a topic.

func (*Engine) SubscribeEvents added in v0.0.6

func (e *Engine) SubscribeEvents(ch chan<- GossipEvent)

SubscribeEvents registers a non-blocking subscriber for gossip events. Unbuffered or slow channels drop events. Safe for concurrent use.

type EngineOption

type EngineOption func(*Engine)

EngineOption configures the engine at creation time.

func WithAdaptive

func WithAdaptive(a *AdaptiveInterval) EngineOption

WithAdaptive sets the adaptive interval tuner.

func WithBackpressure

func WithBackpressure(bp *BackpressureMonitor) EngineOption

WithBackpressure enables backpressure signaling.

func WithBufferPool added in v0.0.6

func WithBufferPool(p *sync.Pool) EngineOption

WithBufferPool shares a single buffer pool across gossip inbound reads. Useful when the consumer already owns a pool tuned for its typical payload size distribution.

func WithDelta

func WithDelta(dt *DeltaTracker) EngineOption

WithDelta enables delta sync with per-peer watermarks.

func WithExchangeDeadline added in v0.0.6

func WithExchangeDeadline(d time.Duration) EngineOption

WithExchangeDeadline caps each G1 exchange at the given duration.

func WithExchangeFunc

func WithExchangeFunc(fn func(ctx context.Context, conn net.Conn, meta *ExchangeMeta) (GossipResult, error)) EngineOption

WithExchangeFunc sets the wire-format-specific exchange function.

func WithFatalErrorClassifier added in v0.0.6

func WithFatalErrorClassifier(fn func(error) bool) EngineOption

WithFatalErrorClassifier replaces the default terminal-error check. The classifier returns true when the error means "session gone, exit loop" and false when "transient, keep trying."

func WithFingerprintProvider added in v0.0.6

func WithFingerprintProvider(fp FingerprintProvider) EngineOption

WithFingerprintProvider wires the G2 digest probe handler's state source. Without this, G2 probes are dropped and G1 always runs.

func WithFirstFrameGrace added in v0.0.6

func WithFirstFrameGrace(d time.Duration) EngineOption

WithFirstFrameGrace sets the grace period for the very first inbound frame on a responder connection. See responderConfig.firstFrameGrace.

func WithG1Codec added in v0.0.8

func WithG1Codec(c G1Codec) EngineOption

WithG1Codec wires the record serialisation used by the native G1 handler. Required alongside WithG1Store — without a codec the handler can't translate between StateStore records ([]byte) and wire bytes.

func WithG1ExchangeObserver added in v0.0.8

func WithG1ExchangeObserver(obs G1ExchangeObserver) EngineOption

WithG1ExchangeObserver registers a callback fired after every successful G1 exchange on this engine. Observer invocations are synchronous on the responder goroutine — keep them quick or dispatch to a worker.

func WithG1Store added in v0.0.8

func WithG1Store(s StateStore) EngineOption

WithG1Store wires the state store whose records drive G1 exchanges. Required for the native G1 handler to activate.

func WithGlobalEgressLimit

func WithGlobalEgressLimit(bytesPerSec int) EngineOption

WithGlobalEgressLimit sets the total outbound gossip throughput cap (bytes/sec). Exceeded traffic is prioritised by TopicConfig.Priority.

func WithHypercube

func WithHypercube(h *Hypercube) EngineOption

WithHypercube enables structured rumor routing.

func WithMaxPayloadSize added in v0.0.6

func WithMaxPayloadSize(n uint32) EngineOption

WithMaxPayloadSize rejects inbound frames declaring a length above the threshold. Prevents malicious or malformed peers from forcing unbounded allocations.

func WithMetaProvider added in v0.0.6

func WithMetaProvider(fn func() *ExchangeMeta) EngineOption

WithMetaProvider supplies the outbound ExchangeMeta builder for G1 handlers. Called once per exchange tick; nil = send an empty meta.

func WithNetworkPolicy added in v0.0.10

func WithNetworkPolicy(p NetworkPolicy) EngineOption

WithNetworkPolicy wires the engine's policy-aware components (rumor retry timing, gossip cadence ceiling, reconciliation pacing) to consult a shared NetworkPolicy. Without this option every component falls back to its hard-coded defaults — same behavior as today.

func WithOverloadQueueThreshold

func WithOverloadQueueThreshold(n int) EngineOption

WithOverloadQueueThreshold sets the outbound queue depth above which Publish returns ErrBackpressure.

func WithPEX

func WithPEX(pex *PEXManager) EngineOption

WithPEX enables peer exchange.

func WithPeerRateLimit

func WithPeerRateLimit(per float64, burst int) EngineOption

WithPeerRateLimit configures the per-peer token bucket. per is messages per second (steady state), burst is the burst capacity. If the engine already has a rate limiter, these values are applied on the next NewEngine boot only — create the limiter via NewRateLimiter to reuse across restarts.

func WithRTT

func WithRTT(rtt *NetworkRTTMeasurer) EngineOption

WithRTT enables RTT measurement.

func WithRateLimiter

func WithRateLimiter(rl *RateLimiter) EngineOption

WithRateLimiter installs a fully-configured engine rate limiter. For most use cases, prefer WithPeerRateLimit and WithGlobalEgressLimit.

func WithReconcile added in v0.0.10

func WithReconcile(d *ReconcileDriver) EngineOption

WithReconcile wires a ReconcileDriver into the engine so the G4 frame handler dispatches inbound TableFrames into the driver's RunResponderRound. Without this option the engine's responder loop returns "unknown frame magic" on G4 frames — peers must fall back to G1 + DeltaTracker.

func WithRumor

func WithRumor(r *RumorPusher) EngineOption

WithRumor enables rumor-mongering (G3 immediate push).

func WithRumorApplyFn added in v0.0.6

func WithRumorApplyFn(fn func([]byte) error) EngineOption

WithRumorApplyFn sets the record-apply callback for the default G3 handler. Consumers typically route by magic-byte prefix or unmarshal as their record type and apply to the local store.

func WithRumorDedupKeyFunc added in v0.0.6

func WithRumorDedupKeyFunc(fn RumorIDFunc) EngineOption

WithRumorDedupKeyFunc sets the dedup key function for the default G3 handler. Defaults to a sha256 of the payload when nil.

func WithSeenIDTracking added in v0.0.6

func WithSeenIDTracking(enabled bool) EngineOption

WithSeenIDTracking enables/disables NodeID collection on G1 applies (for liveness tracking).

func WithSnapshot added in v0.0.10

func WithSnapshot(d *SnapshotDriver) EngineOption

WithSnapshot wires a SnapshotDriver into the engine so the G5 frame handler dispatches inbound ManifestRequest / ShardRequest frames into the driver. Without this option G5 frames return "unknown magic" and cold-start falls back to G1 full sync.

type EngineStats added in v0.0.3

type EngineStats struct {
	Topics          int             // registered topic count
	Subscribers     int             // total subscribers across topics
	Adaptive        *AdaptiveStats  // current interval + idle counter
	Backpressure    bool            // currently overloaded?
	PendingExchange int             // current pendingExchanges on backpressure monitor
	Rumor           *RumorStats     // rumor-push effectiveness
	RateLimiter     *RateLimitStats // queue depth, threshold, peer count
	RTT             time.Duration   // recent avg RTT from NetworkRTTMeasurer (0 if unmeasured)
	HypercubeValid  bool            // hypercube has usable state
	HypercubeDim    int             // current hypercube dimension
}

EngineStats aggregates observable state from all attached subsystems. Nil fields correspond to subsystems not wired on this engine.

type ExchangeMeta

type ExchangeMeta struct {
	// Offer signals desire for a direct connection.
	Offer *ConnectionOffer `json:"conn_offer,omitempty"`

	// ConnectionCounts is the gossip-propagated connection map.
	// Maps nodeID -> current inbound+outbound connection count.
	ConnectionCounts map[string]int `json:"conn_counts,omitempty"`

	// PEXEntries carries signed peer advertisements piggybacked on gossip.
	// Max PEXMaxEntriesPerExchange entries per exchange, rate-limited per peer.
	PEXEntries []PEXEntry `json:"pex_entries,omitempty"`

	// CacheFingerprint is an order-independent XOR hash of all cache keys.
	// When two peers exchange fingerprints and they differ, the next exchange
	// should be a full sync (not delta) to reconcile the divergence.
	CacheFingerprint uint64 `json:"cache_fp,omitempty"`

	// BackpressureSignal indicates the sender is overwhelmed and cannot process
	// gossip exchanges fast enough. When true, receiving peers should
	// apply exponential backoff to their gossip interval for this peer:
	//   - Initial backoff: 30s
	//   - Doubles each consecutive signal: 30s -> 60s -> 120s (capped)
	//   - Resets to 0 when the peer stops signaling backpressure
	// The signal is set by BackpressureMonitor when pendingExchanges exceeds
	// the threshold (default 5). The field is omitempty so peers that never
	// set it are treated as healthy (no backoff applied).
	BackpressureSignal bool `json:"backpressure,omitempty"`
}

ExchangeMeta extends gossip exchange metadata with connection signaling fields and connection map data. This is the wire format addition to each gossip exchange.

type FingerprintProvider added in v0.0.6

type FingerprintProvider interface {
	Fingerprint() uint64
}

FingerprintProvider is implemented by any StateStore whose entire state reduces to a single 64-bit fingerprint (fast equality probe before a full G1 exchange). When a consumer's store satisfies this interface AND the consumer registers the default G2 handler, peers can skip the full G1 exchange when fingerprints match.

type FixedProfilePolicy added in v0.0.10

type FixedProfilePolicy struct {
	// contains filtered or unexported fields
}

FixedProfilePolicy is a NetworkPolicy that always returns the same profile. Used by library mesh nodes on stable wired links and by any consumer that wants explicit control over cadence without signal-driven adaptation. Thread-safe.

func NewFixedProfilePolicy added in v0.0.10

func NewFixedProfilePolicy(p NetworkProfile) *FixedProfilePolicy

NewFixedProfilePolicy returns a policy backed by the given profile. Pass DefaultProfile() for the standard wired-network behavior.

func (*FixedProfilePolicy) PeerOverride added in v0.0.10

func (p *FixedProfilePolicy) PeerOverride(peer string) (NetworkProfile, bool)

PeerOverride returns the override for a specific peer, or false if none is configured. Allows individual peers to deviate from the default profile (e.g. tighter retry timings for a known-flaky peer).

func (*FixedProfilePolicy) Profile added in v0.0.10

func (p *FixedProfilePolicy) Profile() NetworkProfile

Profile returns the current default profile.

func (*FixedProfilePolicy) SetPeerOverride added in v0.0.10

func (p *FixedProfilePolicy) SetPeerOverride(peer string, profile NetworkProfile)

SetPeerOverride installs a per-peer profile override. Empty peer string clears all overrides.

func (*FixedProfilePolicy) SetProfile added in v0.0.10

func (p *FixedProfilePolicy) SetProfile(profile NetworkProfile)

SetProfile updates the default profile and notifies subscribers. Used by signal-driven adapters layered on top of a FixedProfilePolicy (the agent's adapter to wire.NetworkMonitor).

func (*FixedProfilePolicy) Signals added in v0.0.10

func (p *FixedProfilePolicy) Signals() NetworkSignals

Signals returns an empty signal set — fixed-profile policies don't observe network conditions.

func (*FixedProfilePolicy) Subscribe added in v0.0.10

func (p *FixedProfilePolicy) Subscribe(fn func(NetworkProfile))

Subscribe registers a callback fired on every profile change. Callbacks run synchronously in SetProfile's caller goroutine — slow callbacks delay subsequent profile transitions.

type FrameAction added in v0.0.6

type FrameAction int

FrameAction is returned by a FrameHandler to drive the responder loop's next step. The handler has already consumed whatever body follows the frame magic; the action tells the loop whether to keep reading the next frame, exit cleanly (peer closed, no error), or exit with a failure (e.g. malformed frame that may leave the stream in an undefined state).

const (
	// FrameContinue keeps the responder loop running — read the next
	// frame magic.
	FrameContinue FrameAction = iota

	// FrameReturn exits the loop cleanly without error. Used when the
	// handler determines the peer-side has no more frames to send
	// (e.g. graceful goaway).
	FrameReturn

	// FrameFail exits the loop with an error — the stream is in an
	// unrecoverable state and the caller should tear down the
	// connection.
	FrameFail
)

type FrameHandler added in v0.0.6

type FrameHandler interface {
	Handle(ctx context.Context, conn net.Conn, peerNodeID string) FrameAction
}

FrameHandler handles a single frame whose 2-byte magic has already been read by the responder loop. The handler reads the remaining bytes of the frame from conn, processes it, and returns a FrameAction to continue or exit.

peerNodeID is the remote session's identity — threaded through from Engine.RunResponder so handlers can discriminate per-peer without a separate lookup.

type FrameHandlerFunc added in v0.0.6

type FrameHandlerFunc func(ctx context.Context, conn net.Conn, peerNodeID string) FrameAction

FrameHandlerFunc adapts a free function into a FrameHandler.

func (FrameHandlerFunc) Handle added in v0.0.6

func (f FrameHandlerFunc) Handle(ctx context.Context, conn net.Conn, peerNodeID string) FrameAction

Handle delegates to the underlying function.

type G1Codec added in v0.0.8

type G1Codec interface {
	// EncodeExchange serialises records + meta into the G1 body.
	EncodeExchange(records [][]byte, meta *ExchangeMeta) ([]byte, error)

	// DecodeExchange extracts records, the peer's meta, and an
	// optional list of node IDs referenced by those records (for
	// liveness tracking via GossipResult.SeenNodeIDs). Unknown
	// fields must be tolerated so forward-compatible meta
	// additions don't break existing peers.
	DecodeExchange(body []byte) (DecodedExchange, error)
}

G1Codec serialises and deserialises the body of a G1 exchange frame. Whisper owns the [magic][length] framing; the codec owns everything inside so consumers can choose protobuf, JSON, CBOR, or a custom envelope that carries both records and ExchangeMeta piggyback data.

The codec MUST round-trip without loss: DecodeExchange(EncodeExchange(r, m)) yields an equivalent DecodedExchange. Errors from either direction are surfaced to the native G1 handler, which logs and skips the frame rather than tearing down the session.

type G1ExchangeObserver added in v0.0.8

type G1ExchangeObserver func(res GossipResult, peerNodeID string)

G1ExchangeObserver is invoked once per successful G1 exchange with the observed result and the peer's identity. Consumers use this for latency metrics, per-peer liveness bookkeeping, and callbacks that want to publish ledger records (e.g. round-trip telemetry) after each exchange.

type GossipEvent added in v0.0.6

type GossipEvent struct {
	Kind       GossipEventKind
	PeerNodeID string
	Time       time.Time
	Topic      string // optional — populated by topic-aware events
	Bytes      int64  // optional — payload size
	Records    int    // optional — record count
	DurationMs int64  // optional — operation duration
	Err        string // optional — error description
}

GossipEvent is the payload for SubscribeEvents subscribers. Optional fields are populated when the producing handler has the data; consumers use zero-value detection to know what's available.

The shape was kept compact for the original four event kinds; the later additions accept it as-is rather than introduce a parallel MeshEvent type. Callers that need richer per-kind detail attach the fields they care about (e.g. reconciliation handlers populate Bytes + Records + DurationMs).

type GossipEventKind added in v0.0.6

type GossipEventKind int

GossipEventKind identifies a protocol-level event consumers can subscribe to via Engine.SubscribeEvents. The kind enum is open-ended — new kinds are added by Phase as the protocol grows (rumor ACK, reconciliation, snapshot, hypercube re-shape, network policy transition). Existing kinds keep their numeric values to stay wire-stable across mixed-version meshes.

const (

	// EventDigestMatch fires when a G2 probe found local and peer
	// fingerprints equal — consumers typically update per-peer
	// watermarks to avoid re-exchanging already-converged state.
	EventDigestMatch GossipEventKind = iota

	// EventRecordApplied fires when the G1 handler applies at least
	// one record from a peer exchange.
	EventRecordApplied

	// EventRumorReceived fires when a G3 rumor frame was accepted
	// (passed the dedup tracker and was applied).
	EventRumorReceived

	// EventFrameError fires on any frame-level error the loop
	// recovered from (non-fatal; fatal errors exit the loop
	// without producing an event).
	EventFrameError

	// EventRumorSent fires when the rumor pusher emitted a G3 frame
	// to a peer.
	EventRumorSent

	// EventRumorAcked fires when a G3-ACK was received for an
	// outstanding rumor — confirms delivery.
	EventRumorAcked

	// EventRumorRetry fires when ACK timeout triggered a retry via
	// an alternate hypercube edge.
	EventRumorRetry

	// EventRumorDropped fires when a rumor exhausted its retry budget
	// (typically 3) and was deferred to the next freshness sweep.
	EventRumorDropped

	// EventFreshnessSignal fires when a digest mismatch on the
	// freshness bus triggered an on-demand exchange.
	EventFreshnessSignal

	// EventReconcileStart fires when an IBLT reconciliation round
	// began with a peer.
	EventReconcileStart

	// EventReconcileComplete fires when reconciliation decoded
	// successfully and records were exchanged.
	EventReconcileComplete

	// EventReconcileDecodeFailure fires when peeling decode failed,
	// triggering a mode-flip from rate-1.5 to rateless or a
	// fallback to G1 + watermark.
	EventReconcileDecodeFailure

	// EventSnapshotStart fires when a cold-start snapshot exchange
	// began.
	EventSnapshotStart

	// EventSnapshotComplete fires when snapshot reconstruction
	// finished and records were applied.
	EventSnapshotComplete

	// EventSnapshotShardFailure fires on Reed-Solomon shard
	// recovery — observable signal that a neighbor missed but
	// the parity shard saved the transfer.
	EventSnapshotShardFailure

	// EventHypercubeRebuild fires on every cube re-shape (lazy or
	// eager).
	EventHypercubeRebuild

	// EventHypercubeNeighborPromoted fires when an ambient session
	// became a hypercube neighbor.
	EventHypercubeNeighborPromoted

	// EventHypercubeNeighborDemoted fires when a hypercube neighbor
	// was demoted to ambient (still connected, no longer a
	// proactive-dial target).
	EventHypercubeNeighborDemoted

	// EventNetworkPolicyTransition fires when the policy synthesizer
	// produced a new profile in response to signal changes.
	EventNetworkPolicyTransition

	// EventCapabilityNegotiated fires when a peer's capability
	// advertisement was applied to the per-peer feature set.
	EventCapabilityNegotiated
)

type GossipResult

type GossipResult struct {
	RTT            time.Duration // Total exchange time (serialize+write+read+deserialize)
	NetworkRTT     time.Duration // Network round-trip from header exchange timing
	RecordsSent    int           // Number of records we sent
	RecordsApplied int           // Number of records applied from peer
	SeenNodeIDs    []string      // All unique NodeIDs received (for liveness tracking)
	PeerMeta       *ExchangeMeta // metadata from the peer's exchange
	DigestSkipped  bool          // true when G2 digest matched — no data exchanged
}

GossipResult captures the outcome of a single gossip exchange round.

type Hypercube

type Hypercube struct {
	// contains filtered or unexported fields
}

Hypercube implements a virtual hypercube overlay for structured rumor routing.

Each node is assigned a position based on its index in the sorted LAD member list. Neighbors are computed by flipping one bit at a time (one per dimension). Dimension-ordered routing ensures each node receives a rumor exactly once:

  • Origin sends to ALL dimensions (fromDimension = 0xFF → -1)
  • Each receiver forwards only on dimensions GREATER than the one it received on
  • Total messages = N-1 (optimal, zero redundancy)

Example: 4 nodes (2D), positions 00, 01, 10, 11

Node 00 originates: → dim0→01, dim1→10
Node 01 received on dim0: → dim1→11 (only dims > 0)
Node 10 received on dim1: → stop (no dims > 1)
Node 11 received on dim1: → stop
Total: 3 messages for 4 nodes. Optimal.

func NewHypercube

func NewHypercube(selfID string) *Hypercube

NewHypercube creates a hypercube for the given local node ID. Call Rebuild() with the current member list to initialize.

selfID is required and immutable after construction — pass the local node's canonical NodeID. Consumers that don't know their NodeID until after an async step (e.g. an agent waiting on enrollment) should defer construction until the identity is available rather than mutating it later; the immutable design prevents the "empty selfID → nil Neighbors → silent routing fallback" class of bug that arose from constructor-time uncertainty.

func (*Hypercube) Dimension

func (h *Hypercube) Dimension() int

Dimension returns the cube dimension count.

func (*Hypercube) DimensionNeighbor

func (h *Hypercube) DimensionNeighbor(d int) string

DimensionNeighbor returns the neighbor in a specific dimension, or "" if none. Fully bounds-checked: rejects negative d, d ≥ dimension, selfPos < 0, and the neighbor-position-exceeds-members case that arises when member count is not a power of 2. Safe to call with any int from any source.

func (*Hypercube) MemberCount

func (h *Hypercube) MemberCount() int

MemberCount returns the number of members in the hypercube.

func (*Hypercube) Neighbors

func (h *Hypercube) Neighbors() []string

Neighbors returns all neighbor node IDs (one per dimension). Returns nil if this node is not in the hypercube.

func (*Hypercube) Position

func (h *Hypercube) Position() int

Position returns this node's hypercube address (-1 if not in cube).

func (*Hypercube) Rebuild

func (h *Hypercube) Rebuild(memberIDs []string)

Rebuild recomputes the hypercube from the current member list. All nodes produce the same topology from the same sorted member list.

func (*Hypercube) RouteRumor

func (h *Hypercube) RouteRumor(fromDimension int) []int

RouteRumor returns the dimensions to forward on based on dimension-ordered routing. fromDimension = -1 (or 0xFF as uint8) means origin — forward on ALL dimensions. Otherwise, forward on dimensions strictly greater than fromDimension.

func (*Hypercube) Valid added in v0.0.3

func (h *Hypercube) Valid() bool

Valid reports whether this hypercube has usable state (self is a member, dimension is consistent with the members list). Intended for pre-flight checks by callers that loop over dimensions — a single Valid call amortises what would otherwise be repeated bounds checks.

func (*Hypercube) Version

func (h *Hypercube) Version() uint64

Version returns the fingerprint of the member list.

type HypercubeExt added in v0.0.10

type HypercubeExt struct {
	// contains filtered or unexported fields
}

HypercubeExt wraps the base Hypercube with the Phase 6 extension state. Co-exists with the original Hypercube — consumers that don't want extensions continue to use the original NewHypercube directly.

func NewHypercubeExt added in v0.0.10

func NewHypercubeExt(selfID string, opts HypercubeOptions) *HypercubeExt

NewHypercubeExt returns an extended hypercube wrapping a primary Hypercube. opts.RegionOf, MaxDimensionsOf, etc. are honored on every Rebuild call.

func (*HypercubeExt) DimensionWeight added in v0.0.10

func (h *HypercubeExt) DimensionWeight(dim int) float64

DimensionWeight returns the current adaptive weight for a dimension. Used by fanout selection — lower-weight dims get fewer rumors. Weights are in [0.1, 1.0]; default 1.0.

func (*HypercubeExt) Dual added in v0.0.10

func (h *HypercubeExt) Dual() *Hypercube

Dual returns the secondary (hash-sorted) cube, or nil if EnableDualCube wasn't set.

func (*HypercubeExt) EffectiveDimensions added in v0.0.10

func (h *HypercubeExt) EffectiveDimensions(peerID string) int

EffectiveDimensions returns the dimension cap to use for a specific peer. Combines self.MaxDimensions, peer.MaxDimensions, link.SupportedDimensions (asymmetric-dim + per-link scaling). 0 = no cap.

func (*HypercubeExt) MarkDirty added in v0.0.10

func (h *HypercubeExt) MarkDirty()

MarkDirty signals that membership has changed. Under HypercubeRebuildLazy, increments the dirty count; the next Rebuild only actually runs if dirtyCount > threshold or staleness elapsed.

func (*HypercubeExt) Primary added in v0.0.10

func (h *HypercubeExt) Primary() *Hypercube

Primary returns the underlying primary cube. Used when the HypercubeExt isn't visible but the base API is needed.

func (*HypercubeExt) Rebuild added in v0.0.10

func (h *HypercubeExt) Rebuild(members []string)

Rebuild recomputes the cube(s) from the member list. Honors region-aware ordering when opts.RegionOf is set; honors per-node dimension caps when opts.MaxDimensionsOf is set; honors lazy policy when configured. Excludes members for which opts.IsEphemeral returns true — they remain reachable as gossip leaves but aren't selected as cube neighbors.

func (*HypercubeExt) RecordDimensionLoss added in v0.0.10

func (h *HypercubeExt) RecordDimensionLoss(dim int)

RecordDimensionLoss is called by rumor pusher when a fanout to a specific dimension fails (ACK timeout, write error). Bumps the dimension's adaptive weight down so subsequent fanout preferentially picks healthier dimensions.

func (*HypercubeExt) RecordDimensionSuccess added in v0.0.10

func (h *HypercubeExt) RecordDimensionSuccess(dim int)

RecordDimensionSuccess is called when a fanout succeeds. Recovers the dimension's weight toward 1.0.

func (*HypercubeExt) RouteToFar added in v0.0.10

func (h *HypercubeExt) RouteToFar(targetNodeID string) (nextHop string, ok bool)

RouteToFar returns the next-hop neighbor for an RPC targeting a peer that isn't a direct hypercube neighbor. Picks the neighbor whose hypercube position is closest (by XOR distance) to the target's position. Loop-free by hypercube's dimension-ordered rule — forward-only-on-greater-dimensions guarantees acyclic routing.

Returns ("", false) if no useful neighbor exists (target is already a direct neighbor, or self isn't in the cube).

type HypercubeOptions added in v0.0.10

type HypercubeOptions struct {
	// RegionOf returns a region key for a NodeID. Used by
	// region-aware dimension ordering. nil = no region awareness
	// (today's behavior).
	RegionOf func(nodeID string) string

	// MaxDimensionsOf returns the per-node dimension cap. Used by
	// asymmetric-dimensions support. nil = no cap.
	MaxDimensionsOf func(nodeID string) int

	// LinkSupportedDimsOf returns the per-link dimension support
	// (effectiveDims = min(self.Max, peer.Max, link.Supported)).
	// nil = no link-level cap.
	LinkSupportedDimsOf func(nodeID string) int

	// RebuildPolicy: eager (default) or lazy.
	RebuildPolicy HypercubeRebuildPolicy

	// DirtyThreshold: lazy rebuild fires after N MarkDirty calls.
	// 0 = use default (10).
	DirtyThreshold int

	// Staleness: lazy rebuild fires after wall-clock interval
	// since last rebuild. 0 = use default (60 sec).
	Staleness time.Duration

	// EnableDualCube: maintain a second cube (hash-sorted)
	// alongside the primary (NodeID-sorted) for redundancy.
	EnableDualCube bool

	// IsEphemeral returns true for peers that should be excluded from
	// hypercube neighbor selection (browsers, admin CLIs, low-battery
	// cellular agents). Excluded members are NOT included in the
	// member list passed to the cube; they participate as gossip
	// leaves only.
	//
	// nil = no ephemerals (all members are cube candidates).
	IsEphemeral func(nodeID string) bool
}

HypercubeOptions extends the original Hypercube with all 7 extensions from Phase 6 of the redesign:

  1. Region-aware dimension ordering — sort dimensions so early-dim hops stay in-region, late-dim hops cross regions.
  2. Asymmetric dimensions — per-node MaxDimensions cap.
  3. Lazy rebuild — rebuild on dirty threshold or staleness.
  4. RPC routing — hypercube-routed RPC for far peers.
  5. Adaptive dimensional weighting — bump down flaky dimensions in fanout selection.
  6. Per-link dimensionality scaling — link advertises supported dimensions; effective = min(self, peer, link).
  7. Dual-hypercube redundancy — two cubes (NodeID-sorted + hash-sorted) for instant fault-tolerance.

All extensions are optional / additive — a Hypercube without any of these set behaves exactly as today's implementation.

type HypercubeRebuildPolicy added in v0.0.10

type HypercubeRebuildPolicy uint8

HypercubeRebuildPolicy defines when to rebuild the cube. Eager rebuilds on every membership change (today's behavior). Lazy defers rebuilds until a configurable threshold of dirty events or a wall-clock interval.

const (
	// HypercubeRebuildEager: rebuild on every Rebuild() call.
	// Default for backward compatibility.
	HypercubeRebuildEager HypercubeRebuildPolicy = iota

	// HypercubeRebuildLazy: rebuild only when MarkDirty has been
	// called more than `dirtyThreshold` times OR when more than
	// `staleness` has elapsed since the last rebuild. Cuts cube-
	// rebuild thrash on a churning fleet.
	HypercubeRebuildLazy
)

type NetworkPolicy added in v0.0.10

type NetworkPolicy interface {
	Profile() NetworkProfile
	Signals() NetworkSignals
	PeerOverride(peer string) (NetworkProfile, bool)
	Subscribe(func(NetworkProfile))
}

NetworkPolicy is the consumer-facing surface every cadence/pacing decision reads. Implementations may be static (library mesh-node) or signal-driven (agent). Subscribe is non-blocking — slow subscribers miss profile transitions, which is fine because every caller re-reads Profile() before each interval/pacing decision.

type NetworkProfile added in v0.0.10

type NetworkProfile struct {
	GossipMin          time.Duration // 2s — convergence floor
	GossipBase         time.Duration // 10s — base interval
	GossipMax          time.Duration // 60s — idle ceiling (wifi default)
	FreshnessProbeMin  time.Duration // 30s — between probes per peer
	FreshnessProbeMax  time.Duration // 5min — coalesce repeated stale
	RumorRetryInitial  time.Duration // 200ms — first retry delay
	RumorRetryMax      time.Duration // 5s — final retry cap
	RumorFanout        int           // 1 — hypercube dimensions
	ReconcilePacing    int           // 0 = unlimited; cellular: 8 KB/s
	SnapshotChunkMax   int           // 65536 — bytes per snapshot chunk
	SnapshotShardCount int           // 2 — k-of-N for cold-start
	HypercubeMaxDims   int           // 0 = unlimited
}

NetworkProfile is the synthesised configuration consumers read for every cadence / pacing / sizing decision. All durations and byte counts are absolute values — consumers don't need to know which signals produced them.

Defaults (zero values) are chosen so a consumer that never updates the policy still gets sensible behavior (matches the existing AdaptiveHWPInterval defaults from 2026-04-09 Task 10a + the 2026-04-23 freshness 30-sec emit + RumorPusher defaults).

func CellularProfile added in v0.0.10

func CellularProfile() NetworkProfile

CellularProfile returns a NetworkProfile suitable for metered cellular links — wider intervals, paced reconciliation, smaller snapshot chunks. Used by the multi-signal synthesizer when LinkType=cellular and one of the secondary signals (battery low, RTT high) confirms the transition.

func DefaultProfile added in v0.0.10

func DefaultProfile() NetworkProfile

DefaultProfile returns a NetworkProfile suitable for stable wired connections (fly nodes, ethernet agents). Consumers without a signal source can use this directly.

type NetworkRTTMeasurer

type NetworkRTTMeasurer struct {
	// contains filtered or unexported fields
}

NetworkRTTMeasurer tracks per-connection network RTT using header exchange timing. During a gossip exchange, both sides simultaneously write their length-prefixed header (4 bytes). The time from our write to receiving the peer's header approximates one network round-trip.

Samples live in a fixed-size ring buffer indexed by idx%len(ring). No slice copy on roll-over, and the backing array never grows — matches cpu_gate.go's rolling-window pattern.

func NewNetworkRTTMeasurer

func NewNetworkRTTMeasurer(maxSamples int) *NetworkRTTMeasurer

NewNetworkRTTMeasurer creates a measurer that keeps the last N samples.

func (*NetworkRTTMeasurer) AvgRTT

func (m *NetworkRTTMeasurer) AvgRTT() time.Duration

AvgRTT returns the average of recent RTT samples. Returns 0 if no samples.

func (*NetworkRTTMeasurer) LastRTT

func (m *NetworkRTTMeasurer) LastRTT() time.Duration

LastRTT returns the most recent RTT sample. Returns 0 if no samples.

func (*NetworkRTTMeasurer) Record

func (m *NetworkRTTMeasurer) Record(rtt time.Duration)

Record adds a new RTT sample.

func (*NetworkRTTMeasurer) SampleCount

func (m *NetworkRTTMeasurer) SampleCount() int

SampleCount returns how many RTT samples are currently held in the window. Caps at the ring capacity once the ring has wrapped.

type NetworkSignals added in v0.0.10

type NetworkSignals struct {
	LinkType        int           // 0=unknown, 1=ethernet, 2=wifi, 3=cellular, 4=vpn
	AvgRTT          time.Duration // exponential moving average of observed RTT
	LossRate        float64       // recent rumor-ACK miss rate, 0.0-1.0
	EventBurstRate  float64       // events/min on the address-change bus
	BatteryLevel    float64       // 0.0-1.0; -1.0 = AC power / unknown
	BatteryDraining bool          // explicit signal independent of level
	TimeOfDay       int           // 0-23 hour, local; 0 = unknown
	LinkStableSince time.Duration // how long current LinkType has held
	PeerCount       int           // active sessions
}

NetworkSignals is the multi-signal input vector that drives a NetworkProfile. Consumers update signals asynchronously; the policy synthesizer reads them whenever a profile is requested.

Not every signal is meaningful for every consumer:

  • Library mesh nodes on fly run on stable wired links, so LinkType is always "ethernet-equivalent" and battery fields are zero / N/A. The fixed-profile policy ignores most signals.
  • Agents on roaming devices populate every field — LinkType transitions, RTT histograms, loss-rate, battery state all drive profile transitions.

type PEXEntry

type PEXEntry struct {
	NodeID    string    `json:"node_id"`
	Addresses []string  `json:"addresses"`
	Region    string    `json:"region"`
	Signature []byte    `json:"signature"` // ed25519 sig over NodeID+Addresses+SignedAt
	SignedAt  time.Time `json:"signed_at"`
}

PEXEntry represents a single peer advertisement in a Peer Exchange response. Entries are signed by the advertising node's ed25519 key to prove ownership.

func SignPEXEntry

func SignPEXEntry(nodeID string, addresses []string, region string, privateKey ed25519.PrivateKey) PEXEntry

SignPEXEntry creates a signed PEX entry for the local node.

type PEXManager

type PEXManager struct {
	// contains filtered or unexported fields
}

PEXManager coordinates PEX entry creation, verification, and sharing for a node.

func NewPEXManager

func NewPEXManager(
	nodeID string,
	addresses []string,
	region string,
	privateKey ed25519.PrivateKey,
	publicKeyLookup func(nodeID string) ed25519.PublicKey,
) *PEXManager

NewPEXManager creates a PEX manager for the local node.

func (*PEXManager) BuildPEXEntries

func (m *PEXManager) BuildPEXEntries(peerNodeID string) []PEXEntry

BuildPEXEntries returns up to PEXMaxEntriesPerExchange entries to piggyback on a gossip exchange to the given peer. Rate-limited per peer.

func (*PEXManager) KnownPeers

func (m *PEXManager) KnownPeers() []PEXEntry

KnownPeers returns all verified, non-stale PEX entries.

func (*PEXManager) ProcessPEXEntries

func (m *PEXManager) ProcessPEXEntries(entries []PEXEntry) []string

ProcessPEXEntries validates and stores inbound PEX entries from a peer. Returns the list of newly discovered node IDs.

func (*PEXManager) RefreshLocalEntry

func (m *PEXManager) RefreshLocalEntry(nodeID string, addresses []string, region string)

RefreshLocalEntry re-signs our PEX entry (call when addresses change).

func (*PEXManager) RemovePeer added in v0.0.3

func (m *PEXManager) RemovePeer(peerNodeID string)

RemovePeer drops a disconnected peer's state — both the known-peers entry AND the rate-limiter entry. Previously only known-peers was touched (via pruneKnownPeers' age-based cleanup), leaving the rate-limiter's lastSend map with a stale entry that only timed out after 2× PEXInterval. Call this from the ConnectionManager teardown path so PEX state tracks peer lifecycle exactly.

type PEXRateLimiter

type PEXRateLimiter struct {
	// contains filtered or unexported fields
}

PEXRateLimiter enforces rate limits on PEX sends per peer. Each peer is allowed one PEX send per PEXInterval.

func NewPEXRateLimiter

func NewPEXRateLimiter() *PEXRateLimiter

NewPEXRateLimiter creates a rate limiter with the default interval.

func (*PEXRateLimiter) Allow

func (rl *PEXRateLimiter) Allow(peerNodeID string) bool

Allow returns true if a PEX send to peerNodeID is allowed.

func (*PEXRateLimiter) Forget added in v0.0.3

func (rl *PEXRateLimiter) Forget(peerNodeID string)

Forget removes a peer's rate-limit state. Called from PEXManager.RemovePeer so a disconnected peer's lastSend entry doesn't linger (otherwise it only clears via age-based prune, which ignores stale peer identity). Safe no-op if the peer is unknown.

type PEXRequest

type PEXRequest struct {
	RequestingNodeID string `json:"requesting_node_id"`
	MaxEntries       int    `json:"max_entries,omitempty"` // 0 = use default (20)
}

PEXRequest is sent by a node to request peer lists from a connected peer.

type PEXResponse

type PEXResponse struct {
	Entries []PEXEntry `json:"entries"`
}

PEXResponse contains the peer list from the responding node.

type PeerBackoff

type PeerBackoff struct {
	// contains filtered or unexported fields
}

PeerBackoff tracks exponential backoff state for a single peer that has signaled backpressure. Used by the sender side of the gossip loop.

func NewPeerBackoff

func NewPeerBackoff() *PeerBackoff

NewPeerBackoff creates a new per-peer backoff tracker.

func (*PeerBackoff) CurrentBackoff

func (pb *PeerBackoff) CurrentBackoff() time.Duration

CurrentBackoff returns the current backoff duration (0 if not backing off).

func (*PeerBackoff) RecordSignal

func (pb *PeerBackoff) RecordSignal(peerSignaledBackpressure bool)

RecordSignal processes a backpressure signal from a peer exchange result. If the peer signaled backpressure, the backoff doubles (exponential). If the peer did NOT signal backpressure, the backoff resets to zero.

func (*PeerBackoff) ShouldSkip

func (pb *PeerBackoff) ShouldSkip() bool

ShouldSkip returns true if the peer is currently in backoff and gossip should be skipped for this tick.

type PeerRateLimiter

type PeerRateLimiter struct {
	// contains filtered or unexported fields
}

PeerRateLimiter tracks token-bucket state plus violation accounting for a single peer. When violations exceed the threshold in the window, the peer is placed in cooldown and skipped by the fanout.

func (*PeerRateLimiter) Allow

func (p *PeerRateLimiter) Allow() bool

Allow returns true if a single message is allowed for this peer right now. A false return indicates a violation (either throttled or in cooldown) and increments the violation counter. When the counter crosses the threshold within violationWindow, the peer is put in cooldown.

func (*PeerRateLimiter) InCooldown

func (p *PeerRateLimiter) InCooldown() bool

InCooldown reports whether the peer is currently cooling down.

func (*PeerRateLimiter) Violations

func (p *PeerRateLimiter) Violations() int

Violations returns the current violation count in the rolling window.

type PersistedPeerState added in v0.0.10

type PersistedPeerState struct {
	PeerID        string    `json:"peer_id"`
	Watermark     time.Time `json:"watermark"`
	ExchangeCount int       `json:"exchange_count"`
	LastActivity  time.Time `json:"last_activity"`
	AppliedOnFull int       `json:"applied_on_full"`
	FullSyncCount int       `json:"full_sync_count"`
}

PersistedPeerState is the over-the-wire / over-the-disk representation of a single peer's delta state, used by DeltaPersistence to round-trip state across process restarts. Keep field tags stable — they're the disk schema for any backend that serialises this struct.

type PullFetchFn added in v0.0.4

type PullFetchFn func(topic string, since time.Time, maxBytes int) ([]byte, error)

PullFetchFn fetches records matching the pull request. Returns the serialised response payload (implementation-specific — typically a G1 records block), or an error. The returned byte slice is already size- capped per the request's MaxResponseSize (or DefaultPullResponseMaxSize when zero) — HandlePullRequest validates the cap before calling.

type PullRequest added in v0.0.4

type PullRequest struct {
	Topic           string    // topic to pull
	SinceWatermark  time.Time // records strictly newer than this; zero = full sync
	MaxResponseSize uint32    // responder-side cap (bytes); 0 = use DefaultPullResponseMaxSize
}

PullRequest is the decoded form of a G4 PULL_REQUEST frame.

func ReadPullRequestBody added in v0.0.4

func ReadPullRequestBody(r io.Reader) (PullRequest, error)

ReadPullRequestBody reads the body of a G4 frame after the 2-byte magic has been consumed by the responder multiplexer. Symmetric with ReadGossipBody / ReadDigestBody / ReadRumorBody.

type RateLimitStats added in v0.0.3

type RateLimitStats struct {
	QueueDepth     int
	QueueThreshold int
	Peers          int
	Overloaded     bool
}

RateLimitStats is an inlined view of RateLimiter state for EngineStats.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter is the engine-level coordinator: per-peer buckets plus a global egress (bytes/sec) limiter plus overload queue tracking.

func NewRateLimiter

func NewRateLimiter(peerPerSec float64, peerBurst int, egressBytesPerSec int, queueThreshold int) *RateLimiter

NewRateLimiter builds an engine rate limiter using the provided settings. Zero values fall back to package defaults.

func (*RateLimiter) AllowPeer

func (r *RateLimiter) AllowPeer(peerID string) bool

AllowPeer returns true if a publish to peerID is currently allowed. A false return means the message should be dropped for that peer (other peers still receive it). Unknown peers are lazily registered.

func (*RateLimiter) DecQueue

func (r *RateLimiter) DecQueue()

DecQueue decrements the outbound queue counter.

func (*RateLimiter) EgressSaturated

func (r *RateLimiter) EgressSaturated(n int) bool

EgressSaturated returns true if the global egress bucket has no tokens available for the given payload size. Used by the engine to decide whether to hold low-priority traffic in favor of high-priority topics.

func (*RateLimiter) IncQueue

func (r *RateLimiter) IncQueue()

IncQueue increments the outbound queue counter. Used by the engine to signal that a publish is in flight.

func (*RateLimiter) Overloaded

func (r *RateLimiter) Overloaded() bool

Overloaded returns true if the outbound queue has exceeded the threshold. When Overloaded is true, Publish returns ErrBackpressure.

func (*RateLimiter) PeerCount

func (r *RateLimiter) PeerCount() int

PeerCount returns the number of peers currently tracked.

func (*RateLimiter) PeerInCooldown

func (r *RateLimiter) PeerInCooldown(peerID string) bool

PeerInCooldown reports whether the peer is currently cooling down after sustained violations.

func (*RateLimiter) QueueDepth

func (r *RateLimiter) QueueDepth() int

QueueDepth returns the current outbound queue depth (test observability).

func (*RateLimiter) QueueThreshold added in v0.0.3

func (r *RateLimiter) QueueThreshold() int

QueueThreshold returns the current overload threshold. Useful for dashboards showing headroom (depth vs threshold) and for operator validation after startup.

func (*RateLimiter) RemovePeer

func (r *RateLimiter) RemovePeer(peerID string)

RemovePeer drops per-peer rate-limit state. Called when a peer disconnects.

func (*RateLimiter) ReserveEgress

func (r *RateLimiter) ReserveEgress(n int) bool

ReserveEgress attempts to consume n bytes from the global egress bucket. Returns true if allowed immediately; false means egress is saturated and the caller should defer or drop (priority-aware callers pick by priority).

func (*RateLimiter) SetQueueThreshold added in v0.0.3

func (r *RateLimiter) SetQueueThreshold(n int)

SetQueueThreshold adjusts the overload threshold at runtime. Values ≤ 0 are ignored (keeps previous value). Intended for operator tuning via a diagnostics endpoint — e.g. temporarily raise the threshold during a known convergence storm without bouncing the process.

type ReconcileCodec added in v0.0.10

type ReconcileCodec interface {
	// EncodeTable serialises an IBLT into a wire body.
	EncodeTable(t *iblt.IBLT, mode ReconcileMode) ([]byte, error)

	// DecodeTable deserialises an IBLT from a wire body. Receiver-
	// side path; mode is decoded from the body.
	DecodeTable(body []byte) (*iblt.IBLT, ReconcileMode, error)

	// EncodeReply packs records the responder is sending back to
	// the initiator (records the initiator was missing). Records
	// are opaque to whisper; codec defines the per-record format.
	EncodeReply(records [][]byte) ([]byte, error)

	// DecodeReply unpacks a reply body.
	DecodeReply(body []byte) ([][]byte, error)

	// EncodeRequest packs a list of record IDs (16-byte content
	// hashes) the responder is asking the initiator to send.
	EncodeRequest(ids []iblt.Key) ([]byte, error)

	// DecodeRequest unpacks a request body into IDs.
	DecodeRequest(body []byte) ([]iblt.Key, error)
}

ReconcileCodec is the consumer-supplied serialiser for the bodies of G4 frames. Whisper owns the [magic][length][flags] framing; the codec owns everything inside.

type ReconcileDriver added in v0.0.10

type ReconcileDriver struct {
	// contains filtered or unexported fields
}

ReconcileDriver is the per-process protocol driver. One instance shared across all peer connections; per-peer state lives on ReconcilePeer entries.

func NewReconcileDriver added in v0.0.10

func NewReconcileDriver(codec ReconcileCodec, store ReconcileStore, policy NetworkPolicy) *ReconcileDriver

NewReconcileDriver returns a driver wiring the consumer's codec and store into whisper's protocol logic. policy is optional — when present, RunRound uses NetworkPolicy.Profile().ReconcilePacing to throttle cell streams on metered links.

func (*ReconcileDriver) Codec added in v0.0.10

func (d *ReconcileDriver) Codec() ReconcileCodec

Codec exposes the codec for handlers that need to decode frames outside the driver's run loops.

func (*ReconcileDriver) PeerCount added in v0.0.10

func (d *ReconcileDriver) PeerCount() int

PeerCount is the number of registered peers.

func (*ReconcileDriver) Policy added in v0.0.10

func (d *ReconcileDriver) Policy() NetworkPolicy

Policy returns the NetworkPolicy reference if one was wired.

func (*ReconcileDriver) RegisterPeer added in v0.0.10

func (d *ReconcileDriver) RegisterPeer(p *ReconcilePeer)

RegisterPeer adds a peer. Idempotent.

func (*ReconcileDriver) RunInitiatorRound added in v0.0.10

func (d *ReconcileDriver) RunInitiatorRound(nodeID string, timeout time.Duration) (applied, sent int, err error)

RunInitiatorRound drives one reconciliation round as the initiator. Builds an IBLT from the store, sizes per the peer's current mode, sends the table frame, waits for the reply + request, applies received records, and serves requested records.

Returns the count of records applied (received from peer) and records sent (peer was missing). Errors are wire-level — protocol-level failures (decode incomplete) trigger mode flips internally and return success with 0 counts.

func (*ReconcileDriver) RunResponderRound added in v0.0.10

func (d *ReconcileDriver) RunResponderRound(conn net.Conn, body []byte) error

RunResponderRound is called by a frame-handler when it has just consumed a G4 TableFrame's [magic][length][flags=0x00] header and the body bytes. Builds the symmetric difference and writes back the reply + request frames.

func (*ReconcileDriver) Store added in v0.0.10

func (d *ReconcileDriver) Store() ReconcileStore

Store exposes the store for handlers that need to read/apply records outside the driver's run loops.

func (*ReconcileDriver) UnregisterPeer added in v0.0.10

func (d *ReconcileDriver) UnregisterPeer(nodeID string)

UnregisterPeer drops a peer's state.

type ReconcileFlags added in v0.0.10

type ReconcileFlags uint8

ReconcileFlags is the per-frame flag byte in the G4 body header. Distinguishes the three frame variants:

0x00 = TableFrame (initiator → responder): the IBLT itself
0x01 = ReplyFrame (responder → initiator): records the
       initiator is missing
0x02 = RequestFrame (responder → initiator): rumor IDs the
       responder is missing (initiator pulls these from its
       store and sends them as a follow-up TableFrame variant)
const (
	ReconcileFlagTable   ReconcileFlags = 0x00
	ReconcileFlagReply   ReconcileFlags = 0x01
	ReconcileFlagRequest ReconcileFlags = 0x02

	// ReconcileFlagDecodeFailed is sent by the responder when its IBLT
	// subtract did not converge — body is empty, used purely to signal
	// the initiator that the empty reply is a decode failure (not "we
	// were already in sync"). Without this flag, the initiator can't
	// distinguish "0 records both ways because of failure" from "0
	// records both ways because diff was empty," which means the
	// mode-flip state machine never advances out of rate-1.5 even when
	// the table consistently fails to decode.
	ReconcileFlagDecodeFailed ReconcileFlags = 0x03
)

type ReconcileMode added in v0.0.10

type ReconcileMode uint8

ReconcileMode is the per-peer mode the protocol driver runs in.

const (
	// ReconcileRate15 is rate-1.5 mode: sender ships a single
	// fixed-size IBLT (cells = 1.5 × d_max). Receiver decodes once.
	// Most efficient when d_max is a good estimate.
	ReconcileRate15 ReconcileMode = iota

	// ReconcileRateless is rateless mode: sender streams cells;
	// receiver attempts decode after each batch. Adapts to any
	// |d| at the cost of slightly higher bandwidth.
	ReconcileRateless
)

type ReconcilePeer added in v0.0.10

type ReconcilePeer struct {
	NodeID string
	Conn   net.Conn
	// contains filtered or unexported fields
}

ReconcilePeer represents a peer the driver can run a reconciliation round with. Wraps the per-peer wire conn plus state for mode-flip decisions.

type ReconcileStore added in v0.0.10

type ReconcileStore interface {
	// Snapshot returns the full set of (content_hash, record_bytes)
	// pairs currently in the store. Used to seed the IBLT.
	Snapshot() ([]iblt.Key, [][]byte)

	// FetchByID returns the record bytes for a given content hash,
	// or false if the store doesn't have it (peer is asking for
	// something we evicted; benign).
	FetchByID(id iblt.Key) ([]byte, bool)

	// Apply ingests an inbound record. Same contract as
	// StateStore.Apply — merge/tombstone resolution is the
	// consumer's responsibility.
	Apply(record []byte) error
}

ReconcileStore is the consumer's record store, keyed by content hash. Whisper's reconciliation driver calls Snapshot to populate the IBLT, FetchByID to pull records the peer is missing, and Apply to ingest records the peer sent.

type ReedSolomonErasure added in v0.0.10

type ReedSolomonErasure struct{}

ReedSolomonErasure is a default Reed-Solomon implementation of the erasure-coding portion of SnapshotCodec. Codec implementations embed it (or call directly) to satisfy EncodeErasure / DecodeErasure without re-implementing the byte-slice marshalling that pads variable-length record shards into the equal-length buffers reedsolomon.Encode requires.

The wire format inside each shard is:

[count uint32][len uint32][record_bytes][len uint32][record_bytes]...

Padded with zero bytes to the shard buffer length. The first 4 bytes (count) on a parity shard are not record bytes — parity shards are opaque to the consumer; only the n-of-n decode path reads them.

func (ReedSolomonErasure) DecodeErasure added in v0.0.10

func (ReedSolomonErasure) DecodeErasure(shards [][][]byte, k int) ([][][]byte, error)

DecodeErasure reconstructs the original k data shards from any k-of-n shards. Missing shards in the input MUST be nil entries (their inner slice empty or the outer entry nil). The output is the first k reconstructed shards as record-byte-slice slices.

func (ReedSolomonErasure) EncodeErasure added in v0.0.10

func (ReedSolomonErasure) EncodeErasure(dataShards [][][]byte, parityCount int) ([][][]byte, error)

EncodeErasure flattens k data shards (each carrying its own records) into equal-length buffers, runs reed-solomon over them to produce parityCount parity shards, and returns all k+parityCount shards in a stable order: data first, parity after.

Output shape: [][][]byte where each outer entry is one shard's flattened bytes split into a single inner [][]byte (the codec keeps a slice-of-slices interface for symmetry with the input). Concretely each output shard's inner slice has length 1 — the flattened buffer.

type RumorConfig

type RumorConfig struct {
	MaxHops            int     // max hop count before rumor dies (default 4)
	Fanout             int     // number of peers to forward to (default 3)
	ForwardProbability float64 // base probability of forwarding (1.0 = always)
	Enabled            bool    // master switch
}

RumorConfig configures rumor-mongering behavior.

func AgentMeshRumorConfig

func AgentMeshRumorConfig() RumorConfig

AgentMeshRumorConfig returns config optimized for large meshes (hundreds of devices).

func ServiceMeshRumorConfig

func ServiceMeshRumorConfig() RumorConfig

ServiceMeshRumorConfig returns config optimized for small meshes (11 nodes).

type RumorIDFunc

type RumorIDFunc func(payload []byte) string

RumorIDFunc generates a dedup key from a rumor payload. Consumers provide their own implementation based on their record structure. The returned string must be deterministic for the same payload.

type RumorMessage

type RumorMessage struct {
	ID      string // dedup key (generated by consumer's RumorIDFunc)
	Payload []byte // opaque serialised record
}

RumorMessage is a payload with its dedup key, queued for push.

type RumorPeerConn

type RumorPeerConn interface {
	PeerNodeID() string
	WriteRumor(payload []byte, hopCount uint8, fromDimension uint8) error
}

RumorPeerConn is the interface for sending rumors to a peer.

type RumorPusher

type RumorPusher struct {
	// contains filtered or unexported fields
}

RumorPusher pushes new records to peers immediately via hypercube or random.

func NewRumorPusher

func NewRumorPusher(cfg RumorConfig, rumorIDFn RumorIDFunc) *RumorPusher

NewRumorPusher creates a pusher with the given config. rumorIDFn generates dedup keys from payloads; if nil, a hash-based default is used.

func (*RumorPusher) EnableACKs added in v0.0.10

func (rp *RumorPusher) EnableACKs(timeout time.Duration, maxRetries int)

EnableACKs activates the rumor ACK protocol on this pusher. Without it, PushRumor / PushRumorExcluding behave exactly as before (best- effort, no delivery confirmation). With it active:

  • every per-peer rumor send registers an entry in the ACK tracker
  • the receiver-of-ACK loop on the rumor stream calls HandleACK to dispatch incoming G3-ACK frames into the tracker
  • a sweeper goroutine polls for ACK timeouts on the configured interval; timed-out rumors retry via an alternate hypercube edge until the retry budget is exhausted

Capability gating MUST happen at the registration boundary: only peers that advertise RumorACK should be passed to RegisterPeer when ACKs are enabled, otherwise their rumors will time out every cycle and waste retry budget. Mixed-version meshes register non-ACK peers separately or run with EnableACKs disabled per-peer.

Idempotent — multiple calls update timeout/retry parameters but only one sweeper runs.

func (*RumorPusher) HandleACK added in v0.0.10

func (rp *RumorPusher) HandleACK(rumorID, peerID string)

HandleACK is called by the ACK reader loop on each peer's rumor stream when a G3-ACK frame arrives. Looks up the (rumorID, peerID) pair in the tracker; if matched, clears the entry, bumps the acksReceived counter, and emits EventRumorAcked. Unknown ACKs are silently ignored — duplicates or late arrivals after retry-budget exhaustion.

func (*RumorPusher) HypercubeExt added in v0.0.10

func (rp *RumorPusher) HypercubeExt() *HypercubeExt

HypercubeExt returns the configured extended cube, or nil if only a plain Hypercube has been wired.

func (*RumorPusher) NotifyNewPayload

func (rp *RumorPusher) NotifyNewPayload(payload []byte)

NotifyNewPayload queues an opaque payload for rumor push (non-blocking). Payloads larger than MaxRumorPayloadBytes are dropped; they still reach peers via the G1 delta path, but holding them in the bounded inbound queue would pin (capacity × payload-size) bytes resident in the worst case. queueFull covers both overflow and oversized-drop cases.

func (*RumorPusher) PushRumor

func (rp *RumorPusher) PushRumor(payload []byte, hopCount uint8, fromDimension uint8)

PushRumor sends a payload to selected peers via hypercube or random fallback.

func (*RumorPusher) PushRumorExcluding

func (rp *RumorPusher) PushRumorExcluding(payload []byte, hopCount uint8, fromDimension uint8, excludeNodeID string)

PushRumorExcluding is like PushRumor but excludes a specific peer (the sender) to prevent back-propagation of rumors to the node that sent them.

func (*RumorPusher) RegisterPeer

func (rp *RumorPusher) RegisterPeer(nodeID string, conn RumorPeerConn)

RegisterPeer adds a peer connection for rumor delivery.

func (*RumorPusher) Run

func (rp *RumorPusher) Run(ctx context.Context)

Run processes inbound new-record notifications and pushes rumors.

func (*RumorPusher) SetEventEmitter added in v0.0.10

func (rp *RumorPusher) SetEventEmitter(fn func(GossipEvent))

SetEventEmitter wires the rumor pusher to a MeshEvent producer callback. Lifecycle events (RumorSent, RumorAcked, RumorRetry, RumorDropped) emit through the callback when set; without it the pusher updates only its atomic counters. Function-typed so consumers can fan events into their own subscribers (connection-history, mesh-debug, OTLP) without depending on whisper's internal event-bus type.

func (*RumorPusher) SetHypercube

func (rp *RumorPusher) SetHypercube(cube *Hypercube)

SetHypercube sets the structured overlay for dimension-ordered routing.

func (*RumorPusher) SetHypercubeExt added in v0.0.10

func (rp *RumorPusher) SetHypercubeExt(ext *HypercubeExt)

SetHypercubeExt sets the extended hypercube. When set, it supersedes any plain Hypercube wired via SetHypercube — the pusher uses the primary cube for the canonical routing decision and (when the extended options enable it) the dual cube for redundant fanout.

Adaptive dimension weighting, RouteToFar dispatch, region-aware ordering and ephemeral exclusion all read from the extended cube; the plain Hypercube path retains today's behavior for callers that haven't migrated.

func (*RumorPusher) SetNetworkPolicy added in v0.0.10

func (rp *RumorPusher) SetNetworkPolicy(p NetworkPolicy)

SetNetworkPolicy wires the rumor pusher to consult a NetworkPolicy for retry timing and fanout overrides. Subsequent pushes read `Profile().RumorRetryInitial` and `Profile().RumorFanout` to override the fixed values supplied to EnableACKs / RumorConfig. Without this, fixed defaults apply.

func (*RumorPusher) SetPeerCapabilityCheck added in v0.0.10

func (rp *RumorPusher) SetPeerCapabilityCheck(fn func(peerID string) bool)

SetPeerCapabilityCheck wires a per-peer capability lookup. The pusher only registers ACK tracking for peers that the callback returns true for. Pass nil to disable gating (every peer is treated as capable; fleet-wide ACK semantics).

func (*RumorPusher) Stats added in v0.0.3

func (rp *RumorPusher) Stats() RumorStats

Stats returns a snapshot of rumor-push effectiveness. Cheap: atomic reads for counters + one RLock for peer/cache sizes.

func (*RumorPusher) Tracker

func (rp *RumorPusher) Tracker() *RumorTracker

Tracker returns the underlying RumorTracker (for responder G3 handling).

func (*RumorPusher) UnregisterPeer

func (rp *RumorPusher) UnregisterPeer(nodeID string)

UnregisterPeer removes a peer connection.

type RumorStats added in v0.0.3

type RumorStats struct {
	Notified        uint64 // NotifyNewPayload calls
	Deduped         uint64 // skipped because already-seen
	QueueFull       uint64 // notifies dropped because inbound channel full
	PushesHypercube uint64 // frame writes via hypercube overlay
	PushesRandom    uint64 // frame writes via random-peer fallback
	WriteErrors     uint64 // peer.WriteRumor failures
	AcksReceived    uint64 // G3-ACK confirmations matched to outstanding rumors
	RumorRetries    uint64 // rumor re-pushes after ACK timeout
	RumorDropped    uint64 // rumors that exhausted retry budget
	PendingAcks     int    // outstanding (rumor, peer) pairs awaiting ACK
	Peers           int    // current peer count (RegisterPeer − UnregisterPeer)
	SeenCache       int    // current size of the tracker's seen map
}

RumorStats is a point-in-time snapshot of rumor-push effectiveness.

type RumorTracker

type RumorTracker struct {
	// contains filtered or unexported fields
}

RumorTracker deduplicates rumors using an O(1) LRU. The doubly-linked list orders entries by recency (front = newest, back = oldest); the map provides O(1) lookup into that list. MarkSeen moves an existing entry to the front or pushes a new one; eviction pops from the back.

func NewRumorTracker

func NewRumorTracker(cfg RumorConfig) *RumorTracker

NewRumorTracker creates a tracker with the given config.

func (*RumorTracker) IsSeen

func (rt *RumorTracker) IsSeen(rumorID string) bool

IsSeen returns true if this rumor has been processed before.

func (*RumorTracker) MarkSeen

func (rt *RumorTracker) MarkSeen(rumorID string)

MarkSeen records that this rumor has been processed. Existing keys are moved to the front (most-recent); new keys are pushed and the tail is evicted if capacity is exceeded.

func (*RumorTracker) ShouldForward

func (rt *RumorTracker) ShouldForward(hopCount uint8) bool

ShouldForward returns true based on probability decay: P / (1 + hopCount).

type SnapshotChunk added in v0.0.10

type SnapshotChunk struct {
	ShardIndex  uint8
	ChunkIndex  uint16
	TotalChunks uint16 // 0 = "more coming"; >0 = "this is final, total=N"
	Body        []byte
}

SnapshotChunk is a single wire chunk of a shard. The shard's record bytes are split into chunks bounded by the stream's available credit and SnapshotChunkMax — the only place where chunked-on-the-wire transfer lives.

type SnapshotCodec added in v0.0.10

type SnapshotCodec interface {
	EncodeManifest(m SnapshotManifest) ([]byte, error)
	DecodeManifest(body []byte) (SnapshotManifest, error)

	EncodeShardRequest(spec SnapshotShardSpec) ([]byte, error)
	DecodeShardRequest(body []byte) (SnapshotShardSpec, error)

	EncodeShardChunk(c SnapshotChunk) ([]byte, error)
	DecodeShardChunk(body []byte) (SnapshotChunk, error)

	// PartitionForShard returns the records belonging to a specific
	// shard. Deterministic — every peer with the same record set
	// produces the same partitioning. Used by the responder to
	// extract its slice of the encoded snapshot.
	PartitionForShard(records [][]byte, spec SnapshotShardSpec) [][]byte

	// Erasure encodes k data shards into n total shards (n-k
	// parity) using Reed-Solomon. n=k means no erasure coding
	// (single neighbor, no parity); n>k tolerates n-k missing
	// shards.
	EncodeErasure(dataShards [][][]byte, parityCount int) ([][][]byte, error)

	// DecodeErasure reconstructs the full k data shards from any
	// k-of-n received shards. Missing shards are nil entries in
	// the input slice.
	DecodeErasure(shards [][][]byte, k int) ([][][]byte, error)
}

SnapshotCodec is the consumer-supplied serialiser for the four G5 body shapes. Whisper owns the [magic][length][kind] framing; the codec owns the per-kind body encoding.

type SnapshotDriver added in v0.0.10

type SnapshotDriver struct {
	// contains filtered or unexported fields
}

SnapshotDriver runs the cold-start protocol. One instance per process, shared across peers.

Cold-start sequence (initiator side):

  1. Connect to N=3 hypercube neighbors.
  2. Parallel SnapshotManifestRequest to each.
  3. Pick freshest by HLC; verify N≥2 agree on fingerprint (consensus check — outlier flagged in MeshEvents).
  4. Pick k=ShardCount fastest neighbors that agreed; one extra for parity.
  5. Parallel SnapshotShardRequest to all k+parity neighbors.
  6. Reassemble chunks per shard; Reed-Solomon decode across.
  7. Apply records to local store.
  8. On any single-shard failure: parity shard saves the transfer.
  9. On 2+ shard failures: retry against next-best neighbor pair.

On metered cellular (NetworkPolicy.SnapshotShardCount = 1), the driver falls back to single-neighbor-with-retry (the original Option A) — parallel transfer over cellular costs more in connection setup than it saves.

func NewSnapshotDriver added in v0.0.10

func NewSnapshotDriver(codec SnapshotCodec, store SnapshotStore, policy NetworkPolicy) *SnapshotDriver

NewSnapshotDriver returns a driver wiring the consumer's codec and store into the G5 protocol. policy is optional; nil falls back to default sizing (3-shard fan-in with one parity).

func (*SnapshotDriver) HandleManifestRequest added in v0.0.10

func (d *SnapshotDriver) HandleManifestRequest(conn net.Conn) error

HandleManifestRequest is called by the engine's frame handler when a SnapshotKindManifestRequest arrives. Builds a manifest from the local store and writes it back as SnapshotKindManifest.

func (*SnapshotDriver) HandleShardRequest added in v0.0.10

func (d *SnapshotDriver) HandleShardRequest(conn net.Conn, body []byte) error

HandleShardRequest is called when a SnapshotKindShardRequest arrives. Extracts the requested shard's records from the store, chunks them per the policy's SnapshotChunkMax, and streams the chunks back as SnapshotKindShard frames.

func (*SnapshotDriver) RunInitiator added in v0.0.10

func (d *SnapshotDriver) RunInitiator(neighbors []net.Conn) (applied int, manifest SnapshotManifest, err error)

RunInitiator drives the cold-start sequence. neighbors is the pool of hypercube peers to probe; the driver picks freshest + fastest. Returns the count of records applied and the manifest it converged on (for telemetry).

Three-step protocol:

  1. Parallel manifest probe — fan out N requests, collect responses within probeTimeout. Filter out dead peers. Pick the freshest by HLC; require a fingerprint quorum (≥ ceil(N/2) peers agree) before committing — guards against an outlier with stale data.
  2. Shard fan-in — request shard i from peer i (round-robin) so bandwidth fans out across ShardCount streams. Reed-Solomon parity tolerates one lost neighbor mid-transfer.
  3. Apply records to the local store as chunks arrive.

On metered cellular (NetworkPolicy.SnapshotShardCount = 1), the driver collapses to single-neighbor sequential transfer so the connection-setup-overhead penalty doesn't dominate transfer time.

type SnapshotFrameKind added in v0.0.10

type SnapshotFrameKind uint8

SnapshotFrameKind discriminates the four frame types in the G5 protocol. Each frame starts with [magic][length] common header then a one-byte kind discriminator.

const (
	// SnapshotKindManifestRequest: probe to a hypercube neighbor
	// asking for its current snapshot manifest. Body: empty.
	SnapshotKindManifestRequest SnapshotFrameKind = 0x10

	// SnapshotKindManifest: response carrying HLC, fingerprint,
	// total record count, total bytes, shard layout proposal.
	// Initiator picks the freshest of N parallel manifests.
	SnapshotKindManifest SnapshotFrameKind = 0x11

	// SnapshotKindShardRequest: initiator asks for a specific shard
	// of the snapshot (hash-prefix bucket). Body: [shard_index][shard_count].
	SnapshotKindShardRequest SnapshotFrameKind = 0x12

	// SnapshotKindShard: streaming chunks for a single shard. Body
	// carries chunk_index + total_chunks + record bytes; receiver
	// reassembles, then Reed-Solomon decodes across shards.
	SnapshotKindShard SnapshotFrameKind = 0x13
)

type SnapshotManifest added in v0.0.10

type SnapshotManifest struct {
	HLC         uint64 // peer's current HLC
	Fingerprint uint64 // cache fingerprint (XOR of content hashes)
	RecordCount uint32 // total records the peer has
	TotalBytes  uint64 // approximate total wire size
	ShardCount  uint8  // proposed shard count for k-of-N transfer
	IsAnchor    bool   // peer advertises anchor service
}

SnapshotManifest is the per-peer "what I have" advertisement returned by SnapshotKindManifest. Tiny (~200 bytes); cheap to fetch from N=3 peers in parallel before committing to a transfer.

type SnapshotShardSpec added in v0.0.10

type SnapshotShardSpec struct {
	ShardIndex uint8
	ShardCount uint8
}

SnapshotShardSpec describes one shard of an erasure-coded snapshot. ShardIndex is the position (0..ShardCount-1); ShardCount is the total shards (k data + parity = N total). For a 2-of-3 layout (current default), ShardCount=3 and shards 0+1 are data while shard 2 is parity.

type SnapshotStore added in v0.0.10

type SnapshotStore interface {
	Snapshot() (records [][]byte, hlc uint64, fingerprint uint64)
	Apply(record []byte) error
}

SnapshotStore is the consumer's record store, used by the responder side to populate snapshots. Same Snapshot interface as ReconcileStore — most consumers implement both with the same underlying type.

type StateStore

type StateStore interface {
	// Fingerprint returns a 64-bit summary of current state. Used by
	// G2 digest probes — equality means both sides agree on every
	// record and the follow-up G1 can be skipped. Must change on any
	// mutation to avoid false-positive matches.
	Fingerprint() uint64

	// Snapshot returns every live record as opaque bytes. Called by
	// the initiator when no per-peer watermark is available and by
	// the responder on forced full-sync cycles.
	Snapshot() [][]byte

	// Delta returns records mutated since the given time. A zero
	// time is equivalent to Snapshot. Implementations may return the
	// full snapshot when they can't cheaply compute a delta — the
	// initiator tolerates over-sending.
	Delta(since time.Time) [][]byte

	// Apply ingests a single inbound record. Merge/tombstone/LWW
	// resolution is the consumer's responsibility; Whisper just
	// hands over the bytes and trusts the return for success
	// counting.
	Apply(data []byte) error
}

StateStore is the read/write surface Whisper drives during G1/G2 exchanges and rumor push. Consumers implement this by wrapping their authoritative record store (e.g., Ledger's DirectoryCache).

Records are opaque byte slices from Whisper's perspective — the consumer's G1Codec handles (de)serialisation at the exchange boundary. Apply is the single ingest point for both G1 responses and G3 rumors so merge semantics live in one place.

type Subscriber

type Subscriber interface {
	OnBroadcast(topic string, from string, payload []byte)
}

Subscriber receives broadcast messages for subscribed topics.

type TopicConfig

type TopicConfig struct {
	Mode     TopicMode
	Proto    proto.Message // type hint for unmarshal (nil = opaque bytes)
	Store    StateStore    // required for StatefulMerge, nil for others
	Handler  TopicHandler  // required for RequestResponse, optional for others
	TTL      time.Duration // BroadcastOnly: message expiry (0 = no expiry)
	Rumor    bool          // StatefulMerge: enable G3 rumor push on mutation (default true)
	Priority int           // dispatch priority: 0 = default, higher = preferred under egress pressure
}

TopicConfig configures a registered topic.

type TopicHandler

type TopicHandler interface {
	// OnMessage is called when a message arrives for this topic.
	// For BroadcastOnly: informational (message already propagated).
	// For RequestResponse: must return response bytes (nil = no response).
	OnMessage(ctx context.Context, from string, topic string, payload []byte) (response []byte, err error)
}

TopicHandler handles incoming messages for a topic.

type TopicMode

type TopicMode int

TopicMode defines the gossip semantics for a topic.

const (
	// BroadcastOnly is firehose pub/sub. No state, no merge, no delta sync.
	// Messages propagate to all subscribers and are not stored.
	BroadcastOnly TopicMode = iota

	// StatefulMerge uses delta sync with merge-on-apply.
	// Records are stored in a StateStore. Delta sync uses per-peer watermarks.
	// Merge conflicts are resolved by the store's registered MergeFunc.
	StatefulMerge

	// RequestResponse is solicit/reply. One peer sends a query, matching
	// peers reply. No state stored. No propagation beyond responders.
	RequestResponse
)

func (TopicMode) String

func (m TopicMode) String() string

String returns the name of the topic mode.

type TopicRegistry

type TopicRegistry struct {
	// contains filtered or unexported fields
}

TopicRegistry manages registered topics and their configurations.

func NewTopicRegistry

func NewTopicRegistry() *TopicRegistry

NewTopicRegistry creates an empty topic registry.

func (*TopicRegistry) All

func (r *TopicRegistry) All() []string

All returns all registered topic names.

func (*TopicRegistry) Count

func (r *TopicRegistry) Count() int

Count returns the number of registered topics.

func (*TopicRegistry) Get

func (r *TopicRegistry) Get(name string) (TopicConfig, bool)

Get returns the config for a topic, or false if not found.

func (*TopicRegistry) Register

func (r *TopicRegistry) Register(name string, config TopicConfig) error

Register adds a topic to the registry. Returns an error if the topic already exists or if required fields are missing for the mode.

Directories

Path Synopsis
Package iblt implements an Invertible Bloom Lookup Table for set reconciliation.
Package iblt implements an Invertible Bloom Lookup Table for set reconciliation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL