Documentation
¶
Index ¶
- func Topic(pattern string) string
- type Asynx
- type Builder
- func (b *Builder[T]) Build() (Asynx[T], error)
- func (b *Builder[T]) WithBus(bus models.Bus[T]) *Builder[T]
- func (b *Builder[T]) WithCorruptionHook(fn func(error)) *Builder[T]
- func (b *Builder[T]) WithEventStore(s models.Store) *Builder[T]
- func (b *Builder[T]) WithForgetHandler(fn models.ForgetHandler[T]) *Builder[T]
- func (b *Builder[T]) WithPanicHandler(fn models.PanicHandler[T]) *Builder[T]
- func (b *Builder[T]) WithPublishErrorHandler(fn models.PublishErrorHandler[T]) *Builder[T]
- func (b *Builder[T]) WithSchemaVersion(v int) *Builder[T]
- func (b *Builder[T]) WithShardingOpts(opts ShardingOpts) *Builder[T]
- func (b *Builder[T]) WithSnapshotStore(s models.Store) *Builder[T]
- func (b *Builder[T]) WithUpcaster(fromVersion int, fn models.Upcaster) *Builder[T]
- type ShardingOpts
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Topic ¶ added in v0.6.0
Topic converts a human-readable event pattern into an anchored Go regex string.
Pattern format: {{aggregate}}.{{action}}.{{id}}
Segments are split with strings.SplitN(pattern, ".", 3) — the third segment (id) may contain dots and other regex metacharacters.
Rules:
- Trailing * → .* (greedy — id segment can contain dots)
- Middle * → [^.]+ (segment-bounded — aggregate/action have no dots)
- Literal segments → regexp.QuoteMeta
- Result is anchored: ^...$
Types ¶
type Asynx ¶
type Asynx[T any] interface { // Send validates and persists the command, then returns the resulting Event[T] // once it is durably written. Projection handlers are fired asynchronously — // when Send returns, handlers may not have run yet. // Use SendWait when the caller needs projections to be up to date before proceeding. Send( ctx context.Context, cmd models.Command[T], ) (models.Event[T], error) // SendWait behaves like Send but additionally blocks until all matching // projection handlers have completed. When SendWait returns without error, // the event is persisted and every projection subscribed to it has finished. // Use Send when projection consistency is not required at the call site. SendWait( ctx context.Context, cmd models.Command[T], ) (models.Event[T], error) // Forget writes a tombstone event for the aggregate, notifies all ForgetHandlers // synchronously, then erases all events, snapshots, and cached state. // Returns ErrValidation if the aggregate does not exist. Forget( ctx context.Context, aggregateID string, ) error // OnForget registers a handler invoked when any aggregate is forgotten. // The handler receives the tombstone event; Event.Aggregate holds the last known state. // Returns a subscription ID that can be passed to Unsubscribe. OnForget( fn models.ForgetHandler[T], ) (string, error) Shutdown( ctx context.Context, ) error Get( ctx context.Context, aggregateID string, ) (T, error) Exists( ctx context.Context, aggregateID string, ) (bool, error) Preload( ctx context.Context, aggregateID string, ) error Subscribe( pattern string, handler models.ProjectionHandler[T], opts ...models.SubscriptionOpt[T], ) (string, error) Unsubscribe( id string, ) error // Listen opens a channel-based subscription for events matching the given pattern. // The pattern is converted via Topic() internally. // // count > 0: the channel has capacity equal to count, auto-closes and // auto-unsubscribes after count events are received. // count == 0: unbounded — the channel has a default capacity of 16; it never // auto-closes. The caller must call the returned unsubscribe func // to clean up. Note: unsub() does NOT close the channel in // unbounded mode; do not range over the channel after calling unsub. // count < 0: treated the same as count == 0 (unbounded, capacity 16). // // The returned unsubscribe func is idempotent. // Returns ErrEmptyPattern if pattern is empty. Listen( pattern string, count int, ) (<-chan models.Event[T], func(), error) // SubscribeWait blocks until the first event matching pattern arrives or ctx // is cancelled. The pattern is converted via Topic() internally (through Listen). // // Returns the matching event, or ctx.Err() if the context is cancelled or its // deadline is exceeded. Auto-unsubscribes in all cases. // // To bound the wait time, pass a context with a deadline (e.g., // context.WithTimeout(parent, 5*time.Second)); pass context.Background() to // wait indefinitely. // // For subscribe-before-act ordering (subscribe → check state → send command → // then read), use Listen directly. SubscribeWait( ctx context.Context, pattern string, ) (models.Event[T], error) Replay( ctx context.Context, aggregateID string, fromVersion int64, toVersion int64, fn models.ProjectionHandler[T], ) error WaitPublish() }
type Builder ¶
type Builder[T any] struct { // contains filtered or unexported fields }
func (*Builder[T]) WithCorruptionHook ¶
WithCorruptionHook registers a callback invoked when a snapshot cannot be deserialized. The hook receives the deserialization error and is called before falling back to the cold replay path.
func (*Builder[T]) WithEventStore ¶
func (*Builder[T]) WithForgetHandler ¶ added in v0.4.0
func (b *Builder[T]) WithForgetHandler(fn models.ForgetHandler[T]) *Builder[T]
WithForgetHandler registers a ForgetHandler at build time. Equivalent to calling OnForget after Build. Multiple calls register multiple handlers.
func (*Builder[T]) WithPanicHandler ¶
func (b *Builder[T]) WithPanicHandler( fn models.PanicHandler[T], ) *Builder[T]
func (*Builder[T]) WithPublishErrorHandler ¶ added in v0.3.1
func (b *Builder[T]) WithPublishErrorHandler(fn models.PublishErrorHandler[T]) *Builder[T]
WithPublishErrorHandler sets a callback invoked when Bus.Publish returns a non-nil error inside an async publish goroutine. The event has already been durably written to the event store; this callback is for observability only. When not set, publish errors are silently dropped.
func (*Builder[T]) WithSchemaVersion ¶
func (*Builder[T]) WithShardingOpts ¶
func (b *Builder[T]) WithShardingOpts( opts ShardingOpts, ) *Builder[T]
func (*Builder[T]) WithSnapshotStore ¶
WithSnapshotStore sets a dedicated snapshot store. Defaults to the event store when not provided.
type ShardingOpts ¶
ShardingOpts configures the processor's worker pool. Shards defaults to 8; QueueDepth defaults to 0 (unbounded).
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
bus
ChannelBus is an in-process event bus that dispatches events to subscribers via goroutines.
|
ChannelBus is an in-process event bus that dispatches events to subscribers via goroutines. |
|
commands
Package commands holds internal Asynx commands that are dispatched by the library itself rather than by application code.
|
Package commands holds internal Asynx commands that are dispatched by the library itself rather than by application code. |
|
eventstore
Package eventstore provides the persistence layer for Asynx aggregates.
|
Package eventstore provides the persistence layer for Asynx aggregates. |
|
processor
Package processor coordinates command routing and execution lifecycle for Asynx.
|
Package processor coordinates command routing and execution lifecycle for Asynx. |
|
processor/exec
Package exec implements the command execution pipeline for Asynx.
|
Package exec implements the command execution pipeline for Asynx. |
|
processor/models
Package models defines data structures shared across processor sub-packages.
|
Package models defines data structures shared across processor sub-packages. |
|
processor/pool
Package pool implements shard-based concurrent command execution.
|
Package pool implements shard-based concurrent command execution. |
|
processor/queue
Package queue implements consistent shard routing for Asynx commands.
|
Package queue implements consistent shard routing for Asynx commands. |
|
store
Package store provides in-memory implementations of models.Store for use in tests.
|
Package store provides in-memory implementations of models.Store for use in tests. |