notify

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package notify provides per-user real-time notification streams.

Notifications are delivered in real-time to connected users via Stream and persisted in a Store for backfill on reconnect. The Notifier coordinates between presence tracking, notification storage, and optional cross-instance routing.

Architecture

The notification system uses the event bus with AsWorker (worker model), meaning each mailbox event is processed by exactly one instance:

Mailbox Event (MessageReceived, etc.)
  → Event Bus (AsWorker — one worker per event)
    → Notifier.Push() checks presence
      → Store.Save() (for backfill on reconnect)
      → Local stream delivery (if user connected to this instance)
      → Router.Route() (if user connected to another instance)

Basic Usage

Configure the service with a notifier:

notifier := notify.NewNotifier(
    notify.WithStore(notifyStore),
    notify.WithPresence(tracker),
)

svc, _ := mailbox.New(mailbox.Config{},
    mailbox.WithStore(store),
    mailbox.WithNotifier(notifier),
)

Open a notification stream (e.g., in an SSE handler):

// Register presence
reg, _ := tracker.Register(ctx, userID, presence.WithRouting(...))
defer reg.Unregister(ctx)

// Open stream with backfill from last seen event
stream, _ := svc.Notifications(ctx, userID, lastEventID)
defer stream.Close()

for {
    evt, err := stream.Next(r.Context())
    if err != nil {
        return
    }
    fmt.Fprintf(w, "id: %s\ndata: %s\n\n", evt.ID, evt.Payload)
    flusher.Flush()
}

Cross-Instance Delivery

Without a Router, remote instances discover events via store polling (configurable interval, default 2s). With a Router and presence routing info, events are forwarded directly to the instance holding the connection:

notifier := notify.NewNotifier(
    notify.WithStore(store),
    notify.WithPresence(tracker),
    notify.WithRouter(myRouter),
    notify.WithInstanceID("web-server-3"),
)

Implementations

Index

Constants

View Source
const (
	DefaultPollInterval = 2 * time.Second
	DefaultBufferSize   = 64
)

Default configuration values.

Variables

View Source
var (
	// ErrStreamClosed is returned when reading from a closed stream.
	ErrStreamClosed = errors.New("notify: stream closed")

	// ErrNotifierClosed is returned when the notifier has been closed.
	ErrNotifierClosed = errors.New("notify: notifier closed")

	// ErrStoreClosed is returned when the notification store has been closed.
	ErrStoreClosed = errors.New("notify: store closed")
)

Sentinel errors for the notify package.

Functions

This section is empty.

Types

type BatchSaver added in v0.6.5

type BatchSaver interface {
	// SaveBatch persists multiple events atomically or in a pipeline.
	// Each event may target a different user. The implementation assigns event IDs.
	SaveBatch(ctx context.Context, events []*Event) error
}

BatchSaver is an optional interface that Store implementations can implement to support saving multiple events in a single round-trip. When the notifier's store implements BatchSaver, PushMulti uses it for efficient multi-recipient delivery (e.g., Redis pipeline).

type DroppedCounter added in v0.6.5

type DroppedCounter interface {
	// Dropped returns the number of events dropped since the stream was opened.
	Dropped() int64
}

DroppedCounter is an optional interface that Stream implementations can provide to expose the number of events dropped due to slow consumption. SSE handlers can check this to detect and disconnect slow clients.

type Event

type Event struct {
	// ID is the store-assigned event ID, used for Last-Event-ID resume.
	ID string `json:"id"`

	// Type identifies the event kind (e.g., "mailbox.message.received").
	Type string `json:"type"`

	// UserID is the target user for this notification.
	UserID string `json:"user_id"`

	// Payload is the JSON-encoded event-specific data.
	Payload []byte `json:"payload"`

	// Timestamp is when the event occurred.
	Timestamp time.Time `json:"timestamp"`
}

Event is a notification delivered to a user's stream.

type Notifier

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier manages per-user notification delivery and persistence. It coordinates between the event bus, presence tracking, notification store, routing, and local SSE streams.

The typical flow:

  1. Service event handler calls Push (runs on the AsWorker instance).
  2. Push checks presence — if user is offline, the event is dropped.
  3. Push saves to Store (for backfill on reconnect).
  4. Push delivers to local streams (if user is connected to this instance).
  5. If user is on another instance and a Router is configured, route there.
  6. Otherwise, the remote instance discovers events via store polling.

func NewNotifier

func NewNotifier(opts ...Option) *Notifier

NewNotifier creates a new Notifier with the given options.

func (*Notifier) Close

func (n *Notifier) Close(_ context.Context) error

Close shuts down the notifier and all active streams. It cancels all stream contexts and waits for every pollLoop goroutine to exit.

func (*Notifier) Deliver

func (n *Notifier) Deliver(userID string, evt Event)

Deliver pushes an event directly to a local stream, bypassing presence checks and store persistence. This is used by Router implementations on the receiving side of cross-instance delivery.

func (*Notifier) Push

func (n *Notifier) Push(ctx context.Context, userID string, evt Event) error

Push sends a notification to a user.

When presence tracking is configured and the user is offline, Push returns immediately without saving or routing — the event is dropped. When presence is not configured, or when the user is online, the event is persisted to the store (for backfill on reconnect) and then delivered either to a local stream (user connected to this instance) or forwarded via the Router (user connected to another instance).

func (*Notifier) PushMulti added in v0.6.5

func (n *Notifier) PushMulti(ctx context.Context, userIDs []string, evt Event) (failed int, err error)

PushMulti sends a notification to multiple users efficiently. When the store implements BatchSaver, all events are saved in a single pipeline. Otherwise, falls back to individual Push calls.

func (*Notifier) Subscribe

func (n *Notifier) Subscribe(ctx context.Context, userID string, lastEventID string) (Stream, error)

Subscribe opens a notification stream for the user. If lastEventID is non-empty, the stream replays events after that ID from the store before switching to live delivery. The returned Stream must be closed by the caller.

Subscribe does NOT register presence — the caller should manage presence registration separately (e.g., at the SSE handler level) since presence is an independent module.

type Option

type Option func(*options)

Option configures a Notifier.

func WithBufferSize

func WithBufferSize(n int) Option

WithBufferSize sets the channel buffer size for local event delivery. Default is 64.

func WithInstanceID

func WithInstanceID(id string) Option

WithInstanceID sets this instance's identifier. Used to compare with presence routing info to determine whether a user is connected locally or on a remote instance.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets a custom logger.

func WithMeterProvider added in v0.7.5

func WithMeterProvider(mp metric.MeterProvider) Option

WithMeterProvider sets the OpenTelemetry meter provider for the notifier. When provided, the notifier records mailbox.notify.streams.active metrics.

func WithPollInterval

func WithPollInterval(d time.Duration) Option

WithPollInterval sets how often streams poll the store for events that were written by other instances. Default is 2 seconds.

func WithPresence

func WithPresence(t presence.Tracker) Option

WithPresence sets the presence tracker. When provided, Push skips offline users entirely (no store write, no delivery).

func WithRouter

func WithRouter(r Router) Option

WithRouter sets the cross-instance event router. When both a Router and Presence (with routing info) are configured, the notifier routes events directly to the instance holding the user's connection instead of relying solely on store polling.

func WithStore

func WithStore(s Store) Option

WithStore sets the notification store for persistence and backfill.

type Router

type Router interface {
	// Route delivers an event to the instance identified by the routing info.
	// Returning an error causes the notifier to fall back to store persistence
	// (the remote instance will pick it up via polling).
	Route(ctx context.Context, info RoutingInfo, evt Event) error
}

Router delivers notification events to remote instances. When presence tracking includes routing information, the notifier uses a Router to forward events to the instance holding the user's connection, avoiding store polling latency.

Implementations might use HTTP, gRPC, Redis Pub/Sub, or any other inter-instance communication mechanism.

type RoutingInfo

type RoutingInfo struct {
	// InstanceID identifies the target server instance.
	InstanceID string `json:"instance_id,omitempty"`
	// Metadata holds arbitrary routing data (e.g., address, port).
	Metadata map[string]string `json:"metadata,omitempty"`
}

RoutingInfo describes where to deliver a notification. Mirrors presence.RoutingInfo but decoupled to avoid a dependency from Router implementations back to the presence package.

type Store

type Store interface {
	// Save persists a notification event. The implementation assigns the event ID.
	Save(ctx context.Context, evt *Event) error

	// List returns notifications for a user after the given event ID, ordered
	// by ID ascending. Pass "" for afterID to list from the beginning.
	// Limit caps the number of returned events (0 for implementation default).
	List(ctx context.Context, userID string, afterID string, limit int) ([]Event, error)

	// Cleanup removes notifications older than the given time.
	Cleanup(ctx context.Context, olderThan time.Time) error

	// Close releases resources held by the store.
	Close(ctx context.Context) error
}

Store persists notifications per user for backfill on reconnect. Implementations must be safe for concurrent use.

type Stream

type Stream interface {
	// Next blocks until the next event is available or ctx is cancelled.
	// Returns ErrStreamClosed if the stream has been closed.
	Next(ctx context.Context) (Event, error)

	// Close releases resources associated with this stream.
	// After Close, Next returns ErrStreamClosed.
	Close() error
}

Stream is a per-user notification stream. Callers read events by calling Next in a loop until the context is cancelled or the stream is closed.

type StreamStore added in v0.6.2

type StreamStore interface {
	Store

	// Subscribe returns a Stream backed by native streaming (e.g., XREAD BLOCK).
	// If lastEventID is non-empty, missed events are replayed before live delivery.
	Subscribe(ctx context.Context, userID string, lastEventID string) (Stream, error)
}

StreamStore is an optional interface that Store implementations can implement when they support native event streaming. When the notifier's store implements StreamStore, Subscribe delegates to the store's native streaming instead of using channel-based delivery with polling.

Redis Streams is the canonical example: XADD persists events and XREAD BLOCK delivers them, collapsing store and stream into a single data structure — no channels, no poll loops, no Router needed.

Directories

Path Synopsis
Package memory provides an in-memory notify.Store for testing.
Package memory provides an in-memory notify.Store for testing.
Package redis provides a Redis Streams-backed notify.Store and notify.StreamStore.
Package redis provides a Redis Streams-backed notify.Store and notify.StreamStore.
Package webhook provides a notify.Router that delivers notification events to configured HTTP endpoints via signed JSON POST requests.
Package webhook provides a notify.Router that delivers notification events to configured HTTP endpoints via signed JSON POST requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL