models

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: MIT Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound            = errors.New("asynx: aggregate not found")
	ErrValidation          = errors.New("asynx: validation failed")
	ErrPipelineFailed      = errors.New("asynx: pipeline failed")
	ErrQueueFull           = errors.New("asynx: queue full")
	ErrShuttingDown        = errors.New("asynx: shutting down")
	ErrAlreadyShuttingDown = errors.New("asynx: already shutting down")
	ErrContextCancelled    = errors.New("asynx: context cancelled")
	ErrBusClosed           = errors.New("asynx: bus closed")
	ErrNilHandler          = errors.New("asynx: handler is nil")
	ErrEmptyPattern        = errors.New("asynx: pattern is empty")
	ErrMissingEventStore   = errors.New("asynx: event store is required")
	ErrForgetFailed        = errors.New("asynx: forget failed")
	ErrDispatcherClosed    = errors.New("asynx: dispatcher closed")
)

Functions

This section is empty.

Types

type Bus

type Bus[T any] interface {
	// Publish is called only after the event is durably written to the store.
	// A non-nil error means dispatch failed, not that the event was lost.
	Publish(
		ctx context.Context,
		event Event[T],
	) error

	// PublishSync fires all matching handlers synchronously — it blocks until every
	// handler goroutine triggered by this specific event has completed. It does NOT
	// wait for handlers from other concurrent events.
	PublishSync(
		ctx context.Context,
		event Event[T],
	) error

	Subscribe(
		pattern string,
		handler ProjectionHandler[T],
		opts ...SubscriptionOpt[T],
	) (string, error)

	Unsubscribe(
		id string,
	) error

	Close(
		ctx context.Context,
	) error

	// WaitForHandlers blocks until all in-flight handler executions complete.
	// Only for use in tests; do not call in production code.
	WaitForHandlers()
}

type Command

type Command[T any] interface {
	AggregateID() string
	EventName() string
	ShouldSnapshot() bool

	// Validate receives nil current when the aggregate has never existed.
	Validate(
		current *T,
	) error

	// EmitEvent receives nil current when the aggregate has never existed.
	EmitEvent(
		current *T,
	) T
}

Command defines the contract for aggregate mutations. Implementations must be pure — no IO, side effects, or randomness.

type Event

type Event[T any] struct {
	ID                string
	AggregateID       string
	EventName         string
	Version           int64
	SchemaVersion     int
	OccurredAt        time.Time
	Aggregate         T
	PreviousAggregate T
}

type ForgetHandler added in v0.4.0

type ForgetHandler[T any] func(context.Context, Event[T])

ForgetHandler is called when an aggregate is forgotten. It receives the tombstone event; Event.Aggregate holds the aggregate's last known state.

type PanicHandler

type PanicHandler[T any] func(
	context.Context,
	Event[T],
	any,
)

PanicHandler is called when a ProjectionHandler panics. It receives the same context that was passed to Publish.

type ProjectionHandler

type ProjectionHandler[T any] func(
	context.Context,
	Event[T],
)

ProjectionHandler is a subscription callback that receives the publishing context and the event that triggered it.

type PublishErrorHandler added in v0.3.1

type PublishErrorHandler[T any] func(
	context.Context,
	Event[T],
	error,
)

PublishErrorHandler is called when Bus.Publish returns a non-nil error inside an async publish goroutine. The event has already been durably written to the event store; this callback is for observability only. When nil, publish errors are silently dropped.

type Store

type Store interface {
	// Append enforces (aggregateID, version) uniqueness — the sole coordination
	// mechanism for correctness across concurrent writers.
	Append(
		ctx context.Context,
		aggregateID string,
		version int64,
		data []byte,
	) error

	ReadFrom(
		ctx context.Context,
		aggregateID string,
		fromVersion int64,
	) ([][]byte, error)

	ReadRange(
		ctx context.Context,
		aggregateID string,
		fromVersion int64,
		count int64,
	) ([][]byte, error)

	// Count returns the number of entries with version >= fromVersion.
	Count(
		ctx context.Context,
		aggregateID string,
		fromVersion int64,
	) (int64, error)

	// Delete removes all records for the given aggregateID.
	// Idempotent — deleting a non-existent aggregateID is not an error.
	Delete(
		ctx context.Context,
		aggregateID string,
	) error
}

type SubscriptionConfig

type SubscriptionConfig[T any] struct {
	Fallback ProjectionHandler[T]
	Timeout  time.Duration
}

type SubscriptionOpt

type SubscriptionOpt[T any] func(*SubscriptionConfig[T])

func WithFallback

func WithFallback[T any](
	handler ProjectionHandler[T],
) SubscriptionOpt[T]

WithFallback registers a secondary handler invoked when the primary panics.

func WithHandlerTimeout

func WithHandlerTimeout[T any](
	d time.Duration,
) SubscriptionOpt[T]

WithHandlerTimeout sets the maximum duration a subscription handler may run.

type TimeoutHandler

type TimeoutHandler[T any] func(
	context.Context,
	Event[T],
	time.Duration,
)

TimeoutHandler is called when a ProjectionHandler exceeds its timeout. It receives the same context that was passed to Publish.

type Upcaster

type Upcaster func(
	ctx context.Context,
	eventName string,
	patches []byte,
) ([]byte, error)

Upcaster transforms raw patch bytes for a single schema version step. It receives the publishing context, the event name, and the JSON-encoded RFC 6902 patch array for the event at SchemaVersion v. It must return JSON-encoded patches compatible with SchemaVersion v+1.

Upcasters must be stateless and safe for concurrent use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL