ebus

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2025 License: MIT Imports: 5 Imported by: 0

README

📦 ebus – A Generic, Lightweight Event Bus for Go

ebus is a highly extensible, type-safe event bus library for Go with full lifecycle support for events: validation, handling, rollback, commit, and optional transaction dispatch.

Designed to be clean, flexible, and generic using Go's type parameters.


✨ Features

  • ✅ Type-safe payloads using Go generics
  • 🔁 Transactional event dispatching with rollback support
  • 🧱 Middleware support (logging, tracing, metrics, etc.)
  • 🎯 Command pattern with result extraction
  • 🔔 Post-commit subscribers per payload type
  • 🧪 Fully unit tested

📦 Installation

go get github.com/malumar/ebus

🚀 Quick Example

bus := ebus.NewDefault(ebus.NewID8ByteHandler[AppCtx]())

err := bus.Publish(ctx, []ebus.Payload[AppCtx]{
    &UserData{Login: "john", Age: 42},
})

For more examples look into ebus_test.go

With result:
res, err := ebus.Run[AppCtx, ebus.ID8Byte, int](ctx, bus, []ebus.Payload[AppCtx]{
    &EventWithResult{I: 42},
})

🧹 Payload Lifecycle (Interface & Flow)

Each payload must implement the following interface:

type Payload[T any] interface {
    Validate(ctx T) error   // called before dispatching
    Handle(ctx T) error     // core processing logic
    Commit(ctx T)           // finalize if everything succeeded
    Rollback(ctx T)         // undo if Handle failed
    PayloadType() PayloadType // type identifier used for logging/subscribers
}

Lifecycle flow:

When you call Publish(...) or Run(...), each payload goes through the following steps:

  1. Validate – all payloads must pass before any handling starts
  2. Handle – called in order; if any fails, rollback begins
  3. Rollback – executed in reverse order for handled payloads
  4. Commit – only called if all payloads were successfully handled

This guarantees consistent event processing with rollback support in case of partial failures.


🧠 Command Support

Commands are payloads that produce results:

type CommandEvent[T, Res any] interface {
    Payload[T]
    Result() Res
}

Used with Run(...) to publish and return a result atomically.


🧵 Middleware

Middleware is applied per event dispatch:

  • LoggingById / LoggingByTypeName
    Logs event and payload information using either numeric or named payload types.

    bus := ebus.NewDefault(idGen,
      ebus.LoggingById(log.Printf),
    )
    

    You can customize how payload types are named in logs:

    bus := ebus.NewDefault(idGen,
      ebus.LoggingByTypeName(func(pt ebus.PayloadType) string {
          return pt.String() // or custom mapping like a map[PayloadType]string
      }, log.Printf),
    )
    
  • Retry
    You can automatically retry failed events using Retry(...), This will retry event handling up to 3 times with a 100ms delay between attempts.

    bus := ebus.NewDefault(idGen,
      ebus.Retry(3, 100*time.Millisecond),
    )
    
  • RetryWithPolicy
    Retries event dispatch using a custom retry policy. Useful for transient failures.

    Examples in ebus_test.go

  • RetryWithFallback
    Similar to RetryWithPolicy, but also sends failed events to a fallback handler (e.g. dead-letter queue).

    Examples in ebus_test.go

  • WithEventHooks
    Allows you to execute hooks before and after handling an event (for logging, tracing, etc.).

    Examples in ebus_test.go


🔔 Subscribers

You can register commit-time hooks:

subs := ebus.NewSubscribers[AppCtx, ebus.ID8Byte]()

subs.Subscribe(&UserData{}, func(ctx AppCtx, p ebus.Payload[AppCtx]) {
    log.Println("User created:", p.(*UserData).Login)
})

bus := ebus.NewDefault(idGen, subs.Notifier())

🧪 Testing

Run all tests:

go test ./...

Your suite covers error handling, transactions, and rollback behavior.


⚖️ License

This project is licensed under the terms of the MIT license. See LICENSE.


👤 About the Author

Created by Marcin Maluszczak (https://registro.pl).
Feel free to reach out or follow me on GitHub.

💬 Contributing

Contributions are welcome. Feel free to open issues or pull requests.

Documentation

Overview

Package ebus provides a generic event bus with support for transactional dispatching, middleware, and payload lifecycle (validate, handle, commit, rollback). author Marcin Maluszczak

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyPayloads = errors.New("empty payloads")

Common errors

View Source
var ErrPayloadIsNotCommand = errors.New("payload is not command or not match type")

Functions

func Dispatcher

func Dispatcher[T, ID any](ctx T, event *Event[T, ID]) error

Dispatcher is the default non-transactional dispatcher.

func Run

func Run[T any, ID any, Res any](ctx T, bus *Bus[T, ID], payloads []Payload[T]) (Res, error)

Run publishes a command event and extracts its result.

Types

type Bus

type Bus[T, ID any] struct {
	// contains filtered or unexported fields
}

Bus represents the core event bus with middleware, payload dispatching, and handler registration.

func New

func New[T, ID any](idGenHandler IDGenHandler[T, ID], handler EventHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

New creates a new Bus with a given event handler and middleware chain.

func NewDefault

func NewDefault[T, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

NewDefault creates a Bus with the default dispatcher and optional middleware.

func NewWithTx

func NewWithTx[T any, ID any](idGenHandler IDGenHandler[T, ID], txHandler TxHandler[T], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

NewWithTx creates a Bus that wraps dispatch logic in a transaction using the provided TxHandler.

func (*Bus[T, ID]) Apply

func (b *Bus[T, ID]) Apply(ctx T, event *Event[T, ID]) error

Apply applies an event using the configured middleware chain.

func (*Bus[T, ID]) Publish

func (b *Bus[T, ID]) Publish(ctx T, events []Payload[T]) error

Publish creates a new Event and applies it to the bus with Timestamp in UTC

type CommandEvent

type CommandEvent[T, Res any] interface {
	Payload[T]
	Result() Res
}

CommandEvent is a special Payload that produces a result value after execution.

type CommitListener

type CommitListener[T any] func(ctx T, payload Payload[T])

CommitListener is a function that is triggered after a payload has been committed.

type Event

type Event[T, ID any] struct {
	ID       ID           // Unique identifier of the event
	Created  time.Time    // Timestamp of event creation
	Payloads []Payload[T] // A list of payloads included in the event
}

Event represents a single event containing multiple payloads.

type EventHandler

type EventHandler[T, ID any] func(ctx T, event *Event[T, ID]) error

EventHandler is a generic function that handles an event with a given context.

func TxDispatcher

func TxDispatcher[T, ID any](inTx TxHandler[T]) EventHandler[T, ID]

TxDispatcher wraps the dispatch logic in a transactional context.

type EventHooks added in v1.1.0

type EventHooks[T, ID any] struct {
	OnBeforeHandle func(ctx T, event *Event[T, ID])
	OnAfterCommit  func(ctx T, event *Event[T, ID])
}

type EventMiddleware

type EventMiddleware[T, ID any] func(next EventHandler[T, ID]) EventHandler[T, ID]

EventMiddleware wraps an EventHandler to add cross-cutting behavior like logging or recovery.

func LoggingById added in v1.1.0

func LoggingById[T any, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]

func LoggingByTypeName added in v1.1.0

func LoggingByTypeName[T any, ID any](payloadTypeNameHandler PayloadTypeNameHandler, logf func(format string, args ...any)) EventMiddleware[T, ID]

func Retry added in v1.1.0

func Retry[T, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]

Retry middleware that forces the event to repeat when it fails

func RetryWithFallback added in v1.1.0

func RetryWithFallback[T, ID any](policy RetryPolicy, fallback RetryFallbackHandler[T, ID]) EventMiddleware[T, ID]

RetryWithFallback retries event dispatch using a policy and calls fallback on failure.

func RetryWithPolicy added in v1.1.0

func RetryWithPolicy[T, ID any](policy RetryPolicy) EventMiddleware[T, ID]

RetryWithPolicy retries event dispatch based on a custom retry policy. Allows for conditional retry logic and custom backoff strategies.

func WithEventHooks added in v1.1.0

func WithEventHooks[T, ID any](hooks EventHooks[T, ID]) EventMiddleware[T, ID]

WithEventHooks allows injecting callbacks before handling and after committing an event. Useful for logging, metrics, or tracing lifecycle stages.

type ID8Byte

type ID8Byte [8]byte

ID8Byte is a fixed-size 8-byte identifier.

type IDGenHandler

type IDGenHandler[T any, ID any] func(ctx T) (ID, error)

IDGenHandler is generic function that handles creation ID for new event

func NewID8ByteHandler

func NewID8ByteHandler[T any]() IDGenHandler[T, ID8Byte]

NewID8ByteHandler returns a thread-safe ID generator that produces sequential ID8Byte values. The counter is unique per handler instance.

type Payload

type Payload[T any] interface {
	Validate(ctx T) error
	Commit(ctx T)
	Handle(ctx T) error
	Rollback(ctx T)
	PayloadType() PayloadType
}

Payload defines a lifecycle for event actions including validation, handling, rollback, and commit.

type PayloadType

type PayloadType int

type PayloadTypeNameHandler added in v1.1.0

type PayloadTypeNameHandler func(pt PayloadType) string

type RetryFallbackHandler added in v1.1.0

type RetryFallbackHandler[T any, ID any] func(evt *Event[T, ID], err error)

RetryFallbackHandler defines a function that receives failed events.

type RetryPolicy added in v1.1.0

type RetryPolicy interface {
	ShouldRetry(attempt int, err error) bool
	Backoff(attempt int) time.Duration
}

RetryPolicy Interface

type Subscribers

type Subscribers[T, ID any] struct {
	// contains filtered or unexported fields
}

Subscribers manages commit listeners for specific payload types.

func NewSubscribers

func NewSubscribers[T, ID any]() *Subscribers[T, ID]

NewSubscribers creates an empty list of subscribers.

func (*Subscribers[T, ID]) Notifier

func (s *Subscribers[T, ID]) Notifier() EventMiddleware[T, ID]

Notifier returns an EventMiddleware that notifies all matching subscribers after commit.

func (*Subscribers[T, ID]) Subscribe

func (s *Subscribers[T, ID]) Subscribe(payload Payload[T], handler CommitListener[T])

Subscribe registers a commit listener for a specific payload.

type TxHandler

type TxHandler[T any] func(ctx T, handler func() error) error

TxHandler defines a function that executes a given handler within a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL