subscription

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: Apache-2.0 Imports: 7 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	ID       string
	Position *uint64 // nil means no checkpoint stored yet
}

Checkpoint represents the last processed position for a subscription.

type CheckpointCommitter

type CheckpointCommitter struct {
	// contains filtered or unexported fields
}

CheckpointCommitter batches checkpoint writes with gap detection. It ensures we never commit position N+1 if position N hasn't been processed yet. This prevents data loss when events are processed concurrently and out of order.

func NewCheckpointCommitter

func NewCheckpointCommitter(
	store CheckpointStore,
	subscriptionID string,
	batchSize int,
	interval time.Duration,
) *CheckpointCommitter

NewCheckpointCommitter creates a committer. batchSize: commit after this many events (0 = no batch limit). interval: commit at least this often (0 = no time limit).

func (*CheckpointCommitter) Close

func (c *CheckpointCommitter) Close(ctx context.Context) error

Close stops the timer and flushes.

func (*CheckpointCommitter) Commit

func (c *CheckpointCommitter) Commit(ctx context.Context, position uint64, sequence uint64) error

Commit records that the event at the given position with the given sequence has been processed. The committer tracks these and commits the highest position where all prior sequences have also been processed (gap detection).

Example: if events arrive as sequence 1,2,4 (skipping 3):

  • After sequence 1: commit position of seq 1
  • After sequence 2: commit position of seq 2
  • After sequence 4: DON'T commit (seq 3 is missing, gap detected)
  • When sequence 3 arrives: commit position of seq 4 (all gaps filled)

func (*CheckpointCommitter) Flush

func (c *CheckpointCommitter) Flush(ctx context.Context) error

Flush forces an immediate commit of the highest contiguous position.

type CheckpointStore

type CheckpointStore interface {
	GetCheckpoint(ctx context.Context, id string) (Checkpoint, error)
	StoreCheckpoint(ctx context.Context, checkpoint Checkpoint) error
}

CheckpointStore persists and retrieves checkpoints.

type ConsumeContext

type ConsumeContext struct {
	EventID        uuid.UUID
	EventType      string
	Stream         eventuous.StreamName
	Payload        any // deserialized event, nil if unknown type
	Metadata       eventuous.Metadata
	ContentType    string
	Position       uint64 // position in the source stream
	GlobalPosition uint64 // position in the global log ($all)
	Sequence       uint64 // local sequence number within this subscription
	Created        time.Time
	SubscriptionID string
}

ConsumeContext carries the event data through the subscription pipeline.

type EventHandler

type EventHandler interface {
	HandleEvent(ctx context.Context, msg *ConsumeContext) error
}

EventHandler processes a single event.

func Chain

func Chain(handler EventHandler, mw ...Middleware) EventHandler

Chain applies middleware in order: first middleware is outermost.

Example: Chain(h, A, B, C) produces A(B(C(h))), so execution order is A-before → B-before → C-before → h → C-after → B-after → A-after.

type HandlerFunc

type HandlerFunc func(ctx context.Context, msg *ConsumeContext) error

HandlerFunc adaptor — like http.HandlerFunc.

func (HandlerFunc) HandleEvent

func (f HandlerFunc) HandleEvent(ctx context.Context, msg *ConsumeContext) error

type Middleware

type Middleware func(EventHandler) EventHandler

Middleware wraps a handler with additional behavior.

func WithConcurrency

func WithConcurrency(limit int) Middleware

WithConcurrency processes events concurrently up to the given limit. Uses a semaphore channel to bound concurrency. The handler blocks until the event is actually processed, preserving error semantics needed for correct checkpointing and ack/nack.

func WithLogging

func WithLogging(logger *slog.Logger) Middleware

WithLogging logs event processing at debug level.

func WithPartitioning

func WithPartitioning(count int, keyFunc func(*ConsumeContext) string) Middleware

WithPartitioning distributes events across N goroutines by partition key. keyFunc extracts the partition key from a ConsumeContext; nil defaults to stream name. Events with the same key always go to the same partition so ordering is preserved per key.

The partition goroutines are started lazily on the first HandleEvent call and run until their input channels are closed (i.e. the returned handler is garbage-collected) or the context passed to HandleEvent is cancelled.

type Subscription

type Subscription interface {
	Start(ctx context.Context) error
}

Subscription consumes events from a source. Start blocks until ctx is cancelled or a fatal error occurs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL