Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OnAggregate ¶
func OnAggregate[C any, S any](svc *AggregateService[S], h AggregateHandler[C, S])
OnAggregate registers an aggregate handler on the service.
Types ¶
type AggregateHandler ¶
type AggregateHandler[C any, S any] struct { // Expected state: IsNew, IsExisting, or IsAny. Expected eventuous.ExpectedState // ID returns the entity identifier from the command. ID func(C) string // Act applies domain logic to the aggregate. Events are recorded via agg.Apply(). Act func(ctx context.Context, agg *aggregate.Aggregate[S], cmd C) error }
AggregateHandler defines how to process a command using an aggregate.
type AggregateService ¶
type AggregateService[S any] struct { // contains filtered or unexported fields }
AggregateService handles commands using aggregates. S is the state type held by the aggregate.
func NewAggregateService ¶
func NewAggregateService[S any]( reader store.EventReader, writer store.EventWriter, fold func(S, any) S, zero S, ) *AggregateService[S]
NewAggregateService creates an aggregate-based command service.
func (*AggregateService[S]) Handle ¶
Handle dispatches a command to its registered handler. Pipeline:
- Look up handler by reflect.TypeOf(command)
- Get entity ID from handler.id(command)
- Build stream name: "{StateTypeName}-{id}"
- Load aggregate from store using store.LoadAggregate
- For IsNew: call agg.EnsureNew(). For IsExisting: call agg.EnsureExists().
- Call handler.act(ctx, agg, command)
- If no changes on aggregate, return current state (no-op)
- Store aggregate using store.StoreAggregate
- Return Result[S] with updated state
type CommandHandler ¶
type CommandHandler[S any] interface { Handle(ctx context.Context, command any) (*Result[S], error) }
CommandHandler is the interface for command handling, enabling decorators (e.g., OTel).
type Handler ¶
type Handler[C any, S any] struct { // Expected state: IsNew, IsExisting, or IsAny. Expected eventuous.ExpectedState // Stream returns the stream name for this command. Stream func(C) eventuous.StreamName // Act is a pure function: given current state and the command, return new events. Act func(ctx context.Context, state S, cmd C) ([]any, error) }
Handler defines how to process a specific command type.
type Service ¶
type Service[S any] struct { // contains filtered or unexported fields }
Service handles commands by loading state, executing a handler, and storing new events.
func New ¶
func New[S any]( reader store.EventReader, writer store.EventWriter, fold func(S, any) S, zero S, ) *Service[S]
New creates a functional command service.
func (*Service[S]) Handle ¶
Handle dispatches a command to its registered handler. The pipeline:
- Look up handler by command type
- Get stream name from handler
- LoadState from store (using handler's Expected state)
- Call handler.Act(ctx, state, command) → new events
- If no new events, return current state (no-op)
- Append new events to store (each as NewStreamEvent with uuid.New())
- Fold new events into state for the result
- Return Result[S]