ebus

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: MIT Imports: 4 Imported by: 0

README

📦 ebus – A Generic, Lightweight Event Bus for Go

ebus is a highly extensible, type-safe event bus library for Go with full lifecycle support for events: validation, handling, rollback, commit, and optional transaction dispatch.

Designed to be clean, flexible, and generic using Go's type parameters.


✨ Features

  • ✅ Type-safe payloads using Go generics
  • 🔁 Transactional event dispatching with rollback support
  • 🧱 Middleware support (logging, tracing, metrics, etc.)
  • 🎯 Command pattern with result extraction
  • 🔔 Post-commit subscribers per payload type
  • 🧪 Fully unit tested

📦 Installation

go get github.com/malumar/ebus

Replace your-username with your actual GitHub handle or module path.


🚀 Quick Example

bus := ebus.NewDefault(ebus.NewID8ByteHandler[AppCtx]())

err := bus.Publish(ctx, []ebus.Payload[AppCtx]{
    &UserData{Login: "john", Age: 42},
})

For more exaples look into ebus_test.go

With result:
res, err := ebus.Run[AppCtx, ebus.ID8Byte, int](ctx, bus, []ebus.Payload[AppCtx]{
    &EventWithResult{I: 42},
})

🧩 Payload Lifecycle

Each payload must implement the following interface:

type Payload[T any] interface {
    Validate(ctx T) error
    Handle(ctx T) error
    Commit(ctx T)
    Rollback(ctx T)
    PayloadType() PayloadType
}

🧠 Command Support

Commands are payloads that produce results:

type CommandEvent[T, Res any] interface {
    Payload[T]
    Result() Res
}

Used with Run(...) to publish and return a result atomically.


🧵 Middleware

Middleware is applied per event dispatch:

bus := ebus.NewDefault(idGen,
    myCustomLogger,
    myTracingLayer,
)

🔔 Subscribers

You can register commit-time hooks:

subs := ebus.NewSubscribers[AppCtx, ebus.ID8Byte]()

subs.Subscribe(&UserData{}, func(ctx AppCtx, p ebus.Payload[AppCtx]) {
    log.Println("User created:", p.(*UserData).Login)
})

bus := ebus.NewDefault(idGen, subs.Notifier())

🧪 Testing

Run all tests:

go test ./...

Your suite covers error handling, transactions, and rollback behavior.


⚖️ License

This project is licensed under the terms of the MIT license. See LICENSE.


👤 About the Author

Created by Marcin Maluszczak (https://registro.pl).
Feel free to reach out or follow me on GitHub.

💬 Contributing

Contributions are welcome. Feel free to open issues or pull requests.

Documentation

Overview

Package ebus provides a generic event bus with support for transactional dispatching, middleware, and payload lifecycle (validate, handle, commit, rollback).

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyPayloads = errors.New("empty payloads")

Common errors

View Source
var ErrPayloadIsNotCommand = errors.New("payload is not command or not match type")

Functions

func Dispatcher

func Dispatcher[T, ID any](ctx T, event *Event[T, ID]) error

Dispatcher is the default non-transactional dispatcher.

func Run

func Run[T any, ID any, Res any](ctx T, bus *Bus[T, ID], payloads []Payload[T]) (Res, error)

Run publishes a command event and extracts its result.

Types

type Bus

type Bus[T, ID any] struct {
	// contains filtered or unexported fields
}

Bus represents the core event bus with middleware, payload dispatching, and handler registration.

func New

func New[T, ID any](idGenHandler IDGenHandler[T, ID], handler EventHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

New creates a new Bus with a given event handler and middleware chain.

func NewDefault

func NewDefault[T, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

NewDefault creates a Bus with the default dispatcher and optional middleware.

func NewWithTx

func NewWithTx[T any, ID any](idGenHandler IDGenHandler[T, ID], txHandler TxHandler[T], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]

NewWithTx creates a Bus that wraps dispatch logic in a transaction using the provided TxHandler.

func (*Bus[T, ID]) Apply

func (b *Bus[T, ID]) Apply(ctx T, event *Event[T, ID]) error

Apply applies an event using the configured middleware chain.

func (*Bus[T, ID]) Publish

func (b *Bus[T, ID]) Publish(ctx T, events []Payload[T]) error

Publish creates a new Event and applies it to the bus with Timestamp in UTC

type CommandEvent

type CommandEvent[T, Res any] interface {
	Payload[T]
	Result() Res
}

CommandEvent is a special Payload that produces a result value after execution.

type CommitListener

type CommitListener[T any] func(ctx T, payload Payload[T])

CommitListener is a function that is triggered after a payload has been committed.

type Event

type Event[T, ID any] struct {
	ID       ID           // Unique identifier of the event
	Created  time.Time    // Timestamp of event creation
	Payloads []Payload[T] // A list of payloads included in the event
}

Event represents a single event containing multiple payloads.

type EventHandler

type EventHandler[T, ID any] func(ctx T, event *Event[T, ID]) error

EventHandler is a generic function that handles an event with a given context.

func TxDispatcher

func TxDispatcher[T, ID any](inTx TxHandler[T]) EventHandler[T, ID]

TxDispatcher wraps the dispatch logic in a transactional context.

type EventMiddleware

type EventMiddleware[T, ID any] func(next EventHandler[T, ID]) EventHandler[T, ID]

EventMiddleware wraps an EventHandler to add cross-cutting behavior like logging or recovery.

type ID8Byte

type ID8Byte [8]byte

ID8Byte is a fixed-size 8-byte identifier.

type IDGenHandler

type IDGenHandler[T any, ID any] func(ctx T) (ID, error)

IDGenHandler is generic function that handles creation ID for new event

func NewID8ByteHandler

func NewID8ByteHandler[T any]() IDGenHandler[T, ID8Byte]

NewID8ByteHandler returns a thread-safe ID generator that produces sequential ID8Byte values. The counter is unique per handler instance.

type Payload

type Payload[T any] interface {
	Validate(ctx T) error
	Commit(ctx T)
	Handle(ctx T) error
	Rollback(ctx T)
	PayloadType() PayloadType
}

Payload defines a lifecycle for event actions including validation, handling, rollback, and commit.

type PayloadType

type PayloadType int

type Subscribers

type Subscribers[T, ID any] struct {
	// contains filtered or unexported fields
}

Subscribers manages commit listeners for specific payload types.

func NewSubscribers

func NewSubscribers[T, ID any]() *Subscribers[T, ID]

NewSubscribers creates an empty list of subscribers.

func (*Subscribers[T, ID]) Notifier

func (s *Subscribers[T, ID]) Notifier() EventMiddleware[T, ID]

Notifier returns an EventMiddleware that notifies all matching subscribers after commit.

func (*Subscribers[T, ID]) Subscribe

func (s *Subscribers[T, ID]) Subscribe(payload Payload[T], handler CommitListener[T])

Subscribe registers a commit listener for a specific payload.

type TxHandler

type TxHandler[T any] func(ctx T, handler func() error) error

TxHandler defines a function that executes a given handler within a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL