Documentation
¶
Overview ¶
Package queue 提供统一的消息队列抽象层。
queue 包定义了消息队列的标准接口,支持内存队列和分布式队列(RocketMQ、Redis、RabbitMQ、Kafka 等)。 通过统一的 API,你可以轻松地在不同的消息队列实现之间切换,而无需修改业务代码。
功能特性 ¶
- 统一的消息队列抽象接口
- 支持多种消息队列实现(内存、RocketMQ、Redis、RabbitMQ)
- 类型安全的消息处理
- 支持发布订阅模式
- 支持消息标签过滤(RocketMQ)
- 支持延迟消息(RocketMQ)
- 支持消息属性和元数据
- 线程安全
快速开始 ¶
基本使用:
package main
import (
"context"
"encoding/json"
"log"
"github.com/f2xme/gox/queue"
"github.com/f2xme/gox/queue/adapter/mem"
)
func main() {
ctx := context.Background()
// 创建内存队列
q := mem.New()
defer q.(queue.Closer).Close()
// 订阅主题
sub, err := q.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
var order Order
json.Unmarshal(msg.Body, &order)
// 处理订单
if err := processOrder(order); err != nil {
return err // 返回错误,消息会重新入队
}
return nil // 返回 nil,消息被确认
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
// 发布消息
data, _ := json.Marshal(Order{ID: "123", Amount: 9900})
err = q.Publish(ctx, "orders", data)
if err != nil {
log.Fatal(err)
}
}
核心接口 ¶
Queue 接口结合了发布和订阅功能:
type Queue interface {
Publisher
Subscriber
}
Publisher 接口定义发布操作:
type Publisher interface {
Publish(ctx context.Context, topic string, body []byte) error
PublishWithOptions(ctx context.Context, topic string, body []byte, opts PublishOptions) error
}
Subscriber 接口定义订阅操作:
type Subscriber interface {
Subscribe(ctx context.Context, topic string, handler Handler) (Subscription, error)
SubscribeWithOptions(ctx context.Context, topic string, handler Handler, opts SubscribeOptions) (Subscription, error)
}
Handler 是消息处理函数:
type Handler func(ctx context.Context, msg *Message) error
返回 nil 表示消息处理成功(ACK),返回 error 表示处理失败(NACK)。
可用适配器 ¶
内存队列:
import "github.com/f2xme/gox/queue/adapter/mem" queue := mem.New()
特性:进程内通信、零外部依赖、高性能、适合单机应用
RocketMQ 队列:
import "github.com/f2xme/gox/queue/adapter/rocketmq"
queue, err := rocketmq.New(
rocketmq.WithNameServers([]string{"localhost:9876"}),
rocketmq.WithGroupName("my-producer-group"),
)
特性:分布式消息队列、支持消息标签过滤、支持延迟消息、支持集群和广播消费、高可用和高吞吐量
使用示例 ¶
发布消息:
// 发布简单消息
err := queue.Publish(ctx, "orders", []byte(`{"order_id":"123"}`))
// 发布结构化消息
data, _ := json.Marshal(order)
err := queue.Publish(ctx, "orders", data)
// 发布带选项的消息(RocketMQ)
err := queue.PublishWithOptions(ctx, "orders", data, queue.PublishOptions{
Tags: "urgent",
Keys: []string{"order-123"},
Properties: map[string]string{
"source": "web",
},
DelayLevel: 3, // 延迟 10 秒
})
订阅消息:
// 订阅主题
sub, err := queue.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
var order Order
json.Unmarshal(msg.Body, &order)
// 处理订单
if err := processOrder(order); err != nil {
return err // 返回错误,消息会重新入队
}
return nil // 返回 nil,消息被确认
})
// 取消订阅
defer sub.Unsubscribe()
// 带选项订阅(RocketMQ)
sub, err := queue.SubscribeWithOptions(ctx, "orders", handler, queue.SubscribeOptions{
ConsumerGroup: "order-processor",
Tags: "urgent||normal", // 过滤标签
MaxConcurrency: 10, // 限制并发
})
多个订阅者:
// 订阅者 1:发送邮件
queue.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
return sendEmail(msg.Body)
})
// 订阅者 2:更新库存
queue.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
return updateInventory(msg.Body)
})
优雅关闭:
queue := adapter.New()
defer func() {
if closer, ok := queue.(queue.Closer); ok {
closer.Close()
}
}()
消息处理模式 ¶
点对点(Point-to-Point):
一个消息只被一个消费者处理:
// 生产者 queue.Publish(ctx, "tasks", taskData) // 消费者 1 queue.Subscribe(ctx, "tasks", handler1) // 消费者 2 queue.Subscribe(ctx, "tasks", handler2) // 每个消息只会被 handler1 或 handler2 处理一次
发布订阅(Pub/Sub):
一个消息被所有订阅者处理:
// 发布者 queue.Publish(ctx, "events", eventData) // 订阅者 1 queue.Subscribe(ctx, "events", handler1) // 订阅者 2 queue.Subscribe(ctx, "events", handler2) // 每个消息会被 handler1 和 handler2 都处理
最佳实践 ¶
使用结构化消息:
// 推荐:使用 JSON
type OrderMessage struct {
OrderID string `json:"order_id"`
Amount int64 `json:"amount"`
}
data, _ := json.Marshal(OrderMessage{OrderID: "123", Amount: 9900})
queue.Publish(ctx, "orders", data)
实现幂等性:
func handleOrder(ctx context.Context, msg *queue.Message) error {
var order OrderMessage
json.Unmarshal(msg.Body, &order)
// 检查是否已处理
if isProcessed(order.OrderID) {
return nil // 已处理,直接返回
}
// 处理订单
processOrder(order)
// 标记为已处理
markAsProcessed(order.OrderID)
return nil
}
错误处理和重试:
func handleMessage(ctx context.Context, msg *queue.Message) error {
// 可重试的错误:返回 error
if err := processMessage(msg); err != nil {
if isRetryable(err) {
return err // 消息会重新入队
}
// 不可重试的错误:记录日志并返回 nil
log.Printf("permanent error: %v", err)
return nil // 消息被确认,不再重试
}
return nil
}
使用 context 控制超时:
func handleMessage(ctx context.Context, msg *queue.Message) error {
// 设置处理超时
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
return processWithContext(ctx, msg)
}
线程安全 ¶
所有队列实现都是线程安全的,可以在多个 goroutine 中并发使用。
Package queue 提供统一的消息队列抽象层。 支持内存队列和分布式队列的类型安全操作。
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("queue: closed")
ErrClosed is returned when operating on a closed queue.
var ErrFull = errors.New("queue: full")
ErrFull is returned when the queue is full and cannot accept more messages.
Functions ¶
This section is empty.
Types ¶
type AdvancedPublisher ¶ added in v0.1.2
type AdvancedPublisher interface {
Publisher
// PublishAndGetResult 发送消息并返回包含消息 ID 的结果
PublishAndGetResult(ctx context.Context, topic string, body []byte, opts PublishOptions) (*SendResult, error)
}
AdvancedPublisher 扩展发布接口,支持获取发送结果(如消息 ID) 并非所有实现都支持,可通过类型断言按需使用:
if ap, ok := q.(queue.AdvancedPublisher); ok {
result, err := ap.PublishAndGetResult(ctx, topic, body, opts)
}
type Message ¶
type Message struct {
// ID 消息的唯一标识符
ID string
// Topic 消息所属的主题/频道
Topic string
// Body 消息负载
Body []byte
// Tags 可选的消息标签,用于过滤(RocketMQ 等使用)
Tags string
// Keys 可选的消息键,用于索引和搜索
Keys []string
// Properties 可选的键值对,用于自定义元数据
Properties map[string]string
// DelayLevel 延迟消息的延迟级别(0 = 不延迟)
DelayLevel int
// BornTimestamp 消息创建时间
BornTimestamp time.Time
}
Message 表示队列中的一条消息
type PublishOptions ¶
type PublishOptions struct {
// Tags 消息标签,用于过滤(例如 "TagA||TagB")
Tags string
// Keys 消息键,用于索引
Keys []string
// Properties 自定义元数据
Properties map[string]string
// DelayLevel 延迟投递级别(0 = 立即投递)
DelayLevel int
}
PublishOptions 发布消息的可选参数
type Publisher ¶
type Publisher interface {
// Publish 发送消息到指定主题
Publish(ctx context.Context, topic string, body []byte) error
// PublishWithOptions 使用额外选项发送消息
PublishWithOptions(ctx context.Context, topic string, body []byte, opts PublishOptions) error
}
Publisher 定义基础发布操作接口
type SendResult ¶ added in v0.1.2
type SendResult struct {
// MessageID Broker 分配的消息唯一标识,可用于追踪和幂等去重
MessageID string
}
SendResult 发送消息的结果
type SubscribeOptions ¶
type SubscribeOptions struct {
// ConsumerGroup 消费者组名称(分布式队列必需)
ConsumerGroup string
// Tags 过滤消息的标签(例如 "TagA||TagB","*" 表示全部)
Tags string
// MaxConcurrency 限制并发消息处理数量(0 = 不限制)
MaxConcurrency int
// AutoCommit 启用自动消息确认(默认:true)
AutoCommit bool
}
SubscribeOptions 订阅主题的可选参数
type Subscriber ¶
type Subscriber interface {
// Subscribe 为指定主题注册处理函数
// 每当主题收到消息时,处理函数会被调用
// 返回一个 Subscription,可用于取消订阅
Subscribe(ctx context.Context, topic string, handler Handler) (Subscription, error)
// SubscribeWithOptions 使用额外选项注册处理函数
SubscribeWithOptions(ctx context.Context, topic string, handler Handler, opts SubscribeOptions) (Subscription, error)
}
Subscriber 定义订阅操作接口
type Subscription ¶
type Subscription interface {
// Unsubscribe 停止接收消息并释放资源
Unsubscribe() error
}
Subscription 表示一个可以关闭的活动订阅