subscription

package
v0.0.0-...-ffd6cda Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package subscription provides guaranteed event delivery for event-sourced systems.

Delivery Guarantee

The core invariant: if an event is written to the store, every registered consumer WILL process it. This is achieved through:

  • Durable checkpoints track each consumer's position
  • Catch-up on startup replays from checkpoint to head
  • Periodic polling ensures no event is missed (belt and suspenders)
  • Optional notifiers provide low-latency delivery

The Sequence Gap Problem

When events are stored with auto-increment global sequences, concurrent writers can cause gaps: transaction A gets sequence 10, transaction B gets sequence 11, B commits first. A consumer checkpointing at 11 could miss 10.

Four approaches, in order of recommendation:

## 1. Single Writer (Recommended)

Use the CommandBus for single-writer-per-stream. One goroutine per stream means global sequences are always monotonic with no holes. No gaps, no complexity, fastest possible throughput.

This is the right answer for most applications. The other approaches exist as escape hatches for systems that can't use single writer.

## 2. Published Sequence Table

After committing events, update a separate "published_sequence" table with the highest committed sequence. Consumers read from this table instead of trusting the events table directly.

Trade-offs:

  • Extra write per commit (small overhead)
  • Reads are trivial — no gap detection needed
  • Works with any relational database
  • Slight delivery delay (published after commit)

## 3. Postgres Transaction Visibility (pg_snapshot)

Postgres exposes transaction visibility via system columns:

SELECT * FROM events
WHERE global_seq > $checkpoint
  AND xmin::text::bigint < pg_snapshot_xmin(pg_current_snapshot())::text::bigint
ORDER BY global_seq;

This reads only guaranteed-committed rows. Mathematically correct — no heuristics, no timeouts.

Trade-offs:

  • Postgres-specific (not portable)
  • Unknown performance impact — the xmin/snapshot functions may be expensive under high write load. Benchmark for your workload.
  • Requires understanding Postgres MVCC internals

## 4. Gap Detection with Timeout (Generic Fallback)

The subscription detects sequence gaps and waits for them to fill:

  • Gap detected → wait GapTimeout (default 500ms)
  • Gap fills within timeout → process normally
  • Gap persists → assume rollback, skip it, log warning
  • Known gaps tracked to avoid re-waiting

Trade-offs:

  • Works with any store (generic)
  • Adds latency on gaps (up to GapTimeout)
  • Heuristic — could theoretically skip a very slow transaction (mitigated by generous timeout)
  • Simple to understand and debug

This is the built-in fallback used by EventSubscription when gap detection is enabled (GapTimeout > 0).

Notifiers

StoreNotifier is an optimization for low-latency delivery. Without it, subscriptions rely on periodic polling (default 500ms). With it, events are delivered within milliseconds.

Available notifiers:

  • ChannelNotifier: in-process, for testing and memory stores
  • (Future) PostgresNotifier: LISTEN/NOTIFY
  • (Future) SQLiteNotifier: data_version polling
  • (Future) NATSNotifier: subject pub/sub

Package subscription provides guaranteed event delivery with checkpoint-based catch-up and live notification support.

The core guarantee: if an event is written to the store, every registered consumer WILL process it. This works through:

  1. Checkpoint: each consumer tracks its last processed global sequence
  2. Catch-up: on start, replay from checkpoint to head
  3. Live: after caught up, use notifications + periodic polling
  4. Retry: failed events retry with backoff, then go to DLQ

Single-writer (via CommandBus) eliminates sequence gaps. For multi-writer scenarios, gap detection handles out-of-order commits. See GapDetector.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Add

func Add[E any](m *SubscriptionManager, sub *EventSubscription[E]) error

Add registers and starts a subscription. Returns error if ID is duplicate.

Types

type BatchHandler

type BatchHandler[E any] func(ctx context.Context, events []GlobalEvent[E]) error

BatchHandler processes a batch of events. Checkpoint advances only after the entire batch succeeds.

type ChannelNotifier

type ChannelNotifier struct {
	// contains filtered or unexported fields
}

ChannelNotifier is an in-process StoreNotifier backed by a Go channel. Use for in-memory stores and testing. Call Signal() after appending events.

func NewChannelNotifier

func NewChannelNotifier() *ChannelNotifier

NewChannelNotifier creates a new in-process notifier.

func (*ChannelNotifier) Close

func (n *ChannelNotifier) Close() error

Close shuts down the notifier and closes all listener channels.

func (*ChannelNotifier) Notify

func (n *ChannelNotifier) Notify(ctx context.Context) <-chan uint64

Notify returns a channel that receives signals when new events are appended. The channel is closed when the notifier is closed or ctx is cancelled.

func (*ChannelNotifier) Signal

func (n *ChannelNotifier) Signal(sequence uint64)

Signal notifies all listeners that new events are available at the given sequence. Non-blocking: if a listener's buffer is full, the signal is dropped (polling catches it).

type Checkpoint

type Checkpoint interface {
	// Load returns the last processed sequence for a consumer. Returns 0 if new.
	Load(ctx context.Context, consumerID string) (uint64, error)

	// Save persists the consumer's position. Must be atomic.
	Save(ctx context.Context, consumerID string, sequence uint64) error
}

Checkpoint tracks a consumer's last processed global sequence. Implementations must be durable for production use (SQLite, Postgres, etc).

type Config

type Config[E any] struct {
	// ConsumerID uniquely identifies this consumer. Required.
	ConsumerID string

	// Reader provides access to the global event stream. Required.
	Reader GlobalReader[E]

	// Checkpoint tracks consumed position. Required.
	Checkpoint Checkpoint

	// Handler processes individual events. Either Handler or BatchHandler required.
	Handler Handler[E]

	// BatchHandler processes events in batches. Mutually exclusive with Handler.
	BatchHandler BatchHandler[E]

	// Notifier provides low-latency event notifications. Optional.
	// Without it, the subscription relies on polling only.
	Notifier StoreNotifier

	// PollInterval is the fallback polling interval. Default: 500ms.
	// Belt and suspenders — ensures delivery even if notifier drops signals.
	PollInterval time.Duration

	// BatchSize is the max events to read per poll. Default: 100.
	BatchSize int

	// MaxRetries before sending to DLQ. Default: 5.
	MaxRetries int

	// RetryBaseDelay is the base delay for exponential backoff. Default: 100ms.
	RetryBaseDelay time.Duration

	// RetryMaxDelay caps the backoff. Default: 10s.
	RetryMaxDelay time.Duration

	// OnDLQ is called when an event exhausts retries. Optional.
	// If nil, the error is logged and the event is skipped.
	OnDLQ func(ctx context.Context, event GlobalEvent[E], err error)

	// Logger for operational logging. Optional.
	Logger *slog.Logger

	// GapTimeout is how long to wait for a sequence gap to fill before
	// assuming it was a rollback. Default: 500ms.
	// Set to 0 to disable gap detection (single-writer mode).
	GapTimeout time.Duration

	// MaxGapWait is the total time willing to wait for gaps before moving on.
	// Default: 5s.
	MaxGapWait time.Duration

	// EventTypes filters events by their EventType field. Only events whose
	// EventType is in this list will be delivered to the handler.
	// If empty or nil, all events are delivered (no filtering).
	// Checkpoint still advances past filtered-out events to ensure progress.
	EventTypes []string
}

Config configures an EventSubscription.

func FromStateView

func FromStateView[E any](view eskit.StateView[E], reader GlobalReader[E], checkpoint Checkpoint, opts ...Option[E]) Config[E]

FromStateView creates a subscription Config from a StateView. This is the standard way to wire a projection into the event stream.

The StateView's Evolve function receives eskit.Event[E], which is adapted from the subscription's GlobalEvent[E]. If the StateView has a Setup function, it is called before the subscription starts processing events.

type EventSubscription

type EventSubscription[E any] struct {
	// contains filtered or unexported fields
}

EventSubscription is a durable event consumer with guaranteed delivery. It catches up from its checkpoint on start, then switches to live mode using notifier signals (if available) and periodic polling.

func New

func New[E any](cfg Config[E]) (*EventSubscription[E], error)

New creates a new EventSubscription. Call Start to begin processing.

func (*EventSubscription[E]) Lag

func (s *EventSubscription[E]) Lag(ctx context.Context) (uint64, error)

Lag returns how many events behind the consumer is (approximately).

func (*EventSubscription[E]) Start

func (s *EventSubscription[E]) Start(ctx context.Context) error

Start begins processing events. Blocks until ctx is cancelled or Stop is called. Typically run in a goroutine.

func (*EventSubscription[E]) Stop

func (s *EventSubscription[E]) Stop()

Stop signals the subscription to shut down gracefully.

func (*EventSubscription[E]) Wait

func (s *EventSubscription[E]) Wait()

Wait blocks until the subscription has fully stopped.

type GlobalEvent

type GlobalEvent[E any] struct {
	// GlobalSequence is the store-wide monotonic position. Never reused.
	GlobalSequence uint64

	// StreamID identifies which stream this event belongs to.
	StreamID string

	// EventType is the string name of the event (e.g., "OrderCreated").
	// Available when the store has an EventRegistry configured.
	EventType string

	// Version is the per-stream version number.
	Version int

	// Data is the domain event payload.
	Data E

	// Timestamp is when the event was recorded.
	Timestamp time.Time
}

GlobalEvent wraps a domain event with its global sequence number. The global sequence is a monotonically increasing number assigned at append time, used by consumers to track position and detect gaps.

type GlobalReader

type GlobalReader[E any] interface {
	// ReadFrom returns events starting from the given global sequence (inclusive),
	// up to limit events. Returns events in global sequence order.
	ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)

	// LatestSequence returns the highest global sequence in the store, or 0 if empty.
	LatestSequence(ctx context.Context) (uint64, error)
}

GlobalReader reads events by global sequence. Event stores must implement this to support subscriptions.

type Handler

type Handler[E any] func(ctx context.Context, event GlobalEvent[E]) error

Handler processes events. Return nil to advance the checkpoint. Return an error to trigger retry logic.

type MemoryCheckpoint

type MemoryCheckpoint struct {
	// contains filtered or unexported fields
}

MemoryCheckpoint is an in-memory checkpoint for testing.

func NewMemoryCheckpoint

func NewMemoryCheckpoint() *MemoryCheckpoint

func (*MemoryCheckpoint) Load

func (c *MemoryCheckpoint) Load(_ context.Context, id string) (uint64, error)

func (*MemoryCheckpoint) Save

func (c *MemoryCheckpoint) Save(_ context.Context, id string, seq uint64) error

type MemoryGlobalReader

type MemoryGlobalReader[E any] struct {
	// contains filtered or unexported fields
}

MemoryGlobalReader is an in-memory GlobalReader for testing. Events are appended manually and assigned monotonic global sequences.

func NewMemoryGlobalReader

func NewMemoryGlobalReader[E any]() *MemoryGlobalReader[E]

NewMemoryGlobalReader creates a new in-memory global reader.

func (*MemoryGlobalReader[E]) Append

func (r *MemoryGlobalReader[E]) Append(streamID string, data ...E) []uint64

Append adds events and assigns global sequences. Returns the assigned sequences.

func (*MemoryGlobalReader[E]) AppendTyped

func (r *MemoryGlobalReader[E]) AppendTyped(streamID string, events ...struct {
	Data      E
	EventType string
}) []uint64

AppendTyped adds events with explicit event types and assigns global sequences.

func (*MemoryGlobalReader[E]) AppendWithSequence

func (r *MemoryGlobalReader[E]) AppendWithSequence(seq uint64, streamID string, data E)

AppendWithSequence adds an event with a specific sequence number. Used for testing gap scenarios.

func (*MemoryGlobalReader[E]) LatestSequence

func (r *MemoryGlobalReader[E]) LatestSequence(_ context.Context) (uint64, error)

func (*MemoryGlobalReader[E]) ReadFrom

func (r *MemoryGlobalReader[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)

type Option

type Option[E any] func(*Config[E])

Option configures a subscription created from a StateView.

func WithNotifier

func WithNotifier[E any](n StoreNotifier) Option[E]

WithNotifier sets the store notifier for low-latency event delivery.

type StoreAdapter

type StoreAdapter[E any] struct {
	// contains filtered or unexported fields
}

StoreAdapter adapts any StoreReader (eskit.Event-based) into a subscription.GlobalReader (GlobalEvent-based). This bridges the type gap between the event store layer and the subscription system.

func NewStoreAdapter

func NewStoreAdapter[E any](store StoreReader[E]) *StoreAdapter[E]

NewStoreAdapter wraps a store's global reader into a subscription.GlobalReader.

func (*StoreAdapter[E]) LatestSequence

func (a *StoreAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence delegates to the underlying store.

func (*StoreAdapter[E]) ReadFrom

func (a *StoreAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)

ReadFrom reads events from the store and converts them to GlobalEvents.

type StoreNotifier

type StoreNotifier interface {
	Notify(ctx context.Context) <-chan uint64
	Close() error
}

StoreNotifier signals when new events are available in the store. This is an optimization for low latency — subscriptions work without it via periodic polling. The value on the channel is the latest global sequence (or 0 if unknown).

type StoreReader

type StoreReader[E any] interface {
	ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)
	LatestSequence(ctx context.Context) (uint64, error)
}

StoreReader is the interface that event stores implement for global reads. Both MemoryStore and sqlitestore.Store satisfy this via their ReadFrom/LatestSequence methods.

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager manages multiple subscriptions with lifecycle control.

func NewManager

func NewManager() *SubscriptionManager

NewManager creates a new subscription manager.

func (*SubscriptionManager) ConsumerIDs

func (m *SubscriptionManager) ConsumerIDs() []string

ConsumerIDs returns the IDs of all active subscriptions.

func (*SubscriptionManager) Remove

func (m *SubscriptionManager) Remove(consumerID string) error

Remove stops and removes a subscription.

func (*SubscriptionManager) StopAll

func (m *SubscriptionManager) StopAll()

StopAll stops all subscriptions and waits for them to finish.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL