Documentation
¶
Overview ¶
Package es provides event sourcing and CQRS primitives that can be composed into application-specific aggregates, command handlers, and read models. It is the core package of the synapse toolkit.
The package is intentionally serialization-agnostic: stores deal only in opaque bytes (RawEnvelope), and concrete codecs are registered per event type through a Registry. Concrete codec implementations (encoding/json, protobuf, etc.) live in sibling packages so es itself remains free of third-party dependencies.
Typical usage couples three pieces:
- An Aggregate type (usually embedding AggregateBase) that owns domain state and reacts to events through Apply.
- A Registry populated with EventCodec entries for every event type.
- An EventStore implementation backed by an in-memory map, a database, or a remote service.
A Repository wires those three together so application code can Load and Save aggregates without thinking about serialization or optimistic-concurrency mechanics.
Index ¶
- Variables
- func CausationFrom(ctx context.Context) string
- func CorrelationFrom(ctx context.Context) string
- func Execute[C any, A Aggregate](ctx context.Context, r *Repository[A], id StreamID, cmd C, h Handler[C, A]) error
- func FoldEvents[S any](init S, events iter.Seq2[Envelope, error], step func(S, Envelope) (S, error)) (S, error)
- func Register[E any](r *Registry, eventType string, c TypedCodec[E])
- func RegisterUpcaster[In, Out any](r *Registry, fromType, toType string, fn func(In) (Out, error))
- func SubjectFrom(ctx context.Context) string
- func UnsupportedSeq[T any]() iter.Seq2[T, error]
- func WithCausation(ctx context.Context, id string) context.Context
- func WithCorrelation(ctx context.Context, id string) context.Context
- func WithMetadata(ctx context.Context, meta Metadata) context.Context
- func WithSubject(ctx context.Context, subject string) context.Context
- type Aggregate
- type AggregateBase
- func (b *AggregateBase) ClearPending()
- func (b *AggregateBase) Pending() []Envelope
- func (b *AggregateBase) Record(eventType string, payload any, apply func(Envelope))
- func (b *AggregateBase) ReplayAll(events iter.Seq2[Envelope, error], apply func(Envelope)) error
- func (b *AggregateBase) SetVersion(v uint64)
- func (b *AggregateBase) StreamID() StreamID
- func (b *AggregateBase) Version() uint64
- type CheckpointStore
- type Clock
- type CodecNotFoundError
- type ConflictError
- type ContentType
- type DeadLetterEntry
- type DeadLetterStore
- type DuplicateEventIDError
- type Envelope
- type EventCodec
- type EventStore
- type Handler
- type Metadata
- type Middleware
- type Operation
- type PayloadTypeError
- type Projection
- type RawEnvelope
- type RawSnapshot
- type ReadOptions
- type Registry
- func (r *Registry) Lookup(eventType string) (EventCodec, bool)
- func (r *Registry) LookupUpcaster(fromType string) (Upcaster, bool)
- func (r *Registry) Register(eventType string, c EventCodec)
- func (r *Registry) Types() []string
- func (r *Registry) Upcast(payload any, typeName string) (any, string, error)
- type Repository
- type RepositoryOption
- func WithClock(c Clock) RepositoryOption
- func WithIDGenerator(g idgen.Generator) RepositoryOption
- func WithLogger(l *slog.Logger) RepositoryOption
- func WithMiddleware(mws ...Middleware) RepositoryOption
- func WithSnapshotPolicy(p SnapshotPolicy) RepositoryOption
- func WithSnapshotStore(s SnapshotStore) RepositoryOption
- type Revision
- type SnapshotPolicy
- type SnapshotStore
- type Snapshotter
- type Source
- type StreamID
- type StreamNotFoundError
- type SubscriptionOptions
- type SystemClock
- type TypedCodec
- type Upcaster
- type UpcasterCycleError
- type UpcasterTypeError
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnsupported is the sentinel a store implementation returns // when it does not support a method on the [EventStore] or // [CheckpointStore] interface. iter-returning methods surface it // via a single terminal yield; see [UnsupportedSeq]. Callers // detect it with errors.Is; the admin RPC layer maps it to // CodeUnimplemented. ErrUnsupported = errors.New("synapse: operation not supported") // ErrShreddedPayload is returned by [Repository.Load] (and the // projection.Runner decode path) when an envelope arrives bearing // [ContentTypeShredded] — a store wrapper has substituted the // payload because the subject's encryption key is destroyed. // Aggregate replay propagates this error (state is unrecoverable); // projections can branch on it via the runner's OnError hook to // skip-and-continue. ErrShreddedPayload = errors.New("synapse: shredded payload") // ErrConflict indicates an optimistic-concurrency violation at // append time. Detailed information is available on // [*ConflictError]. ErrConflict = errors.New("synapse: revision conflict") // ErrStreamNotFound indicates a load against a stream that holds // no events. Detailed information is available on // [*StreamNotFoundError]. ErrStreamNotFound = errors.New("synapse: stream not found") // ErrCodecNotFound indicates no [EventCodec] was registered for // an event Type encountered during marshal or unmarshal. ErrCodecNotFound = errors.New("synapse: codec not registered for event type") // ErrPayloadType indicates a codec received a payload that did // not match the type it was registered for. ErrPayloadType = errors.New("synapse: payload type mismatch") // ErrUpcasterType indicates a registered upcaster received a // payload whose dynamic type did not match the In type it was // registered with. Detailed information is available on // [*UpcasterTypeError]. ErrUpcasterType = errors.New("synapse: upcaster payload type mismatch") // ErrUpcasterCycle indicates the registered upcasters form a cycle // or exceed the per-call hop limit. Detailed information is // available on [*UpcasterCycleError]. ErrUpcasterCycle = errors.New("synapse: upcaster cycle") // ErrDuplicateEventID indicates [EventStore.Append] rejected an // envelope whose [RawEnvelope.EventID] already exists in the // store. Detailed information is available on // [*DuplicateEventIDError]. See ADR-0045. ErrDuplicateEventID = errors.New("synapse: duplicate event id") )
Sentinel errors used for classification with errors.Is.
Typed errors elsewhere in this package wrap these sentinels via Unwrap, so callers can write either:
if errors.Is(err, es.ErrConflict) { ... }
or
var ce *es.ConflictError
if errors.As(err, &ce) { /* use ce.Expected, ce.Actual */ }
var ( // Any allows the append regardless of current stream state. Any = Revision{/* contains filtered or unexported fields */} // NoStream requires the stream to not yet exist. NoStream = Revision{/* contains filtered or unexported fields */} // StreamExists requires the stream to already contain at least one event. StreamExists = Revision{/* contains filtered or unexported fields */} )
Sentinel revisions covering the three open-ended cases.
Functions ¶
func CausationFrom ¶ added in v0.3.0
CausationFrom returns the causation identifier installed by WithCausation, or the empty string if none is set. See CorrelationFrom for intended use.
func CorrelationFrom ¶ added in v0.3.0
CorrelationFrom returns the correlation identifier installed by WithCorrelation, or the empty string if none is set. Intended for observability adapters that need to tag spans, logs, or metrics with the saga identifier carried in ctx.
func Execute ¶
func Execute[C any, A Aggregate]( ctx context.Context, r *Repository[A], id StreamID, cmd C, h Handler[C, A], ) error
Execute is a convenience that loads an aggregate, runs the command handler, and saves any events the handler recorded. The load-handle-save pipeline is wrapped by the Repository's middleware chain (see WithMiddleware), so cross-cutting concerns such as locking and retries apply uniformly across all command types.
When Repository.Load returns an error wrapping ErrStreamNotFound, Execute treats it as "fresh aggregate": it builds one via the Repository's newFn (the same constructor passed to NewRepository) and runs the handler against it. Save then appends with expected revision NoStream, which is the natural shape for a create-style command. Handlers that require an existing aggregate must guard on agg.Version() == 0 and return an error. See ADR-0030.
Any other Load error propagates unchanged. If the handler returns a non-nil error, Execute propagates it without attempting to save.
func FoldEvents ¶
func FoldEvents[S any]( init S, events iter.Seq2[Envelope, error], step func(S, Envelope) (S, error), ) (S, error)
FoldEvents is a generic helper for callers who prefer a pure-reducer style over the Aggregate/AggregateBase approach: given an initial state and a sequence of envelopes, fold step over them and return the resulting state.
FoldEvents is independent of the Repository machinery and is useful for projections, read-model rebuilds, and ad-hoc analysis. The step function may return an error (FoldEvents is not constrained to aggregate semantics; it is a general reducer).
func Register ¶
func Register[E any](r *Registry, eventType string, c TypedCodec[E])
Register adapts a strongly typed TypedCodec into an EventCodec and stores it in r under eventType. It is the preferred entry point when registering codecs from user code:
es.Register(reg, "order.placed", json.For[OrderPlaced]())
func RegisterUpcaster ¶
func RegisterUpcaster[In, Out any]( r *Registry, fromType, toType string, fn func(In) (Out, error), )
RegisterUpcaster registers a typed upcaster on the Registry. The function receives an In value (the decoded payload of an event at fromType) and returns the Out value at the next version, identified by toType.
es.RegisterUpcaster[OrderPlacedV1, OrderPlacedV2](reg,
"order.placed.v1", "order.placed.v2",
func(in OrderPlacedV1) (OrderPlacedV2, error) {
return OrderPlacedV2{Total: in.Amount, Currency: "USD"}, nil
})
The same Registry is shared by event codecs and snapshot codecs, so the same mechanism upcasts both. Registration is typically done at startup; the Registry is safe for concurrent use afterwards.
Registering an upcaster for a fromType that already has one replaces the previous entry.
func SubjectFrom ¶ added in v0.3.0
SubjectFrom returns the data subject identifier installed by WithSubject, or the empty string if none is set. See CorrelationFrom for intended use.
func UnsupportedSeq ¶ added in v0.3.0
UnsupportedSeq returns an iter.Seq2 that yields a single terminal (zero, ErrUnsupported). Stub implementations of iter-returning EventStore / CheckpointStore methods return this.
func WithCausation ¶
WithCausation returns a child context carrying id as the causation identifier. The same precedence rule as for correlation applies: per-event values win over the context value at Repository.Save time.
func WithCorrelation ¶
WithCorrelation returns a child context carrying id as the correlation identifier for events recorded under it. Repository stamps it onto each saved event's Correlation field where that field is otherwise empty; an explicit value on the Envelope takes precedence.
func WithMetadata ¶
WithMetadata returns a child context carrying meta as event metadata. Successive calls merge; on key collision, the later call wins. At Repository.Save time the context map is the base and the per-event [Envelope.Metadata] map overrides on key collision, so callers can establish a baseline (user, trace id) for a request and still tag individual events explicitly.
func WithSubject ¶ added in v0.3.0
WithSubject returns a child context carrying subject as the data subject identifier for events recorded under it. Repository stamps it onto each saved event's Subject field where that field is otherwise empty; a per-event Subject takes precedence.
Use this to bind a request's data subject (customer, account, etc.) once at the transport boundary so every aggregate Save inside the request inherits it without per-call wiring. See ADR-0035.
Types ¶
type Aggregate ¶
type Aggregate interface {
// StreamID returns the stream this aggregate writes to and loads from.
StreamID() StreamID
// Version returns the number of events the aggregate has consumed.
// Newly constructed aggregates report 0.
Version() uint64
// Apply mutates aggregate state in response to a single event. It
// is invoked both during rehydration (events loaded from the
// store) and immediately after a new event is recorded.
//
// Apply must not fail. Events are facts that already happened:
// refusing to apply one during rehydration cannot unmake the past,
// and validation of recordable events belongs in the command method
// that calls Record, before the event is added to the pending
// queue.
Apply(env Envelope)
// SetVersion advances the aggregate's loaded version. The
// [Repository] calls SetVersion after each Apply during
// rehydration so the aggregate's version tracks the stream's head.
// Domain code should not call SetVersion directly; embedders of
// [AggregateBase] get a correct implementation for free.
SetVersion(v uint64)
// Pending returns the events that have been recorded but not yet
// persisted. The returned slice is owned by the aggregate; callers
// must not append to it or mutate its elements. The [Repository]
// reads it during Save and then calls [Aggregate.ClearPending].
Pending() []Envelope
// ClearPending discards the recorded-but-unpersisted events. The
// [Repository] calls this after a successful append.
ClearPending()
}
Aggregate is the unit of consistency in event sourcing. Concrete aggregate types embed AggregateBase to satisfy most of the interface; only Apply needs domain-specific logic.
type Order struct {
*es.AggregateBase
status string
total int
}
func (o *Order) Apply(env es.Envelope) {
switch p := env.Payload.(type) {
case OrderPlaced:
o.status, o.total = "placed", p.Total
case OrderShipped:
o.status = "shipped"
}
}
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is an embeddable struct that supplies the bookkeeping every aggregate needs: stream identity, version tracking, and a pending event buffer.
Embed by pointer so the embedding type can mutate state through (*AggregateBase).Record:
type Order struct { *es.AggregateBase; /* domain fields */ }
func NewOrder(id OrderID) *Order {
return &Order{AggregateBase: es.NewAggregateBase(id.Stream())}
}
func NewAggregateBase ¶
func NewAggregateBase(id StreamID) *AggregateBase
NewAggregateBase returns an AggregateBase bound to id at version 0.
func (*AggregateBase) ClearPending ¶
func (b *AggregateBase) ClearPending()
ClearPending implements Aggregate.
func (*AggregateBase) Pending ¶
func (b *AggregateBase) Pending() []Envelope
Pending implements Aggregate.
func (*AggregateBase) Record ¶
func (b *AggregateBase) Record(eventType string, payload any, apply func(Envelope))
Record stages a new event on the aggregate. It composes an Envelope from the embedder's stream id and the next version number, invokes apply so the aggregate's in-memory state reflects the change, and queues the envelope for the next save.
The apply callback is typically the embedder's own Apply method. Threading it through Record explicitly avoids the runtime cost of reflection while keeping AggregateBase unaware of the concrete aggregate type.
Validation of recordable events should happen in the command method that calls Record, before Record is invoked.
func (*AggregateBase) ReplayAll ¶
ReplayAll advances AggregateBase across events loaded from history, invoking apply for each one. It does not enqueue events for persistence; rehydration is read-only with respect to the store.
The Repository does not use ReplayAll (it advances version through [Aggregate.SetVersion] so it can interleave codec lookups), but callers writing custom load paths can reuse it for convenience.
ReplayAll's apply callback does not return an error; events are facts that already happened. The returned error is only ever the terminal error yielded by the events iterator (typically an EventStore read failure).
func (*AggregateBase) SetVersion ¶
func (b *AggregateBase) SetVersion(v uint64)
SetVersion implements Aggregate.
func (*AggregateBase) StreamID ¶
func (b *AggregateBase) StreamID() StreamID
StreamID implements Aggregate.
func (*AggregateBase) Version ¶
func (b *AggregateBase) Version() uint64
Version implements Aggregate.
type CheckpointStore ¶
type CheckpointStore interface {
// Save persists the last successfully processed position for
// name. Implementations should treat repeated Save calls with
// the same (name, position) as idempotent.
Save(ctx context.Context, name string, position uint64) error
// Load returns the last saved position for name. The bool is
// false when no checkpoint has been saved for name; that is not
// an error.
Load(ctx context.Context, name string) (position uint64, found bool, err error)
// Reset removes the checkpoint for name. Subsequent Loads return
// (0, false, nil). Used for projection rebuilds.
Reset(ctx context.Context, name string) error
// Names enumerates every projection name with a saved position.
// The order of yielded names is implementation-defined. Backends
// that cannot list yield a single terminal (zero, [ErrUnsupported]).
//
// Implementations should be safe to call concurrently with the
// rest of the [CheckpointStore] surface.
Names(ctx context.Context) iter.Seq2[string, error]
}
CheckpointStore persists per-projection progress so consumers can resume across restarts.
The name parameter is a caller-chosen identifier — typically the projection's logical name. Names must be unique per concurrent consumer.
Implementations that cannot support Names yield a single terminal (zero, ErrUnsupported); see UnsupportedSeq.
type Clock ¶
Clock abstracts the wall-clock source used by the Repository to stamp RecordedAt on outgoing events. Tests can supply a virtual clock (for instance via testing/synctest) to make timestamps deterministic.
type CodecNotFoundError ¶
type CodecNotFoundError struct {
EventType string
}
CodecNotFoundError reports a missing EventCodec registration. It unwraps to ErrCodecNotFound.
func (*CodecNotFoundError) Error ¶
func (e *CodecNotFoundError) Error() string
func (*CodecNotFoundError) Unwrap ¶
func (*CodecNotFoundError) Unwrap() error
type ConflictError ¶
ConflictError reports an optimistic-concurrency violation. It unwraps to ErrConflict.
func (*ConflictError) Error ¶
func (e *ConflictError) Error() string
func (*ConflictError) Unwrap ¶
func (*ConflictError) Unwrap() error
type ContentType ¶
type ContentType string
ContentType describes the wire format of a serialized event payload. Values follow the IANA media type convention, e.g. "application/json" or "application/vnd.google.protobuf".
An EventCodec reports the ContentType it produces, and a RawEnvelope carries it alongside the payload so that consumers can decode an event without consulting external registries.
const ContentTypeShredded ContentType = "application/synapse.shredded+v1"
ContentTypeShredded is the sentinel ContentType a store wrapper substitutes onto an envelope whose Payload it cannot decode — typically because the subject's data-encryption key has been destroyed by crypto-shredding. Both Repository.Load and the projection.Runner's decode path recognize it and short-circuit codec lookup with ErrShreddedPayload.
type DeadLetterEntry ¶ added in v0.5.0
type DeadLetterEntry struct {
// ID identifies this entry. Empty on Append asks the store to
// assign one; non-empty on Append is idempotent (re-Appending a
// matching id is a no-op).
ID string
// Projection is the name passed to [projection.NewRunner] that
// failed to apply Envelope. Multiple projections can share a
// store; List queries are scoped by this field.
Projection string
// Envelope is the event that couldn't be processed, in storage
// form. The store does not decode it; operators inspecting the
// entry get the raw bytes and the metadata that identifies it
// (StreamID, Version, Type, etc.).
Envelope RawEnvelope
// Error is the failure message from the projection's Project
// call. The store carries the string; structured error
// information (sentinels, typed wrappers) is lost across the
// store boundary.
Error string
// OccurredAt is when the failure happened — assigned by the
// runner just before Append.
OccurredAt time.Time
}
DeadLetterEntry is one record in a DeadLetterStore: the envelope a projection failed to process, plus the failure context.
type DeadLetterStore ¶ added in v0.5.0
type DeadLetterStore interface {
// Append records entry. If entry.ID is empty the implementation
// assigns one and returns the populated entry. Re-Appending with
// a matching id is a no-op (the existing row stays).
Append(ctx context.Context, entry DeadLetterEntry) (DeadLetterEntry, error)
// List yields entries for projection in OccurredAt order
// (oldest first). Yields zero results when no entries exist.
List(ctx context.Context, projection string) iter.Seq2[DeadLetterEntry, error]
// Remove deletes the entry with the given id. Idempotent on
// unknown ids.
Remove(ctx context.Context, id string) error
// Projections enumerates every projection that currently has at
// least one DLQ entry. Operators use this as the entry point
// for "what needs attention right now."
Projections(ctx context.Context) iter.Seq2[string, error]
}
DeadLetterStore records envelopes that a [projection.Runner] failed to process via Project. Operators inspect entries to diagnose problems, fix the underlying projection, and (typically via [CheckpointStore.Reset] + replay) re-process the affected events.
Implementations must be safe for concurrent use. Backends that cannot enumerate (List, Projections) yield ErrUnsupported from the returned iterator.
See ADR-0041.
type DuplicateEventIDError ¶ added in v0.6.0
DuplicateEventIDError reports an [EventStore.Append] rejected because [RawEnvelope.EventID] already exists in the store. The rejected batch is not partially applied: either every envelope committed or none did. It unwraps to ErrDuplicateEventID. See ADR-0045.
func (*DuplicateEventIDError) Error ¶ added in v0.6.0
func (e *DuplicateEventIDError) Error() string
func (*DuplicateEventIDError) Unwrap ¶ added in v0.6.0
func (*DuplicateEventIDError) Unwrap() error
type Envelope ¶
type Envelope struct {
// EventID uniquely identifies this event. The default
// generator emits UUIDv7 values so IDs sort chronologically.
EventID string
// StreamID is the stream this event belongs to.
StreamID StreamID
// Version is the 1-based position of this event within its stream.
Version uint64
// GlobalPosition is the 1-based position of this event in the
// store's global append order, across all streams. It is assigned
// by the [EventStore] on Append and surfaces through Load and
// [SubscribableEventStore.Subscribe]. The Repository and the
// command-side code paths ignore it; it is meaningful primarily
// to subscription consumers.
GlobalPosition uint64
// RecordedAt is the wall-clock time at which the event was appended.
RecordedAt time.Time
// Type is the logical event name used to look up an [EventCodec]
// in the [Registry], e.g. "order.placed".
Type string
// ContentType is the wire format of Payload once serialized.
// It is populated by the Repository at save time from the codec
// chosen for this event Type.
ContentType ContentType
// Causation is the EventID of the event that directly caused this one.
// Empty when the event has no upstream cause.
Causation string
// Correlation groups events that share a causal chain — typically
// the EventID of the initiating command or external request.
Correlation string
// Subject is the opaque identifier of the data subject this event
// is "about" — the customer, account, or other party whose data
// the event contains. Empty for events without personal data.
//
// Synapse treats the value as opaque; applications choose the
// taxonomy. The expected convention is a pseudonymous identifier
// (a UUID, an internal customer id) — never an email address or
// other directly identifying value, because the subject identifier
// is retained in the immutable log even after crypto-shredding
// (ADR-0036) and intentionally outlives the rest of the event.
//
// Set the subject on individual envelopes when needed, or
// establish a baseline for the request via [WithSubject].
Subject string
// Metadata holds free-form annotations.
Metadata Metadata
// Payload is the typed domain event value.
Payload any
}
Envelope is the application-facing event record. The Payload field holds the user's domain event value; a Repository serializes it through an EventCodec before passing it to an EventStore.
Identity, time, and content-type fields are stamped by the Repository at save time, so domain code never needs a clock or an ID generator.
type EventCodec ¶
type EventCodec interface {
// ContentType returns the wire format produced by Marshal — for
// example "application/json" or "application/vnd.google.protobuf".
// The [Repository] copies this onto the [RawEnvelope] before
// handing it to the [EventStore].
ContentType() ContentType
// Marshal serializes a payload to bytes. Implementations should
// reject payloads whose dynamic type does not match what the
// codec was registered for, returning [*PayloadTypeError].
Marshal(payload any) ([]byte, error)
// Unmarshal decodes a payload from bytes into a freshly allocated
// value of the codec's registered type.
Unmarshal(data []byte) (any, error)
}
EventCodec is the per-event-type serialization interface. The Registry holds one EventCodec per registered event Type; concrete implementations (encoding/json, protobuf, etc.) live in subpackages so the root package stays free of third-party dependencies.
Implementations should be safe for concurrent use after construction.
type EventStore ¶
type EventStore interface {
// Append writes events to a stream atomically and returns the
// new head revision (always Exact(v)) on success.
Append(
ctx context.Context,
stream StreamID,
expected Revision,
events ...RawEnvelope,
) (Revision, error)
// Load returns events from a stream as an iterator of
// (envelope, error) pairs. The iterator emits at most one
// terminal error and then stops.
Load(
ctx context.Context,
stream StreamID,
opts ReadOptions,
) iter.Seq2[RawEnvelope, error]
// Subscribe yields events across all streams in global append
// order, starting at the position past opts.From. When opts.Live
// is true, the iterator blocks waiting for new events after
// exhausting the existing log; otherwise it terminates when
// caught up.
//
// Backends that do catch-up reads only yield a single terminal
// (zero, [ErrUnsupported]).
//
// Context cancellation terminates the iterator with a single
// terminal (zero, ctx.Err()) yield.
Subscribe(ctx context.Context, opts SubscriptionOptions) iter.Seq2[RawEnvelope, error]
// SubscribeStream is the per-stream variant of [Subscribe]. It
// yields only events for stream, ordered by [RawEnvelope.Version].
// opts.From is interpreted as a stream Version (the iterator
// yields events with Version > opts.From).
//
// Backends that do not support per-stream subscription yield
// (zero, [ErrUnsupported]).
SubscribeStream(ctx context.Context, stream StreamID, opts SubscriptionOptions) iter.Seq2[RawEnvelope, error]
// Streams enumerates every stream the event store knows about.
// The order of yielded stream ids is implementation-defined.
// Admin tooling consumes this; backends that cannot list
// efficiently yield (zero, [ErrUnsupported]).
//
// Implementations should be safe to call concurrently with
// Append and Load.
Streams(ctx context.Context) iter.Seq2[StreamID, error]
// StreamsBySubject enumerates every stream containing at least
// one event with the given [RawEnvelope.Subject]. Backends
// typically satisfy this with an index on the subject column.
// Admin tooling and crypto-shredding workflows use it to answer
// GDPR right-of-access and right-of-erasure requests (ADR-0035,
// ADR-0036). Backends without an index yield (zero,
// [ErrUnsupported]).
//
// An empty subject argument MUST yield zero results — callers
// asking "which streams have no subject?" want a full scan, not
// this method. The order of yielded stream ids is
// implementation-defined; duplicates must not be yielded.
//
// Implementations should be safe to call concurrently with
// Append and Load.
StreamsBySubject(ctx context.Context, subject string) iter.Seq2[StreamID, error]
}
EventStore is the persistence boundary for serialized events. It knows nothing about codecs or domain types; payloads flow through as opaque bytes inside RawEnvelope.
Implementations may back the store with an in-memory map, a SQL database, a log-oriented system, or a remote service. The contract for the mandatory methods is the same in every case:
- Append is atomic per call and per stream: either every envelope is persisted at consecutive versions, or none are.
- Append enforces the caller's Revision expectation and returns a *ConflictError (wrapping ErrConflict) on a mismatch.
- Load yields events in ascending version order. The iterator terminates cleanly when the stream is exhausted or the Limit is hit, and yields a single terminal (zero, err) on failure.
The remaining methods (Subscribe, SubscribeStream, Streams, StreamsBySubject) are part of the interface but a backend that cannot support one of them yields a single terminal ErrUnsupported from the returned iterator. See UnsupportedSeq for the helper.
type Handler ¶
Handler is a typed command handler. It receives a freshly loaded aggregate and is expected to call domain methods on it that internally enqueue events through (*AggregateBase).Record. The Repository takes care of persistence once the handler returns.
func PlaceOrder(ctx context.Context, cmd PlaceOrderCmd, o *Order) error {
return o.Place(cmd.Items, cmd.Total)
}
type Metadata ¶
Metadata is free-form key/value context attached to an event: correlation identifiers, actor information, trace IDs, and so on.
Values are strings by convention so they can be serialized by every codec without negotiation. Richer metadata that needs structured types belongs in the event payload itself.
func MetadataFrom ¶ added in v0.3.0
MetadataFrom returns the merged metadata installed by WithMetadata, or nil if none is set. The returned map is the live ctx-stored value; callers must not mutate it.
type Middleware ¶
Middleware wraps an Operation to add behavior before, after, or around the underlying load-handle-save pipeline. The returned Operation must call next inside its body to execute the command.
Middleware compose left-to-right: WithMiddleware(a, b, c) produces a chain where a wraps b wraps c wraps the underlying operation, so a's "before" code runs first and a's "after" code runs last.
Concrete middlewares (per-aggregate locking, retry on transient errors, etc.) live in the github.com/ianunruh/synapse/es/middleware subpackage.
type Operation ¶
Operation is the type-erased form of an Execute call. It captures the load-handle-save pipeline as a single function that runs against a stream.
Operations are produced by Execute internally and consumed by Middleware. User code rarely constructs Operation values directly; it provides Middleware that wrap them.
type PayloadTypeError ¶
PayloadTypeError reports a payload whose dynamic type did not match the type a codec was registered for. It unwraps to ErrPayloadType.
func (*PayloadTypeError) Error ¶
func (e *PayloadTypeError) Error() string
func (*PayloadTypeError) Unwrap ¶
func (*PayloadTypeError) Unwrap() error
type Projection ¶
Projection is the consumer side of an event-sourcing read model. It receives decoded events from a [SubscribableEventStore] — typically via a projection.Runner — and applies them to derived state: a SQL table, an in-memory map, a search index, an outbound integration, etc.
Implementations should be deterministic and idempotent. The Runner may present the same event twice on retry after a checkpoint-write failure, and Live subscribers reconnecting from a checkpoint may also re-encounter the boundary event.
type RawEnvelope ¶
type RawEnvelope struct {
EventID string
StreamID StreamID
Version uint64
GlobalPosition uint64
RecordedAt time.Time
Type string
ContentType ContentType
Causation string
Correlation string
Subject string
Metadata Metadata
Payload []byte
}
RawEnvelope is the storage-facing form of an event. Payload is opaque bytes; EventStore implementations never need to know about codecs or domain types.
The field set mirrors Envelope so backends can persist a single flat row without consulting a schema.
type RawSnapshot ¶
type RawSnapshot struct {
StreamID StreamID
Version uint64
Type string
ContentType ContentType
RecordedAt time.Time
Subject string
Metadata Metadata
Payload []byte
}
RawSnapshot is the storage-facing form of an aggregate state checkpoint, paralleling RawEnvelope for events. Payload is opaque bytes; SnapshotStore implementations do not know about codecs or domain types.
type ReadOptions ¶
type ReadOptions struct {
// From is the first version to return (1-based). A value of 0
// means "start at the beginning of the stream".
From uint64
// Limit caps the number of events returned. A value of 0 means
// "no limit".
Limit uint64
}
ReadOptions controls a [EventStore.Load] call. The zero value asks for the entire stream from the beginning.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps event Type strings to EventCodec implementations and, optionally, to Upcaster functions for schema evolution. A single Registry may mix codecs that use different wire formats — for instance, JSON for legacy events and protobuf for new ones — and upcasters that turn old event versions into newer ones.
A Registry is safe for concurrent use. Registration is typically done at startup; Lookup and Upcast are called on every load.
func (*Registry) Lookup ¶
func (r *Registry) Lookup(eventType string) (EventCodec, bool)
Lookup returns the EventCodec registered for eventType. The boolean is false when no codec has been registered for that type.
func (*Registry) LookupUpcaster ¶
LookupUpcaster returns the Upcaster registered for fromType. The boolean is false when no upcaster has been registered for that type.
func (*Registry) Register ¶
func (r *Registry) Register(eventType string, c EventCodec)
Register associates an EventCodec with an event Type. Registering the same Type twice replaces the previous entry.
Most callers use the generic top-level Register function instead, which adapts a TypedCodec of the concrete event type.
func (*Registry) Types ¶
Types returns the event types currently registered, in unspecified order. The returned slice is a fresh copy and may be retained.
func (*Registry) Upcast ¶
Upcast applies the upcaster chain rooted at typeName to payload, returning the final payload and final type name. When no upcaster is registered for typeName, the inputs pass through unchanged with a nil error.
A cycle in the registered upcasters returns *UpcasterCycleError without further mutation. Exceeding [upcastMaxHops] returns the same error with the full chain attached.
Errors returned by the user-registered upcaster function are wrapped with the from-type for context. UpcasterTypeError is returned without wrapping so callers can use errors.As directly.
type Repository ¶
type Repository[A Aggregate] struct { // contains filtered or unexported fields }
Repository binds an EventStore, a codec Registry, a Clock, an idgen.Generator, an optional SnapshotStore/SnapshotPolicy, an optional chain of Middleware, and an slog.Logger together so application code can load and save aggregates of type A without thinking about serialization, ID generation, snapshotting, or optimistic concurrency.
A Repository is safe for concurrent use as long as its dependencies are. The newFn factory is invoked to construct an empty aggregate before rehydration.
func NewRepository ¶
func NewRepository[A Aggregate]( store EventStore, reg *Registry, newFn func(StreamID) A, opts ...RepositoryOption, ) *Repository[A]
NewRepository constructs a Repository over the given store and codec registry. newFn returns a zero-value aggregate bound to the requested stream id; it is invoked by Load before replaying history.
func (*Repository[A]) Load ¶
func (r *Repository[A]) Load(ctx context.Context, id StreamID) (A, error)
Load constructs a fresh aggregate via newFn. If a SnapshotStore is configured AND the aggregate implements Snapshotter, Load first tries to restore from the latest snapshot and then replays only events with Version > snapshot.Version; otherwise it replays the full event history.
If neither a snapshot nor any events exist for the stream, Load returns *StreamNotFoundError wrapping ErrStreamNotFound. Callers needing "load or create" semantics should construct an aggregate directly and call Save.
func (*Repository[A]) Save ¶
func (r *Repository[A]) Save(ctx context.Context, agg A) error
Save serializes the aggregate's Pending events via the codec registry, stamps EventID/RecordedAt/ContentType where missing, and appends them to the store under an expected revision matching the aggregate's loaded version. On success, Pending is cleared.
After a successful append, if a SnapshotStore and SnapshotPolicy are both configured and the aggregate implements Snapshotter, the policy is consulted with the version before and after the Save. If it returns true, a best-effort snapshot save is attempted; errors from that step are logged at Warn level via the Repository's slog.Logger and otherwise swallowed (events are already committed; the snapshot is an optimization).
Returns nil immediately if there are no pending events.
func (*Repository[A]) SaveSnapshot ¶
func (r *Repository[A]) SaveSnapshot(ctx context.Context, agg A) error
SaveSnapshot persists a snapshot of the aggregate's current state to the configured SnapshotStore. It is intended for explicit checkpoints — migration scripts, integration tests, or application-driven snapshotting outside the SnapshotPolicy.
Returns an error when no SnapshotStore is configured or when the aggregate does not implement Snapshotter.
type RepositoryOption ¶
type RepositoryOption func(*repositoryOptions)
RepositoryOption configures a Repository at construction time.
func WithClock ¶
func WithClock(c Clock) RepositoryOption
WithClock overrides the wall-clock used to stamp RecordedAt on saved events. The default is SystemClock.
func WithIDGenerator ¶
func WithIDGenerator(g idgen.Generator) RepositoryOption
WithIDGenerator overrides the idgen.Generator used to stamp EventID on saved events. The default is idgen.UUIDv7 backed by the Repository's Clock.
func WithLogger ¶
func WithLogger(l *slog.Logger) RepositoryOption
WithLogger overrides the slog.Logger used by the Repository to record best-effort failures — currently, automatic snapshot save errors that [Save] intentionally swallows. The default is slog.Default, so library warnings reach the program's default handler without explicit configuration. To silence, install a logger backed by a discard handler.
func WithMiddleware ¶
func WithMiddleware(mws ...Middleware) RepositoryOption
WithMiddleware appends Middleware to the Repository's command execution chain. Subsequent calls append rather than replace, so constructors that compose multiple option groups behave naturally.
Middleware run left-to-right as outer wrappers: the first middleware passed wraps the second, which wraps the third, and so on around the load-handle-save pipeline that Execute invokes.
func WithSnapshotPolicy ¶
func WithSnapshotPolicy(p SnapshotPolicy) RepositoryOption
WithSnapshotPolicy installs a SnapshotPolicy that the Repository consults after each successful Save to decide whether to write a new snapshot. Without a policy, automatic snapshots never fire even if a SnapshotStore is configured; Repository.SaveSnapshot remains usable for manual checkpoints.
func WithSnapshotStore ¶
func WithSnapshotStore(s SnapshotStore) RepositoryOption
WithSnapshotStore wires a SnapshotStore into the Repository, enabling the snapshot-aware Repository.Load path and unlocking Repository.SaveSnapshot. Without a store, automatic snapshots never fire and manual SaveSnapshot returns an error.
type Revision ¶
type Revision struct {
// contains filtered or unexported fields
}
Revision expresses a caller's expectation about a stream's state when appending events. It is a tagged value type — comparable, copyable, and free of allocations — so it can flow through hot paths without runtime cost.
The zero value is Any, meaning "no expectation"; callers can pass it without ceremony when concurrency control is not desired.
func Exact ¶
Exact requires the stream to be at exactly version v.
A successful append against Exact(v) advances the stream to v + N where N is the number of events appended.
type SnapshotPolicy ¶
SnapshotPolicy decides whether the Repository should take a new snapshot after a successful Save. It is invoked with the aggregate at its post-Save state, the version before the Save started, and the version after the Save's events were applied.
Returning true triggers an immediate, best-effort snapshot save; returning false skips it.
func EveryNVersions ¶
func EveryNVersions(n uint64) SnapshotPolicy
EveryNVersions returns a SnapshotPolicy that fires when a Save advances the aggregate past a multiple of n. With n=100, snapshots happen after the first save that reaches v=100, 200, 300, and so on. Returns a policy that always returns false when n == 0.
Because the policy compares versionBefore/n with versionAfter/n, it fires at most once per multiple of n regardless of batch size — a single Save that advances from v=49 to v=210 still triggers only one snapshot, not two.
type SnapshotStore ¶
type SnapshotStore interface {
// Save persists snap, replacing any earlier snapshot for the
// same stream.
Save(ctx context.Context, snap RawSnapshot) error
// Latest returns the most recent snapshot for stream. The bool
// is false when no snapshot has been saved for stream; that is
// not an error.
Latest(ctx context.Context, stream StreamID) (RawSnapshot, bool, error)
}
SnapshotStore persists and retrieves aggregate state checkpoints. Unlike EventStore, snapshots are not append-only: [SnapshotStore.Save] replaces any prior snapshot for the same stream.
type Snapshotter ¶
type Snapshotter interface {
// SnapshotType returns the type name registered with the codec
// [Registry] for the snapshot's state value. By convention this
// includes a version suffix (e.g. "counter.snapshot.v1") so
// schema evolution is explicit.
SnapshotType() string
// Snapshot returns a typed view of the aggregate's domain state.
// The returned value is passed through the codec registered for
// [SnapshotType].
Snapshot() (state any, err error)
// Restore populates the aggregate from a previously taken
// snapshot. The state argument is the value returned by the
// codec's Unmarshal. The [Repository] sets the aggregate's
// version (via [Aggregate.SetVersion]) after a successful
// Restore; Restore itself should not touch version state.
Restore(state any) error
}
Snapshotter is the optional capability an aggregate type implements to support snapshotting. When an aggregate satisfies Snapshotter, the Repository uses snapshots to skip event replay up to the snapshot's version on Load, and (when a SnapshotPolicy agrees) writes new snapshots after a successful Save.
Aggregates that do not implement Snapshotter still work with a Repository configured for snapshots; their Load path simply falls through to full event replay and their Save path skips the snapshot step.
type Source ¶ added in v0.6.0
type Source interface {
// Subscribe yields events across all streams in global append
// order, starting at the position past opts.From. See
// [EventStore.Subscribe] for the full contract.
Subscribe(ctx context.Context, opts SubscriptionOptions) iter.Seq2[RawEnvelope, error]
// SubscribeStream yields events for a single stream ordered by
// [RawEnvelope.Version]. See [EventStore.SubscribeStream] for the
// full contract.
SubscribeStream(ctx context.Context, stream StreamID, opts SubscriptionOptions) iter.Seq2[RawEnvelope, error]
}
Source is the minimum surface a [projection.Runner] needs from its input: a way to subscribe to events globally or per stream. Every EventStore satisfies Source by virtue of providing Subscribe and SubscribeStream; broker-backed adapters (NATS JetStream, Kafka, ...) implement Source directly without the rest of the store contract.
See ADR-0044 for the broker-delivered-projections design that motivates this narrowing.
type StreamID ¶
type StreamID string
StreamID is the storage-facing identity of an event stream. It is a plain string so that backends, indices, logs, and admin tools can move it around with zero cost.
Domain code is encouraged to keep typed identifiers in its own package (for example `type OrderID string`) and convert at the aggregate boundary:
type OrderID string
func (id OrderID) Stream() es.StreamID { return es.StreamID("order-" + string(id)) }
type StreamNotFoundError ¶
type StreamNotFoundError struct {
Stream StreamID
}
StreamNotFoundError reports a load against an empty stream. It unwraps to ErrStreamNotFound.
func (*StreamNotFoundError) Error ¶
func (e *StreamNotFoundError) Error() string
func (*StreamNotFoundError) Unwrap ¶
func (*StreamNotFoundError) Unwrap() error
type SubscriptionOptions ¶
type SubscriptionOptions struct {
// From is the position to start at. For [EventStore.Subscribe]
// this is interpreted as a [RawEnvelope.GlobalPosition]; for
// [EventStore.SubscribeStream] it is a [RawEnvelope.Version]
// within the targeted stream. Events with the relevant
// position > From are yielded; 0 means start from the beginning.
From uint64
// Live, when true, blocks waiting for new events after yielding
// all existing events past From. When false, the iterator
// terminates cleanly once caught up.
Live bool
// Types, when non-empty, restricts delivery to events whose
// [RawEnvelope.Type] exactly matches one of the listed type names.
// An empty or nil slice delivers every type (the default).
//
// Filtering is a delivery concern only: it does not change position
// semantics. Yielded events keep their true GlobalPosition (or
// per-stream Version), so the delivered sequence may have gaps where
// filtered-out events fall. Resuming from a yielded event's position
// remains correct.
Types []string
}
SubscriptionOptions controls a [EventStore.Subscribe] or [EventStore.SubscribeStream] call.
type SystemClock ¶
type SystemClock struct{}
SystemClock is a Clock backed by time.Now. It is the default for constructors that take an optional clock.
type TypedCodec ¶
type TypedCodec[E any] interface { ContentType() ContentType Marshal(E) ([]byte, error) Unmarshal([]byte) (E, error) }
TypedCodec[E] is the strongly typed counterpart to EventCodec. Codec subpackages typically expose constructors that return TypedCodec[E] so users can write:
es.Register(reg, "order.placed", json.For[OrderPlaced]())
Register adapts a TypedCodec[E] into an EventCodec without reflection on the hot path.
type Upcaster ¶
Upcaster transforms a decoded payload of one event (or snapshot) type into the next version of that type, returning the new payload and the new type name. Upcasters compose through the Registry: when a payload is decoded on Load, Registry.Upcast applies every upcaster whose from-type matches the current type, in sequence, until no upcaster matches. The aggregate's Apply (or, for snapshots, Restore) sees the final upcasted shape.
User code does not construct Upcaster values directly — register typed upcasters through RegisterUpcaster, which takes care of the type-erasure on the hot path.
type UpcasterCycleError ¶
type UpcasterCycleError struct {
Chain []string
}
UpcasterCycleError is returned when the registered upcasters form a cycle or when [upcastMaxHops] is exceeded. Chain is the sequence of type names traversed, with the repeating type appended for visibility.
func (*UpcasterCycleError) Error ¶
func (e *UpcasterCycleError) Error() string
func (*UpcasterCycleError) Unwrap ¶
func (*UpcasterCycleError) Unwrap() error
type UpcasterTypeError ¶
UpcasterTypeError is returned when a registered upcaster receives a payload whose dynamic type does not match the In type the upcaster was registered with — typically a sign that the codec for FromType was reconfigured but the upcaster was not.
func (*UpcasterTypeError) Error ¶
func (e *UpcasterTypeError) Error() string
func (*UpcasterTypeError) Unwrap ¶
func (*UpcasterTypeError) Unwrap() error
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package commandbus routes named, byte-encoded commands to the typed es.Handler registered for them, so HTTP and gRPC transports can dispatch commands without writing a per-route adapter by hand.
|
Package commandbus routes named, byte-encoded commands to the typed es.Handler registered for them, so HTTP and gRPC transports can dispatch commands without writing a per-route adapter by hand. |
|
Package middleware provides built-in es.Middleware implementations for common cross-cutting concerns around command execution: per-aggregate locking, retry on transient errors, and so on.
|
Package middleware provides built-in es.Middleware implementations for common cross-cutting concerns around command execution: per-aggregate locking, retry on transient errors, and so on. |
|
Package process provides a thin wrapper for the process-manager pattern: an aggregate that consumes events from one or more streams and emits commands to drive a multi-step workflow.
|
Package process provides a thin wrapper for the process-manager pattern: an aggregate that consumes events from one or more streams and emits commands to drive a multi-step workflow. |
|
Package projection drives consumers of the event log via a Runner that subscribes to an es.Source (typically an es.EventStore, optionally a broker-backed Consumer), decodes events through a codec es.Registry, invokes a es.Projection, and (optionally) checkpoints progress to a es.CheckpointStore so consumers resume across restarts.
|
Package projection drives consumers of the event log via a Runner that subscribes to an es.Source (typically an es.EventStore, optionally a broker-backed Consumer), decodes events through a codec es.Registry, invokes a es.Projection, and (optionally) checkpoints progress to a es.CheckpointStore so consumers resume across restarts. |