Documentation
¶
Overview ¶
Package forwarder — egress.go implements the per-worker outbound message batcher. Each Process*/ForwardControl call enqueues outbound datagrams into an Egress; the worker calls Egress.Flush at the end of each receive batch, which dispatches one ipv6.PacketConn.WriteBatch (sendmmsg on Linux) per target. This amortises the egress syscall cost across the entire receive batch instead of paying it per packet per interface.
An Egress is owned by a single worker goroutine and is not safe for concurrent use.
Package forwarder implements the decode → forward pipeline for shard-proxy.
Hot path ¶
Forwarder.Process decodes the ingress frame (BRC-12, BRC-124, or BRC-128), derives the multicast group from the TxID, then for BRC-124/BRC-128 frames conditionally stamps HashKey and SeqNum in-place at raw[40:48] and raw[48:56]:
- If SeqNum (raw[48:56]) is already non-zero the sender pre-stamped the frame; the proxy forwards it verbatim without modification.
- If SeqNum is zero the proxy stamps: HashKey = XXH64(senderIPv6 ∥ groupIdx ∥ subtreeID) (stable per flow); SeqNum = per-(sender, group, subtree) monotonic counter starting at 1. Each subtree therefore owns an independent sequence so loss in one subtree cannot create false gaps in another.
Per-flow counters live in a striped map (chainStripes) so concurrent workers contend on independent shards rather than a single mutex. Once a flow entry exists, counter increment is lock-free via atomic.Uint64.
BRC-130 fragmentation ¶
When Forwarder.SetFragMTU is called with a positive MTU, BRC-124/BRC-128 frames whose payload exceeds fragDataSize (= MTU − 40 − 8 − 104) are split into K BRC-130 fragment datagrams instead of being forwarded verbatim. Each fragment receives its own HashKey and SeqNum so it is independently cacheable and retransmittable by the retry endpoint. Frames at or below the threshold are forwarded verbatim.
BRC-12 frames are always forwarded verbatim.
Egress lifecycle ¶
Forwarder.OpenTargets opens one UDP socket per interface with IPV6_MULTICAST_IF applied and wraps it in an ipv6.PacketConn cached on the returned Target for batched WriteBatch (sendmmsg on Linux). Each worker constructs an Egress over the targets and passes it to every Process* call; the worker calls Egress.Flush at the end of each receive batch and once more during graceful shutdown to drain in-flight messages. Sockets are released with CloseTargets.
Index ¶
- func CloseTargets(targets []Target, log *slog.Logger)
- type BridgingEngine
- type Egress
- func (e *Egress) EnqueueControl(raw []byte, dst net.UDPAddr, label string, workerID int)
- func (e *Egress) EnqueueControlPooled(raw []byte, dst net.UDPAddr, label string, workerID int, pooled *[]byte)
- func (e *Egress) EnqueueData(raw []byte, dst net.UDPAddr, groupIdx uint32, workerID int)
- func (e *Egress) EnqueueDataPooled(raw []byte, dst net.UDPAddr, groupIdx uint32, workerID int, pooled *[]byte)
- func (e *Egress) Flush()
- func (e *Egress) FlushVia(fn EgressWriteFunc)
- func (e *Egress) PoolGet() *[]byte
- func (e *Egress) Targets() []Target
- type EgressWriteFunc
- type Forwarder
- func (fw *Forwarder) Bridging() *BridgingEngine
- func (fw *Forwarder) Dispatch(egr *Egress, raw []byte, src net.Addr, workerID int)
- func (fw *Forwarder) EgressPort() int
- func (fw *Forwarder) ForwardControl(egr *Egress, raw []byte, idx shard.GroupIdx, port int)
- func (fw *Forwarder) OpenTargets(ifaces []*net.Interface, probeWorker bool) ([]Target, error)
- func (fw *Forwarder) Process(egr *Egress, raw []byte, src net.Addr, workerID int)
- func (fw *Forwarder) ProcessAnchor(egr *Egress, raw []byte, src net.Addr, workerID int)
- func (fw *Forwarder) ProcessBlock(egr *Egress, raw []byte, src net.Addr, workerID int)
- func (fw *Forwarder) ProcessSubtreeData(egr *Egress, raw []byte, src net.Addr, workerID int)
- func (fw *Forwarder) SetBindSource(ip net.IP)
- func (fw *Forwarder) SetBridging(b *BridgingEngine)
- func (fw *Forwarder) SetEgressHopLimit(n int)
- func (fw *Forwarder) SetFragMTU(mtu int)
- func (fw *Forwarder) SetTxidDedup(d TxidDedup, prefix string)
- type Target
- type TxidDedup
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CloseTargets ¶
CloseTargets closes all egress sockets opened by [OpenTargets].
Types ¶
type BridgingEngine ¶ added in v1.12.0
type BridgingEngine struct {
// Secondary is the successor generation's shard engine. Distinct
// ShardBits and (optionally) addressing prefix so the dual-emit
// destination is the SUCCESSOR layout.
Secondary *shard.Engine
// TransitionEpoch (Unix seconds) is the time at which the consumer
// MUST stop dual-emitting; the applier replaces Secondary with the
// new active engine and clears the BridgingEngine pointer.
TransitionEpoch int64
}
Forwarder decodes ingress frames (BRC-12 or BRC-124/BRC-128), derives the multicast destination from the TxID, stamps HashKey/SeqNum for BRC-124/BRC-128 frames, and optionally splits large payloads into BRC-130 fragment datagrams. BridgingEngine carries the secondary shard engine used during a BRC-139 live-resharding bridging window. When set on a Forwarder via SetBridging, the per-frame emit path computes both the active and the bridging shard indices and emits to BOTH destinations. The listener's per-TxID egress dedup absorbs the duplicate frame on the receive side.
SetBridging is safe to call concurrently with the hot path. A nil pointer means "no bridging in flight" and the hot path is a single emit per frame (the normal, steady-state behaviour).
type Egress ¶
type Egress struct {
// contains filtered or unexported fields
}
func NewEgress ¶
NewEgress constructs an Egress bound to the given targets. batchHint sets the initial capacity reservation for the per-target message queue; growth beyond this is dynamic. The Egress draws fragment buffer memory from a sync.Pool sized to the forwarder's BRC-130 fragment datagram capacity; fragmentation must be enabled on fw for the pool to be initialised.
func (*Egress) EnqueueControl ¶
EnqueueControl queues raw for fan-out to every target as a control-plane datagram. label is the metrics.ControlFrameForwarded label fired per target at Flush.
func (*Egress) EnqueueControlPooled ¶
func (e *Egress) EnqueueControlPooled(raw []byte, dst net.UDPAddr, label string, workerID int, pooled *[]byte)
EnqueueControlPooled is EnqueueControl with a pool-recycled backing buffer.
func (*Egress) EnqueueData ¶
EnqueueData queues raw for fan-out to every target with destination dst. raw must remain valid until the next Flush call. groupIdx and size are captured for the PacketForwarded metric at Flush time.
func (*Egress) EnqueueDataPooled ¶
func (e *Egress) EnqueueDataPooled(raw []byte, dst net.UDPAddr, groupIdx uint32, workerID int, pooled *[]byte)
EnqueueDataPooled is EnqueueData where raw was obtained via PoolGet. The backing buffer (passed by pointer to the original slice) is returned to the pool after Flush completes.
func (*Egress) Flush ¶
func (e *Egress) Flush()
Flush writes all queued messages to each target via WriteBatch (sendmmsg on Linux; per-packet fallback elsewhere). Per-target write errors are recorded as EgressError; messages beyond WriteBatch's sent-count fire PacketDropped with reason "write_error". Pool buffers are released exactly once each before return.
func (*Egress) FlushVia ¶ added in v1.12.11
func (e *Egress) FlushVia(fn EgressWriteFunc)
FlushVia drains the per-batch egress queue through fn instead of the kernel WriteBatch (sendmmsg) path, then resets the queue and releases pooled buffers exactly like Flush, recording forwarded/dropped metrics identically. It lets an alternative egress transport reuse the forwarder's decode/stamp/addressing pipeline without forking it.
type EgressWriteFunc ¶ added in v1.12.11
EgressWriteFunc delivers one queued datagram — the BRC frame payload raw to multicast destination dst — for target index target. Returning a non-nil error stops that target's drain and is recorded like a sendmmsg write error.
type Forwarder ¶
type Forwarder struct {
// contains filtered or unexported fields
}
func New ¶
func New(engine *shard.Engine, mcPrefix uint16, mcGroupID uint16, egressPort int, debug bool, rec *metrics.Recorder) *Forwarder
New creates a Forwarder. No sockets are opened here; call [OpenTargets] in each worker's Run loop.
- engine: immutable shard derivation engine.
- mcPrefix: upper 16-bit scope prefix for control-plane group address derivation.
- mcGroupID: IANA group-id occupying bytes 12–13 (default shard.DefaultGroupID).
- egressPort: UDP destination port written into outgoing multicast datagrams.
- debug: enable per-packet debug logging.
- rec: metrics recorder; may be nil.
func (*Forwarder) Bridging ¶ added in v1.12.0
func (fw *Forwarder) Bridging() *BridgingEngine
Bridging returns the currently-active bridging engine, or nil when the proxy is in steady state. Intended for tests and telemetry.
func (*Forwarder) Dispatch ¶ added in v1.12.7
Dispatch routes one ingress datagram to the correct Process* entry point by its BRC frame-version byte (raw[6]). It is the single source of truth for frame-version routing, shared by the worker receive loop and any alternative ingress that drives the forwarder directly, so version handling cannot drift between callers. raw is the UDP payload (the BRC frame); src is its source; egr is the caller's per-worker egress queue. raw must remain valid until egr.Flush returns.
func (*Forwarder) EgressPort ¶
EgressPort returns the configured UDP destination port for multicast egress.
func (*Forwarder) ForwardControl ¶
ForwardControl enqueues a raw BRC-127 control datagram (e.g. SubtreeGroupAnnounce) for the given network-service multicast group index. The destination address is derived using shard.GroupAddr with the engine's configured scope prefix and IANA group-id. Unlike [Process], no sequence stamping or frame decoding is performed. raw must remain valid until egr.Flush returns.
func (*Forwarder) OpenTargets ¶
OpenTargets opens one multicast egress UDP socket per interface and wraps each in an ipv6.PacketConn for batched WriteBatch (sendmmsg on Linux) calls. On worker 0 (probeWorker == true) each socket is probed with a zero-byte send to verify multicast egress is functional.
On error, all partially opened sockets are closed before returning.
func (*Forwarder) Process ¶
Process is the hot path: decode raw for routing, conditionally stamp HashKey/SeqNum, then enqueue into egr for batched egress via Egress.Flush.
For BRC-124/BRC-128 frames: if raw[48:56] (SeqNum) is non-zero the sender has pre-stamped the frame and it is forwarded verbatim. If SeqNum is zero the proxy stamps raw[40:48] (HashKey) and raw[48:56] (SeqNum) in-place: HashKey is stable per (sender, group, subtree) flow; SeqNum is a per-flow monotonic counter starting at 1. BRC-12 frames are always forwarded verbatim. workerID is used only for metrics labels.
raw must remain valid until egr.Flush returns. egr may be nil; in that case the frame is decoded and stamped but not enqueued — used by tests that exercise only the stamping logic.
func (*Forwarder) ProcessAnchor ¶
ProcessAnchor handles BRC-134 chained anchor transaction frames (FrameVer 0x06). Anchor transactions are the root of a chain of dependent transactions and must reach every subscriber regardless of shard assignment. They are validated, HashKey/SeqNum-stamped, and enqueued for GroupBlockBroadcast (FF0E::B:FFFE) — the same multicast group as BRC-131 block frames.
raw must remain valid until egr.Flush returns. egr may be nil for tests.
func (*Forwarder) ProcessBlock ¶
ProcessBlock handles BRC-131 block control frames (FrameVer 0x04). It validates the frame, stamps HashKey/SeqNum if needed, optionally fragments large payloads via BRC-130, and enqueues into egr for the GroupBlockBroadcast multicast group instead of a shard group.
raw must remain valid until egr.Flush returns. egr may be nil for tests.
func (*Forwarder) ProcessSubtreeData ¶
ProcessSubtreeData handles BRC-132 subtree data frames (FrameVer 0x05). It validates the frame, stamps HashKey/SeqNum per (sender, 0xFFFB, subtreeID) flow, optionally fragments large payloads via BRC-130, and enqueues for the GroupSubtreeDataAnnounce multicast group.
raw must remain valid until egr.Flush returns. egr may be nil for tests.
func (*Forwarder) SetBindSource ¶ added in v1.11.0
SetBindSource configures the source IPv6 the kernel will bind for every multicast egress socket. Required when source-mode=ssm so SSM receivers can pre-declare this proxy in their (S,G) join calls. Each proxy replica MUST use a distinct bindSource — anycast / shared source IPs break PIM-SSM RPF.
ip must be a valid IPv6 address (To4() == nil). Pass nil to clear. Must be called before [OpenTargets].
func (*Forwarder) SetBridging ¶ added in v1.12.0
func (fw *Forwarder) SetBridging(b *BridgingEngine)
SetBridging publishes (or clears) the BRC-139 live-resharding bridging engine. Safe to call from any goroutine; the per-frame emit path reads the pointer atomically.
Pass nil to exit bridging mode. Bridging-mode operators MUST orchestrate the swap-and-clear at TransitionEpoch (the applier rebuilds the primary engine with the successor's parameters and then calls SetBridging(nil)).
func (*Forwarder) SetEgressHopLimit ¶ added in v1.12.6
SetEgressHopLimit configures IPV6_MULTICAST_HOPS for every egress socket. The default multicast hop limit is 1 (single L2 segment); routed or tunneled mesh fabrics (ip6gre + mc-router) must raise it so per-hop decrement does not drop frames before they reach remote subscribers. 0 leaves the kernel default. Must be called before [OpenTargets].
func (*Forwarder) SetFragMTU ¶
SetFragMTU enables BRC-130 fragmentation for the given path MTU. Frames with payload larger than (mtu - 40 - 8 - 104) bytes are split into multiple BRC-130 datagrams. Pass mtu <= 0 to disable fragmentation.
func (*Forwarder) SetTxidDedup ¶
SetTxidDedup attaches a TxID claim store used to suppress ingress duplicates before multicast. prefix is the Redis key prefix associated with the proxy ingress namespace (typically "bsp:tx:"). Pass nil to disable. Must be called before any worker goroutine starts processing.
type Target ¶
Target pairs a network interface with its pre-opened multicast egress socket and the cached ipv6.PacketConn wrapper used for batched WriteBatch (sendmmsg) calls during Egress.Flush.
type TxidDedup ¶
TxidDedup is the minimal interface a TxID claim store must satisfy. It is satisfied by *txidset.Store from shard-common; the forwarder depends only on this interface so that tests can inject lightweight fakes.
Claim must report (true, nil) when the caller wins the claim (proceed), (false, nil) when another claimant already holds the TxID (suppress), or (true, err) on Redis error (fail-open). Errors are surfaced through Recorder callbacks supplied to the Store rather than re-reported by the forwarder.