Documentation
¶
Overview ¶
Package busen provides a small typed-first in-process event bus for Go.
Busen is designed around a few explicit goals:
- event payloads are plain Go values
- type-safe subscriptions are the primary API
- topics are optional local routing metadata
- context propagation is built into publish and handler execution
- asynchronous delivery uses bounded queues with explicit backpressure
- hooks expose runtime events without introducing a heavy framework layer
- middleware wraps local dispatch without turning the package into a framework
- optional metadata and observers support bridge/audit scenarios
- the package stays focused on simple in-process application use
Type-based subscriptions use exact Go types. A subscription registered for one type does not receive values of another type, even if they satisfy the same interface.
Topic subscriptions use dot-separated topics. Wildcards are intentionally small in scope:
- "*" matches exactly one segment
- ">" matches one or more remaining segments and must be the last segment
Ordering is never global. Busen only preserves FIFO delivery for a single asynchronous subscriber with one worker, or within the same non-empty ordering key for async subscribers with multiple workers.
Most applications start with New, register handlers with Subscribe, SubscribeTopic, or SubscribeTopics, and publish values with Publish. Use Async, Sequential, WithParallelism, and WithOverflow when you need bounded asynchronous delivery, and WithHooks when you want to observe runtime errors, panics, dropped/rejected events, [UseObserver] for cross-cutting bridge observation, and [Shutdown] when you need explicit shutdown modes.
Example ¶
type UserCreated struct {
Email string
}
b := busen.New()
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[UserCreated]) error {
fmt.Println("welcome", event.Value.Email)
return nil
})
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(context.Background(), b, UserCreated{Email: "hello@example.com"}); err != nil {
log.Fatal(err)
}
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: welcome hello@example.com
Index ¶
- Variables
- func Publish[T any](ctx context.Context, b *Bus, value T, opts ...PublishOption) error
- func Subscribe[T any](b *Bus, handler Handler[T], opts ...SubscribeOption) (func(), error)
- func SubscribeMatch[T any](b *Bus, match func(Event[T]) bool, handler Handler[T], opts ...SubscribeOption) (func(), error)
- func SubscribeTopic[T any](b *Bus, pattern string, handler Handler[T], opts ...SubscribeOption) (func(), error)
- func SubscribeTopics[T any](b *Bus, patterns []string, handler Handler[T], opts ...SubscribeOption) (func(), error)
- type Bus
- type Dispatch
- type DroppedEvent
- type Event
- type Handler
- type HandlerError
- type HandlerPanic
- type HandlerPanicError
- type HookPanic
- type Hooks
- type MetadataBuilder
- type Middleware
- type Next
- type Observation
- type Observer
- type ObserverOption
- type Option
- type OverflowPolicy
- type PublishDone
- type PublishMetadataInput
- type PublishOption
- type PublishStart
- type RejectedEvent
- type ShutdownMode
- type ShutdownResult
- type SubscribeOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClosed indicates that the bus no longer accepts new publishes or subscriptions. ErrClosed = errors.New("busen: bus closed") // ErrHandlerNil indicates that a nil handler was passed to Subscribe. ErrHandlerNil = errors.New("busen: handler is nil") // ErrBufferFull indicates that an asynchronous subscriber queue is full. ErrBufferFull = errors.New("busen: subscriber buffer full") // ErrDropped indicates that at least one event was dropped due to backpressure. ErrDropped = errors.New("busen: event dropped") // ErrInvalidPattern indicates that a topic pattern is malformed. ErrInvalidPattern = errors.New("busen: invalid topic pattern") // ErrInvalidOption indicates that an option value is not valid. ErrInvalidOption = errors.New("busen: invalid option") // ErrHandlerPanic indicates that a handler panicked while processing an event. ErrHandlerPanic = errors.New("busen: handler panic") // ErrCloseIncomplete indicates that Close stopped new work but did not finish // draining all in-flight work before the provided context ended. ErrCloseIncomplete = errors.New("busen: close incomplete") )
Functions ¶
func Subscribe ¶
func Subscribe[T any](b *Bus, handler Handler[T], opts ...SubscribeOption) (func(), error)
Subscribe registers a type-based subscription.
func SubscribeMatch ¶
func SubscribeMatch[T any](b *Bus, match func(Event[T]) bool, handler Handler[T], opts ...SubscribeOption) (func(), error)
SubscribeMatch registers a type-based subscription constrained by a predicate filter.
func SubscribeTopic ¶
func SubscribeTopic[T any](b *Bus, pattern string, handler Handler[T], opts ...SubscribeOption) (func(), error)
SubscribeTopic registers a type-based subscription constrained by a topic pattern.
Example ¶
b := busen.New()
unsubscribe, err := busen.SubscribeTopic(b, "orders.>", func(_ context.Context, event busen.Event[string]) error {
fmt.Printf("%s=%s\n", event.Topic, event.Value)
return nil
})
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(context.Background(), b, "created", busen.WithTopic("orders.eu.created")); err != nil {
log.Fatal(err)
}
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: orders.eu.created=created
func SubscribeTopics ¶ added in v0.2.0
func SubscribeTopics[T any](b *Bus, patterns []string, handler Handler[T], opts ...SubscribeOption) (func(), error)
SubscribeTopics registers a type-based subscription constrained by multiple topic patterns.
Example ¶
b := busen.New()
unsubscribe, err := busen.SubscribeTopics(b, []string{"orders.created", "orders.updated"}, func(_ context.Context, event busen.Event[string]) error {
fmt.Printf("%s=%s\n", event.Topic, event.Value)
return nil
})
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(context.Background(), b, "created", busen.WithTopic("orders.created")); err != nil {
log.Fatal(err)
}
if err := busen.Publish(context.Background(), b, "updated", busen.WithTopic("orders.updated")); err != nil {
log.Fatal(err)
}
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: orders.created=created orders.updated=updated
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is a typed-first in-process event bus.
func (*Bus) Close ¶
Close stops accepting new publishes and drains async subscribers. If the provided context ends first, Close returns an error wrapping both ErrCloseIncomplete and the context error. In that case, user handlers are not forcefully canceled.
func (*Bus) Shutdown ¶ added in v0.3.0
func (b *Bus) Shutdown(ctx context.Context, mode ShutdownMode) (ShutdownResult, error)
Shutdown stops accepting new publishes and subscriptions according to mode.
Example ¶
type Job struct {
ID int
}
b := busen.New()
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, _ busen.Event[Job]) error {
return nil
}, busen.Async(), busen.WithBuffer(8))
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
_ = busen.Publish(context.Background(), b, Job{ID: 1})
result, err := b.Shutdown(context.Background(), busen.ShutdownDrain)
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Mode == busen.ShutdownDrain, result.Completed)
Output: true true
func (*Bus) Use ¶
func (b *Bus) Use(middlewares ...Middleware) error
Use registers global dispatch middleware.
Middleware is applied to both sync and async handler execution. It does not replace hooks, and it does not manage bus lifecycle or routing.
Example ¶
type AuditEvent struct {
Action string
}
b := busen.New()
if err := b.Use(func(next busen.Next) busen.Next {
return func(ctx context.Context, dispatch busen.Dispatch) error {
if dispatch.Headers == nil {
dispatch.Headers = make(map[string]string, 1)
}
dispatch.Headers["source"] = "middleware"
return next(ctx, dispatch)
}
}); err != nil {
log.Fatal(err)
}
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[AuditEvent]) error {
fmt.Printf("%s from %s\n", event.Value.Action, event.Headers["source"])
return nil
})
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(
context.Background(),
b,
AuditEvent{Action: "saved"},
busen.WithHeaders(map[string]string{"request-id": "req-1"}),
); err != nil {
log.Fatal(err)
}
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: saved from middleware
func (*Bus) UseObserver ¶ added in v0.3.0
func (b *Bus) UseObserver(observer Observer, opts ...ObserverOption) error
UseObserver registers an optional bridge/audit observer.
Example ¶
type OrderCreated struct {
ID string
}
b := busen.New()
if err := b.UseObserver(
func(_ context.Context, obs busen.Observation) {
fmt.Printf("observe %s %v\n", obs.Topic, obs.EventType)
},
busen.ObserveTopic("orders.>"),
); err != nil {
log.Fatal(err)
}
unsubscribe, err := busen.SubscribeTopic(b, "orders.>", func(_ context.Context, event busen.Event[OrderCreated]) error {
return nil
})
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(context.Background(), b, OrderCreated{ID: "o-1"}, busen.WithTopic("orders.created")); err != nil {
log.Fatal(err)
}
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: observe orders.created busen_test.OrderCreated
type Dispatch ¶
type Dispatch struct {
// EventType is the exact Go type being dispatched.
EventType reflect.Type
// Topic is the publish topic after publish options have been applied.
Topic string
// Key is the publish ordering key after publish options have been applied.
Key string
// Headers is a mutable copy of the publish headers for this handler call.
Headers map[string]string
// Meta is mutable structured metadata for this handler call.
Meta map[string]string
// Value is the event payload that will be passed to the typed handler.
Value any
// Async reports whether the target subscription is asynchronous.
Async bool
}
Dispatch carries untyped event metadata through middleware.
Middleware is intentionally thin and local to in-process dispatch. It may inspect or transform the event metadata before the typed handler runs.
Dispatch mutation rules are intentionally narrow:
- changes are visible to later middleware and the final handler
- changes do not rewrite hook payloads
- changes do not affect subscriber matching, publish-level hooks, or async queue selection, all of which happen before middleware runs
type DroppedEvent ¶
type DroppedEvent struct {
// EventType is the exact Go type that could not be queued.
EventType reflect.Type
// Topic is the event topic that was being delivered.
Topic string
// Key is the event ordering key that was being delivered.
Key string
// Meta is structured envelope metadata for the dropped event.
Meta map[string]string
// Async is always true for dropped events.
Async bool
// Policy is the overflow policy that decided the drop behavior.
Policy OverflowPolicy
// SubscriberID is the internal subscription identifier.
SubscriberID uint64
// QueueLen is the queue length at observation time.
QueueLen int
// QueueCap is the queue capacity.
QueueCap int
// MailboxIndex is the selected worker mailbox index.
MailboxIndex int
// Reason reports why the event was dropped.
Reason error
}
DroppedEvent describes a dropped event caused by backpressure.
type Event ¶
type Event[T any] struct { // Topic carries optional routing metadata supplied at publish time. Topic string // Key carries the optional ordering key supplied at publish time. Key string // Value is the typed event payload. Value T // Headers contains a shallow copy of publish headers visible to handlers. Headers map[string]string // Meta contains structured envelope metadata visible to handlers. Meta map[string]string }
Event is the typed value delivered to handlers.
type HandlerError ¶
type HandlerError struct {
// EventType is the exact Go type handled by the subscriber.
EventType reflect.Type
// Topic is the event topic seen by the handler.
Topic string
// Key is the event ordering key seen by the handler.
Key string
// Meta is structured envelope metadata seen by the handler.
Meta map[string]string
// Async reports whether the handler ran in async mode.
Async bool
// Err is the error returned by the handler.
Err error
}
HandlerError describes a handler error.
type HandlerPanic ¶
type HandlerPanic struct {
// EventType is the exact Go type handled by the subscriber.
EventType reflect.Type
// Topic is the event topic seen by the handler.
Topic string
// Key is the event ordering key seen by the handler.
Key string
// Meta is structured envelope metadata seen by the handler.
Meta map[string]string
// Async reports whether the handler ran in async mode.
Async bool
// Value is the recovered panic value.
Value any
}
HandlerPanic describes a recovered handler panic.
type HandlerPanicError ¶
type HandlerPanicError struct {
Value any
}
HandlerPanicError wraps a recovered handler panic as an error value.
func (*HandlerPanicError) Error ¶
func (e *HandlerPanicError) Error() string
func (*HandlerPanicError) Unwrap ¶
func (e *HandlerPanicError) Unwrap() error
type HookPanic ¶
type HookPanic struct {
// Hook is the callback name that panicked, such as "OnPublishDone".
Hook string
// Value is the recovered panic value.
Value any
}
HookPanic describes a recovered panic raised by another hook callback.
type Hooks ¶
type Hooks struct {
// OnPublishStart runs before matching subscribers are evaluated.
OnPublishStart func(PublishStart)
// OnPublishDone runs after all matching deliveries have been attempted.
OnPublishDone func(PublishDone)
// OnHandlerError runs when a handler returns a non-nil error.
OnHandlerError func(HandlerError)
// OnHandlerPanic runs when a handler panic is recovered.
OnHandlerPanic func(HandlerPanic)
// OnEventDropped runs when async backpressure drops an event.
OnEventDropped func(DroppedEvent)
// OnEventRejected runs when async backpressure rejects an event.
OnEventRejected func(RejectedEvent)
// OnHookPanic runs when another hook panics and the panic is recovered.
OnHookPanic func(HookPanic)
}
Hooks observes publish and handler lifecycle events.
Hooks are intentionally thin. They are not a full middleware pipeline and do not change delivery semantics. They exist to observe important runtime events such as async failures, panics, and dropped events.
type MetadataBuilder ¶ added in v0.3.0
type MetadataBuilder func(PublishMetadataInput) map[string]string
MetadataBuilder builds optional structured metadata for publish envelopes.
type Middleware ¶
Middleware wraps local handler dispatch in the same spirit as HTTP middleware.
type Observation ¶ added in v0.3.0
type Observation struct {
EventType reflect.Type
Topic string
Key string
Headers map[string]string
Meta map[string]string
Value any
Async bool
SubscriberID uint64
}
Observation represents an accepted delivery for bridge/audit observers.
type Observer ¶ added in v0.3.0
type Observer func(context.Context, Observation)
Observer receives accepted observations.
type ObserverOption ¶ added in v0.3.0
type ObserverOption interface {
// contains filtered or unexported methods
}
ObserverOption configures an observer filter.
func ObserveMatch ¶ added in v0.3.0
func ObserveMatch(fn func(Observation) bool) ObserverOption
ObserveMatch applies a custom observation predicate.
func ObserveMetadata ¶ added in v0.3.0
func ObserveMetadata(meta map[string]string) ObserverOption
ObserveMetadata filters observations by metadata subset.
func ObserveTopic ¶ added in v0.3.0
func ObserveTopic(pattern string) ObserverOption
ObserveTopic filters observations by topic pattern.
func ObserveType ¶ added in v0.3.0
func ObserveType[T any]() ObserverOption
ObserveType filters observations by exact event type.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option configures a Bus.
Callers typically obtain Option values from helpers such as WithDefaultBuffer, WithDefaultOverflow, WithHooks, and WithMiddleware rather than implementing this interface directly.
func WithDefaultBuffer ¶
WithDefaultBuffer sets the default queue size for async subscribers.
func WithDefaultOverflow ¶
func WithDefaultOverflow(policy OverflowPolicy) Option
WithDefaultOverflow sets the default overflow policy for async subscribers.
func WithHooks ¶
WithHooks registers runtime hooks for publish and handler lifecycle events.
Example ¶
type UserCreated struct {
ID string
}
b := busen.New(busen.WithHooks(busen.Hooks{
OnPublishDone: func(info busen.PublishDone) {
fmt.Printf("matched=%d delivered=%d\n", info.MatchedSubscribers, info.DeliveredSubscribers)
},
}))
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[UserCreated]) error {
fmt.Println("handled", event.Value.ID)
return nil
})
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(context.Background(), b, UserCreated{ID: "u-1"}); err != nil {
log.Fatal(err)
}
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: handled u-1 matched=1 delivered=1
func WithMetadataBuilder ¶ added in v0.3.0
func WithMetadataBuilder(builder MetadataBuilder) Option
WithMetadataBuilder registers a global metadata builder for publish envelopes.
Example ¶
type OrderCreated struct {
ID string
}
b := busen.New(
busen.WithMetadataBuilder(func(input busen.PublishMetadataInput) map[string]string {
return map[string]string{
"source": "billing",
}
}),
)
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[OrderCreated]) error {
fmt.Printf("%s from %s\n", event.Value.ID, event.Meta["source"])
return nil
})
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(
context.Background(),
b,
OrderCreated{ID: "o-1"},
busen.WithMetadata(map[string]string{"trace_id": "tr-1"}),
); err != nil {
log.Fatal(err)
}
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: o-1 from billing
func WithMiddleware ¶
func WithMiddleware(middlewares ...Middleware) Option
WithMiddleware registers global dispatch middleware at bus construction time.
type OverflowPolicy ¶
type OverflowPolicy int
OverflowPolicy controls what happens when an async subscriber queue is full.
const ( // OverflowBlock blocks the publisher until queue space is available. OverflowBlock OverflowPolicy = iota // OverflowFailFast returns an error instead of waiting for queue space. OverflowFailFast // OverflowDropNewest drops the incoming event when the queue is full. OverflowDropNewest // OverflowDropOldest evicts the oldest queued event to admit the new event. OverflowDropOldest )
type PublishDone ¶
type PublishDone struct {
// EventType is the exact Go type that was published.
EventType reflect.Type
// Topic is the publish topic after options have been applied.
Topic string
// Key is the publish ordering key after options have been applied.
Key string
// Headers is a copy of the publish headers.
Headers map[string]string
// Meta is structured envelope metadata.
Meta map[string]string
// MatchedSubscribers is the number of subscriptions whose routing constraints
// matched the published event.
MatchedSubscribers int
// DeliveredSubscribers is the number of subscriptions that accepted the event
// for handler execution or async enqueue after lifecycle checks.
DeliveredSubscribers int
// Err joins delivery errors returned during publish, if any.
Err error
}
PublishDone describes the end of a publish operation.
type PublishMetadataInput ¶ added in v0.3.0
type PublishMetadataInput struct {
Context context.Context
EventType reflect.Type
Topic string
Key string
Headers map[string]string
Value any
}
PublishMetadataInput is passed to MetadataBuilder.
type PublishOption ¶
type PublishOption interface {
// contains filtered or unexported methods
}
PublishOption configures a publish call.
Callers typically obtain PublishOption values from helpers such as WithTopic, WithKey, and WithHeaders rather than implementing this interface directly.
func WithHeaders ¶
func WithHeaders(headers map[string]string) PublishOption
WithHeaders sets headers for a published event.
func WithKey ¶
func WithKey(key string) PublishOption
WithKey sets an optional ordering key for a published event.
In async mode, subscribers with multiple workers preserve ordering for events that share the same non-empty key within that subscriber. Empty keys fall back to the regular non-keyed path.
Example ¶
type UserCreated struct {
ID string
}
b := busen.New()
done := make(chan struct{}, 2)
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[UserCreated]) error {
fmt.Printf("%s:%s\n", event.Key, event.Value.ID)
done <- struct{}{}
return nil
}, busen.Async(), busen.WithParallelism(2), busen.WithBuffer(4))
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(context.Background(), b, UserCreated{ID: "1"}, busen.WithKey("tenant-a")); err != nil {
log.Fatal(err)
}
if err := busen.Publish(context.Background(), b, UserCreated{ID: "2"}, busen.WithKey("tenant-a")); err != nil {
log.Fatal(err)
}
<-done
<-done
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: tenant-a:1 tenant-a:2
func WithMetadata ¶ added in v0.3.0
func WithMetadata(meta map[string]string) PublishOption
WithMetadata sets structured envelope metadata for a published event.
func WithTopic ¶
func WithTopic(topic string) PublishOption
WithTopic sets the routing topic for a published event.
type PublishStart ¶
type PublishStart struct {
// EventType is the exact Go type being published.
EventType reflect.Type
// Topic is the publish topic after options have been applied.
Topic string
// Key is the publish ordering key after options have been applied.
Key string
// Headers is a copy of the publish headers.
Headers map[string]string
// Meta is structured envelope metadata.
Meta map[string]string
}
PublishStart describes the beginning of a publish operation.
type RejectedEvent ¶ added in v0.3.0
type RejectedEvent struct {
// EventType is the exact Go type that could not be queued.
EventType reflect.Type
// Topic is the event topic that was being delivered.
Topic string
// Key is the event ordering key that was being delivered.
Key string
// Meta is structured envelope metadata for the rejected event.
Meta map[string]string
// Async is always true for rejected events.
Async bool
// Policy is the overflow policy that rejected the event.
Policy OverflowPolicy
// SubscriberID is the internal subscription identifier.
SubscriberID uint64
// QueueLen is the queue length at observation time.
QueueLen int
// QueueCap is the queue capacity.
QueueCap int
// MailboxIndex is the selected worker mailbox index.
MailboxIndex int
// Reason reports why the event was rejected.
Reason error
}
RejectedEvent describes an event rejected by backpressure policy.
type ShutdownMode ¶ added in v0.3.0
type ShutdownMode int
ShutdownMode controls how bus shutdown handles queued async events.
const ( // ShutdownDrain waits for async queues to drain. ShutdownDrain ShutdownMode = iota // ShutdownBestEffort stops accepting work and waits until context ends. ShutdownBestEffort // ShutdownAbort stops accepting work and drops queued async events. ShutdownAbort )
type ShutdownResult ¶ added in v0.3.0
type ShutdownResult struct {
Mode ShutdownMode
// Processed is the number of handler executions observed during shutdown.
Processed int64
// Dropped is the number of dropped events observed during shutdown.
// It includes backpressure drops and abort-mode queue drops.
Dropped int64
// Rejected is the number of rejected events observed during shutdown.
Rejected int64
// TimedOutSubscribers contains subscriber IDs that did not stop before ctx ended.
TimedOutSubscribers []uint64
// Completed reports whether shutdown fully completed before context cancellation.
Completed bool
}
ShutdownResult reports structured shutdown outcomes.
type SubscribeOption ¶
type SubscribeOption interface {
// contains filtered or unexported methods
}
SubscribeOption configures a subscription.
Callers typically obtain SubscribeOption values from helpers such as Async, Sequential, WithParallelism, WithBuffer, WithOverflow, and WithFilter rather than implementing this interface directly.
func Async ¶
func Async() SubscribeOption
Async delivers events through a bounded queue and worker goroutines.
Example ¶
type JobQueued struct {
ID string
}
b := busen.New()
done := make(chan struct{})
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[JobQueued]) error {
fmt.Println("processed", event.Value.ID)
close(done)
return nil
}, busen.Async(), busen.WithBuffer(1))
if err != nil {
log.Fatal(err)
}
defer unsubscribe()
if err := busen.Publish(context.Background(), b, JobQueued{ID: "job-42"}); err != nil {
log.Fatal(err)
}
<-done
if err := b.Close(context.Background()); err != nil {
log.Fatal(err)
}
Output: processed job-42
func Sequential ¶
func Sequential() SubscribeOption
Sequential is shorthand for single-worker async FIFO delivery.
It enables async delivery and forces the subscriber to run with one worker.
func WithBuffer ¶
func WithBuffer(size int) SubscribeOption
WithBuffer sets the queue size for an async subscriber.
func WithFilter ¶
func WithFilter[T any](fn func(Event[T]) bool) SubscribeOption
WithFilter applies a predicate filter before the handler runs.
func WithOverflow ¶
func WithOverflow(policy OverflowPolicy) SubscribeOption
WithOverflow sets the queue overflow policy for an async subscriber.
func WithParallelism ¶
func WithParallelism(n int) SubscribeOption
WithParallelism sets the number of workers for an async subscriber.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
dispatch
Package dispatch provides small primitives for coordinating in-process delivery.
|
Package dispatch provides small primitives for coordinating in-process delivery. |
|
router
Package router compiles and evaluates topic matchers for Busen routing.
|
Package router compiles and evaluates topic matchers for Busen routing. |