asynx

package module
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: MIT Imports: 12 Imported by: 0

README

asynx

Validate early, commit confidently. Event sourcing + CQRS for Go with command-first validation and automatic event diffs.

Install

go get github.com/char2cs/asynx

Quick Start

Define your aggregate:

type Order struct {
	ID     string
	Status string
	Total  float64
}

Implement a command with validation:

type CreateOrderCmd struct {
	ID    string
	Total float64
}

func (c CreateOrderCmd) AggregateID() string { return c.ID }
func (c CreateOrderCmd) EventName() string   { return "OrderCreated" }
func (c CreateOrderCmd) ShouldSnapshot() bool { return false }

// Validation runs before any events are written
func (c CreateOrderCmd) Validate(current *Order) error {
	if c.Total <= 0 {
		return models.ErrValidation // Fails fast
	}
	if current != nil {
		return models.ErrValidation // Already exists
	}
	return nil
}

// Pure function: transform old state → new state
func (c CreateOrderCmd) EmitEvent(current *Order) Order {
	return Order{ID: c.ID, Status: "Pending", Total: c.Total}
}

Initialize and send commands:

ax, _ := asynx.New[Order]().
	WithEventStore(memStore).
	Build()

err := ax.Send(ctx, CreateOrderCmd{ID: "ORD-001", Total: 99.99})
if err == models.ErrValidation {
	// Invalid command rejected before event creation
}

// Read current state (replayed from events)
order, _ := ax.Get(ctx, "ORD-001")

Key Concepts

Commands

Immutable mutation requests. Implement Command[T] with Validate() (fails fast) and EmitEvent() (pure).

// Invalid command rejected before event creation
type UpdateOrderCmd struct {
	OrderID   string
	NewStatus string
}

func (c UpdateOrderCmd) Validate(current *Order) error {
	if current == nil {
		return models.ErrNotFound
	}
	if current.Status == "Shipped" {
		return models.ErrValidation // Can't update shipped orders
	}
	return nil
}

func (c UpdateOrderCmd) EmitEvent(current *Order) Order {
	order := *current
	order.Status = c.NewStatus
	return order
}
Events

Automatic RFC 6902 JSON patches computed from state changes. Durably stored before publication.

// Event contains both states
event.PreviousAggregate // Order{Status: "Pending"}
event.Aggregate         // Order{Status: "Confirmed"}
event.Version           // int64 (total changes for this aggregate)
event.EventName         // "OrderConfirmed"
event.OccurredAt        // timestamp
Aggregates

Your domain model — any Go struct. Asynx replays events on read; you define initial state in EmitEvent() when current is nil.

// Asynx replays all events and reconstructs state
order, err := ax.Get(ctx, "ORD-001")
// Returns latest state from all OrderCreated/OrderConfirmed/OrderShipped events
Projections

Decoupled event subscribers. Can fail independently without affecting the event store.

// Projection fails: event still safely stored
ax.Subscribe("Order.*", func(ctx context.Context, event models.Event[Order]) {
	// Can panic, timeout, or fail — events remain durable
	saveToDatabase(event) // Even if this fails
})
Store

Persistent event backend. Bring your own (PostgreSQL, DynamoDB) or use in-memory for testing.

Bus

Event publication after durably writing to store. In-process by default; can be replaced.

Projections

Subscribe to events with regex patterns. Subscriptions are decoupled from command handling.

Basic subscription:

// Match events by name (regex)
id, err := ax.Subscribe("Order.*", func(ctx context.Context, event models.Event[Order]) {
	// Called after event is durably stored
	fmt.Printf("%s: %s\n", event.EventName, event.Aggregate.ID)
})

Specific event patterns:

// Only OrderShipped events
ax.Subscribe("OrderShipped", handler)

// OrderCreated or OrderConfirmed
ax.Subscribe("Order(Created|Confirmed)", handler)

Fallback handler (failure resilience):

// If primary panics, fallback runs instead
ax.Subscribe("Order.*",
	primaryHandler,
	models.WithFallback[Order](fallbackHandler),
)

Timeouts:

// Handler must complete within 5 seconds
ax.Subscribe("Order.*",
	handler,
	models.WithHandlerTimeout[Order](5 * time.Second),
)

Access event details:

handler := func(ctx context.Context, event models.Event[Order]) {
	event.AggregateID         // "ORD-001"
	event.EventName           // "OrderConfirmed"
	event.Version             // Total changes for this aggregate
	event.Aggregate           // New state
	event.PreviousAggregate   // State before this event
	event.OccurredAt          // Timestamp
}

Unsubscribe:

subID, _ := ax.Subscribe("Order.*", handler)
ax.Unsubscribe(subID) // Stop receiving events

Schema Evolution

Handle breaking changes without data migration.

Scenario: Adding a field to your aggregate:

// v1: type Order struct { ID, Total }
// v2: type Order struct { ID, Total, Status }
// Want: OrderCreated events from v1 get Status="Pending" automatically

Register upcasters for version transitions:

ax := asynx.New[Order]().
	WithEventStore(store).
	WithSchemaVersion(2).                    // Current schema version
	WithUpcaster(1, upcastOrderV1toV2).      // v1→v2 transformation
	WithUpcaster(2, upcastOrderV2toV3).      // v2→v3 transformation
	Build()

Implement an upcaster (transforms RFC 6902 patches):

func upcastOrderV1toV2(ctx context.Context, eventName string, patches []byte) ([]byte, error) {
	// patches is JSON RFC 6902 patch array
	// Add Status="Pending" to OrderCreated events
	if eventName == "OrderCreated" {
		// Parse, add Status op, return updated patches
		return addStatusPatch(patches)
	}
	return patches, nil
}

How it works:

  • Old events stored with v1 patches → upcasted to v2 on replay
  • New commands use v2 EmitEvent() → generate v2 patches
  • Applied during both event reads and projection subscription
  • No data migration needed

Builder Configuration

Method Purpose Default Required
WithEventStore(Store) Persistent event backend Yes
WithSnapshotStore(Store) Dedicated snapshot backend Use event store No
WithBus(Bus[T]) Custom event publisher In-process channel bus No
WithShardingOpts(opts) Worker concurrency 8 shards, unbounded queue No
WithSchemaVersion(int) Current aggregate schema 1 No
WithUpcaster(from, fn) Schema version migration No
WithPanicHandler(fn) Projection panic handler Log and continue No
WithCorruptionHook(fn) Snapshot corruption hook Log and replay No

Configure concurrency:

opts := asynx.ShardingOpts{
	Shards:     16,   // More goroutines for high throughput
	QueueDepth: 1000, // Bounded queue (0 = unbounded)
}

ax, _ := asynx.New[Order]().
	WithEventStore(store).
	WithShardingOpts(opts).
	Build()

Handle projection panics:

ax, _ := asynx.New[Order]().
	WithEventStore(store).
	WithPanicHandler(func(ctx context.Context, event models.Event[Order], p any) {
		// Custom panic handling: alert, log, etc.
		log.Printf("Projection panicked on %s: %v", event.EventName, p)
	}).
	Build()

Handle snapshot corruption:

ax, _ := asynx.New[Order]().
	WithEventStore(store).
	WithSnapshotStore(snapshotStore).
	WithCorruptionHook(func(err error) {
		// Called when snapshot can't deserialize
		// Falls back to replaying events
		metrics.SnapshotCorruptions.Inc()
	}).
	Build()

Bring Your Own Store

Implement the Store interface for any persistent backend (PostgreSQL, DynamoDB, SQLite, etc.):

type Store interface {
	// Append must enforce (aggregateID, version) uniqueness
	// This is the only synchronization needed for correctness
	Append(ctx context.Context, aggregateID string, version int64, data []byte) error

	// ReadFrom returns all events starting from version
	ReadFrom(ctx context.Context, aggregateID string, fromVersion int64) ([][]byte, error)

	// ReadRange returns up to count events (for snapshots)
	ReadRange(ctx context.Context, aggregateID string, fromVersion int64, count int64) ([][]byte, error)

	// Count returns number of events since version
	Count(ctx context.Context, aggregateID string, fromVersion int64) (int64, error)
}

Implement it for PostgreSQL:

type PostgresStore struct {
	db *sql.DB
}

func (s *PostgresStore) Append(ctx context.Context, aggID string, v int64, data []byte) error {
	_, err := s.db.ExecContext(ctx,
		"INSERT INTO events (aggregate_id, version, data) VALUES ($1, $2, $3)",
		aggID, v, data,
	)
	// Postgres UNIQUE(aggregate_id, version) enforces atomically
	return err
}

func (s *PostgresStore) ReadFrom(ctx context.Context, aggID string, fromV int64) ([][]byte, error) {
	rows, _ := s.db.QueryContext(ctx,
		"SELECT data FROM events WHERE aggregate_id=$1 AND version>=$2 ORDER BY version",
		aggID, fromV,
	)
	// ... scan and return patches
}

Use your store:

pgStore := &PostgresStore{db: pgConn}

ax, _ := asynx.New[Order]().
	WithEventStore(pgStore).
	WithSnapshotStore(pgStore).  // Optional: separate snapshot backend
	Build()

For testing, use in-memory store:

import "github.com/char2cs/asynx/internal/store"

ax, _ := asynx.New[Order]().
	WithEventStore(store.New()).  // Fast, no persistence
	Build()

Error Handling

Common error types returned by Asynx:

Error When Action
ErrValidation Validate() returns an error Command rejected; state unchanged
ErrNotFound Get() or Replay() on missing aggregate Check aggregate ID
ErrQueueFull Send exceeds buffer (with QueueDepth) Retry or increase capacity
ErrShuttingDown Send after Shutdown() called Wait for shutdown to complete
ErrContextCancelled Context cancelled during operation Check caller context
ErrMissingEventStore Build() without WithEventStore() Provide an event store

See github.com/char2cs/asynx/models/errors.go for the full error list.

Asynx Interface

type Asynx[T any] interface {
	// Send executes a command
	Send(ctx context.Context, cmd models.Command[T]) error

	// Get reconstructs current aggregate state (replayed from events)
	Get(ctx context.Context, aggregateID string) (T, error)

	// Exists checks if an aggregate exists without full replay
	Exists(ctx context.Context, aggregateID string) (bool, error)

	// Preload caches an aggregate in memory for fast reads
	Preload(ctx context.Context, aggregateID string) error

	// Subscribe registers an event handler (async, non-blocking)
	Subscribe(pattern string, handler models.ProjectionHandler[T],
		opts ...models.SubscriptionOpt[T]) (string, error)

	// Unsubscribe stops an event handler
	Unsubscribe(id string) error

	// Replay runs a handler over a version range (for resync/migration)
	Replay(ctx context.Context, aggregateID string, fromVersion, toVersion int64,
		fn models.ProjectionHandler[T]) error

	// Shutdown gracefully stops command processing and event publication
	Shutdown(ctx context.Context) error

	// WaitPublish blocks until all async publishes complete (testing only)
	WaitPublish()
}

Typical usage patterns:

Send command and check for validation errors:

err := ax.Send(ctx, cmd)
if err == models.ErrValidation {
	// Invalid command — event not created
}

Read current state:

order, err := ax.Get(ctx, "ORD-001")
if err == models.ErrNotFound {
	// Aggregate doesn't exist
}

Preload for high-volume reads:

ax.Preload(ctx, "ORD-001") // Cache in memory
order, _ := ax.Get(ctx, "ORD-001") // Fast: from cache

Replay events for reporting:

ax.Replay(ctx, "ORD-001", 1, 10, func(ctx context.Context, event models.Event[Order]) {
	// Process events 1-10: can rebuild custom projections
})

Test synchronization:

ax.Send(ctx, cmd1)
ax.Send(ctx, cmd2)
ax.WaitPublish() // Block until all events published to projections
// Now safe to check projection side effects

Full Specification

For detailed design rationale, consistency guarantees, and advanced patterns, see the docs/.

License

See LICENSE.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Topic added in v0.6.0

func Topic(pattern string) string

Topic converts a human-readable event pattern into an anchored Go regex string.

Pattern format: {{aggregate}}.{{action}}.{{id}}

Segments are split with strings.SplitN(pattern, ".", 3) — the third segment (id) may contain dots and other regex metacharacters.

Rules:

  • Trailing * → .* (greedy — id segment can contain dots)
  • Middle * → [^.]+ (segment-bounded — aggregate/action have no dots)
  • Literal segments → regexp.QuoteMeta
  • Result is anchored: ^...$

Types

type Asynx

type Asynx[T any] interface {
	// Send validates and persists the command, then returns the resulting Event[T]
	// once it is durably written. Projection handlers are fired asynchronously —
	// when Send returns, handlers may not have run yet.
	// Use SendWait when the caller needs projections to be up to date before proceeding.
	Send(
		ctx context.Context,
		cmd models.Command[T],
	) (models.Event[T], error)

	// SendWait behaves like Send but additionally blocks until all matching
	// projection handlers have completed. When SendWait returns without error,
	// the event is persisted and every projection subscribed to it has finished.
	// Use Send when projection consistency is not required at the call site.
	SendWait(
		ctx context.Context,
		cmd models.Command[T],
	) (models.Event[T], error)

	// Forget writes a tombstone event for the aggregate, notifies all ForgetHandlers
	// synchronously, then erases all events, snapshots, and cached state.
	// Returns ErrValidation if the aggregate does not exist.
	Forget(
		ctx context.Context,
		aggregateID string,
	) error

	// OnForget registers a handler invoked when any aggregate is forgotten.
	// The handler receives the tombstone event; Event.Aggregate holds the last known state.
	// Returns a subscription ID that can be passed to Unsubscribe.
	OnForget(
		fn models.ForgetHandler[T],
	) (string, error)

	Shutdown(
		ctx context.Context,
	) error

	Get(
		ctx context.Context,
		aggregateID string,
	) (T, error)
	Exists(
		ctx context.Context,
		aggregateID string,
	) (bool, error)
	Preload(
		ctx context.Context,
		aggregateID string,
	) error

	Subscribe(
		pattern string,
		handler models.ProjectionHandler[T],
		opts ...models.SubscriptionOpt[T],
	) (string, error)
	Unsubscribe(
		id string,
	) error

	// Listen opens a channel-based subscription for events matching the given pattern.
	// The pattern is converted via Topic() internally.
	//
	// count > 0: the channel has capacity equal to count, auto-closes and
	//            auto-unsubscribes after count events are received.
	// count == 0: unbounded — the channel has a default capacity of 16; it never
	//             auto-closes. The caller must call the returned unsubscribe func
	//             to clean up. Note: unsub() does NOT close the channel in
	//             unbounded mode; do not range over the channel after calling unsub.
	// count < 0: treated the same as count == 0 (unbounded, capacity 16).
	//
	// The returned unsubscribe func is idempotent.
	// Returns ErrEmptyPattern if pattern is empty.
	Listen(
		pattern string,
		count int,
	) (<-chan models.Event[T], func(), error)

	// SubscribeWait blocks until the first event matching pattern arrives or ctx
	// is cancelled. The pattern is converted via Topic() internally (through Listen).
	//
	// Returns the matching event, or ctx.Err() if the context is cancelled or its
	// deadline is exceeded. Auto-unsubscribes in all cases.
	//
	// To bound the wait time, pass a context with a deadline (e.g.,
	// context.WithTimeout(parent, 5*time.Second)); pass context.Background() to
	// wait indefinitely.
	//
	// For subscribe-before-act ordering (subscribe → check state → send command →
	// then read), use Listen directly.
	SubscribeWait(
		ctx context.Context,
		pattern string,
	) (models.Event[T], error)

	Replay(
		ctx context.Context,
		aggregateID string,
		fromVersion int64,
		toVersion int64,
		fn models.ProjectionHandler[T],
	) error

	WaitPublish()
}

type Builder

type Builder[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any]() *Builder[T]

func (*Builder[T]) Build

func (b *Builder[T]) Build() (Asynx[T], error)

Build requires WithEventStore; all other options have defaults.

func (*Builder[T]) WithBus

func (b *Builder[T]) WithBus(
	bus models.Bus[T],
) *Builder[T]

func (*Builder[T]) WithCorruptionHook

func (b *Builder[T]) WithCorruptionHook(fn func(error)) *Builder[T]

WithCorruptionHook registers a callback invoked when a snapshot cannot be deserialized. The hook receives the deserialization error and is called before falling back to the cold replay path.

func (*Builder[T]) WithEventStore

func (b *Builder[T]) WithEventStore(
	s models.Store,
) *Builder[T]

func (*Builder[T]) WithForgetHandler added in v0.4.0

func (b *Builder[T]) WithForgetHandler(fn models.ForgetHandler[T]) *Builder[T]

WithForgetHandler registers a ForgetHandler at build time. Equivalent to calling OnForget after Build. Multiple calls register multiple handlers.

func (*Builder[T]) WithPanicHandler

func (b *Builder[T]) WithPanicHandler(
	fn models.PanicHandler[T],
) *Builder[T]

func (*Builder[T]) WithPublishErrorHandler added in v0.3.1

func (b *Builder[T]) WithPublishErrorHandler(fn models.PublishErrorHandler[T]) *Builder[T]

WithPublishErrorHandler sets a callback invoked when Bus.Publish returns a non-nil error inside an async publish goroutine. The event has already been durably written to the event store; this callback is for observability only. When not set, publish errors are silently dropped.

func (*Builder[T]) WithSchemaVersion

func (b *Builder[T]) WithSchemaVersion(
	v int,
) *Builder[T]

func (*Builder[T]) WithShardingOpts

func (b *Builder[T]) WithShardingOpts(
	opts ShardingOpts,
) *Builder[T]

func (*Builder[T]) WithSnapshotStore

func (b *Builder[T]) WithSnapshotStore(
	s models.Store,
) *Builder[T]

WithSnapshotStore sets a dedicated snapshot store. Defaults to the event store when not provided.

func (*Builder[T]) WithUpcaster

func (b *Builder[T]) WithUpcaster(
	fromVersion int,
	fn models.Upcaster,
) *Builder[T]

type ShardingOpts

type ShardingOpts struct {
	Shards     int
	QueueDepth int
}

ShardingOpts configures the processor's worker pool. Shards defaults to 8; QueueDepth defaults to 0 (unbounded).

Directories

Path Synopsis
internal
bus
ChannelBus is an in-process event bus that dispatches events to subscribers via goroutines.
ChannelBus is an in-process event bus that dispatches events to subscribers via goroutines.
commands
Package commands holds internal Asynx commands that are dispatched by the library itself rather than by application code.
Package commands holds internal Asynx commands that are dispatched by the library itself rather than by application code.
eventstore
Package eventstore provides the persistence layer for Asynx aggregates.
Package eventstore provides the persistence layer for Asynx aggregates.
processor
Package processor coordinates command routing and execution lifecycle for Asynx.
Package processor coordinates command routing and execution lifecycle for Asynx.
processor/exec
Package exec implements the command execution pipeline for Asynx.
Package exec implements the command execution pipeline for Asynx.
processor/models
Package models defines data structures shared across processor sub-packages.
Package models defines data structures shared across processor sub-packages.
processor/pool
Package pool implements shard-based concurrent command execution.
Package pool implements shard-based concurrent command execution.
processor/queue
Package queue implements consistent shard routing for Asynx commands.
Package queue implements consistent shard routing for Asynx commands.
store
Package store provides in-memory implementations of models.Store for use in tests.
Package store provides in-memory implementations of models.Store for use in tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL