busen

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

README

Busen

CI License Go Reference Release Go Version

Busen 是一个小而清晰、typed-first、进程内的 Go 事件总线。

快速概览

  • 定位:小而清晰、typed-first 的进程内事件总线
  • 范围:只做单进程内分发,不做持久化、重放、跨进程投递
  • API 风格:Subscribe[T] / Publish[T],默认同步,语义直观
  • 路由能力:支持精确 topic 与轻量通配(*、末尾 >
  • 并发控制:Async() + WithBuffer(...) + WithOverflow(...) 显式背压
  • 顺序语义:支持 single-worker FIFO 与 per-subscriber/per-key 局部有序
  • 可观测性:Hooks 观测 publish/error/panic/drop/reject
  • 扩展点:Use(...) 中间件、WithMetadata(...) 元数据、UseObserver(...) 桥接观察

核心优势与能力

优势 价值
typed-first API Subscribe[T] / Publish[T] 直接用业务类型,减少样板代码和断言错误
显式并发语义 sync/async、buffer、overflow、keyed ordering 都是可配置且可预期的
轻量但可扩展 支持 topic、middleware、hooks、metadata、observer,按需开启,不强制框架化
观测与排障友好 提供 publish/error/panic/drop/reject 生命周期回调,且携带结构化上下文
工程边界清晰 明确聚焦 in-process,不混入分布式消息平台职责,便于长期维护

安装

要求:Go 1.26.0 或更高版本(与 go.mod 保持一致)。

go get github.com/lin-snow/Busen

快速开始

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/lin-snow/Busen"
)

type UserCreated struct {
	ID    string
	Email string
}

func main() {
	bus := busen.New()

	unsubscribe, err := busen.Subscribe(bus, func(ctx context.Context, event busen.Event[UserCreated]) error {
		fmt.Printf("welcome %s\n", event.Value.Email)
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}
	defer unsubscribe()

	err = busen.Publish(context.Background(), bus, UserCreated{
		ID:    "u_123",
		Email: "hello@example.com",
	})
	if err != nil {
		log.Fatal(err)
	}

	_ = bus.Close(context.Background())
}

API 选择建议

大多数场景可以按下面方式选 API:

场景 建议 API
只按类型收消息 Subscribe[T]
还需要按 topic 约束 SubscribeTopic[T]
一个 handler 需要订阅多个 topic SubscribeTopics[T]
需要按事件内容过滤 SubscribeMatch[T]WithFilter(...)
希望调用方同步拿到 handler error 默认同步订阅
希望异步投递并显式控制背压 Async() + WithBuffer(...) + WithOverflow(...)
希望单个订阅者 FIFO Sequential()
希望同一 key 局部有序 Async() + WithParallelism(...) + 发布时 WithKey(...)
希望观测 publish / panic / drop / reject WithHooks(...)
希望只包裹本地 handler 调用 Use(...)WithMiddleware(...)
希望做 webhook/audit/落库桥接观察 UseObserver(...)

何时适合使用

适合使用 不适合使用
你希望在单个 Go 进程内做 typed event 解耦 你需要持久化、重放或跨进程投递
你需要轻量 topic 路由和有界异步投递 你需要内置 tracing、metrics、retry 或 rate limiting
你希望保持 API 简洁并显式控制并发语义 你需要重型消息平台或分布式能力

Topic 路由

Busen 支持点分隔的轻量 topic 路由。

  • *:匹配恰好一个 segment
  • >:匹配剩余的一个或多个 segment,且必须位于末尾
sub, err := busen.SubscribeTopic(bus, "orders.>", func(ctx context.Context, event busen.Event[string]) error {
	fmt.Println(event.Topic, event.Value)
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer sub()

_ = busen.Publish(context.Background(), bus, "created", busen.WithTopic("orders.eu.created"))

如果同一个 handler 需要订阅多个 topic,也可以使用 SubscribeTopics[T]

sub, err := busen.SubscribeTopics(bus, []string{"orders.created", "orders.updated"}, func(ctx context.Context, event busen.Event[string]) error {
	fmt.Println(event.Topic, event.Value)
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer sub()

_ = busen.Publish(context.Background(), bus, "created", busen.WithTopic("orders.created"))
_ = busen.Publish(context.Background(), bus, "updated", busen.WithTopic("orders.updated"))

异步分发与顺序

异步订阅使用有界队列,背压策略是显式的:

  • OverflowBlock
  • OverflowFailFast
  • OverflowDropNewest
  • OverflowDropOldest
_, err = busen.Subscribe(bus, func(ctx context.Context, event busen.Event[UserCreated]) error {
	return nil
},
	busen.Async(),
	busen.Sequential(),
	busen.WithBuffer(128),
	busen.WithOverflow(busen.OverflowBlock),
)

如果发布时带上 WithKey(...),那么同一 async 订阅者内、相同非空 ordering key 的事件会保持局部顺序:

_, err = busen.Subscribe(bus, func(ctx context.Context, event busen.Event[UserCreated]) error {
	return nil
}, busen.Async(), busen.WithParallelism(4), busen.WithBuffer(256))

_ = busen.Publish(context.Background(), bus, UserCreated{ID: "1"}, busen.WithKey("tenant-a"))
_ = busen.Publish(context.Background(), bus, UserCreated{ID: "2"}, busen.WithKey("tenant-a"))

边界说明:

  • ordering key 只对 async subscriber 生效
  • 空 key 会回退到普通非 keyed 调度
  • 顺序保证是 per subscriber / per key,不是全局顺序

Middleware 与 Hooks

Middleware

Busen 提供一个很薄的 dispatch 中间件层,用来做本地组合,而不是重型 pipeline 框架。

err = bus.Use(func(next busen.Next) busen.Next {
	return func(ctx context.Context, dispatch busen.Dispatch) error {
		return next(ctx, dispatch)
	}
})
if err != nil {
	log.Fatal(err)
}

中间件的边界:

  • 只包 handler invocation
  • 不替代钩子
  • 不承担 retries、metrics、tracing、distributed concerns
  • 不影响 subscriber matching、topic routing、async queue selection
  • Dispatch 的修改只影响后续中间件和最终 handler
  • 钩子仍然观察原始 publish 元数据

如果你更喜欢构造期注册方式,也可以使用 WithMiddleware(...)

bus := busen.New(
	busen.WithMiddleware(func(next busen.Next) busen.Next {
		return func(ctx context.Context, dispatch busen.Dispatch) error {
			return next(ctx, dispatch)
		}
	}),
)
Hooks

Hooks 用来观察运行时事件,而不是参与 handler 调用链控制。

bus := busen.New(
	busen.WithHooks(busen.Hooks{
		OnPublishDone: func(info busen.PublishDone) {
			log.Printf("matched=%d delivered=%d err=%v", info.MatchedSubscribers, info.DeliveredSubscribers, info.Err)
		},
		OnHandlerError: func(info busen.HandlerError) {
			log.Printf("async=%v topic=%q err=%v", info.Async, info.Topic, info.Err)
		},
		OnHandlerPanic: func(info busen.HandlerPanic) {
			log.Printf("panic in %v: %v", info.EventType, info.Value)
		},
		OnEventDropped: func(info busen.DroppedEvent) {
			log.Printf("dropped event for topic %q with policy %v", info.Topic, info.Policy)
		},
		OnEventRejected: func(info busen.RejectedEvent) {
			log.Printf("rejected event for topic %q with policy %v", info.Topic, info.Policy)
		},
	}),
)

当前 hooks 触发点包括:

  • OnPublishStart
  • OnPublishDone
  • OnHandlerError
  • OnHandlerPanic
  • OnEventDropped
  • OnEventRejected
  • OnHookPanic
可选统一 metadata

Busen 保持 typed-first,不强制事件信封;如果你需要统一结构化元数据,可以按需启用 metadata 层。

bus := busen.New(
	busen.WithMetadataBuilder(func(input busen.PublishMetadataInput) map[string]string {
		return map[string]string{
			"source": "billing-service",
		}
	}),
)

_ = busen.Publish(
	context.Background(),
	bus,
	OrderCreated{ID: "o_1"},
	busen.WithMetadata(map[string]string{
		"trace_id": "tr_123",
	}),
)

规则:

  • builder 是全局默认 metadata
  • WithMetadata(...) 的同名键会覆盖 builder 返回值
  • metadata 会传递到 middleware、handler 以及 hooks
Observer(桥接观察者)

UseObserver(...) 用于横切观察(webhook、审计、事件落库等),语义是“观察已接受投递”,不参与主业务订阅匹配。

_ = bus.UseObserver(
	func(ctx context.Context, obs busen.Observation) {
		log.Printf("observe type=%v topic=%q sub=%d", obs.EventType, obs.Topic, obs.SubscriberID)
	},
	busen.ObserveTopic("orders.>"),
	busen.ObserveMetadata(map[string]string{"trace_id": "tr_123"}),
)

可选过滤器:

  • ObserveType[T]()
  • ObserveTopic(pattern)
  • ObserveMetadata(map[string]string)
  • ObserveMatch(func(Observation) bool)
Shutdown 模式

Close(ctx) 保持兼容,等价于 Shutdown(ctx, ShutdownDrain)。如果你需要更细粒度策略,可以使用 Shutdown(...)

result, err := bus.Shutdown(context.Background(), busen.ShutdownBestEffort)
if err != nil {
	log.Fatal(err)
}
log.Printf("completed=%v processed=%d dropped=%d rejected=%d timeout_subs=%v",
	result.Completed, result.Processed, result.Dropped, result.Rejected, result.TimedOutSubscribers)

模式说明:

  • ShutdownDrain:停止接收并尽量完整 drain(Close 默认语义)
  • ShutdownBestEffort:停止接收后尽力等待到 ctx 截止
  • ShutdownAbort:停止接收并立即丢弃队列中待处理事件(不强制终止正在运行的 handler)

性能测试

Busen 内置了可重复运行的 benchmark,覆盖 Publish[T]、sync/async、topic 路由、middleware、hooks 等热路径。

主要覆盖项:

  • Publish[T]1 / 10 / 100 个订阅者下的成本
  • sync 与 async sequential 的差异
  • async keyed delivery
  • middleware 开启/关闭
  • middleware + hooks 同时开启
  • async keyed + topic routing
  • exact / wildcard 路由
  • direct router matcher 成本

运行方式:

go test ./... -run '^$' -bench . -benchmem

这些数字代表的是 in-process event bus 的热路径开销,不是消息系统吞吐保证。

在一台使用 Go 1.26.0 的 Apple M4 机器上的一轮参考结果大致为:

场景 参考耗时
sync publish(1 subscriber) 147 ns/op
sync publish(10 subscribers) 659 ns/op
async sequential publish 238 ns/op
async keyed publish 285 ns/op
middleware-enabled publish 129 ns/op
middleware + hooks publish 147 ns/op
async keyed + topic publish 299 ns/op
exact topic publish 158 ns/op
wildcard topic publish 151 ns/op

这一轮里,router matcher 依然保持 0 allocs/op

matcher 参考耗时 分配
exact matcher 1.5 ns/op 0 allocs/op
wildcard matcher 6.3 ns/op 0 allocs/op

新增能力(metadata / observer)的一轮参考结果如下:

场景 参考耗时 分配
publish with metadata(disabled) 126 ns/op 288 B/op, 4 allocs/op
publish with metadata(enabled) 780 ns/op 2640 B/op, 18 allocs/op
publish with observer(disabled) 149 ns/op 312 B/op, 5 allocs/op
publish with observer(enabled) 187 ns/op 376 B/op, 6 allocs/op

说明:

  • 上表来自 go test ./... -run '^$' -bench . -benchmem 的单轮样本,主要用于量级感知
  • metadata 开销主要来自 map 复制/合并与 hook/handler 透传
  • observer 在“仅观察、轻过滤”下增量较小;复杂过滤函数会抬高开销
  • 建议在你的目标硬件上用相同命令复测后再做容量预算

设计边界

  • 类型匹配是精确匹配,不做接口层级自动分发
  • 不保证全局顺序,只保证局部顺序语义
  • sync handler 错误会直接返回给 Publish
  • async handler error / panic 不回传给 Publish,应通过 Hooks 观测
  • Close(ctx) 超时表示未在期限内 drain 完成,不会强制终止用户 handler
  • 这是 in-process event bus,不是 distributed event platform

相关文档

  • 变更记录:CHANGELOG.md
  • 贡献指南:CONTRIBUTING.md
  • 支持与提问:SUPPORT.md
  • 安全策略:SECURITY.md
  • 发布流程:RELEASING.md
  • 项目治理:GOVERNANCE.md
  • 行为准则:CODE_OF_CONDUCT.md
  • 更多用法示例:example_test.go

Documentation

Overview

Package busen provides a small typed-first in-process event bus for Go.

Busen is designed around a few explicit goals:

  • event payloads are plain Go values
  • type-safe subscriptions are the primary API
  • topics are optional local routing metadata
  • context propagation is built into publish and handler execution
  • asynchronous delivery uses bounded queues with explicit backpressure
  • hooks expose runtime events without introducing a heavy framework layer
  • middleware wraps local dispatch without turning the package into a framework
  • optional metadata and observers support bridge/audit scenarios
  • the package stays focused on simple in-process application use

Type-based subscriptions use exact Go types. A subscription registered for one type does not receive values of another type, even if they satisfy the same interface.

Topic subscriptions use dot-separated topics. Wildcards are intentionally small in scope:

  • "*" matches exactly one segment
  • ">" matches one or more remaining segments and must be the last segment

Ordering is never global. Busen only preserves FIFO delivery for a single asynchronous subscriber with one worker, or within the same non-empty ordering key for async subscribers with multiple workers.

Most applications start with New, register handlers with Subscribe, SubscribeTopic, or SubscribeTopics, and publish values with Publish. Use Async, Sequential, WithParallelism, and WithOverflow when you need bounded asynchronous delivery, and WithHooks when you want to observe runtime errors, panics, dropped/rejected events, [UseObserver] for cross-cutting bridge observation, and [Shutdown] when you need explicit shutdown modes.

Example
type UserCreated struct {
	Email string
}

b := busen.New()

unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[UserCreated]) error {
	fmt.Println("welcome", event.Value.Email)
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(context.Background(), b, UserCreated{Email: "hello@example.com"}); err != nil {
	log.Fatal(err)
}

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
welcome hello@example.com

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed indicates that the bus no longer accepts new publishes or subscriptions.
	ErrClosed = errors.New("busen: bus closed")

	// ErrHandlerNil indicates that a nil handler was passed to Subscribe.
	ErrHandlerNil = errors.New("busen: handler is nil")

	// ErrBufferFull indicates that an asynchronous subscriber queue is full.
	ErrBufferFull = errors.New("busen: subscriber buffer full")

	// ErrDropped indicates that at least one event was dropped due to backpressure.
	ErrDropped = errors.New("busen: event dropped")

	// ErrInvalidPattern indicates that a topic pattern is malformed.
	ErrInvalidPattern = errors.New("busen: invalid topic pattern")

	// ErrInvalidOption indicates that an option value is not valid.
	ErrInvalidOption = errors.New("busen: invalid option")

	// ErrHandlerPanic indicates that a handler panicked while processing an event.
	ErrHandlerPanic = errors.New("busen: handler panic")

	// ErrCloseIncomplete indicates that Close stopped new work but did not finish
	// draining all in-flight work before the provided context ended.
	ErrCloseIncomplete = errors.New("busen: close incomplete")
)

Functions

func Publish

func Publish[T any](ctx context.Context, b *Bus, value T, opts ...PublishOption) error

Publish delivers a typed event to matching subscribers.

func Subscribe

func Subscribe[T any](b *Bus, handler Handler[T], opts ...SubscribeOption) (func(), error)

Subscribe registers a type-based subscription.

func SubscribeMatch

func SubscribeMatch[T any](b *Bus, match func(Event[T]) bool, handler Handler[T], opts ...SubscribeOption) (func(), error)

SubscribeMatch registers a type-based subscription constrained by a predicate filter.

func SubscribeTopic

func SubscribeTopic[T any](b *Bus, pattern string, handler Handler[T], opts ...SubscribeOption) (func(), error)

SubscribeTopic registers a type-based subscription constrained by a topic pattern.

Example
b := busen.New()

unsubscribe, err := busen.SubscribeTopic(b, "orders.>", func(_ context.Context, event busen.Event[string]) error {
	fmt.Printf("%s=%s\n", event.Topic, event.Value)
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(context.Background(), b, "created", busen.WithTopic("orders.eu.created")); err != nil {
	log.Fatal(err)
}

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
orders.eu.created=created

func SubscribeTopics added in v0.2.0

func SubscribeTopics[T any](b *Bus, patterns []string, handler Handler[T], opts ...SubscribeOption) (func(), error)

SubscribeTopics registers a type-based subscription constrained by multiple topic patterns.

Example
b := busen.New()

unsubscribe, err := busen.SubscribeTopics(b, []string{"orders.created", "orders.updated"}, func(_ context.Context, event busen.Event[string]) error {
	fmt.Printf("%s=%s\n", event.Topic, event.Value)
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(context.Background(), b, "created", busen.WithTopic("orders.created")); err != nil {
	log.Fatal(err)
}
if err := busen.Publish(context.Background(), b, "updated", busen.WithTopic("orders.updated")); err != nil {
	log.Fatal(err)
}

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
orders.created=created
orders.updated=updated

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is a typed-first in-process event bus.

func New

func New(opts ...Option) *Bus

New creates a new Bus.

func (*Bus) Close

func (b *Bus) Close(ctx context.Context) error

Close stops accepting new publishes and drains async subscribers. If the provided context ends first, Close returns an error wrapping both ErrCloseIncomplete and the context error. In that case, user handlers are not forcefully canceled.

func (*Bus) Shutdown added in v0.3.0

func (b *Bus) Shutdown(ctx context.Context, mode ShutdownMode) (ShutdownResult, error)

Shutdown stops accepting new publishes and subscriptions according to mode.

Example
type Job struct {
	ID int
}

b := busen.New()
unsubscribe, err := busen.Subscribe(b, func(_ context.Context, _ busen.Event[Job]) error {
	return nil
}, busen.Async(), busen.WithBuffer(8))
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

_ = busen.Publish(context.Background(), b, Job{ID: 1})

result, err := b.Shutdown(context.Background(), busen.ShutdownDrain)
if err != nil {
	log.Fatal(err)
}
fmt.Println(result.Mode == busen.ShutdownDrain, result.Completed)
Output:
true true

func (*Bus) Use

func (b *Bus) Use(middlewares ...Middleware) error

Use registers global dispatch middleware.

Middleware is applied to both sync and async handler execution. It does not replace hooks, and it does not manage bus lifecycle or routing.

Example
type AuditEvent struct {
	Action string
}

b := busen.New()
if err := b.Use(func(next busen.Next) busen.Next {
	return func(ctx context.Context, dispatch busen.Dispatch) error {
		if dispatch.Headers == nil {
			dispatch.Headers = make(map[string]string, 1)
		}
		dispatch.Headers["source"] = "middleware"
		return next(ctx, dispatch)
	}
}); err != nil {
	log.Fatal(err)
}

unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[AuditEvent]) error {
	fmt.Printf("%s from %s\n", event.Value.Action, event.Headers["source"])
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(
	context.Background(),
	b,
	AuditEvent{Action: "saved"},
	busen.WithHeaders(map[string]string{"request-id": "req-1"}),
); err != nil {
	log.Fatal(err)
}

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
saved from middleware

func (*Bus) UseObserver added in v0.3.0

func (b *Bus) UseObserver(observer Observer, opts ...ObserverOption) error

UseObserver registers an optional bridge/audit observer.

Example
type OrderCreated struct {
	ID string
}

b := busen.New()
if err := b.UseObserver(
	func(_ context.Context, obs busen.Observation) {
		fmt.Printf("observe %s %v\n", obs.Topic, obs.EventType)
	},
	busen.ObserveTopic("orders.>"),
); err != nil {
	log.Fatal(err)
}

unsubscribe, err := busen.SubscribeTopic(b, "orders.>", func(_ context.Context, event busen.Event[OrderCreated]) error {
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(context.Background(), b, OrderCreated{ID: "o-1"}, busen.WithTopic("orders.created")); err != nil {
	log.Fatal(err)
}

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
observe orders.created busen_test.OrderCreated

type Dispatch

type Dispatch struct {
	// EventType is the exact Go type being dispatched.
	EventType reflect.Type
	// Topic is the publish topic after publish options have been applied.
	Topic string
	// Key is the publish ordering key after publish options have been applied.
	Key string
	// Headers is a mutable copy of the publish headers for this handler call.
	Headers map[string]string
	// Meta is mutable structured metadata for this handler call.
	Meta map[string]string
	// Value is the event payload that will be passed to the typed handler.
	Value any
	// Async reports whether the target subscription is asynchronous.
	Async bool
}

Dispatch carries untyped event metadata through middleware.

Middleware is intentionally thin and local to in-process dispatch. It may inspect or transform the event metadata before the typed handler runs.

Dispatch mutation rules are intentionally narrow:

  • changes are visible to later middleware and the final handler
  • changes do not rewrite hook payloads
  • changes do not affect subscriber matching, publish-level hooks, or async queue selection, all of which happen before middleware runs

type DroppedEvent

type DroppedEvent struct {
	// EventType is the exact Go type that could not be queued.
	EventType reflect.Type
	// Topic is the event topic that was being delivered.
	Topic string
	// Key is the event ordering key that was being delivered.
	Key string
	// Meta is structured envelope metadata for the dropped event.
	Meta map[string]string
	// Async is always true for dropped events.
	Async bool
	// Policy is the overflow policy that decided the drop behavior.
	Policy OverflowPolicy
	// SubscriberID is the internal subscription identifier.
	SubscriberID uint64
	// QueueLen is the queue length at observation time.
	QueueLen int
	// QueueCap is the queue capacity.
	QueueCap int
	// MailboxIndex is the selected worker mailbox index.
	MailboxIndex int
	// Reason reports why the event was dropped.
	Reason error
}

DroppedEvent describes a dropped event caused by backpressure.

type Event

type Event[T any] struct {
	// Topic carries optional routing metadata supplied at publish time.
	Topic string
	// Key carries the optional ordering key supplied at publish time.
	Key string
	// Value is the typed event payload.
	Value T
	// Headers contains a shallow copy of publish headers visible to handlers.
	Headers map[string]string
	// Meta contains structured envelope metadata visible to handlers.
	Meta map[string]string
}

Event is the typed value delivered to handlers.

type Handler

type Handler[T any] func(ctx context.Context, event Event[T]) error

Handler handles a typed event.

type HandlerError

type HandlerError struct {
	// EventType is the exact Go type handled by the subscriber.
	EventType reflect.Type
	// Topic is the event topic seen by the handler.
	Topic string
	// Key is the event ordering key seen by the handler.
	Key string
	// Meta is structured envelope metadata seen by the handler.
	Meta map[string]string
	// Async reports whether the handler ran in async mode.
	Async bool
	// Err is the error returned by the handler.
	Err error
}

HandlerError describes a handler error.

type HandlerPanic

type HandlerPanic struct {
	// EventType is the exact Go type handled by the subscriber.
	EventType reflect.Type
	// Topic is the event topic seen by the handler.
	Topic string
	// Key is the event ordering key seen by the handler.
	Key string
	// Meta is structured envelope metadata seen by the handler.
	Meta map[string]string
	// Async reports whether the handler ran in async mode.
	Async bool
	// Value is the recovered panic value.
	Value any
}

HandlerPanic describes a recovered handler panic.

type HandlerPanicError

type HandlerPanicError struct {
	Value any
}

HandlerPanicError wraps a recovered handler panic as an error value.

func (*HandlerPanicError) Error

func (e *HandlerPanicError) Error() string

func (*HandlerPanicError) Unwrap

func (e *HandlerPanicError) Unwrap() error

type HookPanic

type HookPanic struct {
	// Hook is the callback name that panicked, such as "OnPublishDone".
	Hook string
	// Value is the recovered panic value.
	Value any
}

HookPanic describes a recovered panic raised by another hook callback.

type Hooks

type Hooks struct {
	// OnPublishStart runs before matching subscribers are evaluated.
	OnPublishStart func(PublishStart)
	// OnPublishDone runs after all matching deliveries have been attempted.
	OnPublishDone func(PublishDone)
	// OnHandlerError runs when a handler returns a non-nil error.
	OnHandlerError func(HandlerError)
	// OnHandlerPanic runs when a handler panic is recovered.
	OnHandlerPanic func(HandlerPanic)
	// OnEventDropped runs when async backpressure drops an event.
	OnEventDropped func(DroppedEvent)
	// OnEventRejected runs when async backpressure rejects an event.
	OnEventRejected func(RejectedEvent)
	// OnHookPanic runs when another hook panics and the panic is recovered.
	OnHookPanic func(HookPanic)
}

Hooks observes publish and handler lifecycle events.

Hooks are intentionally thin. They are not a full middleware pipeline and do not change delivery semantics. They exist to observe important runtime events such as async failures, panics, and dropped events.

type MetadataBuilder added in v0.3.0

type MetadataBuilder func(PublishMetadataInput) map[string]string

MetadataBuilder builds optional structured metadata for publish envelopes.

type Middleware

type Middleware func(Next) Next

Middleware wraps local handler dispatch in the same spirit as HTTP middleware.

type Next

type Next func(context.Context, Dispatch) error

Next is the continuation function used by Middleware.

type Observation added in v0.3.0

type Observation struct {
	EventType reflect.Type
	Topic     string
	Key       string
	Headers   map[string]string
	Meta      map[string]string
	Value     any
	Async     bool

	SubscriberID uint64
}

Observation represents an accepted delivery for bridge/audit observers.

type Observer added in v0.3.0

type Observer func(context.Context, Observation)

Observer receives accepted observations.

type ObserverOption added in v0.3.0

type ObserverOption interface {
	// contains filtered or unexported methods
}

ObserverOption configures an observer filter.

func ObserveMatch added in v0.3.0

func ObserveMatch(fn func(Observation) bool) ObserverOption

ObserveMatch applies a custom observation predicate.

func ObserveMetadata added in v0.3.0

func ObserveMetadata(meta map[string]string) ObserverOption

ObserveMetadata filters observations by metadata subset.

func ObserveTopic added in v0.3.0

func ObserveTopic(pattern string) ObserverOption

ObserveTopic filters observations by topic pattern.

func ObserveType added in v0.3.0

func ObserveType[T any]() ObserverOption

ObserveType filters observations by exact event type.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option configures a Bus.

Callers typically obtain Option values from helpers such as WithDefaultBuffer, WithDefaultOverflow, WithHooks, and WithMiddleware rather than implementing this interface directly.

func WithDefaultBuffer

func WithDefaultBuffer(size int) Option

WithDefaultBuffer sets the default queue size for async subscribers.

func WithDefaultOverflow

func WithDefaultOverflow(policy OverflowPolicy) Option

WithDefaultOverflow sets the default overflow policy for async subscribers.

func WithHooks

func WithHooks(hooks Hooks) Option

WithHooks registers runtime hooks for publish and handler lifecycle events.

Example
type UserCreated struct {
	ID string
}

b := busen.New(busen.WithHooks(busen.Hooks{
	OnPublishDone: func(info busen.PublishDone) {
		fmt.Printf("matched=%d delivered=%d\n", info.MatchedSubscribers, info.DeliveredSubscribers)
	},
}))

unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[UserCreated]) error {
	fmt.Println("handled", event.Value.ID)
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(context.Background(), b, UserCreated{ID: "u-1"}); err != nil {
	log.Fatal(err)
}

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
handled u-1
matched=1 delivered=1

func WithMetadataBuilder added in v0.3.0

func WithMetadataBuilder(builder MetadataBuilder) Option

WithMetadataBuilder registers a global metadata builder for publish envelopes.

Example
type OrderCreated struct {
	ID string
}

b := busen.New(
	busen.WithMetadataBuilder(func(input busen.PublishMetadataInput) map[string]string {
		return map[string]string{
			"source": "billing",
		}
	}),
)

unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[OrderCreated]) error {
	fmt.Printf("%s from %s\n", event.Value.ID, event.Meta["source"])
	return nil
})
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(
	context.Background(),
	b,
	OrderCreated{ID: "o-1"},
	busen.WithMetadata(map[string]string{"trace_id": "tr-1"}),
); err != nil {
	log.Fatal(err)
}

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
o-1 from billing

func WithMiddleware

func WithMiddleware(middlewares ...Middleware) Option

WithMiddleware registers global dispatch middleware at bus construction time.

type OverflowPolicy

type OverflowPolicy int

OverflowPolicy controls what happens when an async subscriber queue is full.

const (
	// OverflowBlock blocks the publisher until queue space is available.
	OverflowBlock OverflowPolicy = iota
	// OverflowFailFast returns an error instead of waiting for queue space.
	OverflowFailFast
	// OverflowDropNewest drops the incoming event when the queue is full.
	OverflowDropNewest
	// OverflowDropOldest evicts the oldest queued event to admit the new event.
	OverflowDropOldest
)

type PublishDone

type PublishDone struct {
	// EventType is the exact Go type that was published.
	EventType reflect.Type
	// Topic is the publish topic after options have been applied.
	Topic string
	// Key is the publish ordering key after options have been applied.
	Key string
	// Headers is a copy of the publish headers.
	Headers map[string]string
	// Meta is structured envelope metadata.
	Meta map[string]string
	// MatchedSubscribers is the number of subscriptions whose routing constraints
	// matched the published event.
	MatchedSubscribers int
	// DeliveredSubscribers is the number of subscriptions that accepted the event
	// for handler execution or async enqueue after lifecycle checks.
	DeliveredSubscribers int
	// Err joins delivery errors returned during publish, if any.
	Err error
}

PublishDone describes the end of a publish operation.

type PublishMetadataInput added in v0.3.0

type PublishMetadataInput struct {
	Context   context.Context
	EventType reflect.Type
	Topic     string
	Key       string
	Headers   map[string]string
	Value     any
}

PublishMetadataInput is passed to MetadataBuilder.

type PublishOption

type PublishOption interface {
	// contains filtered or unexported methods
}

PublishOption configures a publish call.

Callers typically obtain PublishOption values from helpers such as WithTopic, WithKey, and WithHeaders rather than implementing this interface directly.

func WithHeaders

func WithHeaders(headers map[string]string) PublishOption

WithHeaders sets headers for a published event.

func WithKey

func WithKey(key string) PublishOption

WithKey sets an optional ordering key for a published event.

In async mode, subscribers with multiple workers preserve ordering for events that share the same non-empty key within that subscriber. Empty keys fall back to the regular non-keyed path.

Example
type UserCreated struct {
	ID string
}

b := busen.New()
done := make(chan struct{}, 2)

unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[UserCreated]) error {
	fmt.Printf("%s:%s\n", event.Key, event.Value.ID)
	done <- struct{}{}
	return nil
}, busen.Async(), busen.WithParallelism(2), busen.WithBuffer(4))
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(context.Background(), b, UserCreated{ID: "1"}, busen.WithKey("tenant-a")); err != nil {
	log.Fatal(err)
}
if err := busen.Publish(context.Background(), b, UserCreated{ID: "2"}, busen.WithKey("tenant-a")); err != nil {
	log.Fatal(err)
}

<-done
<-done

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
tenant-a:1
tenant-a:2

func WithMetadata added in v0.3.0

func WithMetadata(meta map[string]string) PublishOption

WithMetadata sets structured envelope metadata for a published event.

func WithTopic

func WithTopic(topic string) PublishOption

WithTopic sets the routing topic for a published event.

type PublishStart

type PublishStart struct {
	// EventType is the exact Go type being published.
	EventType reflect.Type
	// Topic is the publish topic after options have been applied.
	Topic string
	// Key is the publish ordering key after options have been applied.
	Key string
	// Headers is a copy of the publish headers.
	Headers map[string]string
	// Meta is structured envelope metadata.
	Meta map[string]string
}

PublishStart describes the beginning of a publish operation.

type RejectedEvent added in v0.3.0

type RejectedEvent struct {
	// EventType is the exact Go type that could not be queued.
	EventType reflect.Type
	// Topic is the event topic that was being delivered.
	Topic string
	// Key is the event ordering key that was being delivered.
	Key string
	// Meta is structured envelope metadata for the rejected event.
	Meta map[string]string
	// Async is always true for rejected events.
	Async bool
	// Policy is the overflow policy that rejected the event.
	Policy OverflowPolicy
	// SubscriberID is the internal subscription identifier.
	SubscriberID uint64
	// QueueLen is the queue length at observation time.
	QueueLen int
	// QueueCap is the queue capacity.
	QueueCap int
	// MailboxIndex is the selected worker mailbox index.
	MailboxIndex int
	// Reason reports why the event was rejected.
	Reason error
}

RejectedEvent describes an event rejected by backpressure policy.

type ShutdownMode added in v0.3.0

type ShutdownMode int

ShutdownMode controls how bus shutdown handles queued async events.

const (
	// ShutdownDrain waits for async queues to drain.
	ShutdownDrain ShutdownMode = iota
	// ShutdownBestEffort stops accepting work and waits until context ends.
	ShutdownBestEffort
	// ShutdownAbort stops accepting work and drops queued async events.
	ShutdownAbort
)

type ShutdownResult added in v0.3.0

type ShutdownResult struct {
	Mode ShutdownMode
	// Processed is the number of handler executions observed during shutdown.
	Processed int64
	// Dropped is the number of dropped events observed during shutdown.
	// It includes backpressure drops and abort-mode queue drops.
	Dropped int64
	// Rejected is the number of rejected events observed during shutdown.
	Rejected int64
	// TimedOutSubscribers contains subscriber IDs that did not stop before ctx ended.
	TimedOutSubscribers []uint64
	// Completed reports whether shutdown fully completed before context cancellation.
	Completed bool
}

ShutdownResult reports structured shutdown outcomes.

type SubscribeOption

type SubscribeOption interface {
	// contains filtered or unexported methods
}

SubscribeOption configures a subscription.

Callers typically obtain SubscribeOption values from helpers such as Async, Sequential, WithParallelism, WithBuffer, WithOverflow, and WithFilter rather than implementing this interface directly.

func Async

func Async() SubscribeOption

Async delivers events through a bounded queue and worker goroutines.

Example
type JobQueued struct {
	ID string
}

b := busen.New()
done := make(chan struct{})

unsubscribe, err := busen.Subscribe(b, func(_ context.Context, event busen.Event[JobQueued]) error {
	fmt.Println("processed", event.Value.ID)
	close(done)
	return nil
}, busen.Async(), busen.WithBuffer(1))
if err != nil {
	log.Fatal(err)
}
defer unsubscribe()

if err := busen.Publish(context.Background(), b, JobQueued{ID: "job-42"}); err != nil {
	log.Fatal(err)
}

<-done

if err := b.Close(context.Background()); err != nil {
	log.Fatal(err)
}
Output:
processed job-42

func Sequential

func Sequential() SubscribeOption

Sequential is shorthand for single-worker async FIFO delivery.

It enables async delivery and forces the subscriber to run with one worker.

func WithBuffer

func WithBuffer(size int) SubscribeOption

WithBuffer sets the queue size for an async subscriber.

func WithFilter

func WithFilter[T any](fn func(Event[T]) bool) SubscribeOption

WithFilter applies a predicate filter before the handler runs.

func WithOverflow

func WithOverflow(policy OverflowPolicy) SubscribeOption

WithOverflow sets the queue overflow policy for an async subscriber.

func WithParallelism

func WithParallelism(n int) SubscribeOption

WithParallelism sets the number of workers for an async subscriber.

Directories

Path Synopsis
internal
dispatch
Package dispatch provides small primitives for coordinating in-process delivery.
Package dispatch provides small primitives for coordinating in-process delivery.
router
Package router compiles and evaluates topic matchers for Busen routing.
Package router compiles and evaluates topic matchers for Busen routing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL