Documentation
¶
Overview ¶
Package rocketmq 基于 Apache RocketMQ 5.x Go SDK 实现 `queue.Queue`, 提供面向 Proxy 的消息发布与订阅适配器。
功能特性 ¶
- 使用 `WithEndpoint` 指定 RocketMQ 5.x Proxy 地址
- 支持消息标签(Tags)过滤
- 支持消息键(Keys)和自定义属性
- 支持 RocketMQ 固定延迟等级消息(1-18 级)
- 支持通过 `queue.AdvancedPublisher` 获取发送后的消息 ID
- 支持 `NewWithConfig` / `MustNewWithConfig` 从配置中心创建实例
快速开始 ¶
import (
"context"
"github.com/f2xme/gox/queue"
"github.com/f2xme/gox/queue/adapter/rocketmq"
)
func example(ctx context.Context) error {
q, err := rocketmq.New(
rocketmq.WithEndpoint("localhost:8081"),
rocketmq.WithNamespace("dev"),
)
if err != nil {
return err
}
defer q.(queue.Closer).Close()
if err := q.Publish(ctx, "orders", []byte(`{"order_id":"123"}`)); err != nil {
return err
}
sub, err := q.SubscribeWithOptions(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
return nil
}, queue.SubscribeOptions{
ConsumerGroup: "order-consumer",
Tags: "*",
})
if err != nil {
return err
}
defer sub.Unsubscribe()
return nil
}
创建实例 ¶
使用 `New` 通过 Option 创建队列:
q, err := rocketmq.New(
rocketmq.WithEndpoint("localhost:8081"),
rocketmq.WithCredentials("accessKey", "secretKey"),
rocketmq.WithNamespace("production"),
rocketmq.WithSendTimeout(5*time.Second),
)
使用 `MustNew` 在初始化失败时直接终止程序:
q := rocketmq.MustNew(
rocketmq.WithEndpoint("localhost:8081"),
)
使用 `NewWithConfig` 从 `config.Config` 读取配置:
q, err := rocketmq.NewWithConfig(cfg)
支持的配置键:
- `queue.rocketmq.endpoint`
- `queue.rocketmq.accessKey`
- `queue.rocketmq.secretKey`
- `queue.rocketmq.namespace`
- `queue.rocketmq.retries`
- `queue.rocketmq.sendTimeout`
- `queue.rocketmq.consumerModel`
发布消息 ¶
简单发布:
err := q.Publish(ctx, "orders", []byte(`{"order_id":"123"}`))
带选项发布:
err := q.PublishWithOptions(ctx, "orders", []byte(`{"order_id":"123"}`), queue.PublishOptions{
Tags: "urgent",
Keys: []string{"order-123"},
Properties: map[string]string{
"source": "web",
},
DelayLevel: 3,
})
获取发送结果(消息 ID):
if ap, ok := q.(queue.AdvancedPublisher); ok {
result, err := ap.PublishAndGetResult(ctx, "orders", body, queue.PublishOptions{})
if err != nil {
return err
}
_ = result.MessageID
}
订阅消息 ¶
`Subscribe` 会使用默认消费者组 `DEFAULT_CONSUMER_GROUP`,更推荐在生产环境显式调用 `SubscribeWithOptions` 指定消费者组和过滤条件:
sub, err := q.SubscribeWithOptions(ctx, "orders", handler, queue.SubscribeOptions{
ConsumerGroup: "order-processor",
Tags: "urgent||normal",
})
标签规则:
- `"*"` 表示订阅全部消息
- `"tagA||tagB"` 表示订阅多个标签
- 空字符串会被自动视为 `"*"`
可用选项 ¶
## WithEndpoint
设置 RocketMQ 5.x Proxy 地址:
rocketmq.New(rocketmq.WithEndpoint("localhost:8081"))
## WithCredentials
设置访问凭证:
rocketmq.New(rocketmq.WithCredentials("accessKey", "secretKey"))
## WithNamespace
设置命名空间:
rocketmq.New(rocketmq.WithNamespace("dev"))
## WithRetries
设置发送失败重试次数。该字段会被记录到适配器配置中,但 RocketMQ 5.x SDK 当前由底层自行处理重试策略:
rocketmq.New(rocketmq.WithRetries(3))
## WithSendTimeout
设置发送超时:
rocketmq.New(rocketmq.WithSendTimeout(5 * time.Second))
## WithConsumerModel
设置消费模式标识:
rocketmq.New(rocketmq.WithConsumerModel(rocketmq.ConsumerModelClustering))
其中 `ConsumerModelBroadcasting` 目前仅作为兼容字段保留;RocketMQ 5.x Go SDK 及当前适配器实现并未真正切换到广播消费行为。
延迟消息 ¶
RocketMQ 支持固定延迟等级,适配器会将 `queue.PublishOptions.DelayLevel` 转换为绝对投递时间。支持的等级如下:
- 1: 1s
- 2: 5s
- 3: 10s
- 4: 30s
- 5: 1m
- 6: 2m
- 7: 3m
- 8: 4m
- 9: 5m
- 10: 6m
- 11: 7m
- 12: 8m
- 13: 9m
- 14: 10m
- 15: 20m
- 16: 30m
- 17: 1h
- 18: 2h
关闭资源 ¶
该适配器实现了 `queue.Closer`。调用 `Close` 会停止所有已注册订阅, 并优雅关闭底层 Producer:
if closer, ok := q.(queue.Closer); ok {
_ = closer.Close()
}
注意事项 ¶
- 当前接入点是 RocketMQ 5.x Proxy,不再使用旧版 NameServer 配置
- `SubscribeWithOptions` 必须提供 `ConsumerGroup`
- `Subscribe` 仅适合快速示例,生产环境应显式指定消费者组
- `MaxConcurrency` 和 `AutoCommit` 目前未映射到底层 SDK 行为
- 延迟消息仅支持固定的 1-18 级
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/f2xme/gox/queue/adapter/rocketmq"
)
func main() {
ctx := context.Background()
// 创建 RocketMQ 队列
q, err := rocketmq.New(
rocketmq.WithEndpoint("localhost:8081"),
)
if err != nil {
log.Fatalf("Failed to create queue: %v", err)
}
// 发布简单消息
err = q.Publish(ctx, "orders", []byte("hello"))
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
fmt.Println("Message published")
}
Output: Message published
Index ¶
- Constants
- func MustNew(opts ...Option) queue.Queue
- func MustNewWithConfig(cfg config.Config) queue.Queue
- func New(opts ...Option) (queue.Queue, error)
- func NewContext(ctx context.Context, opts ...Option) (queue.Queue, error)
- func NewWithConfig(cfg config.Config) (queue.Queue, error)
- type Option
- func WithConsumerModel(model string) Option
- func WithCredentials(accessKey, secretKey string) Option
- func WithEndpoint(endpoint string) Option
- func WithNamespace(namespace string) Option
- func WithRetries(retries int) Option
- func WithSendTimeout(timeout time.Duration) Option
- func WithTopics(topics ...string) Option
- type Options
Examples ¶
Constants ¶
const ( ConsumerModelClustering = "clustering" // 集群消费模式 ConsumerModelBroadcasting = "broadcasting" // 广播消费模式(RocketMQ 5.x 不支持,仅作保留) )
消费模式常量
Variables ¶
This section is empty.
Functions ¶
func MustNewWithConfig ¶ added in v0.2.0
MustNewWithConfig 使用配置创建一个新的 RocketMQ 队列,出错时终止程序。
func New ¶
New 使用给定选项创建新的 RocketMQ 队列
Example (WithOptions) ¶
package main
import (
"fmt"
"log"
"github.com/f2xme/gox/queue/adapter/rocketmq"
)
func main() {
// 创建带完整配置的 RocketMQ 队列
q, err := rocketmq.New(
rocketmq.WithEndpoint("localhost:8081"),
rocketmq.WithCredentials("access-key", "secret-key"),
rocketmq.WithNamespace("production"),
rocketmq.WithRetries(3),
)
if err != nil {
log.Fatalf("Failed to create queue: %v", err)
}
_ = q
fmt.Println("Queue created with options")
}
Output: Queue created with options
func NewContext ¶ added in v0.3.0
NewContext 使用给定选项和 context 创建新的 RocketMQ 队列,context 可用于控制启动超时
func NewWithConfig ¶ added in v0.2.0
NewWithConfig 使用 config.Config 中的配置创建一个新的 RocketMQ 队列。 配置键:
- queue.rocketmq.endpoint (string): Proxy 地址(默认:"127.0.0.1:8081")
- queue.rocketmq.accessKey (string): 认证访问密钥
- queue.rocketmq.secretKey (string): 认证密钥
- queue.rocketmq.namespace (string): 命名空间
- queue.rocketmq.retries (int): 发送失败重试次数(默认:2)
- queue.rocketmq.sendTimeout (duration): 发送消息超时时间(默认:3s)
- queue.rocketmq.consumerModel (string): 消费模式(默认:"clustering")
示例:
q, err := rocketmq.NewWithConfig(cfg)
Types ¶
type Option ¶
type Option func(*Options)
Option 配置选项函数
func WithCredentials ¶
WithCredentials 设置认证的访问密钥和密钥
示例:
rocketmq.New(rocketmq.WithCredentials("myAccessKey", "mySecretKey"))
func WithEndpoint ¶ added in v0.2.0
WithEndpoint 设置 RocketMQ 5.x Proxy 地址
示例:
rocketmq.New(rocketmq.WithEndpoint("localhost:8081"))
func WithSendTimeout ¶
WithSendTimeout 设置发送消息的超时时间
示例:
rocketmq.New(rocketmq.WithSendTimeout(5 * time.Second))
func WithTopics ¶ added in v0.3.1
WithTopics 设置启动时预热的 topic 列表,可加速首次发送
示例:
rocketmq.New(rocketmq.WithTopics("orders", "payments"))
type Options ¶
type Options struct {
// Endpoint RocketMQ 5.x Proxy 地址(格式:host:port)
Endpoint string
// AccessKey 认证访问密钥(可选)
AccessKey string
// SecretKey 认证密钥(可选)
SecretKey string
// Namespace 消息隔离命名空间(可选)
Namespace string
// Retries 发送失败重试次数(保留字段,新 SDK 内部处理重试)
Retries int
// SendTimeout 发送消息超时时间
SendTimeout time.Duration
// ConsumerModel 消费模式(集群或广播)
ConsumerModel string
// Topics 预热的 topic 列表,启动时会预先拉取路由信息。
// 注意:至少需要指定一个 topic,否则 SDK 无法完成初始化握手,导致 New/NewContext 永久阻塞。
Topics []string
}
Options 定义 RocketMQ 队列的配置选项