command

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: Apache-2.0 Imports: 6 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func On

func On[C any, S any](svc *Service[S], h Handler[C, S])

On registers a command handler on the service.

func OnAggregate

func OnAggregate[C any, S any](svc *AggregateService[S], h AggregateHandler[C, S])

OnAggregate registers an aggregate handler on the service.

Types

type AggregateHandler

type AggregateHandler[C any, S any] struct {
	// Expected state: IsNew, IsExisting, or IsAny.
	Expected eventuous.ExpectedState

	// ID returns the entity identifier from the command.
	ID func(C) string

	// Act applies domain logic to the aggregate. Events are recorded via agg.Apply().
	Act func(ctx context.Context, agg *aggregate.Aggregate[S], cmd C) error
}

AggregateHandler defines how to process a command using an aggregate.

type AggregateService

type AggregateService[S any] struct {
	// contains filtered or unexported fields
}

AggregateService handles commands using aggregates. S is the state type held by the aggregate.

func NewAggregateService

func NewAggregateService[S any](
	reader store.EventReader,
	writer store.EventWriter,
	fold func(S, any) S,
	zero S,
) *AggregateService[S]

NewAggregateService creates an aggregate-based command service.

func (*AggregateService[S]) Handle

func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Result[S], error)

Handle dispatches a command to its registered handler. Pipeline:

  1. Look up handler by reflect.TypeOf(command)
  2. Get entity ID from handler.id(command)
  3. Build stream name: "{StateTypeName}-{id}"
  4. Load aggregate from store using store.LoadAggregate
  5. For IsNew: call agg.EnsureNew(). For IsExisting: call agg.EnsureExists().
  6. Call handler.act(ctx, agg, command)
  7. If no changes on aggregate, return current state (no-op)
  8. Store aggregate using store.StoreAggregate
  9. Return Result[S] with updated state

type CommandHandler

type CommandHandler[S any] interface {
	Handle(ctx context.Context, command any) (*Result[S], error)
}

CommandHandler is the interface for command handling, enabling decorators (e.g., OTel).

type Handler

type Handler[C any, S any] struct {
	// Expected state: IsNew, IsExisting, or IsAny.
	Expected eventuous.ExpectedState

	// Stream returns the stream name for this command.
	Stream func(C) eventuous.StreamName

	// Act is a pure function: given current state and the command, return new events.
	Act func(ctx context.Context, state S, cmd C) ([]any, error)
}

Handler defines how to process a specific command type.

type Result

type Result[S any] struct {
	State          S
	NewEvents      []any
	GlobalPosition uint64
	StreamVersion  int64
}

Result of a handled command.

type Service

type Service[S any] struct {
	// contains filtered or unexported fields
}

Service handles commands by loading state, executing a handler, and storing new events.

func New

func New[S any](
	reader store.EventReader,
	writer store.EventWriter,
	fold func(S, any) S,
	zero S,
) *Service[S]

New creates a functional command service.

func (*Service[S]) Handle

func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], error)

Handle dispatches a command to its registered handler. The pipeline:

  1. Look up handler by command type
  2. Get stream name from handler
  3. LoadState from store (using handler's Expected state)
  4. Call handler.Act(ctx, state, command) → new events
  5. If no new events, return current state (no-op)
  6. Append new events to store (each as NewStreamEvent with uuid.New())
  7. Fold new events into state for the result
  8. Return Result[S]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL