broker

package module
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2026 License: MIT Imports: 10 Imported by: 0

README

Unified MQ Broker for Go

Unified MQ Broker for Go 是一个通用的消息中间件适配包,旨在提供统一、简洁的 API 接口,消除底层 MQ 服务实现(如 RocketMQ, Kafka, RabbitMQ 等)与业务逻辑之间的耦合。

核心特性

  • 接口驱动: 统一的 Broker, Publisher, Subscriber 接口。
  • 多驱动支持:
    驱动 (Driver) 状态 (Status) 测试覆盖率 (Coverage) 说明 (Description)
    Core Framework ✅ 生产就绪 89.4% 库核心逻辑与通用 Options
    AWS SQS ✅ 生产就绪 94.6% 亚马逊云队列服务
    NATS ✅ 生产就绪 91.7% 高性能消息系统
    Redis ✅ 生产就绪 91.6% 基于 Streams (Consumer Group)
    RocketMQ ✅ 生产就绪 83.6% 阿里云/原生 RocketMQ
    RabbitMQ ✅ 生产就绪 82.7% 标准 AMQP 协议
    Kafka ✅ 生产就绪 82.4% 基于 sarama 的高并发实现
    GCP Pub/Sub ✅ 已支持 22.3% 谷歌云发布订阅
  • 可扩展性: 插件化架构,轻松接入新的 MQ 实现。
  • 统一模型: 厂商无关的消息模型。

项目结构

.
├── broker.go          // 核心接口定义
├── message.go         // 统一消息结构
├── options.go         // 统一配置项
├── json.go            // 默认 JSON 编解码器
├── noop_broker.go     // 空实现(用于测试)
├── middleware/        // 中间件(如 OTEL)
├── brokers/           // 各 MQ 适配器实现
│   ├── rocketmq/      // RocketMQ
│   ├── kafka/         // Kafka
│   ├── rabbitmq/      // RabbitMQ
│   ├── nats/          // NATS
│   ├── redis/         // Redis Streams
│   ├── sqs/           // AWS SQS
│   └── pubsub/        // GCP Pub/Sub
└── examples/          // 使用示例

快速开始

1. 使用 No-op Broker (用于本地开发/测试)
import "github.com/qvcloud/broker"

// 初始化
b := broker.NewNoopBroker()
b.Connect()

// 订阅
b.Subscribe("topic", func(ctx context.Context, event broker.Event) error {
    fmt.Println("Received:", string(event.Message().Body))
    return nil
})

// 发布
b.Publish(context.Background(), "topic", &broker.Message{Body: []byte("hello")})
2. 使用 RocketMQ
import (
    "github.com/qvcloud/broker"
    "github.com/qvcloud/broker/brokers/rocketmq"
)

b := rocketmq.NewBroker(
    broker.Addrs("127.0.0.1:9876"),
)
b.Connect()
3. 使用 Kafka
import (
    "github.com/qvcloud/broker"
    "github.com/qvcloud/broker/brokers/kafka"
)

b := kafka.NewBroker(
    broker.Addrs("127.0.0.1:9092"),
)
b.Connect()
4. 使用 RabbitMQ
import (
    "github.com/qvcloud/broker"
    "github.com/qvcloud/broker/brokers/rabbitmq"
)

b := rabbitmq.NewBroker(
    broker.Addrs("amqp://guest:guest@localhost:5672/"),
)
b.Connect()
5. 使用 NATS
import (
    "github.com/qvcloud/broker"
    "github.com/qvcloud/broker/brokers/nats"
)

b := nats.NewBroker(
    broker.Addrs("nats://localhost:4222"),
)
b.Connect()
6. 使用 AWS SQS
import (
    "github.com/qvcloud/broker"
    "github.com/qvcloud/broker/brokers/sqs"
)

// SQS 使用 AWS 默认配置加载凭证和区域
b := sqs.NewBroker()
b.Connect()

// 发布到指定 Queue URL
b.Publish(ctx, "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue", msg)
7. 使用 GCP Pub/Sub
import (
    "github.com/qvcloud/broker"
    "github.com/qvcloud/broker/brokers/pubsub"
)

// Addrs 传入 GCP Project ID
b := pubsub.NewBroker(
    broker.Addrs("my-gcp-project-id"),
)
b.Connect()

// 订阅时需通过 WithQueue 指定 Subscription ID
b.Subscribe("my-topic", handler, broker.WithQueue("my-subscription"))
8. 使用 Redis Streams
import (
    "github.com/qvcloud/broker"
    "github.com/qvcloud/broker/brokers/redis"
)

b := redis.NewBroker(
    broker.Addrs("127.0.0.1:6379"),
    redis.WithDB(0),
    redis.WithPassword("your-password"),
)
b.Connect()

// 订阅 (使用 Consumer Group)
b.Subscribe("topic", handler, broker.Queue("my-group"))
9. 集成 OpenTelemetry
import (
    "github.com/qvcloud/broker/middleware"
)

b.Subscribe("topic", middleware.OtelHandler(func(ctx context.Context, event broker.Event) error {
    // 处理逻辑...
    return nil
}))
10. 安全连接 (TLS/SSL)

本库在 Kafka, RabbitMQ 和 NATS 适配器中支持标准 TLS 配置。

import (
    "crypto/tls"
    "github.com/qvcloud/broker"
)

// 加载双向 TLS 配置 (可选)
tlsConfig := &tls.Config{
    InsecureSkipVerify: false,
    // 其他字段...
}

b := kafka.NewBroker(
    broker.Addrs("kafka:9093"),
    broker.Secure(true),
    broker.TLSConfig(tlsConfig),
)
b.Connect()
💡 消息处理回调 (Handler) 的返回值说明

b.Subscribe 中定义的 Handler 函数返回的 error 直接影响消息的确认机制:

  • 返回 nil:表示消息处理成功。Broker 适配器会自动确认消息(Ack),消息将不会再次派发。
  • 返回 error:表示处理失败。消息将不会被确认。根据底层 MQ 的实现,该消息通常会:
    • 重新入队 (Requeue):如 RabbitMQ,消息会回到队列等待下次消费。
    • 等待超时重发:如 SQS 或 GCP Pub/Sub,消息在可见性超时后会重新派发。
    • 暂停提交位点:如 Kafka,可能会导致该分区消息堆积。

新手建议:对于程序逻辑错误或无法通过重试解决的错误,建议捕获异常、记录日志并返回 nil,或者手动将其投递至死信队列(DLQ),以避免队列因无限重试而阻塞。

核心设计原则

  1. 接口驱动: 保证业务逻辑与具体的 MQ 实现解耦。
  2. 高性能: 适配层保持极简,最小化性能开销。
  3. 可观测性: 原生支持 OpenTelemetry。

性能表现

我们对 broker 框架的基础损耗进行了评估,测试环境为 Apple M2 Pro (Go 1.21)。

1. 序列化优化 (Smart Serialization)

通过对原始数据(Bytes/String)的智能路径优化,序列化性能提升了约 5 倍

测试场景 耗时 (ns/op) 内存分配 (B/op) 分配次数 (allocs/op) 结论
标准 json.Marshal (Bytes) 83.52 88 2 基准
智能序列化 (Bytes) 15.91 24 1 提速 ~5.2x
标准 json.Marshal (String) 86.18 64 2 基准
智能序列化 (String) 16.20 24 1 提速 ~5.3x
2. 框架基础开销
测试项目 耗时 (ns/op) 内存分配 (B/op) 分配次数 (allocs/op)
NoopBroker 发布 37.94 80 2
WithTrackedValue (首次) 154.9 504 6
GetTrackedValue (读取) 29.17 0 0

: 选项追踪 (Option Tracking) 引入的额外开销在网络 IO 面前(秒级/毫秒级)几乎可以忽略不计,但它能显著提升开发效率,防止因配置拼写错误或跨平台参数误用导致的“静默失败”。

开源协议

本项目采用 MIT License 协议。你可以自由地使用、修改和分发本项目,只需保留原始版权声明。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetTrackedValue

func GetTrackedValue(ctx context.Context, key any) any

GetTrackedValue retrieves a value from the context and marks it as consumed.

func TrackOptions added in v0.3.4

func TrackOptions(ctx context.Context) context.Context

TrackOptions initializes an OptionTracker in the context.

func WarnUnconsumed

func WarnUnconsumed(ctx context.Context, logger Logger)

WarnUnconsumed logs a warning if any registered options were not consumed.

func WithTrackedValue

func WithTrackedValue(ctx context.Context, key, val any, name string) context.Context

WithTrackedValue adds a value to the context and registers it for tracking.

Types

type Broker

type Broker interface {
	// Init initializes the broker with options.
	// It should only validate configuration and not establish network connections.
	Init(...Option) error
	// Options returns the broker options.
	Options() Options
	// Address returns the broker address.
	Address() string
	// Connect connects the broker to the message service.
	// All network initialization and client creation should happen here.
	Connect() error
	// Disconnect disconnects the broker from the message service.
	Disconnect() error
	// Publish publishes a message to a topic.
	Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
	// Subscribe subscribes to a topic with a handler.
	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	// String returns the broker implementation name.
	String() string
}

Broker is an interface used for asynchronous messaging. It provides a unified API to interact with different message brokers.

func NewNoopBroker

func NewNoopBroker(opts ...Option) Broker

type Event

type Event interface {
	// Topic returns the topic name.
	Topic() string
	// Message returns the received message.
	Message() *Message
	// Ack acknowledges the message.
	Ack() error
	// Nack negatively acknowledges the message.
	// If requeue is true, the message will be returned to the queue if supported.
	Nack(requeue bool) error
	// Error returns any error occurred during processing.
	Error() error
}

Event is given to a subscription handler for processing. It contains the message and metadata about the topic.

type Handler

type Handler func(context.Context, Event) error

Handler is used to process messages via a subscription of a topic.

type JsonMarshaler

type JsonMarshaler struct{}

func (JsonMarshaler) Marshal

func (j JsonMarshaler) Marshal(v any) ([]byte, error)

func (JsonMarshaler) String

func (j JsonMarshaler) String() string

func (JsonMarshaler) Unmarshal

func (j JsonMarshaler) Unmarshal(d []byte, v any) error

type Logger

type Logger interface {
	Log(v ...any)
	Logf(format string, v ...any)
}

Logger is a simple logging interface.

type Marshaler

type Marshaler interface {
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
	String() string
}

Marshaler is a simple encoding interface.

type Message

type Message struct {
	// Header contains message metadata.
	Header map[string]string
	// Body contains the message payload.
	Body []byte
	// Partition is the partition ID for brokers that support it (like Kafka).
	Partition int32
}

Message is a message send/received from the broker.

type Option

type Option func(*Options)

func Addrs

func Addrs(addrs ...string) Option

Addrs sets the host addresses to be used by the broker.

func ClientID

func ClientID(id string) Option

ClientID sets the client identifier.

func Codec

func Codec(c Marshaler) Option

Codec sets the codec used for encoding/decoding used where a broker does not support headers.

func ErrorHandler

func ErrorHandler(h Handler) Option

ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors.

func Meter

func Meter(m metric.Meter) Option

Meter sets the meter used for observability.

func Secure

func Secure(b bool) Option

Secure communication with the broker.

func TLSConfig

func TLSConfig(t *tls.Config) Option

Specify TLS Config.

func Tracer

func Tracer(t trace.Tracer) Option

Tracer sets the tracer used for observability.

func WithClientID added in v0.3.4

func WithClientID(id string) Option

WithClientID sets the client identifier.

func WithContext added in v0.3.4

func WithContext(ctx context.Context) Option

WithContext sets the context for the broker.

func WithLogger

func WithLogger(l Logger) Option

WithLogger sets the logger for the broker.

type OptionTracker

type OptionTracker struct {
	// contains filtered or unexported fields
}

OptionTracker tracks which options have been registered and consumed.

type Options

type Options struct {
	// Addrs is a list of broker addresses.
	Addrs []string
	// Secure specifies whether to use a secure connection.
	Secure bool
	// Codec is the marshaler used for encoding/decoding messages.
	Codec Marshaler

	// ErrorHandler is called when an error occurs during message handling.
	ErrorHandler Handler

	// TLSConfig is the TLS configuration for secure connections.
	TLSConfig *tls.Config

	// Tracer is the OpenTelemetry tracer for observability.
	Tracer trace.Tracer
	// Meter is the OpenTelemetry meter for observability.
	Meter metric.Meter

	// Context is the underlying context for custom options.
	Context context.Context

	// Logger for debug/info logging.
	Logger Logger

	// ClientID is a unique identifier for the client.
	ClientID string
}

Options contains the broker configuration.

func NewOptions

func NewOptions(opts ...Option) *Options

type PublishOption

type PublishOption func(*PublishOptions)

func PublishContext

func PublishContext(ctx context.Context) PublishOption

PublishContext set context.

func WithDelay

func WithDelay(d time.Duration) PublishOption

WithDelay sets the delay duration for a publish operation.

func WithShardingKey

func WithShardingKey(v string) PublishOption

func WithTags

func WithTags(tags ...string) PublishOption

type PublishOptions

type PublishOptions struct {
	// Context is the context for the publish operation.
	Context context.Context
	// ShardingKey is the key used for sharding/partitioning.
	ShardingKey string
	// Delay is the delay duration for the message.
	Delay time.Duration
	// Tags are labels for the message (e.g. for filtering).
	Tags []string
}

PublishOptions contains options for publishing a message.

func NewPublishOptions added in v0.3.4

func NewPublishOptions(opts ...PublishOption) PublishOptions

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck will disable auto acking of messages after they have been handled.

func Queue

func Queue(name string) SubscribeOption

Queue sets the name of the queue to share messages on.

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context.

func WithAutoAck added in v0.3.4

func WithAutoAck(ack bool) SubscribeOption

WithAutoAck sets the auto acknowledgement for the subscription.

func WithDeadLetterQueue

func WithDeadLetterQueue(v string) SubscribeOption

func WithQueue

func WithQueue(name string) SubscribeOption

WithQueue sets the name of the queue or consumer group.

type SubscribeOptions

type SubscribeOptions struct {
	// AutoAck specifies whether to automatically acknowledge messages.
	AutoAck bool
	// Queue is the consumer group name or queue name.
	Queue string
	// DeadLetterQueue is the name of the dead letter queue.
	DeadLetterQueue string

	// Context is the context for the subscribe operation.
	Context context.Context
}

SubscribeOptions contains options for subscribing to a topic.

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

type Subscriber

type Subscriber interface {
	// Options returns the subscription options.
	Options() SubscribeOptions
	// Topic returns the subscribed topic.
	Topic() string
	// Unsubscribe stops the subscription.
	Unsubscribe() error
}

Subscriber is a convenience return type for the Subscribe method. It allows managing the subscription lifecycle.

Directories

Path Synopsis
brokers
sqs
examples
basic command
nats command
pubsub command
rabbitmq command
redis command
rocketmq command
sqs command
specs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL