Documentation
¶
Overview ¶
Package commandbus routes named, byte-encoded commands to the typed es.Handler registered for them, so HTTP and gRPC transports can dispatch commands without writing a per-route adapter by hand.
The bus sits on top of es.Execute: registration captures the command and aggregate types at startup, and dispatch crosses the dynamic boundary exactly once — at the codec's Unmarshal — before calling es.Execute with statically typed arguments. Middleware configured on the repository (per-aggregate locking, retry, …) wraps each dispatched command identically to a direct es.Execute call; the bus adds no wrapping of its own. See ADR-0028.
Commands implement the one-method Command interface so they can carry their own target stream id:
type PlaceOrder struct {
OrderID string `json:"order_id"`
Items []Item `json:"items"`
}
func (c PlaceOrder) AggregateID() es.StreamID {
return es.StreamID(c.OrderID)
}
Wiring:
bus := commandbus.New()
commandbus.Register(bus, "order.place",
orderRepo, PlaceHandler, jsoncodec.For[PlaceOrder]())
// In the transport handler:
if err := bus.Dispatch(ctx, name, body); err != nil {
switch {
case errors.Is(err, commandbus.ErrUnknownCommand):
// 404 — no such command
case errors.Is(err, commandbus.ErrDecode):
// 400 — malformed body
case errors.Is(err, es.ErrConflict):
// 409 — optimistic concurrency
default:
// 5xx — handler failed
}
}
To propagate causation, correlation, and metadata into the events the command produces, wrap ctx with es.WithCausation, es.WithCorrelation, and es.WithMetadata before calling Dispatch. The Repository reads them off the context inside Save with no bus involvement.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrDecode = errors.New("synapse: command decode failed")
ErrDecode is the sentinel returned (via DecodeError) when the command's codec fails to decode the payload. Use errors.Is to classify it — typically a transport maps it to "bad request" (HTTP 400). Use errors.As with *DecodeError (or errors.Unwrap) to recover the underlying codec error.
var ErrPanic = errors.New("synapse: command handler panicked")
ErrPanic is the sentinel returned (via PanicError) when the Recover middleware catches a panic from a handler or any layer below it. Use errors.Is to classify the failure; use errors.As with *PanicError to recover the recovered value and stack trace.
var ErrUnknownCommand = errors.New("synapse: command not registered")
ErrUnknownCommand is the sentinel returned (via UnknownCommandError) when Bus.Dispatch is called with a name that has not been registered. Use errors.Is to classify the failure — typically a transport maps it to "no such route" (HTTP 404).
Functions ¶
func Register ¶
func Register[C Command, A es.Aggregate]( b *Bus, name string, repo *es.Repository[A], h es.Handler[C, A], codec es.TypedCodec[C], )
Register binds name to a typed handler. The command type C must implement Command; the aggregate type A must implement es.Aggregate. codec decodes payload bytes into a C; the closure then reads the target stream from c.AggregateID() and calls es.Execute, which applies the repository's middleware chain.
Register panics at startup if name is already registered, repo is nil, or codec is nil — these are programmer errors that surface only at dispatch time otherwise. See ADR-0028 for the rationale on panicking (vs. es.Registry's silent last-wins).
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus routes named, byte-encoded commands to typed es.Handler implementations through their es.Repository (and, transitively, through the repository's middleware chain).
A Bus is safe for concurrent use. Registration is expected at startup; Bus.Dispatch runs concurrently. See ADR-0028.
func New ¶
New returns an empty Bus ready to be populated with Register. Any middleware supplied via WithMiddleware is composed once here; per- Dispatch work is one chain call.
func (*Bus) Dispatch ¶
Dispatch decodes payload into the command registered under name and executes it against the registered repository. The target stream is taken from the decoded command's [Command.AggregateID] method.
Dispatch returns:
- *UnknownCommandError (wrapping ErrUnknownCommand) if name has no registered handler.
- *DecodeError (wrapping ErrDecode and the codec's error) if the payload fails to decode.
- any error returned by the handler or by es.Execute, including *es.ConflictError and *es.StreamNotFoundError, propagated verbatim so transports can classify them with errors.Is / errors.As.
type Command ¶
Command is implemented by every command type registered with a Bus. Bus.Dispatch decodes the payload, then reads the target stream from the command itself — keeping commands self-contained and freeing transports from extracting the id separately. See ADR-0028.
type DecodeError ¶
DecodeError carries the offending command name and the underlying codec error. It unwraps to both ErrDecode and the wrapped error so callers can match either via errors.Is.
func (*DecodeError) Error ¶
func (e *DecodeError) Error() string
func (*DecodeError) Unwrap ¶
func (e *DecodeError) Unwrap() []error
type Middleware ¶
Middleware wraps an Operation to add behavior before, after, or around dispatch — for example logging, panic recovery, or per-call deadlines. Middlewares compose left-to-right as outer wrappers: WithMiddleware(A, B, C) yields A wrapping B wrapping C wrapping the core lookup. See ADR-0029.
Bus middleware operates at the dispatch boundary (ctx, name, []byte) and is orthogonal to es.Middleware, which operates at the aggregate boundary (ctx, es.StreamID) and lives on the es.Repository (ADR-0012). The two compose without ever double-wrapping.
func Logging ¶
func Logging(logger *slog.Logger) Middleware
Logging returns a Middleware that records every dispatch on logger. Successful dispatches log at slog.LevelDebug with command (name) and duration attributes; failed dispatches log at slog.LevelWarn and include the err attribute. A nil logger falls back to slog.Default.
Debug is the default success level so production logs stay quiet by default — point a Debug-enabled handler at this if you want a full command audit log.
func Recover ¶
func Recover() Middleware
Recover returns a Middleware that recovers panics from any layer below it and returns them as *PanicError (wrapping ErrPanic). Compose it as an outer wrapper (e.g. earlier in WithMiddleware) so the panic is caught before any inner middleware sees it as a normal return.
func Timeout ¶
func Timeout(d time.Duration) Middleware
Timeout returns a Middleware that wraps the dispatch context with context.WithTimeout for d. Useful when the transport does not already impose a deadline. d <= 0 disables the timeout.
type Operation ¶
Operation is the type-erased shape of Bus.Dispatch. It is what Middleware wraps. See WithMiddleware and ADR-0029.
type Option ¶
type Option func(*options)
Option configures a Bus at construction time.
func WithMiddleware ¶
func WithMiddleware(mws ...Middleware) Option
WithMiddleware installs bus-level middleware that wraps every Bus.Dispatch call. Successive calls accumulate; middlewares compose left-to-right as outer wrappers, matching es.WithMiddleware's convention (ADR-0012). See ADR-0029 for the built-ins and how bus middleware composes with repository middleware.
type PanicError ¶
PanicError carries the command name, the recovered panic value, and the stack trace captured at recovery time. It wraps ErrPanic.
func (*PanicError) Error ¶
func (e *PanicError) Error() string
func (*PanicError) Unwrap ¶
func (*PanicError) Unwrap() error
type UnknownCommandError ¶
type UnknownCommandError struct {
Name string
}
UnknownCommandError carries the offending name. It wraps ErrUnknownCommand.
func (*UnknownCommandError) Error ¶
func (e *UnknownCommandError) Error() string
func (*UnknownCommandError) Unwrap ¶
func (*UnknownCommandError) Unwrap() error