Documentation
¶
Overview ¶
Package ebus provides a generic event bus with support for transactional dispatching, middleware, and payload lifecycle (validate, handle, commit, rollback). author Marcin Maluszczak
Index ¶
- Variables
- func ApplyAllUnsafe[T, ID any](env T, event *Event[T, ID], tx Tx) (handled int, err error)
- func Clone(b []byte) []byte
- func CommitAllUnsafe[T, ID any](env T, event *Event[T, ID])
- func Dispatcher[T, ID any](env T, event *Event[T, ID]) error
- func RollbackRangeUnsafe[T, ID any](env T, ev *Event[T, ID], from, to int)
- func Run[T any, ID any, Res any](env T, bus *Bus[T, ID], payload Payload[T]) (Res, error)
- func RunAll[T any, ID any, Res any](env T, bus *Bus[T, ID], payloads []Payload[T]) (Res, error)
- func ValidateAll[T, ID any](env T, event *Event[T, ID]) error
- type Bus
- func New[T, ID any](idGenHandler IDGenHandler[T, ID], handler EventHandler[T, ID], ...) *Bus[T, ID]
- func NewDefault[T, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
- func NewWithTx[T Transactional, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
- func (b *Bus[T, ID]) Apply(env T, event *Event[T, ID]) error
- func (b *Bus[T, ID]) Publish(env T, payload Payload[T]) error
- func (b *Bus[T, ID]) PublishAll(env T, events []Payload[T]) error
- func (b *Bus[T, ID]) PublishRaw(env T, dec Decoder[T], pt PayloadType, body []byte) error
- func (b *Bus[T, ID]) PublishRaws(env T, dec Decoder[T], raws ...Raw[ID]) error
- type CommandEvent
- type CommitListener
- type ContextCarrier
- type Decoder
- type Event
- type EventHandler
- type EventHooks
- type EventMiddleware
- func LoggingByID[T any, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]
- func LoggingByTypeName[T any, ID any](payloadTypeNameHandler PayloadTypeNameHandler, ...) EventMiddleware[T, ID]
- func Recovery[T, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]
- func Retry[T, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]
- func RetryWithContext[T any, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]
- func RetryWithFallback[T, ID any](policy RetryPolicy, fallback RetryFallbackHandler[T, ID]) EventMiddleware[T, ID]
- func RetryWithPolicy[T, ID any](policy RetryPolicy) EventMiddleware[T, ID]
- func WithEventHooks[T, ID any](hooks EventHooks[T, ID]) EventMiddleware[T, ID]
- type ExpoJitterPolicy
- type ID8Byte
- type IDGenHandler
- type JSONDecoder
- type Payload
- type PayloadType
- type PayloadTypeNameHandler
- type Raw
- type RawKeeper
- type RawPayload
- type Registry
- type RetryFallbackHandler
- type RetryPolicy
- type Stager
- type Subscribers
- type Transactional
- type Tx
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyPayloads = errors.New("empty payloads")
ErrEmptyPayloads indicates that an event contained no payloads to process.
var ErrPayloadIsNotCommand = errors.New("payload is not a command or result type mismatch")
ErrPayloadIsNotCommand indicates the payload does not implement the CommandEvent interface or does not match the expected result type.
Functions ¶
func ApplyAllUnsafe ¶ added in v1.3.0
ApplyAllUnsafe handles each payload in order and returns the number of successfully handled payloads along with the first error encountered. If tx implements Stager[ID], RAW payloads kept by RawKeeper are staged before Handle. Panic-safe: converts panics from Handle into an error and returns the count of completed payloads. Unsafe: assumes ValidateAll was run.
func CommitAllUnsafe ¶ added in v1.3.0
CommitAllUnsafe calls Commit on all payloads after successful processing. Unsafe: assumes validation and handling have succeeded.
func Dispatcher ¶
Dispatcher is the default non-transactional handler that validates, handles all payloads, rolls back on failure, and commits on success.
func RollbackRangeUnsafe ¶ added in v1.3.0
RollbackRangeUnsafe calls Rollback on payloads in reverse order from index (to-1) down to from. Unsafe: assumes bounds are valid and validation has already occurred.
func Run ¶
Run publishes a single command payload and returns its result. The payload must implement CommandEvent[T, Res].
func RunAll ¶ added in v1.3.0
RunAll publishes multiple payloads and returns the result from the first one, which must implement CommandEvent[T, Res]. All payloads are processed together.
func ValidateAll ¶ added in v1.3.0
ValidateAll verifies that the event and all payloads pass validation. Returns ErrEmptyPayloads when the event has no payloads.
Types ¶
type Bus ¶
type Bus[T, ID any] struct { // contains filtered or unexported fields }
Bus is the core event bus. It holds the ID generator and the composed middleware chain that ultimately dispatches events to a handler.
func New ¶
func New[T, ID any](idGenHandler IDGenHandler[T, ID], handler EventHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
New constructs a Bus with a custom root handler and a middleware chain. Middlewares are composed outermost-first in the order they are provided.
func NewDefault ¶
func NewDefault[T, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
NewDefault constructs a Bus that uses the default non-transactional Dispatcher. Middlewares are applied around the handler in the provided order.
func NewWithTx ¶
func NewWithTx[T Transactional, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
NewWithTx constructs a Bus that runs dispatch inside a transaction created by env. If the begun Tx implements Stager[ID], RAW payloads are staged before Handle. See Transactional and Stager for details. Requirements:
- T must implement Transactional (i.e., provide BeginTx to start a transaction)
- The returned Tx may optionally implement Stager[ID]; if it does, RAW payloads (via RawKeeper[ID]) will be staged before handling. Stager is detected dynamically using a type assertion.
In short: T starts the transaction; if Tx can stage, the Bus will stage RAWs first, then run the Validate/Handle/Commit/Rollback lifecycle under the transaction.
func (*Bus[T, ID]) Apply ¶
Apply dispatches the given event through the composed middleware chain and handler.
func (*Bus[T, ID]) Publish ¶
Publish creates an Event with a single payload, sets Created to UTC, and applies it.
func (*Bus[T, ID]) PublishAll ¶ added in v1.3.0
PublishAll creates an Event with the given payloads, timestamps it in UTC, and applies it through the bus. Either all payloads are processed or none.
func (*Bus[T, ID]) PublishRaw ¶ added in v1.3.0
func (b *Bus[T, ID]) PublishRaw( env T, dec Decoder[T], pt PayloadType, body []byte, ) error
PublishRaw decodes a single RAW payload (type + bytes) using the provided Decoder, then publishes it as an event. Useful for ingesting serialized messages.
func (*Bus[T, ID]) PublishRaws ¶ added in v1.3.0
PublishRaws decodes all provided RAW payloads first. If any decode fails, no changes are applied. With a transactional bus, all payloads are processed within a single transaction.
type CommandEvent ¶
CommandEvent is a special Payload that produces a result value after execution.
type CommitListener ¶
CommitListener is invoked after a payload has been successfully committed. It is called once per committed payload of the subscribed type.
type ContextCarrier ¶ added in v1.3.0
ContextCarrier can be implemented by environment types to expose a context.Context, enabling middleware to honor cancellation and deadlines. Context-respecting middleware should use ContextCarrier Example Env T:
type Env struct {
ctx context.Context
...other fields and implementations (Transactional, etc.)
}
func (s AppState) Context() context.Context { return s.ctx } e.g. ctx := context.Background()
if c, ok := any(env).(ContextCarrier); ok && c.Context() != nil {
ctx = c.Context()
}
type Decoder ¶ added in v1.3.0
type Decoder[T any] interface { Decode(pt PayloadType, raw []byte) (Payload[T], error) }
Decoder converts a (payload type, raw bytes) pair into a concrete Payload[T]. Implementations should return an error for unknown types or malformed data.
type Event ¶
type Event[T, ID any] struct { ID ID // Unique identifier of the event Created time.Time // Timestamp of event creation Payloads []Payload[T] // A list of payloads included in the event }
Event groups multiple payloads to be processed together (atomically when using a transactional dispatcher). Created is set to UTC time at publish.
type EventHandler ¶
EventHandler processes an Event in a given environment T. Returning a non-nil error fails the entire event (and triggers rollback where applicable).
func TxDispatcher ¶
func TxDispatcher[T Transactional, ID any]() EventHandler[T, ID]
TxDispatcher returns an EventHandler that validates payloads, begins a transaction, optionally stages RAWs if the Tx implements Stager[ID], handles payloads, commits on success, or rolls back (with per-payload Rollback) on failure or panic.
type EventHooks ¶ added in v1.1.0
type EventHooks[T, ID any] struct { OnBeforeHandle func(env T, event *Event[T, ID]) OnAfterCommit func(env T, event *Event[T, ID]) }
EventHooks provides observational callbacks for event processing lifecycle: OnBeforeHandle is called before handling, OnAfterCommit after a successful commit.
type EventMiddleware ¶
type EventMiddleware[T, ID any] func(next EventHandler[T, ID]) EventHandler[T, ID]
EventMiddleware decorates an EventHandler with cross-cutting behavior such as logging, metrics, retries or panic recovery.
func LoggingByID ¶ added in v1.3.0
func LoggingByID[T any, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]
LoggingByID returns logging middleware that identifies payload types by their numeric ID.
func LoggingByTypeName ¶ added in v1.1.0
func LoggingByTypeName[T any, ID any](payloadTypeNameHandler PayloadTypeNameHandler, logf func(format string, args ...any)) EventMiddleware[T, ID]
LoggingByTypeName returns logging middleware that uses the provided type-name resolver to print human-friendly payload type names.
func Recovery ¶ added in v1.3.0
func Recovery[T, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]
Recovery wraps the handler and converts panics into errors, optionally logging the panic using the provided formatter.
func Retry ¶ added in v1.1.0
func Retry[T, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]
Retry retries dispatch up to retries times with a fixed delay. This variant does not honor context cancellation; prefer RetryWithContext when applicable.
func RetryWithContext ¶ added in v1.3.0
RetryWithContext retries dispatch up to retries times with a fixed delay. If env exposes a context (via a Context() context.Context method), cancellation and deadlines are honored between attempts. Context-respecting middleware should use ContextCarrier
func RetryWithFallback ¶ added in v1.1.0
func RetryWithFallback[T, ID any](policy RetryPolicy, fallback RetryFallbackHandler[T, ID]) EventMiddleware[T, ID]
RetryWithFallback retries using the given policy and invokes fallback with the final error if all attempts fail.
func RetryWithPolicy ¶ added in v1.1.0
func RetryWithPolicy[T, ID any](policy RetryPolicy) EventMiddleware[T, ID]
RetryWithPolicy retries event dispatch based on a custom retry policy. Allows for conditional retry logic and custom backoff strategies.
func WithEventHooks ¶ added in v1.1.0
func WithEventHooks[T, ID any](hooks EventHooks[T, ID]) EventMiddleware[T, ID]
WithEventHooks injects observational callbacks that run before handling and after a successful commit. Hooks must be side-effect free.
type ExpoJitterPolicy ¶ added in v1.3.0
type ExpoJitterPolicy struct {
Base time.Duration // eg. 10 * time.Millisecond
Factor float64 // eg. 2.0 (2^attempt)
Max time.Duration // maximum delay
Rand *rand.Rand // optional; If NIL, use the global
}
ExpoJitterPolicy Without jitter, multiple instances will start retrying at the same intervals (thundering herd), which can overload the system (next wave). Jitter randomizes latency within a certain range, dissipating the load. example: policy := &ExpoJitterPolicy{Base: 10*time.Millisecond, Factor: 2, Max: 500*time.Millisecond} mw := RetryWithPolicy[testCtx, int](policy)
func (*ExpoJitterPolicy) Backoff ¶ added in v1.3.0
func (p *ExpoJitterPolicy) Backoff(attempt int) time.Duration
func (*ExpoJitterPolicy) ShouldRetry ¶ added in v1.3.0
func (p *ExpoJitterPolicy) ShouldRetry(attempt int, err error) bool
type ID8Byte ¶
type ID8Byte [8]byte
ID8Byte is a fixed-size, big-endian 8-byte identifier suitable for compact, sortable event IDs.
type IDGenHandler ¶
IDGenHandler generates a unique identifier for a new event using the provided environment T. Errors prevent event creation.
func NewID8ByteHandler ¶
func NewID8ByteHandler[T any](initialValue uint64) IDGenHandler[T, ID8Byte]
NewID8ByteHandler returns a thread-safe ID generator that yields sequential, big-endian 8-byte identifiers. Values start from initialValue+1 for each call. The counter is unique per handler instance.
type JSONDecoder ¶ added in v1.3.0
JSONDecoder decodes payloads from JSON using a type Registry. Unknown fields are disallowed to surface schema mismatches early.
func NewJSONDecoder ¶ added in v1.3.0
func NewJSONDecoder[T any]() *JSONDecoder[T]
NewJSONDecoder returns a JSONDecoder with an empty Registry. Register concrete payload factories on the embedded Registry before decoding.
func (*JSONDecoder[T]) Decode ¶ added in v1.3.0
func (d *JSONDecoder[T]) Decode(pt PayloadType, raw []byte) (Payload[T], error)
Decode implements Decoder by constructing a payload from the registered factory, disallowing unknown JSON fields, and decoding the provided bytes into it. Returns an error for unknown types or malformed JSON.
type Payload ¶
type Payload[T any] interface { Validate(env T) error Commit(env T) Handle(env T) error Rollback(env T) PayloadType() PayloadType }
Payload defines the lifecycle of a unit of work processed by the bus. Validate should check invariants, Handle applies the change, Commit finalizes successful changes and Rollback reverts partial work on failure.
type PayloadType ¶
type PayloadType int
PayloadType identifies a concrete payload kind. It is used for decoding, routing and subscription to payload-specific listeners.
type PayloadTypeNameHandler ¶ added in v1.1.0
type PayloadTypeNameHandler func(pt PayloadType) string
PayloadTypeNameHandler returns a human-friendly name for a given PayloadType, typically used by logging middleware.
type Raw ¶ added in v1.3.0
type Raw[ID any] struct { Type PayloadType // Body will disappear after the process is over, so if you want to save it somewhere, // Clone it, e.g. through the Clone function Body []byte Meta struct { EventID ID CorrelationID [16]byte SchemaVer uint16 TimestampUnix int64 } }
Raw carries the original serialized payload and transport metadata as received before decoding. Body contains the payload bytes and is transient; Clone it if you need to persist it beyond processing. Meta is populated by the bus when publishing: EventID and TimestampUnix are set from the event; CorrelationID and SchemaVer can be used for routing and schema/versioning.
When a payload implements RawKeeper[ID], the bus injects a pointer to its Raw before Handle is invoked. If the active transaction implements Stager[ID], PutRaw is called to stage the RAW envelope before handling.
type RawKeeper ¶ added in v1.3.0
type RawKeeper[ID any] interface { // SetRaw sets the underlying RAW pointer. The bus calls this when publishing RAWs // to pass the original envelope to the payload. SetRaw(raw *Raw[ID]) // Raw returns the stored RAW pointer, which may be nil if not published from RAW input. Raw() *Raw[ID] }
RawKeeper can be implemented by payloads that want access to the incoming RAW representation. When publishing RAWs, the bus populates RawKeeper automatically. RawKeeper if you want to receive PayLoad in the form that came with the Event, implement RawKeeper or use RawPayload as the basis of the structure
type RawPayload ¶ added in v1.3.0
type RawPayload[ID any] struct { // contains filtered or unexported fields }
RawPayload is a helper that implements RawKeeper. Embed it into payload structs to receive the original RAW value when publishing RAWs.
func (*RawPayload[ID]) Raw ¶ added in v1.3.0
func (p *RawPayload[ID]) Raw() *Raw[ID]
Raw returns the stored RAW pointer, which may be nil if not published from RAW input.
func (*RawPayload[ID]) SetRaw ¶ added in v1.3.0
func (p *RawPayload[ID]) SetRaw(r *Raw[ID])
SetRaw sets the underlying RAW pointer. The bus calls this when publishing RAWs to pass the original envelope to the payload
type Registry ¶ added in v1.3.0
type Registry[T any] struct { // contains filtered or unexported fields }
Registry maps PayloadType to factory functions that construct zero-value payload instances for decoding. It also tracks registration origins to aid debugging.
func NewRegistry ¶ added in v1.3.0
NewRegistry returns a new, empty Registry for mapping PayloadType to factories.
func (*Registry[T]) MustRegister ¶ added in v1.3.0
func (r *Registry[T]) MustRegister(payloadType PayloadType, factory func() Payload[T])
MustRegister registers a factory for the given payload type. It panics if the type was already registered. Use Register to get an error instead.
type RetryFallbackHandler ¶ added in v1.1.0
RetryFallbackHandler receives the final error after retries are exhausted together with the event that failed to be processed.
type RetryPolicy ¶ added in v1.1.0
type RetryPolicy interface {
ShouldRetry(attempt int, err error) bool
Backoff(attempt int) time.Duration
}
RetryWithPolicy retries dispatch according to the provided RetryPolicy, sleeping according to Backoff between attempts and stopping when ShouldRetry returns false.
type Stager ¶ added in v1.3.0
Stager can be implemented by a transaction to receive the RAW form of payloads before handling (e.g., for persistence, audit, or outbox purposes). Stager can be implemented in Tx, remember that additionally your Payload must implement RawKeeper
type Subscribers ¶
type Subscribers[T, ID any] struct { // contains filtered or unexported fields }
Subscribe registers a CommitListener for the payload's type. The listener is invoked after a successful commit for matching payloads.
Concurrency model:
- Subscribe is intended to be called only during application startup (single-threaded), before any events are processed.
- After startup, the set of listeners is treated as immutable; Notifier may be used concurrently without additional synchronization.
- Do NOT call Subscribe at runtime.
Rationale: avoiding locks in the hot path (e.g., inside Tx dispatch) keeps latency low.
func NewSubscribers ¶
func NewSubscribers[T, ID any]() *Subscribers[T, ID]
NewSubscribers creates an empty list of subscribers.
func (*Subscribers[T, ID]) Notifier ¶
func (s *Subscribers[T, ID]) Notifier() EventMiddleware[T, ID]
Notifier returns an EventMiddleware that notifies all matching subscribers after commit.
func (*Subscribers[T, ID]) Subscribe ¶
func (s *Subscribers[T, ID]) Subscribe(payload Payload[T], handler CommitListener[T])
Subscribe registers a commit listener for a specific payload.
type Transactional ¶ added in v1.3.0
Transactional is implemented by environments that can begin a transaction. The readonly flag can be used by implementations to optimize or enforce mode.