ebus

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: MIT Imports: 12 Imported by: 0

README

📦 ebus – A Generic, Lightweight Event Bus for Go

ebus is a highly extensible, type-safe event bus library for Go with full lifecycle support for events: validation, handling, rollback, commit, and optional transaction dispatch.

Designed to be clean, flexible, and generic using Go's type parameters.


✨ Features

  • ✅ Type-safe payloads using Go generics
  • 🔁 Transactional event dispatching with rollback support
  • 🧱 Middleware support (logging, tracing, metrics, etc.)
  • 🎯 Command pattern with result extraction
  • 🔔 Post-commit subscribers per payload type
  • 🧪 Fully unit tested

📦 Installation

Dependencies

  • ebus uses only Go’s standard library (fmt, errors, time, encoding/json, etc.). Tests additionally use testify for assertions.
go get github.com/malumar/ebus

Why ebus

ebus was created to leverage Go’s strengths for write-heavy systems without always relying on SQL or pure key–value stores as the central point. In particular:

  • Move past SQL bottlenecks for workloads where large datasets fit in RAM and you need very low latency writes at scale.
  • Keep the working set in-process for fast startup, fast backups (memory snapshot + WAL), and fast recovery.
  • Make it easy to import a production dataset on a developer machine to reproduce and fix issues locally.
  • Sustain high write throughput with many concurrent clients.
  • Provide a clean, transactional event pipeline that keeps in-memory state (RAM) and persistence (e.g., WAL, files, any store) consistent.

ebus is storage-agnostic: it does not mandate SQL, KV, or a particular database. You can bring your own store and wire it through transactions and staging.

Because ebus ties together the in‑memory and durable transactions under one event flow, replication becomes a first‑class capability rather than an afterthought.

Production use and performance

ebus is used in production on datasets with tens of millions of records and many long-lived client connections issuing a large number of writes. In these workloads it has proven stable and fast, while maintaining consistency across in-memory state and the persistence layer.

Architecture at a glance

  • Event and payload lifecycle

    • Payloads implement Validate → Handle → (Commit | Rollback).
    • Multiple payloads can be grouped into one Event and processed atomically (all or nothing).
  • Two coordinated transactions (replication-friendly)

    • ebus is designed to orchestrate two transactional processes in one logic:
      • TX RAM: your in-memory state and domain effects.
      • TX Persistence: your durable store (e.g., WAL, files, DB).
    • Each can operate independently, but ebus guarantees consistency by:
      • Validating all payloads before any Handle,
      • Staging RAW envelopes (if enabled) before Handle,
      • Rolling back in reverse order on error or panic (+1 includes the failing payload),
      • Committing all payloads only after the outer transaction commits.
  • RAW and Staging (optional but recommended for durability)

    • RAW is the original envelope (e.g., JSON) with metadata (EventID, timestamp).
    • If your payload implements RawKeeper, ebus injects the RAW before Handle.
    • If your Tx implements Stager, ebus calls PutRaw before Handle, enabling outbox/WAL-style durability and audit.

Storage integration

ebus does not impose a storage engine. You can:

  • plug in any persistence layer by implementing Transactional and (optionally) Stager,
  • stage RAW envelopes (e.g., write-ahead log) before Handle,
  • commit the durable side and then finalize in-memory Commit.

In our deployments we use a WAL-based store (project “EDEN”), which complements ebus for persistence and recovery. Any similar WAL/outbox/journal fits the same pattern.

When ebus is a good fit

  • Write-heavy systems where the working set fits in RAM and latency matters.
  • You need atomic multi-step operations with precise rollback and panic safety.
  • You want to keep the original inbound payloads (RAW) for audit/outbox/WAL.
  • You want to replicate or coordinate two transactional layers (RAM + persistence) with a single, consistent event pipeline.

🚀 Quick Example

// Simple bus without transactions (add Recovery)
bus := ebus.NewDefault(
	//0 because we are starting, but if you are already running with the database, 
	// you enter the ID of the last event in the database
    ebus.NewID8ByteHandler[AppCtx](0),
    ebus.Recovery[AppCtx, ebus.ID8Byte](log.Printf), // recommended: panic -> error
)
// Single payload
if err := bus.Publish(ctx, &UserData{Login: "john", age: 42}); err != nil {
    // handle error
}

// Many payloads at once, but all or none must be executed - like a transaction
if err := bus.PublishAll(ctx, []ebus.Payload[AppCtx]{
    &UserData{Login: "john", age: 42},
    &UserData{Login: "alice", age: 37},
}); err != nil {
    // handle error
}

For more examples look into ebus_test.go

With result:
// Command (Run – single payload) + RunAll (the first payload is CommandEvent -
// so you expect the result from the function)
res, err := ebus.Run[AppCtx, ebus.ID8Byte, int](ctx, bus, &EventWithResult{I: 42})

 // If you are using multiple payloads in one event, EventWithResult must be the first
res, err := ebus.RunAll[AppCtx, ebus.ID8Byte, int](ctx, bus, []ebus.Payload[AppCtx]{
    &EventWithResult{I: 42},            // must be the first
    &UserData{Login: "john", age: 42},  // Additional payload
})

Dead-letter with RetryWithFallback

The following example shows how to write an event to "dead-letter" after unsuccessful attempts:

type DeadLetterSink interface {
    Put(evt any, err error)
}

sink := newMySink() // e.g. writing to a file, DB, queue, etc.

policy := &myPolicy{max: 3, delay: 200 * time.Millisecond}

bus := ebus.NewDefault(
    idGen,
    ebus.RetryWithFallback[AppState, ebus.ID8Byte](policy, func(evt *ebus.Event[AppState, ebus.ID8Byte], err error) {
        // Save a minimal snapshot, e.g. ID + payload types
        sink.Put(struct{
            ID       ebus.ID8Byte
            Payloads []ebus.PayloadType
            Err      string
        }{
            ID: evt.ID,
            Payloads: func() []ebus.PayloadType {
                out := make([]ebus.PayloadType, len(evt.Payloads))
                for i, p := range evt.Payloads { out[i] = p.PayloadType() }
                return out
            }(),
            Err: err.Error(),
        }, err)
    }),
)

🧹 Payload Lifecycle (Interface & Flow)

Each payload must implement the following interface:

type Payload[T any] interface {
    Validate(ctx T) error   // called before dispatching
    Handle(ctx T) error     // core processing logic
    Commit(ctx T)           // finalize if everything succeeded
    Rollback(ctx T)         // undo if Handle failed
    PayloadType() PayloadType // type identifier used for logging/subscribers
}

Lifecycle flow:

When you call Publish(...) or Run(...), each payload goes through the following steps:

  1. Validate – all payloads must pass before any handling starts
  2. Handle – called in order; if any fails, rollback begins
  3. Rollback – executed in reverse order for handled payloads
  4. Commit – only called if all payloads were successfully handled

This guarantees consistent event processing with rollback support in case of partial failures.


🧠 Command Support

Commands are payloads that produce results:

type CommandEvent[T, Res any] interface {
    Payload[T]
    Result() Res
}

Used with Run(...) to publish and return a result atomically.


🧵 Middleware

Middleware wraps the handler as layered functions: M1 → M2 → Handler → M2 → M1. Each middleware can run logic before and/or after calling next (e.g., logging, telemetry, retries, panic recovery).

  • LoggingByID / LoggingByTypeName
    Logs event and payload information using either numeric or named payload types.

    bus := ebus.NewDefault(idGen,
      ebus.LoggingByID(log.Printf),
    )
    

    You can customize how payload types are named in logs:

      bus := ebus.NewDefault(idGen,
          ebus.LoggingByTypeName(func(pt ebus.PayloadType) string {
              return fmt.Sprint(pt)
          }, log.Printf),
      )
    
  • Retry
    You can automatically retry failed events using Retry(...), This will retry event handling up to 3 times with a 100ms delay between attempts.

    bus := ebus.NewDefault(idGen,
      ebus.Retry(3, 100*time.Millisecond),
    )
    
  • RetryWithPolicy
    Retries event dispatch using a custom retry policy. Useful for transient failures.

    Examples in ebus_test.go

  • RetryWithFallback
    Similar to RetryWithPolicy, but also sends failed events to a fallback handler (e.g. dead-letter queue).

    Examples in ebus_test.go

  • WithEventHooks
    Allows you to execute hooks before and after handling an event (for logging, tracing, etc.).

    Examples in ebus_test.go


🔔 Subscribers

Note on subscribers

  • Subscribe is intended to be called during application startup (single-threaded). After startup the subscriber set is treated as immutable; Notifier can run concurrently without locks on the hot path.

You can register commit-time hooks:

subs := ebus.NewSubscribers[AppCtx, ebus.ID8Byte]()

subs.Subscribe(&UserData{}, func(ctx AppCtx, p ebus.Payload[AppCtx]) {
    log.Println("User created:", p.(*UserData).Login)
})

bus := ebus.NewDefault(idGen, subs.Notifier())

Non-goals

ebus is not a query engine or ORM. It is a transactional event pipeline and coordinator for in‑memory state and persistence layers (WAL/outbox/journal/DB).


Advanced Usage

RAW payloads, RawKeeper, and Staging (what and why)

  • RAW is the original, serialized message envelope you received (e.g., JSON from a queue). The bus can attach this envelope to your payload before Handle and optionally “stage” it inside a transaction (e.g., write-ahead log, outbox, audit).
  • Use RawKeeper[ID] in your payload to access the incoming RAW. The bus fills Raw.Meta (EventID, TimestampUnix) and sets the Body pointer for you.
  • If your transaction type (Tx) implements Stager[ID], the bus calls PutRaw before Handle. This lets you persist the RAW envelope atomically with your domain changes (common in outbox/WAL patterns).
type UserCreate struct {
    ebus.RawPayload[ebus.ID8Byte] // implements RawKeeper
    Login string
    Age   int
}

func (p *UserCreate) Handle(env AppState) error {
    if r := p.Raw(); r != nil {
        env.Logf("RAW type=%v event=%v ts=%d len=%d",
            r.Type, r.Meta.EventID, r.Meta.TimestampUnix, len(r.Body))
    }
    // ... domain logic
    return nil
}

To feed RAWs, decode them and publish:

dec := ebus.NewJSONDecoder[AppState]()
dec.MustRegister(UserCreateType, func() ebus.Payload[AppState] { return &UserCreate{} })

raw := []byte(`{"Login":"marcin","Age":82}`)
_ = bus.PublishRaw(env, dec, UserCreateType, raw)

With Tx and Stager:

  • Start bus with NewWithTx; if your Tx implements Stager[ID], RAWs are staged before Handle.
  • On error/panic, TX and payloads are rolled back; staged RAWs are not finalized.

Recovery (panic safety)

To ensure that a panic during event processing does not crash the process and leaves no partial state:

  • The Tx path (TxDispatcher) has a built‑in recover that converts panic into error, rolls back the outer transaction, and rolls back payloads in reverse order (including the failing one).
  • For the non‑Tx path (Dispatcher) add the Recovery middleware to convert panic into error.

Example:

bus := ebus.NewDefault(idGen,
ebus.Recovery[AppCtx, ebus.ID8Byte](log.Printf),
)

Example (full protection in TxDispatcher – built-in recover):

  • embed defer recover() in TxDispatcher (see section Recovery – implementation).

Working with context

T (Env/State) does not need to be context.Context. If you need deadlines/cancellation:

  • implement Context() context.Context on your Env (ContextCarrier),
  • use RetryWithContext (honors cancel/timeout),
  • or pass context through your own components.

Example:

type AppState struct { ctx context.Context }
func (s AppState) Context() context.Context { return s.ctx }

bus := ebus.NewWithTx(idGen,
    ebus.RetryWithContext[AppState, ebus.ID8Byte](5, 100*time.Millisecond),
)

NewWithTx requirements (Transactional + Stager)

NewWithTx works with an Env type T that implements:

  • Transactional (BeginTx(readonly bool) (Tx, error)),
  • optionally Stager[ID] on the Tx (to stage RAW envelopes before Handle).

In short:

  • T starts the transaction,
  • Tx may stage RAW (files, WAL, DB), detected via type assertion,
  • ebus coordinates validation, staging, Handle, rollback, and commit.

Telemetry and metrics

You can collect metrics through the middleware:

  • event handling time,
  • status (success/failure),
  • number of attempts (when using retry),
  • number of rollbacks (see below).

Example of simple metrics:

type MetricsSink interface {
    ObserveDuration(eventID any, d time.Duration, success bool)
    ObserveRetry(eventID any, attempt int)
}

func Telemetry[T any, ID any](sink MetricsSink) ebus.EventMiddleware[T, ID] {
    return func(next ebus.EventHandler[T, ID]) ebus.EventHandler[T, ID] {
        return func(env T, evt *ebus.Event[T, ID]) error {
            start := time.Now()
            err := next(env, evt)
            sink.ObserveDuration(evt.ID, time.Since(start), err == nil)
            return err
        }
    }
}

You can measure the number of attempts by your own variant Retry/RetryWithPolicy, which calls sink.ObserveRetry(evt.ID, attempt).

The easiest way to count the number of rollbacks is in the hook where the rollback is called (e.g. by modifying RollbackRangeUnsafe or TxDispatcher – optional integration).

Telemetry – ready-made plugins

  • Basic TimeCondition:
type MetricsSink interface {
    ObserveDuration(eventID any, d time.Duration, success bool)
    ObserveRetry(eventID any, attempt int)
}

func Telemetry[T any, ID any](sink MetricsSink) EventMiddleware[T, ID] {
    return func(next EventHandler[T, ID]) EventHandler[T, ID] {
        return func(env T, evt *Event[T, ID]) error {
            start := time.Now()
            err := next(env, evt)
            sink.ObserveDuration(evt.ID, time.Since(start), err == nil)
            return err
        }
    }
}
  • Retries with metrics:
func RetryWithMetrics[T, ID any](retries int, delay time.Duration, sink MetricsSink) EventMiddleware[T, ID] {
    return func(next EventHandler[T, ID]) EventHandler[T, ID] {
        return func(env T, evt *Event[T, ID]) error {
            var err error
            for attempt := 0; attempt < retries; attempt++ {
                if sink != nil { sink.ObserveRetry(evt.ID, attempt) }
                if err = next(env, evt); err == nil {
                    return nil
                }
                if attempt < retries-1 {
                    time.Sleep(delay)
                }
            }
            return err
        }
    }
}
  • Rollback count: if you want the number of rollbacks as a metric, the easiest way is to add a "hook" in RollbackRangeUnsafe (e.g. call an optional global callback or from the Env sink). Example of an idea (requires a slight change in the library code):
var onRollbackCount func(n int)

func SetRollbackObserver(cb func(n int)) { onRollbackCount = cb }

func RollbackRangeUnsafe[T, ID any](env T, ev *Event[T, ID], from, to int) {
if onRollbackCount != nil { onRollbackCount(to-from) }
for i := to - 1; i >= from; i-- {
ev.Payloads[i].Rollback(env)
}
}

### 🧪 Testing

Run all tests:

```bash
go test ./...

Your suite covers error handling, transactions, and rollback behavior.


⚖️ License

This project is licensed under the terms of the MIT license. See LICENSE.


👤 About the Author

Created by Marcin Maluszczak (https://registro.pl).
Feel free to reach out or follow me on GitHub.

💬 Contributing

Contributions are welcome. Feel free to open issues or pull requests.

Documentation

Overview

Package ebus provides a generic event bus with support for transactional dispatching, middleware, and payload lifecycle (validate, handle, commit, rollback). author Marcin Maluszczak

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyPayloads = errors.New("empty payloads")

ErrEmptyPayloads indicates that an event contained no payloads to process.

View Source
var ErrPayloadIsNotCommand = errors.New("payload is not a command or result type mismatch")

ErrPayloadIsNotCommand indicates the payload does not implement the CommandEvent interface or does not match the expected result type.

Functions

func ApplyAllUnsafe added in v1.3.0

func ApplyAllUnsafe[T, ID any](env T, event *Event[T, ID], tx Tx) (handled int, err error)

ApplyAllUnsafe handles each payload in order and returns the number of successfully handled payloads along with the first error encountered. If tx implements Stager[ID], RAW payloads kept by RawKeeper are staged before Handle. Panic-safe: converts panics from Handle into an error and returns the count of completed payloads. Unsafe: assumes ValidateAll was run.

func Clone added in v1.3.0

func Clone(b []byte) []byte

func CommitAllUnsafe added in v1.3.0

func CommitAllUnsafe[T, ID any](env T, event *Event[T, ID])

CommitAllUnsafe calls Commit on all payloads after successful processing. Unsafe: assumes validation and handling have succeeded.

func Dispatcher

func Dispatcher[T, ID any](env T, event *Event[T, ID]) error

Dispatcher is the default non-transactional handler that validates, handles all payloads, rolls back on failure, and commits on success.

func RollbackRangeUnsafe added in v1.3.0

func RollbackRangeUnsafe[T, ID any](env T, ev *Event[T, ID], from, to int)

RollbackRangeUnsafe calls Rollback on payloads in reverse order from index (to-1) down to from. Unsafe: assumes bounds are valid and validation has already occurred.

func Run

func Run[T any, ID any, Res any](env T, bus *Bus[T, ID], payload Payload[T]) (Res, error)

Run publishes a single command payload and returns its result. The payload must implement CommandEvent[T, Res].

func RunAll added in v1.3.0

func RunAll[T any, ID any, Res any](env T, bus *Bus[T, ID], payloads []Payload[T]) (Res, error)

RunAll publishes multiple payloads and returns the result from the first one, which must implement CommandEvent[T, Res]. All payloads are processed together.

func ValidateAll added in v1.3.0

func ValidateAll[T, ID any](env T, event *Event[T, ID]) error

ValidateAll verifies that the event and all payloads pass validation. Returns ErrEmptyPayloads when the event has no payloads.

Types

type Bus

type Bus[T, ID any] struct {
	// contains filtered or unexported fields
}

Bus is the core event bus. It holds the ID generator and the composed middleware chain that ultimately dispatches events to a handler.

func New

func New[T, ID any](idGenHandler IDGenHandler[T, ID], handler EventHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

New constructs a Bus with a custom root handler and a middleware chain. Middlewares are composed outermost-first in the order they are provided.

func NewDefault

func NewDefault[T, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

NewDefault constructs a Bus that uses the default non-transactional Dispatcher. Middlewares are applied around the handler in the provided order.

func NewWithTx

func NewWithTx[T Transactional, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

NewWithTx constructs a Bus that runs dispatch inside a transaction created by env. If the begun Tx implements Stager[ID], RAW payloads are staged before Handle. See Transactional and Stager for details. Requirements:

  • T must implement Transactional (i.e., provide BeginTx to start a transaction)
  • The returned Tx may optionally implement Stager[ID]; if it does, RAW payloads (via RawKeeper[ID]) will be staged before handling. Stager is detected dynamically using a type assertion.

In short: T starts the transaction; if Tx can stage, the Bus will stage RAWs first, then run the Validate/Handle/Commit/Rollback lifecycle under the transaction.

func (*Bus[T, ID]) Apply

func (b *Bus[T, ID]) Apply(env T, event *Event[T, ID]) error

Apply dispatches the given event through the composed middleware chain and handler.

func (*Bus[T, ID]) Publish

func (b *Bus[T, ID]) Publish(env T, payload Payload[T]) error

Publish creates an Event with a single payload, sets Created to UTC, and applies it.

func (*Bus[T, ID]) PublishAll added in v1.3.0

func (b *Bus[T, ID]) PublishAll(env T, events []Payload[T]) error

PublishAll creates an Event with the given payloads, timestamps it in UTC, and applies it through the bus. Either all payloads are processed or none.

func (*Bus[T, ID]) PublishRaw added in v1.3.0

func (b *Bus[T, ID]) PublishRaw(
	env T,
	dec Decoder[T],
	pt PayloadType,
	body []byte,
) error

PublishRaw decodes a single RAW payload (type + bytes) using the provided Decoder, then publishes it as an event. Useful for ingesting serialized messages.

func (*Bus[T, ID]) PublishRaws added in v1.3.0

func (b *Bus[T, ID]) PublishRaws(
	env T,
	dec Decoder[T],
	raws ...Raw[ID],
) error

PublishRaws decodes all provided RAW payloads first. If any decode fails, no changes are applied. With a transactional bus, all payloads are processed within a single transaction.

type CommandEvent

type CommandEvent[T, Res any] interface {
	Payload[T]
	Result() Res
}

CommandEvent is a special Payload that produces a result value after execution.

type CommitListener

type CommitListener[T any] func(env T, payload Payload[T])

CommitListener is invoked after a payload has been successfully committed. It is called once per committed payload of the subscribed type.

type ContextCarrier added in v1.3.0

type ContextCarrier interface {
	Context() context.Context
}

ContextCarrier can be implemented by environment types to expose a context.Context, enabling middleware to honor cancellation and deadlines. Context-respecting middleware should use ContextCarrier Example Env T:

type Env struct {
	ctx context.Context
	...other fields and implementations (Transactional, etc.)
 }

func (s AppState) Context() context.Context { return s.ctx } e.g. ctx := context.Background()

if c, ok := any(env).(ContextCarrier); ok && c.Context() != nil {
   ctx = c.Context()
}

type Decoder added in v1.3.0

type Decoder[T any] interface {
	Decode(pt PayloadType, raw []byte) (Payload[T], error)
}

Decoder converts a (payload type, raw bytes) pair into a concrete Payload[T]. Implementations should return an error for unknown types or malformed data.

type Event

type Event[T, ID any] struct {
	ID       ID           // Unique identifier of the event
	Created  time.Time    // Timestamp of event creation
	Payloads []Payload[T] // A list of payloads included in the event
}

Event groups multiple payloads to be processed together (atomically when using a transactional dispatcher). Created is set to UTC time at publish.

type EventHandler

type EventHandler[T, ID any] func(env T, event *Event[T, ID]) error

EventHandler processes an Event in a given environment T. Returning a non-nil error fails the entire event (and triggers rollback where applicable).

func TxDispatcher

func TxDispatcher[T Transactional, ID any]() EventHandler[T, ID]

TxDispatcher returns an EventHandler that validates payloads, begins a transaction, optionally stages RAWs if the Tx implements Stager[ID], handles payloads, commits on success, or rolls back (with per-payload Rollback) on failure or panic.

type EventHooks added in v1.1.0

type EventHooks[T, ID any] struct {
	OnBeforeHandle func(env T, event *Event[T, ID])
	OnAfterCommit  func(env T, event *Event[T, ID])
}

EventHooks provides observational callbacks for event processing lifecycle: OnBeforeHandle is called before handling, OnAfterCommit after a successful commit.

type EventMiddleware

type EventMiddleware[T, ID any] func(next EventHandler[T, ID]) EventHandler[T, ID]

EventMiddleware decorates an EventHandler with cross-cutting behavior such as logging, metrics, retries or panic recovery.

func LoggingByID added in v1.3.0

func LoggingByID[T any, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]

LoggingByID returns logging middleware that identifies payload types by their numeric ID.

func LoggingByTypeName added in v1.1.0

func LoggingByTypeName[T any, ID any](payloadTypeNameHandler PayloadTypeNameHandler, logf func(format string, args ...any)) EventMiddleware[T, ID]

LoggingByTypeName returns logging middleware that uses the provided type-name resolver to print human-friendly payload type names.

func Recovery added in v1.3.0

func Recovery[T, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]

Recovery wraps the handler and converts panics into errors, optionally logging the panic using the provided formatter.

func Retry added in v1.1.0

func Retry[T, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]

Retry retries dispatch up to retries times with a fixed delay. This variant does not honor context cancellation; prefer RetryWithContext when applicable.

func RetryWithContext added in v1.3.0

func RetryWithContext[T any, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]

RetryWithContext retries dispatch up to retries times with a fixed delay. If env exposes a context (via a Context() context.Context method), cancellation and deadlines are honored between attempts. Context-respecting middleware should use ContextCarrier

func RetryWithFallback added in v1.1.0

func RetryWithFallback[T, ID any](policy RetryPolicy, fallback RetryFallbackHandler[T, ID]) EventMiddleware[T, ID]

RetryWithFallback retries using the given policy and invokes fallback with the final error if all attempts fail.

func RetryWithPolicy added in v1.1.0

func RetryWithPolicy[T, ID any](policy RetryPolicy) EventMiddleware[T, ID]

RetryWithPolicy retries event dispatch based on a custom retry policy. Allows for conditional retry logic and custom backoff strategies.

func WithEventHooks added in v1.1.0

func WithEventHooks[T, ID any](hooks EventHooks[T, ID]) EventMiddleware[T, ID]

WithEventHooks injects observational callbacks that run before handling and after a successful commit. Hooks must be side-effect free.

type ExpoJitterPolicy added in v1.3.0

type ExpoJitterPolicy struct {
	Base   time.Duration // eg. 10 * time.Millisecond
	Factor float64       // eg. 2.0 (2^attempt)
	Max    time.Duration // maximum delay
	Rand   *rand.Rand    // optional;  If NIL, use the global
}

ExpoJitterPolicy Without jitter, multiple instances will start retrying at the same intervals (thundering herd), which can overload the system (next wave). Jitter randomizes latency within a certain range, dissipating the load. example: policy := &ExpoJitterPolicy{Base: 10*time.Millisecond, Factor: 2, Max: 500*time.Millisecond} mw := RetryWithPolicy[testCtx, int](policy)

func (*ExpoJitterPolicy) Backoff added in v1.3.0

func (p *ExpoJitterPolicy) Backoff(attempt int) time.Duration

func (*ExpoJitterPolicy) ShouldRetry added in v1.3.0

func (p *ExpoJitterPolicy) ShouldRetry(attempt int, err error) bool

type ID8Byte

type ID8Byte [8]byte

ID8Byte is a fixed-size, big-endian 8-byte identifier suitable for compact, sortable event IDs.

type IDGenHandler

type IDGenHandler[T any, ID any] func(env T) (ID, error)

IDGenHandler generates a unique identifier for a new event using the provided environment T. Errors prevent event creation.

func NewID8ByteHandler

func NewID8ByteHandler[T any](initialValue uint64) IDGenHandler[T, ID8Byte]

NewID8ByteHandler returns a thread-safe ID generator that yields sequential, big-endian 8-byte identifiers. Values start from initialValue+1 for each call. The counter is unique per handler instance.

type JSONDecoder added in v1.3.0

type JSONDecoder[T any] struct {
	Registry[T]
}

JSONDecoder decodes payloads from JSON using a type Registry. Unknown fields are disallowed to surface schema mismatches early.

func NewJSONDecoder added in v1.3.0

func NewJSONDecoder[T any]() *JSONDecoder[T]

NewJSONDecoder returns a JSONDecoder with an empty Registry. Register concrete payload factories on the embedded Registry before decoding.

func (*JSONDecoder[T]) Decode added in v1.3.0

func (d *JSONDecoder[T]) Decode(pt PayloadType, raw []byte) (Payload[T], error)

Decode implements Decoder by constructing a payload from the registered factory, disallowing unknown JSON fields, and decoding the provided bytes into it. Returns an error for unknown types or malformed JSON.

type Payload

type Payload[T any] interface {
	Validate(env T) error
	Commit(env T)
	Handle(env T) error
	Rollback(env T)
	PayloadType() PayloadType
}

Payload defines the lifecycle of a unit of work processed by the bus. Validate should check invariants, Handle applies the change, Commit finalizes successful changes and Rollback reverts partial work on failure.

type PayloadType

type PayloadType int

PayloadType identifies a concrete payload kind. It is used for decoding, routing and subscription to payload-specific listeners.

type PayloadTypeNameHandler added in v1.1.0

type PayloadTypeNameHandler func(pt PayloadType) string

PayloadTypeNameHandler returns a human-friendly name for a given PayloadType, typically used by logging middleware.

type Raw added in v1.3.0

type Raw[ID any] struct {
	Type PayloadType
	// Body will disappear after the process is over, so if you want to save it somewhere,
	// Clone it, e.g. through the Clone function
	Body []byte
	Meta struct {
		EventID       ID
		CorrelationID [16]byte
		SchemaVer     uint16
		TimestampUnix int64
	}
}

Raw carries the original serialized payload and transport metadata as received before decoding. Body contains the payload bytes and is transient; Clone it if you need to persist it beyond processing. Meta is populated by the bus when publishing: EventID and TimestampUnix are set from the event; CorrelationID and SchemaVer can be used for routing and schema/versioning.

When a payload implements RawKeeper[ID], the bus injects a pointer to its Raw before Handle is invoked. If the active transaction implements Stager[ID], PutRaw is called to stage the RAW envelope before handling.

type RawKeeper added in v1.3.0

type RawKeeper[ID any] interface {
	// SetRaw sets the underlying RAW pointer. The bus calls this when publishing RAWs
	// to pass the original envelope to the payload.
	SetRaw(raw *Raw[ID])
	// Raw returns the stored RAW pointer, which may be nil if not published from RAW input.
	Raw() *Raw[ID]
}

RawKeeper can be implemented by payloads that want access to the incoming RAW representation. When publishing RAWs, the bus populates RawKeeper automatically. RawKeeper if you want to receive PayLoad in the form that came with the Event, implement RawKeeper or use RawPayload as the basis of the structure

type RawPayload added in v1.3.0

type RawPayload[ID any] struct {
	// contains filtered or unexported fields
}

RawPayload is a helper that implements RawKeeper. Embed it into payload structs to receive the original RAW value when publishing RAWs.

func (*RawPayload[ID]) Raw added in v1.3.0

func (p *RawPayload[ID]) Raw() *Raw[ID]

Raw returns the stored RAW pointer, which may be nil if not published from RAW input.

func (*RawPayload[ID]) SetRaw added in v1.3.0

func (p *RawPayload[ID]) SetRaw(r *Raw[ID])

SetRaw sets the underlying RAW pointer. The bus calls this when publishing RAWs to pass the original envelope to the payload

type Registry added in v1.3.0

type Registry[T any] struct {
	// contains filtered or unexported fields
}

Registry maps PayloadType to factory functions that construct zero-value payload instances for decoding. It also tracks registration origins to aid debugging.

func NewRegistry added in v1.3.0

func NewRegistry[T any]() Registry[T]

NewRegistry returns a new, empty Registry for mapping PayloadType to factories.

func (*Registry[T]) MustRegister added in v1.3.0

func (r *Registry[T]) MustRegister(payloadType PayloadType, factory func() Payload[T])

MustRegister registers a factory for the given payload type. It panics if the type was already registered. Use Register to get an error instead.

func (*Registry[T]) Register added in v1.3.0

func (r *Registry[T]) Register(payloadType PayloadType, factory func() Payload[T]) error

Register associates a payload type with a factory function used during decoding. Returns an error if the payload type is already registered.

type RetryFallbackHandler added in v1.1.0

type RetryFallbackHandler[T any, ID any] func(evt *Event[T, ID], err error)

RetryFallbackHandler receives the final error after retries are exhausted together with the event that failed to be processed.

type RetryPolicy added in v1.1.0

type RetryPolicy interface {
	ShouldRetry(attempt int, err error) bool
	Backoff(attempt int) time.Duration
}

RetryWithPolicy retries dispatch according to the provided RetryPolicy, sleeping according to Backoff between attempts and stopping when ShouldRetry returns false.

type Stager added in v1.3.0

type Stager[ID any] interface {
	PutRaw(raw *Raw[ID]) error
}

Stager can be implemented by a transaction to receive the RAW form of payloads before handling (e.g., for persistence, audit, or outbox purposes). Stager can be implemented in Tx, remember that additionally your Payload must implement RawKeeper

type Subscribers

type Subscribers[T, ID any] struct {
	// contains filtered or unexported fields
}

Subscribe registers a CommitListener for the payload's type. The listener is invoked after a successful commit for matching payloads.

Concurrency model:

  • Subscribe is intended to be called only during application startup (single-threaded), before any events are processed.
  • After startup, the set of listeners is treated as immutable; Notifier may be used concurrently without additional synchronization.
  • Do NOT call Subscribe at runtime.

Rationale: avoiding locks in the hot path (e.g., inside Tx dispatch) keeps latency low.

func NewSubscribers

func NewSubscribers[T, ID any]() *Subscribers[T, ID]

NewSubscribers creates an empty list of subscribers.

func (*Subscribers[T, ID]) Notifier

func (s *Subscribers[T, ID]) Notifier() EventMiddleware[T, ID]

Notifier returns an EventMiddleware that notifies all matching subscribers after commit.

func (*Subscribers[T, ID]) Subscribe

func (s *Subscribers[T, ID]) Subscribe(payload Payload[T], handler CommitListener[T])

Subscribe registers a commit listener for a specific payload.

type Transactional added in v1.3.0

type Transactional interface {
	BeginTx(readonly bool) (Tx, error)
}

Transactional is implemented by environments that can begin a transaction. The readonly flag can be used by implementations to optimize or enforce mode.

type Tx added in v1.3.0

type Tx interface {
	Rollback() error
	Commit() error
}

Tx represents a transactional boundary used by the transactional dispatcher. Commit finalizes changes; Rollback reverts them. if Tx has Stager implemented, it will receive Payload at the time of opening tx, remember that additionally your Payload must implement RawKeeper

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL