eventbus

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: MPL-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package eventbus is a small in-process fan-out broadcaster used by the management HTTP server's SSE endpoint to push topology updates to every connected operator.

Design points:

  • **Drop-on-full publish.** Each subscriber owns a small buffered channel. If a subscriber is slow and its buffer fills, Publish DROPS the event for that subscriber rather than blocking — the SWIM heartbeat loop publishes events and must never wait on an unresponsive consumer. Subscribers can detect drops via the Dropped() counter and reconnect to get a fresh snapshot.

  • **Context-driven unsubscribe.** Subscribe takes a context; when the context cancels, the bus reaps the subscription and closes the channel. Callers MUST drain the channel after ctx cancellation to avoid leaking goroutines blocked on send (the bus uses non-blocking send, but a half-buffered channel still has buffered values).

  • **No external deps.** Pure stdlib (sync, sync/atomic). Testable in isolation.

Out of scope: persistence, replay, fan-in, cross-process distribution. This is the simplest thing that works for the monitor's SSE consumer.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is a fan-out broadcaster. The zero value is NOT usable — always construct via New().

func New

func New() *Bus

New constructs an empty Bus.

func (*Bus) Dropped

func (b *Bus) Dropped() uint64

Dropped returns the cumulative count of events dropped because a subscriber's buffer was full. Used for telemetry / sanity checks in tests; not part of the SSE wire format.

func (*Bus) Publish

func (b *Bus) Publish(evt Event) uint64

Publish broadcasts evt to every current subscriber. The publish is fire-and-forget: if a subscriber's buffer is full, the event is dropped for that subscriber and the global drop counter increments. Returns the version assigned to the event (useful for tests pinning ordering).

Publish does NOT block on slow subscribers — by design the SWIM heartbeat goroutine drives publishes and must keep ticking at its configured interval regardless of consumer health.

func (*Bus) Subscribe

func (b *Bus) Subscribe(ctx context.Context) (<-chan Event, func())

Subscribe registers a new subscriber. Returns a receive-only channel of events and an Unsubscribe function. The subscription is also reaped automatically when ctx is canceled (typical shape: caller derives ctx from the request context, the bus cleans up when the request ends).

Callers SHOULD call the returned unsubscribe function in a defer; ctx-driven reaping is the safety net for when the caller's defer runs late or panics. Calling unsubscribe twice is safe (idempotent).

func (*Bus) SubscriberCount

func (b *Bus) SubscriberCount() int

SubscriberCount returns the number of currently registered subscribers. Useful for tests asserting clean unsubscribe behavior on context cancellation.

type Event

type Event struct {
	Type      string
	Payload   any
	Version   uint64
	Timestamp time.Time
}

Event is the payload carried over the bus. Type names the event shape ("members" or "heartbeat" today; new types are additive), Payload is the wire JSON the SSE handler will marshal, and Version is the bus-monotonic ordering so subscribers can detect gaps caused by drops.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL