commandbus

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package commandbus routes named, byte-encoded commands to the typed es.Handler registered for them, so HTTP and gRPC transports can dispatch commands without writing a per-route adapter by hand.

The bus sits on top of es.Execute: registration captures the command and aggregate types at startup, and dispatch crosses the dynamic boundary exactly once — at the codec's Unmarshal — before calling es.Execute with statically typed arguments. Middleware configured on the repository (per-aggregate locking, retry, …) wraps each dispatched command identically to a direct es.Execute call; the bus adds no wrapping of its own. See ADR-0028.

Commands implement the one-method Command interface so they can carry their own target stream id:

type PlaceOrder struct {
    OrderID string  `json:"order_id"`
    Items   []Item  `json:"items"`
}
func (c PlaceOrder) AggregateID() es.StreamID {
    return es.StreamID(c.OrderID)
}

Wiring:

bus := commandbus.New()
commandbus.Register(bus, "order.place",
    orderRepo, PlaceHandler, jsoncodec.For[PlaceOrder]())

// In the transport handler:
if err := bus.Dispatch(ctx, name, body); err != nil {
    switch {
    case errors.Is(err, commandbus.ErrUnknownCommand):
        // 404 — no such command
    case errors.Is(err, commandbus.ErrDecode):
        // 400 — malformed body
    case errors.Is(err, es.ErrConflict):
        // 409 — optimistic concurrency
    default:
        // 5xx — handler failed
    }
}

To propagate causation, correlation, and metadata into the events the command produces, wrap ctx with es.WithCausation, es.WithCorrelation, and es.WithMetadata before calling Dispatch. The Repository reads them off the context inside Save with no bus involvement.

Index

Constants

This section is empty.

Variables

View Source
var ErrDecode = errors.New("synapse: command decode failed")

ErrDecode is the sentinel returned (via DecodeError) when the command's codec fails to decode the payload. Use errors.Is to classify it — typically a transport maps it to "bad request" (HTTP 400). Use errors.As with *DecodeError (or errors.Unwrap) to recover the underlying codec error.

View Source
var ErrPanic = errors.New("synapse: command handler panicked")

ErrPanic is the sentinel returned (via PanicError) when the Recover middleware catches a panic from a handler or any layer below it. Use errors.Is to classify the failure; use errors.As with *PanicError to recover the recovered value and stack trace.

View Source
var ErrUnknownCommand = errors.New("synapse: command not registered")

ErrUnknownCommand is the sentinel returned (via UnknownCommandError) when Bus.Dispatch is called with a name that has not been registered. Use errors.Is to classify the failure — typically a transport maps it to "no such route" (HTTP 404).

Functions

func Register

func Register[C Command, A es.Aggregate](
	b *Bus,
	name string,
	repo *es.Repository[A],
	h es.Handler[C, A],
	codec es.TypedCodec[C],
)

Register binds name to a typed handler. The command type C must implement Command; the aggregate type A must implement es.Aggregate. codec decodes payload bytes into a C; the closure then reads the target stream from c.AggregateID() and calls es.Execute, which applies the repository's middleware chain.

Register panics at startup if name is already registered, repo is nil, or codec is nil — these are programmer errors that surface only at dispatch time otherwise. See ADR-0028 for the rationale on panicking (vs. es.Registry's silent last-wins).

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus routes named, byte-encoded commands to typed es.Handler implementations through their es.Repository (and, transitively, through the repository's middleware chain).

A Bus is safe for concurrent use. Registration is expected at startup; Bus.Dispatch runs concurrently. See ADR-0028.

func New

func New(opts ...Option) *Bus

New returns an empty Bus ready to be populated with Register. Any middleware supplied via WithMiddleware is composed once here; per- Dispatch work is one chain call.

func (*Bus) Dispatch

func (b *Bus) Dispatch(ctx context.Context, name string, payload []byte) error

Dispatch decodes payload into the command registered under name and executes it against the registered repository. The target stream is taken from the decoded command's [Command.AggregateID] method.

Dispatch returns:

func (*Bus) Names

func (b *Bus) Names() []string

Names returns the command names currently registered, in unspecified order. The returned slice is a fresh copy and may be retained.

type Command

type Command interface {
	AggregateID() es.StreamID
}

Command is implemented by every command type registered with a Bus. Bus.Dispatch decodes the payload, then reads the target stream from the command itself — keeping commands self-contained and freeing transports from extracting the id separately. See ADR-0028.

type DecodeError

type DecodeError struct {
	Name string
	Err  error
}

DecodeError carries the offending command name and the underlying codec error. It unwraps to both ErrDecode and the wrapped error so callers can match either via errors.Is.

func (*DecodeError) Error

func (e *DecodeError) Error() string

func (*DecodeError) Unwrap

func (e *DecodeError) Unwrap() []error

type Middleware

type Middleware func(next Operation) Operation

Middleware wraps an Operation to add behavior before, after, or around dispatch — for example logging, panic recovery, or per-call deadlines. Middlewares compose left-to-right as outer wrappers: WithMiddleware(A, B, C) yields A wrapping B wrapping C wrapping the core lookup. See ADR-0029.

Bus middleware operates at the dispatch boundary (ctx, name, []byte) and is orthogonal to es.Middleware, which operates at the aggregate boundary (ctx, es.StreamID) and lives on the es.Repository (ADR-0012). The two compose without ever double-wrapping.

func Logging

func Logging(logger *slog.Logger) Middleware

Logging returns a Middleware that records every dispatch on logger. Successful dispatches log at slog.LevelDebug with command (name) and duration attributes; failed dispatches log at slog.LevelWarn and include the err attribute. A nil logger falls back to slog.Default.

Debug is the default success level so production logs stay quiet by default — point a Debug-enabled handler at this if you want a full command audit log.

func Recover

func Recover() Middleware

Recover returns a Middleware that recovers panics from any layer below it and returns them as *PanicError (wrapping ErrPanic). Compose it as an outer wrapper (e.g. earlier in WithMiddleware) so the panic is caught before any inner middleware sees it as a normal return.

func Timeout

func Timeout(d time.Duration) Middleware

Timeout returns a Middleware that wraps the dispatch context with context.WithTimeout for d. Useful when the transport does not already impose a deadline. d <= 0 disables the timeout.

type Operation

type Operation func(ctx context.Context, name string, payload []byte) error

Operation is the type-erased shape of Bus.Dispatch. It is what Middleware wraps. See WithMiddleware and ADR-0029.

type Option

type Option func(*options)

Option configures a Bus at construction time.

func WithMiddleware

func WithMiddleware(mws ...Middleware) Option

WithMiddleware installs bus-level middleware that wraps every Bus.Dispatch call. Successive calls accumulate; middlewares compose left-to-right as outer wrappers, matching es.WithMiddleware's convention (ADR-0012). See ADR-0029 for the built-ins and how bus middleware composes with repository middleware.

type PanicError

type PanicError struct {
	Name  string
	Value any
	Stack []byte
}

PanicError carries the command name, the recovered panic value, and the stack trace captured at recovery time. It wraps ErrPanic.

func (*PanicError) Error

func (e *PanicError) Error() string

func (*PanicError) Unwrap

func (*PanicError) Unwrap() error

type UnknownCommandError

type UnknownCommandError struct {
	Name string
}

UnknownCommandError carries the offending name. It wraps ErrUnknownCommand.

func (*UnknownCommandError) Error

func (e *UnknownCommandError) Error() string

func (*UnknownCommandError) Unwrap

func (*UnknownCommandError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL