Documentation
¶
Overview ¶
Package ebus provides a generic event bus with support for transactional dispatching, middleware, and payload lifecycle (validate, handle, commit, rollback). author Marcin Maluszczak
Index ¶
- Variables
- func Dispatcher[T, ID any](ctx T, event *Event[T, ID]) error
- func Run[T any, ID any, Res any](ctx T, bus *Bus[T, ID], payloads []Payload[T]) (Res, error)
- type Bus
- func New[T, ID any](idGenHandler IDGenHandler[T, ID], handler EventHandler[T, ID], ...) *Bus[T, ID]
- func NewDefault[T, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
- func NewWithTx[T any, ID any](idGenHandler IDGenHandler[T, ID], txHandler TxHandler[T], ...) *Bus[T, ID]
- type CommandEvent
- type CommitListener
- type Event
- type EventHandler
- type EventHooks
- type EventMiddleware
- func LoggingById[T any, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]
- func LoggingByTypeName[T any, ID any](payloadTypeNameHandler PayloadTypeNameHandler, ...) EventMiddleware[T, ID]
- func Retry[T, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]
- func RetryWithFallback[T, ID any](policy RetryPolicy, fallback RetryFallbackHandler[T, ID]) EventMiddleware[T, ID]
- func RetryWithPolicy[T, ID any](policy RetryPolicy) EventMiddleware[T, ID]
- func WithEventHooks[T, ID any](hooks EventHooks[T, ID]) EventMiddleware[T, ID]
- type ID8Byte
- type IDGenHandler
- type Payload
- type PayloadType
- type PayloadTypeNameHandler
- type RetryFallbackHandler
- type RetryPolicy
- type Subscribers
- type TxHandler
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyPayloads = errors.New("empty payloads")
Common errors
var ErrPayloadIsNotCommand = errors.New("payload is not command or not match type")
Functions ¶
func Dispatcher ¶
Dispatcher is the default non-transactional dispatcher.
Types ¶
type Bus ¶
type Bus[T, ID any] struct { // contains filtered or unexported fields }
Bus represents the core event bus with middleware, payload dispatching, and handler registration.
func New ¶
func New[T, ID any](idGenHandler IDGenHandler[T, ID], handler EventHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
New creates a new Bus with a given event handler and middleware chain.
func NewDefault ¶
func NewDefault[T, ID any](idGenHandler IDGenHandler[T, ID], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
NewDefault creates a Bus with the default dispatcher and optional middleware.
func NewWithTx ¶
func NewWithTx[T any, ID any](idGenHandler IDGenHandler[T, ID], txHandler TxHandler[T], middlewares ...EventMiddleware[T, ID]) *Bus[T, ID]
NewWithTx creates a Bus that wraps dispatch logic in a transaction using the provided TxHandler.
type CommandEvent ¶
CommandEvent is a special Payload that produces a result value after execution.
type CommitListener ¶
CommitListener is a function that is triggered after a payload has been committed.
type Event ¶
type Event[T, ID any] struct { ID ID // Unique identifier of the event Created time.Time // Timestamp of event creation Payloads []Payload[T] // A list of payloads included in the event }
Event represents a single event containing multiple payloads.
type EventHandler ¶
EventHandler is a generic function that handles an event with a given context.
func TxDispatcher ¶
func TxDispatcher[T, ID any](inTx TxHandler[T]) EventHandler[T, ID]
TxDispatcher wraps the dispatch logic in a transactional context.
type EventHooks ¶ added in v1.1.0
type EventMiddleware ¶
type EventMiddleware[T, ID any] func(next EventHandler[T, ID]) EventHandler[T, ID]
EventMiddleware wraps an EventHandler to add cross-cutting behavior like logging or recovery.
func LoggingById ¶ added in v1.1.0
func LoggingById[T any, ID any](logf func(format string, args ...any)) EventMiddleware[T, ID]
func LoggingByTypeName ¶ added in v1.1.0
func LoggingByTypeName[T any, ID any](payloadTypeNameHandler PayloadTypeNameHandler, logf func(format string, args ...any)) EventMiddleware[T, ID]
func Retry ¶ added in v1.1.0
func Retry[T, ID any](retries int, delay time.Duration) EventMiddleware[T, ID]
Retry middleware that forces the event to repeat when it fails
func RetryWithFallback ¶ added in v1.1.0
func RetryWithFallback[T, ID any](policy RetryPolicy, fallback RetryFallbackHandler[T, ID]) EventMiddleware[T, ID]
RetryWithFallback retries event dispatch using a policy and calls fallback on failure.
func RetryWithPolicy ¶ added in v1.1.0
func RetryWithPolicy[T, ID any](policy RetryPolicy) EventMiddleware[T, ID]
RetryWithPolicy retries event dispatch based on a custom retry policy. Allows for conditional retry logic and custom backoff strategies.
func WithEventHooks ¶ added in v1.1.0
func WithEventHooks[T, ID any](hooks EventHooks[T, ID]) EventMiddleware[T, ID]
WithEventHooks allows injecting callbacks before handling and after committing an event. Useful for logging, metrics, or tracing lifecycle stages.
type IDGenHandler ¶
IDGenHandler is generic function that handles creation ID for new event
func NewID8ByteHandler ¶
func NewID8ByteHandler[T any]() IDGenHandler[T, ID8Byte]
NewID8ByteHandler returns a thread-safe ID generator that produces sequential ID8Byte values. The counter is unique per handler instance.
type Payload ¶
type Payload[T any] interface { Validate(ctx T) error Commit(ctx T) Handle(ctx T) error Rollback(ctx T) PayloadType() PayloadType }
Payload defines a lifecycle for event actions including validation, handling, rollback, and commit.
type PayloadType ¶
type PayloadType int
type PayloadTypeNameHandler ¶ added in v1.1.0
type PayloadTypeNameHandler func(pt PayloadType) string
type RetryFallbackHandler ¶ added in v1.1.0
RetryFallbackHandler defines a function that receives failed events.
type RetryPolicy ¶ added in v1.1.0
type RetryPolicy interface {
ShouldRetry(attempt int, err error) bool
Backoff(attempt int) time.Duration
}
RetryPolicy Interface
type Subscribers ¶
type Subscribers[T, ID any] struct { // contains filtered or unexported fields }
Subscribers manages commit listeners for specific payload types.
func NewSubscribers ¶
func NewSubscribers[T, ID any]() *Subscribers[T, ID]
NewSubscribers creates an empty list of subscribers.
func (*Subscribers[T, ID]) Notifier ¶
func (s *Subscribers[T, ID]) Notifier() EventMiddleware[T, ID]
Notifier returns an EventMiddleware that notifies all matching subscribers after commit.
func (*Subscribers[T, ID]) Subscribe ¶
func (s *Subscribers[T, ID]) Subscribe(payload Payload[T], handler CommitListener[T])
Subscribe registers a commit listener for a specific payload.