Documentation
¶
Overview ¶
Package notify provides per-user real-time notification streams.
Notifications are delivered in real-time to connected users via Stream and persisted in a Store for backfill on reconnect. The Notifier coordinates between presence tracking, notification storage, and optional cross-instance routing.
Architecture ¶
The notification system uses the event bus with AsWorker (worker model), meaning each mailbox event is processed by exactly one instance:
Mailbox Event (MessageReceived, etc.)
→ Event Bus (AsWorker — one worker per event)
→ Notifier.Push() checks presence
→ Store.Save() (for backfill on reconnect)
→ Local stream delivery (if user connected to this instance)
→ Router.Route() (if user connected to another instance)
Basic Usage ¶
Configure the service with a notifier:
notifier := notify.NewNotifier(
notify.WithStore(notifyStore),
notify.WithPresence(tracker),
)
svc, _ := mailbox.New(mailbox.Config{},
mailbox.WithStore(store),
mailbox.WithNotifier(notifier),
)
Open a notification stream (e.g., in an SSE handler):
// Register presence
reg, _ := tracker.Register(ctx, userID, presence.WithRouting(...))
defer reg.Unregister(ctx)
// Open stream with backfill from last seen event
stream, _ := svc.Notifications(ctx, userID, lastEventID)
defer stream.Close()
for {
evt, err := stream.Next(r.Context())
if err != nil {
return
}
fmt.Fprintf(w, "id: %s\ndata: %s\n\n", evt.ID, evt.Payload)
flusher.Flush()
}
Cross-Instance Delivery ¶
Without a Router, remote instances discover events via store polling (configurable interval, default 2s). With a Router and presence routing info, events are forwarded directly to the instance holding the connection:
notifier := notify.NewNotifier(
notify.WithStore(store),
notify.WithPresence(tracker),
notify.WithRouter(myRouter),
notify.WithInstanceID("web-server-3"),
)
Implementations ¶
- github.com/rbaliyan/mailbox/notify/memory: In-memory notification store for testing.
Index ¶
- Constants
- Variables
- type BatchSaver
- type DroppedCounter
- type Event
- type Notifier
- func (n *Notifier) Close(_ context.Context) error
- func (n *Notifier) Deliver(userID string, evt Event)
- func (n *Notifier) Push(ctx context.Context, userID string, evt Event) error
- func (n *Notifier) PushMulti(ctx context.Context, userIDs []string, evt Event) (failed int, err error)
- func (n *Notifier) Subscribe(ctx context.Context, userID string, lastEventID string) (Stream, error)
- type Option
- func WithBufferSize(n int) Option
- func WithInstanceID(id string) Option
- func WithLogger(l *slog.Logger) Option
- func WithMeterProvider(mp metric.MeterProvider) Option
- func WithPollInterval(d time.Duration) Option
- func WithPresence(t presence.Tracker) Option
- func WithRouter(r Router) Option
- func WithStore(s Store) Option
- type Router
- type RoutingInfo
- type Store
- type Stream
- type StreamStore
Constants ¶
const ( DefaultPollInterval = 2 * time.Second DefaultBufferSize = 64 )
Default configuration values.
Variables ¶
var ( // ErrStreamClosed is returned when reading from a closed stream. ErrStreamClosed = errors.New("notify: stream closed") // ErrNotifierClosed is returned when the notifier has been closed. ErrNotifierClosed = errors.New("notify: notifier closed") // ErrStoreClosed is returned when the notification store has been closed. ErrStoreClosed = errors.New("notify: store closed") )
Sentinel errors for the notify package.
Functions ¶
This section is empty.
Types ¶
type BatchSaver ¶ added in v0.6.5
type BatchSaver interface {
// SaveBatch persists multiple events atomically or in a pipeline.
// Each event may target a different user. The implementation assigns event IDs.
SaveBatch(ctx context.Context, events []*Event) error
}
BatchSaver is an optional interface that Store implementations can implement to support saving multiple events in a single round-trip. When the notifier's store implements BatchSaver, PushMulti uses it for efficient multi-recipient delivery (e.g., Redis pipeline).
type DroppedCounter ¶ added in v0.6.5
type DroppedCounter interface {
// Dropped returns the number of events dropped since the stream was opened.
Dropped() int64
}
DroppedCounter is an optional interface that Stream implementations can provide to expose the number of events dropped due to slow consumption. SSE handlers can check this to detect and disconnect slow clients.
type Event ¶
type Event struct {
// ID is the store-assigned event ID, used for Last-Event-ID resume.
ID string `json:"id"`
// Type identifies the event kind (e.g., "mailbox.message.received").
Type string `json:"type"`
// UserID is the target user for this notification.
UserID string `json:"user_id"`
// Payload is the JSON-encoded event-specific data.
Payload []byte `json:"payload"`
// Timestamp is when the event occurred.
Timestamp time.Time `json:"timestamp"`
}
Event is a notification delivered to a user's stream.
type Notifier ¶
type Notifier struct {
// contains filtered or unexported fields
}
Notifier manages per-user notification delivery and persistence. It coordinates between the event bus, presence tracking, notification store, routing, and local SSE streams.
The typical flow:
- Service event handler calls Push (runs on the AsWorker instance).
- Push checks presence — if user is offline, the event is dropped.
- Push saves to Store (for backfill on reconnect).
- Push delivers to local streams (if user is connected to this instance).
- If user is on another instance and a Router is configured, route there.
- Otherwise, the remote instance discovers events via store polling.
func NewNotifier ¶
NewNotifier creates a new Notifier with the given options.
func (*Notifier) Close ¶
Close shuts down the notifier and all active streams. It cancels all stream contexts and waits for every pollLoop goroutine to exit.
func (*Notifier) Deliver ¶
Deliver pushes an event directly to a local stream, bypassing presence checks and store persistence. This is used by Router implementations on the receiving side of cross-instance delivery.
func (*Notifier) Push ¶
Push sends a notification to a user.
When presence tracking is configured and the user is offline, Push returns immediately without saving or routing — the event is dropped. When presence is not configured, or when the user is online, the event is persisted to the store (for backfill on reconnect) and then delivered either to a local stream (user connected to this instance) or forwarded via the Router (user connected to another instance).
func (*Notifier) PushMulti ¶ added in v0.6.5
func (n *Notifier) PushMulti(ctx context.Context, userIDs []string, evt Event) (failed int, err error)
PushMulti sends a notification to multiple users efficiently. When the store implements BatchSaver, all events are saved in a single pipeline. Otherwise, falls back to individual Push calls.
func (*Notifier) Subscribe ¶
func (n *Notifier) Subscribe(ctx context.Context, userID string, lastEventID string) (Stream, error)
Subscribe opens a notification stream for the user. If lastEventID is non-empty, the stream replays events after that ID from the store before switching to live delivery. The returned Stream must be closed by the caller.
Subscribe does NOT register presence — the caller should manage presence registration separately (e.g., at the SSE handler level) since presence is an independent module.
type Option ¶
type Option func(*options)
Option configures a Notifier.
func WithBufferSize ¶
WithBufferSize sets the channel buffer size for local event delivery. Default is 64.
func WithInstanceID ¶
WithInstanceID sets this instance's identifier. Used to compare with presence routing info to determine whether a user is connected locally or on a remote instance.
func WithMeterProvider ¶ added in v0.7.5
func WithMeterProvider(mp metric.MeterProvider) Option
WithMeterProvider sets the OpenTelemetry meter provider for the notifier. When provided, the notifier records mailbox.notify.streams.active metrics.
func WithPollInterval ¶
WithPollInterval sets how often streams poll the store for events that were written by other instances. Default is 2 seconds.
func WithPresence ¶
WithPresence sets the presence tracker. When provided, Push skips offline users entirely (no store write, no delivery).
func WithRouter ¶
WithRouter sets the cross-instance event router. When both a Router and Presence (with routing info) are configured, the notifier routes events directly to the instance holding the user's connection instead of relying solely on store polling.
type Router ¶
type Router interface {
// Route delivers an event to the instance identified by the routing info.
// Returning an error causes the notifier to fall back to store persistence
// (the remote instance will pick it up via polling).
Route(ctx context.Context, info RoutingInfo, evt Event) error
}
Router delivers notification events to remote instances. When presence tracking includes routing information, the notifier uses a Router to forward events to the instance holding the user's connection, avoiding store polling latency.
Implementations might use HTTP, gRPC, Redis Pub/Sub, or any other inter-instance communication mechanism.
type RoutingInfo ¶
type RoutingInfo struct {
// InstanceID identifies the target server instance.
InstanceID string `json:"instance_id,omitempty"`
// Metadata holds arbitrary routing data (e.g., address, port).
Metadata map[string]string `json:"metadata,omitempty"`
}
RoutingInfo describes where to deliver a notification. Mirrors presence.RoutingInfo but decoupled to avoid a dependency from Router implementations back to the presence package.
type Store ¶
type Store interface {
// Save persists a notification event. The implementation assigns the event ID.
Save(ctx context.Context, evt *Event) error
// List returns notifications for a user after the given event ID, ordered
// by ID ascending. Pass "" for afterID to list from the beginning.
// Limit caps the number of returned events (0 for implementation default).
List(ctx context.Context, userID string, afterID string, limit int) ([]Event, error)
// Cleanup removes notifications older than the given time.
Cleanup(ctx context.Context, olderThan time.Time) error
// Close releases resources held by the store.
Close(ctx context.Context) error
}
Store persists notifications per user for backfill on reconnect. Implementations must be safe for concurrent use.
type Stream ¶
type Stream interface {
// Next blocks until the next event is available or ctx is cancelled.
// Returns ErrStreamClosed if the stream has been closed.
Next(ctx context.Context) (Event, error)
// Close releases resources associated with this stream.
// After Close, Next returns ErrStreamClosed.
Close() error
}
Stream is a per-user notification stream. Callers read events by calling Next in a loop until the context is cancelled or the stream is closed.
type StreamStore ¶ added in v0.6.2
type StreamStore interface {
Store
// Subscribe returns a Stream backed by native streaming (e.g., XREAD BLOCK).
// If lastEventID is non-empty, missed events are replayed before live delivery.
Subscribe(ctx context.Context, userID string, lastEventID string) (Stream, error)
}
StreamStore is an optional interface that Store implementations can implement when they support native event streaming. When the notifier's store implements StreamStore, Subscribe delegates to the store's native streaming instead of using channel-based delivery with polling.
Redis Streams is the canonical example: XADD persists events and XREAD BLOCK delivers them, collapsing store and stream into a single data structure — no channels, no poll loops, no Router needed.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package memory provides an in-memory notify.Store for testing.
|
Package memory provides an in-memory notify.Store for testing. |
|
Package redis provides a Redis Streams-backed notify.Store and notify.StreamStore.
|
Package redis provides a Redis Streams-backed notify.Store and notify.StreamStore. |
|
Package webhook provides a notify.Router that delivers notification events to configured HTTP endpoints via signed JSON POST requests.
|
Package webhook provides a notify.Router that delivers notification events to configured HTTP endpoints via signed JSON POST requests. |