Documentation
¶
Overview ¶
Package broadcasting provides a pub/sub abstraction modelled on Laravel's Broadcasting facade.
The Broadcaster interface is intentionally small so a Memory driver can ship in-box (single-process apps, tests) while Redis/NATS/Kafka drivers live in sub-packages without dragging external deps into the core module.
Usage:
bcast := broadcasting.NewMemory()
bcast.Subscribe(ctx, "chat.42", func(ctx context.Context, e broadcasting.Event) error {
fmt.Printf("got %s: %s\n", e.Name, string(e.Payload))
return nil
})
_ = bcast.Publish(ctx, broadcasting.Event{
Channel: "chat.42",
Name: "MessagePosted",
Payload: []byte(`{"text":"hi"}`),
})
Compared to events:
- events — synchronous, in-process, typed via generics. For aggregating domain reactions during a single request.
- broadcasting — many-to-many fan-out across processes (when paired with a remote driver) or within a process; subscribers hold a queue and run on their own goroutine.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("broadcasting: closed")
ErrClosed is returned by Publish/Subscribe after Close.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster interface {
// Publish delivers e to every active subscription whose channel
// matches e.Channel.
Publish(ctx context.Context, e Event) error
// Subscribe registers fn to receive every event published on
// channel. The returned Subscription unbinds fn when Cancel is
// called.
Subscribe(ctx context.Context, channel string, fn Handler) (Subscription, error)
// Close releases all resources. Further Publish/Subscribe calls
// return ErrClosed.
Close() error
}
Broadcaster is the transport interface.
type Event ¶
Event is the wire-format unit dispatched through a Broadcaster. Payload is opaque bytes — drivers do not assume JSON, but the helper Publish methods in user code typically pre-encode payloads as JSON.
type Handler ¶
Handler is invoked once per delivered event. Errors are surfaced via the driver's logger callback (or dropped by Memory) — they do not propagate back to the publisher.
type Memory ¶
type Memory struct {
// contains filtered or unexported fields
}
Memory is an in-process Broadcaster. Each subscription runs its own goroutine reading from a buffered channel. Bursts beyond the buffer are dropped; the dropped-count is exposed via Dropped() so callers can spot under-sized buffers in production.
func NewMemory ¶
func NewMemory(opts ...MemoryOption) *Memory
NewMemory returns a Broadcaster with default per-subscription buffer 256 and a silent logger. Tune via options.
func (*Memory) Dropped ¶
Dropped returns the cumulative number of events that could not be delivered because some subscriber's queue was full. Useful as a production saturation metric.
type MemoryOption ¶
type MemoryOption func(*Memory)
MemoryOption customises a Memory broadcaster.
func WithBuffer ¶
func WithBuffer(n int) MemoryOption
WithBuffer sets the per-subscription channel buffer.
func WithLogger ¶
func WithLogger(fn func(string, ...any)) MemoryOption
WithLogger installs a structured logger callback (called when a handler returns an error or an event is dropped).
type Subscription ¶
type Subscription interface {
Cancel()
}
Subscription represents one active subscription. Cancel removes the handler and releases its queue.