Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
Checkpoint represents the last processed position for a subscription.
type CheckpointCommitter ¶
type CheckpointCommitter struct {
// contains filtered or unexported fields
}
CheckpointCommitter batches checkpoint writes with gap detection. It ensures we never commit position N+1 if position N hasn't been processed yet. This prevents data loss when events are processed concurrently and out of order.
func NewCheckpointCommitter ¶
func NewCheckpointCommitter( store CheckpointStore, subscriptionID string, batchSize int, interval time.Duration, ) *CheckpointCommitter
NewCheckpointCommitter creates a committer. batchSize: commit after this many events (0 = no batch limit). interval: commit at least this often (0 = no time limit).
func (*CheckpointCommitter) Close ¶
func (c *CheckpointCommitter) Close(ctx context.Context) error
Close stops the timer and flushes.
func (*CheckpointCommitter) Commit ¶
Commit records that the event at the given position with the given sequence has been processed. The committer tracks these and commits the highest position where all prior sequences have also been processed (gap detection).
Example: if events arrive as sequence 1,2,4 (skipping 3):
- After sequence 1: commit position of seq 1
- After sequence 2: commit position of seq 2
- After sequence 4: DON'T commit (seq 3 is missing, gap detected)
- When sequence 3 arrives: commit position of seq 4 (all gaps filled)
type CheckpointStore ¶
type CheckpointStore interface {
GetCheckpoint(ctx context.Context, id string) (Checkpoint, error)
StoreCheckpoint(ctx context.Context, checkpoint Checkpoint) error
}
CheckpointStore persists and retrieves checkpoints.
type ConsumeContext ¶
type ConsumeContext struct {
EventID uuid.UUID
EventType string
Stream eventuous.StreamName
Payload any // deserialized event, nil if unknown type
Metadata eventuous.Metadata
ContentType string
Position uint64 // position in the source stream
GlobalPosition uint64 // position in the global log ($all)
Sequence uint64 // local sequence number within this subscription
Created time.Time
SubscriptionID string
}
ConsumeContext carries the event data through the subscription pipeline.
type EventHandler ¶
type EventHandler interface {
HandleEvent(ctx context.Context, msg *ConsumeContext) error
}
EventHandler processes a single event.
func Chain ¶
func Chain(handler EventHandler, mw ...Middleware) EventHandler
Chain applies middleware in order: first middleware is outermost.
Example: Chain(h, A, B, C) produces A(B(C(h))), so execution order is A-before → B-before → C-before → h → C-after → B-after → A-after.
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, msg *ConsumeContext) error
HandlerFunc adaptor — like http.HandlerFunc.
func (HandlerFunc) HandleEvent ¶
func (f HandlerFunc) HandleEvent(ctx context.Context, msg *ConsumeContext) error
type Middleware ¶
type Middleware func(EventHandler) EventHandler
Middleware wraps a handler with additional behavior.
func WithConcurrency ¶
func WithConcurrency(limit int) Middleware
WithConcurrency processes events concurrently up to the given limit. Uses a semaphore channel to bound concurrency. The handler blocks until the event is actually processed, preserving error semantics needed for correct checkpointing and ack/nack.
func WithLogging ¶
func WithLogging(logger *slog.Logger) Middleware
WithLogging logs event processing at debug level.
func WithPartitioning ¶
func WithPartitioning(count int, keyFunc func(*ConsumeContext) string) Middleware
WithPartitioning distributes events across N goroutines by partition key. keyFunc extracts the partition key from a ConsumeContext; nil defaults to stream name. Events with the same key always go to the same partition so ordering is preserved per key.
The partition goroutines are started lazily on the first HandleEvent call and run until their input channels are closed (i.e. the returned handler is garbage-collected) or the context passed to HandleEvent is cancelled.
type Subscription ¶
Subscription consumes events from a source. Start blocks until ctx is cancelled or a fatal error occurs.