Documentation
¶
Overview ¶
Package subscription provides guaranteed event delivery for event-sourced systems.
Delivery Guarantee ¶
The core invariant: if an event is written to the store, every registered consumer WILL process it. This is achieved through:
- Durable checkpoints track each consumer's position
- Catch-up on startup replays from checkpoint to head
- Periodic polling ensures no event is missed (belt and suspenders)
- Optional notifiers provide low-latency delivery
The Sequence Gap Problem ¶
When events are stored with auto-increment global sequences, concurrent writers can cause gaps: transaction A gets sequence 10, transaction B gets sequence 11, B commits first. A consumer checkpointing at 11 could miss 10.
Four approaches, in order of recommendation:
## 1. Single Writer (Recommended)
Use the CommandBus for single-writer-per-stream. One goroutine per stream means global sequences are always monotonic with no holes. No gaps, no complexity, fastest possible throughput.
This is the right answer for most applications. The other approaches exist as escape hatches for systems that can't use single writer.
## 2. Published Sequence Table
After committing events, update a separate "published_sequence" table with the highest committed sequence. Consumers read from this table instead of trusting the events table directly.
Trade-offs:
- Extra write per commit (small overhead)
- Reads are trivial — no gap detection needed
- Works with any relational database
- Slight delivery delay (published after commit)
## 3. Postgres Transaction Visibility (pg_snapshot)
Postgres exposes transaction visibility via system columns:
SELECT * FROM events WHERE global_seq > $checkpoint AND xmin::text::bigint < pg_snapshot_xmin(pg_current_snapshot())::text::bigint ORDER BY global_seq;
This reads only guaranteed-committed rows. Mathematically correct — no heuristics, no timeouts.
Trade-offs:
- Postgres-specific (not portable)
- Unknown performance impact — the xmin/snapshot functions may be expensive under high write load. Benchmark for your workload.
- Requires understanding Postgres MVCC internals
## 4. Gap Detection with Timeout (Generic Fallback)
The subscription detects sequence gaps and waits for them to fill:
- Gap detected → wait GapTimeout (default 500ms)
- Gap fills within timeout → process normally
- Gap persists → assume rollback, skip it, log warning
- Known gaps tracked to avoid re-waiting
Trade-offs:
- Works with any store (generic)
- Adds latency on gaps (up to GapTimeout)
- Heuristic — could theoretically skip a very slow transaction (mitigated by generous timeout)
- Simple to understand and debug
This is the built-in fallback used by EventSubscription when gap detection is enabled (GapTimeout > 0).
Notifiers ¶
StoreNotifier is an optimization for low-latency delivery. Without it, subscriptions rely on periodic polling (default 500ms). With it, events are delivered within milliseconds.
Available notifiers:
- ChannelNotifier: in-process, for testing and memory stores
- (Future) PostgresNotifier: LISTEN/NOTIFY
- (Future) SQLiteNotifier: data_version polling
- (Future) NATSNotifier: subject pub/sub
Package subscription provides guaranteed event delivery with checkpoint-based catch-up and live notification support.
The core guarantee: if an event is written to the store, every registered consumer WILL process it. This works through:
- Checkpoint: each consumer tracks its last processed global sequence
- Catch-up: on start, replay from checkpoint to head
- Live: after caught up, use notifications + periodic polling
- Retry: failed events retry with backoff, then go to DLQ
Single-writer (via CommandBus) eliminates sequence gaps. For multi-writer scenarios, gap detection handles out-of-order commits. See GapDetector.
Index ¶
- func Add[E any](m *SubscriptionManager, sub *EventSubscription[E]) error
- type BatchHandler
- type ChannelNotifier
- type Checkpoint
- type Config
- type EventSubscription
- type GlobalEvent
- type GlobalReader
- type Handler
- type MemoryCheckpoint
- type MemoryGlobalReader
- func (r *MemoryGlobalReader[E]) Append(streamID string, data ...E) []uint64
- func (r *MemoryGlobalReader[E]) AppendTyped(streamID string, events ...) []uint64
- func (r *MemoryGlobalReader[E]) AppendWithSequence(seq uint64, streamID string, data E)
- func (r *MemoryGlobalReader[E]) LatestSequence(_ context.Context) (uint64, error)
- func (r *MemoryGlobalReader[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
- type Option
- type StoreAdapter
- type StoreNotifier
- type StoreReader
- type SubscriptionManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Add ¶
func Add[E any](m *SubscriptionManager, sub *EventSubscription[E]) error
Add registers and starts a subscription. Returns error if ID is duplicate.
Types ¶
type BatchHandler ¶
type BatchHandler[E any] func(ctx context.Context, events []GlobalEvent[E]) error
BatchHandler processes a batch of events. Checkpoint advances only after the entire batch succeeds.
type ChannelNotifier ¶
type ChannelNotifier struct {
// contains filtered or unexported fields
}
ChannelNotifier is an in-process StoreNotifier backed by a Go channel. Use for in-memory stores and testing. Call Signal() after appending events.
func NewChannelNotifier ¶
func NewChannelNotifier() *ChannelNotifier
NewChannelNotifier creates a new in-process notifier.
func (*ChannelNotifier) Close ¶
func (n *ChannelNotifier) Close() error
Close shuts down the notifier and closes all listener channels.
func (*ChannelNotifier) Notify ¶
func (n *ChannelNotifier) Notify(ctx context.Context) <-chan uint64
Notify returns a channel that receives signals when new events are appended. The channel is closed when the notifier is closed or ctx is cancelled.
func (*ChannelNotifier) Signal ¶
func (n *ChannelNotifier) Signal(sequence uint64)
Signal notifies all listeners that new events are available at the given sequence. Non-blocking: if a listener's buffer is full, the signal is dropped (polling catches it).
type Checkpoint ¶
type Checkpoint interface {
// Load returns the last processed sequence for a consumer. Returns 0 if new.
Load(ctx context.Context, consumerID string) (uint64, error)
// Save persists the consumer's position. Must be atomic.
Save(ctx context.Context, consumerID string, sequence uint64) error
}
Checkpoint tracks a consumer's last processed global sequence. Implementations must be durable for production use (SQLite, Postgres, etc).
type Config ¶
type Config[E any] struct { // ConsumerID uniquely identifies this consumer. Required. ConsumerID string // Reader provides access to the global event stream. Required. Reader GlobalReader[E] // Checkpoint tracks consumed position. Required. Checkpoint Checkpoint // Handler processes individual events. Either Handler or BatchHandler required. Handler Handler[E] // BatchHandler processes events in batches. Mutually exclusive with Handler. BatchHandler BatchHandler[E] // Notifier provides low-latency event notifications. Optional. // Without it, the subscription relies on polling only. Notifier StoreNotifier // PollInterval is the fallback polling interval. Default: 500ms. // Belt and suspenders — ensures delivery even if notifier drops signals. PollInterval time.Duration // BatchSize is the max events to read per poll. Default: 100. BatchSize int // MaxRetries before sending to DLQ. Default: 5. MaxRetries int // RetryBaseDelay is the base delay for exponential backoff. Default: 100ms. RetryBaseDelay time.Duration // RetryMaxDelay caps the backoff. Default: 10s. RetryMaxDelay time.Duration // OnDLQ is called when an event exhausts retries. Optional. // If nil, the error is logged and the event is skipped. OnDLQ func(ctx context.Context, event GlobalEvent[E], err error) // Logger for operational logging. Optional. Logger *slog.Logger // GapTimeout is how long to wait for a sequence gap to fill before // assuming it was a rollback. Default: 500ms. // Set to 0 to disable gap detection (single-writer mode). GapTimeout time.Duration // MaxGapWait is the total time willing to wait for gaps before moving on. // Default: 5s. MaxGapWait time.Duration // EventTypes filters events by their EventType field. Only events whose // EventType is in this list will be delivered to the handler. // If empty or nil, all events are delivered (no filtering). // Checkpoint still advances past filtered-out events to ensure progress. EventTypes []string }
Config configures an EventSubscription.
func FromStateView ¶
func FromStateView[E any](view eskit.StateView[E], reader GlobalReader[E], checkpoint Checkpoint, opts ...Option[E]) Config[E]
FromStateView creates a subscription Config from a StateView. This is the standard way to wire a projection into the event stream.
The StateView's Evolve function receives eskit.Event[E], which is adapted from the subscription's GlobalEvent[E]. If the StateView has a Setup function, it is called before the subscription starts processing events.
type EventSubscription ¶
type EventSubscription[E any] struct { // contains filtered or unexported fields }
EventSubscription is a durable event consumer with guaranteed delivery. It catches up from its checkpoint on start, then switches to live mode using notifier signals (if available) and periodic polling.
func New ¶
func New[E any](cfg Config[E]) (*EventSubscription[E], error)
New creates a new EventSubscription. Call Start to begin processing.
func (*EventSubscription[E]) Lag ¶
func (s *EventSubscription[E]) Lag(ctx context.Context) (uint64, error)
Lag returns how many events behind the consumer is (approximately).
func (*EventSubscription[E]) Start ¶
func (s *EventSubscription[E]) Start(ctx context.Context) error
Start begins processing events. Blocks until ctx is cancelled or Stop is called. Typically run in a goroutine.
func (*EventSubscription[E]) Stop ¶
func (s *EventSubscription[E]) Stop()
Stop signals the subscription to shut down gracefully.
func (*EventSubscription[E]) Wait ¶
func (s *EventSubscription[E]) Wait()
Wait blocks until the subscription has fully stopped.
type GlobalEvent ¶
type GlobalEvent[E any] struct { // GlobalSequence is the store-wide monotonic position. Never reused. GlobalSequence uint64 // StreamID identifies which stream this event belongs to. StreamID string // EventType is the string name of the event (e.g., "OrderCreated"). // Available when the store has an EventRegistry configured. EventType string // Version is the per-stream version number. Version int // Data is the domain event payload. Data E // Timestamp is when the event was recorded. Timestamp time.Time }
GlobalEvent wraps a domain event with its global sequence number. The global sequence is a monotonically increasing number assigned at append time, used by consumers to track position and detect gaps.
type GlobalReader ¶
type GlobalReader[E any] interface { // ReadFrom returns events starting from the given global sequence (inclusive), // up to limit events. Returns events in global sequence order. ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error) // LatestSequence returns the highest global sequence in the store, or 0 if empty. LatestSequence(ctx context.Context) (uint64, error) }
GlobalReader reads events by global sequence. Event stores must implement this to support subscriptions.
type Handler ¶
type Handler[E any] func(ctx context.Context, event GlobalEvent[E]) error
Handler processes events. Return nil to advance the checkpoint. Return an error to trigger retry logic.
type MemoryCheckpoint ¶
type MemoryCheckpoint struct {
// contains filtered or unexported fields
}
MemoryCheckpoint is an in-memory checkpoint for testing.
func NewMemoryCheckpoint ¶
func NewMemoryCheckpoint() *MemoryCheckpoint
type MemoryGlobalReader ¶
type MemoryGlobalReader[E any] struct { // contains filtered or unexported fields }
MemoryGlobalReader is an in-memory GlobalReader for testing. Events are appended manually and assigned monotonic global sequences.
func NewMemoryGlobalReader ¶
func NewMemoryGlobalReader[E any]() *MemoryGlobalReader[E]
NewMemoryGlobalReader creates a new in-memory global reader.
func (*MemoryGlobalReader[E]) Append ¶
func (r *MemoryGlobalReader[E]) Append(streamID string, data ...E) []uint64
Append adds events and assigns global sequences. Returns the assigned sequences.
func (*MemoryGlobalReader[E]) AppendTyped ¶
func (r *MemoryGlobalReader[E]) AppendTyped(streamID string, events ...struct { Data E EventType string }) []uint64
AppendTyped adds events with explicit event types and assigns global sequences.
func (*MemoryGlobalReader[E]) AppendWithSequence ¶
func (r *MemoryGlobalReader[E]) AppendWithSequence(seq uint64, streamID string, data E)
AppendWithSequence adds an event with a specific sequence number. Used for testing gap scenarios.
func (*MemoryGlobalReader[E]) LatestSequence ¶
func (r *MemoryGlobalReader[E]) LatestSequence(_ context.Context) (uint64, error)
func (*MemoryGlobalReader[E]) ReadFrom ¶
func (r *MemoryGlobalReader[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
type Option ¶
Option configures a subscription created from a StateView.
func WithNotifier ¶
func WithNotifier[E any](n StoreNotifier) Option[E]
WithNotifier sets the store notifier for low-latency event delivery.
type StoreAdapter ¶
type StoreAdapter[E any] struct { // contains filtered or unexported fields }
StoreAdapter adapts any StoreReader (eskit.Event-based) into a subscription.GlobalReader (GlobalEvent-based). This bridges the type gap between the event store layer and the subscription system.
func NewStoreAdapter ¶
func NewStoreAdapter[E any](store StoreReader[E]) *StoreAdapter[E]
NewStoreAdapter wraps a store's global reader into a subscription.GlobalReader.
func (*StoreAdapter[E]) LatestSequence ¶
func (a *StoreAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)
LatestSequence delegates to the underlying store.
func (*StoreAdapter[E]) ReadFrom ¶
func (a *StoreAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
ReadFrom reads events from the store and converts them to GlobalEvents.
type StoreNotifier ¶
StoreNotifier signals when new events are available in the store. This is an optimization for low latency — subscriptions work without it via periodic polling. The value on the channel is the latest global sequence (or 0 if unknown).
type StoreReader ¶
type StoreReader[E any] interface { ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error) LatestSequence(ctx context.Context) (uint64, error) }
StoreReader is the interface that event stores implement for global reads. Both MemoryStore and sqlitestore.Store satisfy this via their ReadFrom/LatestSequence methods.
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager manages multiple subscriptions with lifecycle control.
func NewManager ¶
func NewManager() *SubscriptionManager
NewManager creates a new subscription manager.
func (*SubscriptionManager) ConsumerIDs ¶
func (m *SubscriptionManager) ConsumerIDs() []string
ConsumerIDs returns the IDs of all active subscriptions.
func (*SubscriptionManager) Remove ¶
func (m *SubscriptionManager) Remove(consumerID string) error
Remove stops and removes a subscription.
func (*SubscriptionManager) StopAll ¶
func (m *SubscriptionManager) StopAll()
StopAll stops all subscriptions and waits for them to finish.