broadcasting

package
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package broadcasting provides a pub/sub abstraction modelled on Laravel's Broadcasting facade.

The Broadcaster interface is intentionally small so a Memory driver can ship in-box (single-process apps, tests) while Redis/NATS/Kafka drivers live in sub-packages without dragging external deps into the core module.

Usage:

bcast := broadcasting.NewMemory()
bcast.Subscribe(ctx, "chat.42", func(ctx context.Context, e broadcasting.Event) error {
    fmt.Printf("got %s: %s\n", e.Name, string(e.Payload))
    return nil
})
_ = bcast.Publish(ctx, broadcasting.Event{
    Channel: "chat.42",
    Name:    "MessagePosted",
    Payload: []byte(`{"text":"hi"}`),
})

Compared to events:

  • events — synchronous, in-process, typed via generics. For aggregating domain reactions during a single request.
  • broadcasting — many-to-many fan-out across processes (when paired with a remote driver) or within a process; subscribers hold a queue and run on their own goroutine.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("broadcasting: closed")

ErrClosed is returned by Publish/Subscribe after Close.

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster interface {
	// Publish delivers e to every active subscription whose channel
	// matches e.Channel.
	Publish(ctx context.Context, e Event) error
	// Subscribe registers fn to receive every event published on
	// channel. The returned Subscription unbinds fn when Cancel is
	// called.
	Subscribe(ctx context.Context, channel string, fn Handler) (Subscription, error)
	// Close releases all resources. Further Publish/Subscribe calls
	// return ErrClosed.
	Close() error
}

Broadcaster is the transport interface.

type Event

type Event struct {
	Channel string
	Name    string
	Payload []byte
}

Event is the wire-format unit dispatched through a Broadcaster. Payload is opaque bytes — drivers do not assume JSON, but the helper Publish methods in user code typically pre-encode payloads as JSON.

type Handler

type Handler func(ctx context.Context, e Event) error

Handler is invoked once per delivered event. Errors are surfaced via the driver's logger callback (or dropped by Memory) — they do not propagate back to the publisher.

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory is an in-process Broadcaster. Each subscription runs its own goroutine reading from a buffered channel. Bursts beyond the buffer are dropped; the dropped-count is exposed via Dropped() so callers can spot under-sized buffers in production.

func NewMemory

func NewMemory(opts ...MemoryOption) *Memory

NewMemory returns a Broadcaster with default per-subscription buffer 256 and a silent logger. Tune via options.

func (*Memory) Close

func (m *Memory) Close() error

Close releases all subscriptions. Returns nil.

func (*Memory) Dropped

func (m *Memory) Dropped() uint64

Dropped returns the cumulative number of events that could not be delivered because some subscriber's queue was full. Useful as a production saturation metric.

func (*Memory) Publish

func (m *Memory) Publish(_ context.Context, e Event) error

Publish fans the event out to every subscription on its channel. Non-blocking: if a subscriber's queue is full the event is dropped for that subscriber and the global Dropped counter is incremented.

func (*Memory) Subscribe

func (m *Memory) Subscribe(ctx context.Context, channel string, fn Handler) (Subscription, error)

Subscribe binds fn to receive events on channel.

type MemoryOption

type MemoryOption func(*Memory)

MemoryOption customises a Memory broadcaster.

func WithBuffer

func WithBuffer(n int) MemoryOption

WithBuffer sets the per-subscription channel buffer.

func WithLogger

func WithLogger(fn func(string, ...any)) MemoryOption

WithLogger installs a structured logger callback (called when a handler returns an error or an event is dropped).

type Subscription

type Subscription interface {
	Cancel()
}

Subscription represents one active subscription. Cancel removes the handler and releases its queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL