broadcast

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package broadcast provides a fan-out channel backed by a fixed-size ring buffer.

A Hub hands out a singleton Sender and any number of [Receiver]s. Every value published through the sender is delivered to every live receiver — unlike spmc/mpmc, which distribute each value to a single consumer. The ring is bounded; slow receivers do not block the sender. When a write would wrap onto an unread slot, the oldest unread value is overwritten and the affected receiver sees gochan.ErrLagged on its next Recv before resuming from the oldest still-buffered value.

Send never blocks: it always succeeds, overwrites, or returns gochan.ErrClosed. TrySend exposes the overwrite condition to the publisher by returning gochan.ErrFull when a write would evict an unread value, letting callers self-throttle or implement a drop-newest policy on top of the package's default drop-oldest.

A freshly constructed hub has no receivers; values published before any Hub.Receiver call are dropped without being written to the ring. Late subscribers start at the sender's current write position — they do not see historical values.

Typical uses

Event-stream fan-out (one producer, many listeners), configuration- change notifications, market-data ticks distributed to many strategies, WebSocket / SSE push systems where slow clients must not back up the publisher.

Unlike spmc/mpmc, every receiver sees every value (unless there's lag). Unlike github.com/amorey/gochan/watch, values are buffered (you see the last N, not just the most recent).

Semantics

Shared singleton sender, fan-out delivery. The send-side handle returned by Hub.Sender is a singleton that is safe to share across goroutines: any number of publishers may call Send / TrySend / SendContext / Close concurrently on the same handle. This is an intentional exception to the "one handle, one goroutine" rule that applies to the queue-style packages (spsc, spmc, mpsc, mpmc): broadcast's Send never parks — it always returns immediately, overwriting on wrap — so there is no parked-Send-races-Close window for shared use to expose. Any number of goroutines may each hold their own *Receiver[T] (obtained from Hub.Receiver) and call Recv/Close on it; the implementation does not synchronize concurrent callers on the same receiver handle — call Hub.Receiver once per subscriber. Every value goes to every live receiver — this is fan-out, not load distribution. Use github.com/amorey/gochan/spmc or github.com/amorey/gochan/mpmc if you want each value delivered to exactly one consumer.

Drop-oldest, with sender-observable pressure. The ring buffer holds at most capacity values. When Send wraps around onto a slot still holding an unread value, the unread value is overwritten and the affected receiver(s) see gochan.ErrLagged on their next Recv. TrySend exposes the same condition to the publisher: it returns gochan.ErrFull and refuses to write when an overwrite would occur, letting publishers self-throttle or drop-newest if they prefer.

Late subscribers see only future values. A receiver obtained via Hub.Receiver starts at the sender's current write position. Values published before registration are not replayed. To make a value durable across a subscribe boundary, publish it again after subscription completes.

Sender Send never blocks. By design, Send always returns immediately (success, overwrite, or gochan.ErrClosed). This is the inverse of the queue-style packages, where Send blocks under backpressure. If you want backpressure here, use TrySend + a back-off loop, or use github.com/amorey/gochan/mpmc instead.

No empty-hub gating. Unlike spmc / mpmc, broadcast does not block Send on the first Hub.Receiver call. Values published with no subscribers are dropped without being written to the ring — subsequent subscribers start at "now" and don't see them. This package therefore never returns gochan.ErrNotReady.

Hub close-all. Hub.Close closes the sender and every live receiver immediately. Already-buffered values are not drained: the next Recv returns gochan.ErrClosed, and Chan feeder goroutines unblock and close their channel without delivering remaining ring contents. Use Sender.Close instead if you want subscribers to finish consuming already-published values before seeing close.

Patterns

Fire-and-forget telemetry — publisher doesn't care about lag, subscribers handle it:

hub := broadcast.New[Metric](1024)

tx := hub.Sender()
go func() {
    for m := range metricStream {
        tx.Send(m) // never blocks; overwrites on wrap
    }
    tx.Close()
}()

rx := hub.Receiver()
for {
    m, err := rx.Recv()
    var lagged gochan.ErrLagged
    switch {
    case errors.As(err, &lagged):
        log.Warn("dropped metrics", "missed", lagged.Missed)
        continue
    case errors.Is(err, gochan.ErrClosed):
        return
    }
    process(m)
}

Sender-side observability — publisher tracks subscriber lag without changing delivery:

for state := range stateStream {
    if err := tx.TrySend(state); errors.Is(err, gochan.ErrFull) {
        metrics.Inc("broadcast.subscriber_lagging")
    }
    tx.Send(state) // commit anyway — drop-oldest is fine for this stream
}

User-built drop-newest — preserve older values when the ring is full:

for evt := range events {
    if err := tx.TrySend(evt); errors.Is(err, gochan.ErrFull) {
        droppedCount.Add(1)
        continue // skip the new value; keep older ones in the ring
    }
}

Self-throttling publisher — slow down when subscribers lag:

for v := range source {
    for tx.TrySend(v) != nil {
        time.Sleep(backoff)
    }
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Hub

type Hub[T any] struct {
	// contains filtered or unexported fields
}

Hub is the construction handle for a broadcast pipeline.

func New

func New[T any](capacity int) *Hub[T]

New creates a broadcast Hub backed by a ring buffer of the given capacity. capacity <= 0 panics: a ring of size zero cannot hold a value across a sender→receiver handoff without blocking the sender, which contradicts the package's non-blocking promise.

func (*Hub[T]) Close

func (h *Hub[T]) Close()

Close closes the sender and every live receiver. Already-buffered values are not drained — receivers see gochan.ErrClosed on their next operation, and Chan feeder goroutines unblock. Future Hub.Sender calls return the closed singleton; future Hub.Receiver calls return pre-closed handles. Idempotent.

func (*Hub[T]) Receiver

func (h *Hub[T]) Receiver() *Receiver[T]

Receiver returns a new subscriber bound to the ring. The receiver's read position is set to the sender's current write position — only values published after this call are delivered.

func (*Hub[T]) Sender

func (h *Hub[T]) Sender() *Sender[T]

Sender returns the singleton send-side handle. Repeated calls return the same handle. The handle is safe to share across goroutines (see the package doc for why broadcast is an exception to the "one handle, one goroutine" rule). After the hub has been closed the returned handle reports gochan.ErrClosed on use.

type Receiver

type Receiver[T any] struct {
	// contains filtered or unexported fields
}

Receiver is a receive-side handle. Each receiver tracks its own read position and lag accounting; a single *Receiver[T] is intended for use by one consumer goroutine.

func (*Receiver[T]) Chan

func (rx *Receiver[T]) Chan() <-chan T

Chan returns a per-receiver native channel that yields successive values in order, silently advancing past lagged values. Closed when the sender closes and this receiver drains, or when this receiver is closed. Repeated calls return the same channel.

Abandoning the channel without calling Receiver.Close pins the feeder goroutine — it will park forever waiting for the next value or close signal. Always Close the receiver when you stop reading.

func (*Receiver[T]) Close

func (rx *Receiver[T]) Close()

Close closes this receiver only. Other receivers and the sender are unaffected. Idempotent. Closing the last receiver does not close the sender — broadcast lets the publisher keep writing with no subscribers.

func (*Receiver[T]) Recv

func (rx *Receiver[T]) Recv() (T, error)

Recv blocks until the next value is available at this receiver's position, the sender has closed and this receiver has caught up, or this receiver / the hub is closed. Returns the next value, or one of:

  • gochan.ErrLagged — the receiver fell more than capacity values behind the sender; some values were overwritten. The error carries Missed — the number of values dropped before the receiver caught up. The receiver's position is reset to the oldest still-buffered value; the next Recv resumes from there. The receiver is still usable.
  • gochan.ErrClosed — the sender or hub has closed and this receiver has already drained everything still in the ring at or after its position.

func (*Receiver[T]) RecvContext

func (rx *Receiver[T]) RecvContext(ctx context.Context) (T, error)

RecvContext blocks like Receiver.Recv but returns ctx.Err() if ctx is cancelled first. A ready value or gochan.ErrLagged is preferred over a cancelled context.

func (*Receiver[T]) TryRecv

func (rx *Receiver[T]) TryRecv() (T, error)

TryRecv is non-blocking. Returns the next value, gochan.ErrEmpty if caught up, gochan.ErrLagged if behind, or gochan.ErrClosed.

type Sender

type Sender[T any] struct {
	// contains filtered or unexported fields
}

Sender is the singleton send-side handle. Safe to share across goroutines.

func (*Sender[T]) Close

func (tx *Sender[T]) Close()

Close closes the sender. Already-published values remain in the ring and remain receivable (subject to lag) until each receiver catches up. Further Send / TrySend / SendContext calls return gochan.ErrClosed. Idempotent.

func (*Sender[T]) Send

func (tx *Sender[T]) Send(v T) error

Send publishes v to the ring and returns immediately. Send never blocks: if the ring is full of unread values, the oldest unread slot is overwritten and the receiver(s) holding that slot will see gochan.ErrLagged on their next Recv. Returns gochan.ErrClosed if the sender or hub has been closed; on ErrClosed the value is dropped.

func (*Sender[T]) SendContext

func (tx *Sender[T]) SendContext(ctx context.Context, v T) error

SendContext returns ctx.Err() if ctx is already cancelled; otherwise behaves like Send. Send never blocks, so there is nothing for cancellation to interrupt mid-call.

func (*Sender[T]) TrySend

func (tx *Sender[T]) TrySend(v T) error

TrySend is the pressure-aware variant of Sender.Send. Returns gochan.ErrFull — without writing — if publishing v would evict an unread value from at least one live receiver. Returns gochan.ErrClosed if closed. Returns nil on a successful write.

TrySend is the entry point for senders that want to observe subscriber lag (for metrics, back-pressure, or self-throttling) or implement a drop-newest policy on top of the package's default drop-oldest behavior — see the package overview for patterns.

Directories

Path Synopsis
examples
chan command
broadcast/examples/chan demonstrates the Chan()-based API for a fan-out channel, with subscribers composing Chan() with a cancel signal via select for graceful early shutdown.
broadcast/examples/chan demonstrates the Chan()-based API for a fan-out channel, with subscribers composing Chan() with a cancel signal via select for graceful early shutdown.
recv command
broadcast/examples/recv demonstrates the Recv()-based API for a fan-out channel: every value goes to every live subscriber.
broadcast/examples/recv demonstrates the Recv()-based API for a fan-out channel: every value goes to every live subscriber.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL