queue

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: MIT Imports: 3 Imported by: 0

README

Queue - 消息队列抽象层

统一的消息队列接口,支持内存队列和分布式队列(RocketMQ、Redis、RabbitMQ 等)。

特性

  • 统一的 API 接口
  • 支持多种消息队列实现
  • 类型安全的消息处理
  • 支持消息标签和属性
  • 支持延迟消息
  • 优雅的错误处理

安装

go get github.com/f2xme/gox/queue

快速开始

内存队列
import "github.com/f2xme/gox/queue/adapter/mem"

queue := mem.New()
defer queue.(queue.Closer).Close()

// 发布消息
err := queue.Publish(ctx, "orders", []byte(`{"order_id":"123"}`))

// 订阅消息
sub, err := queue.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
    log.Printf("Received: %s", msg.Body)
    return nil
})
defer sub.Unsubscribe()
RocketMQ 队列
import "github.com/f2xme/gox/queue/adapter/rocketmq"

queue, err := rocketmq.New(
    rocketmq.WithNameServers([]string{"localhost:9876"}),
    rocketmq.WithGroupName("my-producer-group"),
)
if err != nil {
    log.Fatal(err)
}
defer queue.(queue.Closer).Close()

// 发布带标签的消息
err = queue.PublishWithOptions(ctx, "orders", data, queue.PublishOptions{
    Tags: "urgent",
    Keys: []string{"order-123"},
})

// 订阅特定标签
sub, err := queue.SubscribeWithOptions(ctx, "orders", handler, queue.SubscribeOptions{
    ConsumerGroup: "order-processor",
    Tags:          "urgent||normal",
})

可用适配器

适配器 包路径 适用场景
内存队列 github.com/f2xme/gox/queue/adapter/mem 单机应用、测试
RocketMQ github.com/f2xme/gox/queue/adapter/rocketmq 大规模分布式系统

核心接口

Publisher
type Publisher interface {
    Publish(ctx context.Context, topic string, body []byte) error
    PublishWithOptions(ctx context.Context, topic string, body []byte, opts PublishOptions) error
}
Subscriber
type Subscriber interface {
    Subscribe(ctx context.Context, topic string, handler Handler) (Subscription, error)
    SubscribeWithOptions(ctx context.Context, topic string, handler Handler, opts SubscribeOptions) (Subscription, error)
}
Handler
type Handler func(ctx context.Context, msg *Message) error

返回 nil 表示消息处理成功(ACK),返回 error 表示处理失败(NACK)。

高级特性

消息标签过滤(RocketMQ)
// 发布带标签的消息
queue.PublishWithOptions(ctx, "orders", data, queue.PublishOptions{
    Tags: "urgent",
})

// 订阅特定标签
queue.SubscribeWithOptions(ctx, "orders", handler, queue.SubscribeOptions{
    ConsumerGroup: "processor",
    Tags:          "urgent||normal", // 订阅 urgent 或 normal
})
延迟消息(RocketMQ)
queue.PublishWithOptions(ctx, "tasks", data, queue.PublishOptions{
    DelayLevel: 3, // 延迟 10 秒
})
消息属性
queue.PublishWithOptions(ctx, "orders", data, queue.PublishOptions{
    Properties: map[string]string{
        "source": "web",
        "version": "v1",
    },
})

最佳实践

1. 实现幂等性
func handleOrder(ctx context.Context, msg *queue.Message) error {
    var order Order
    json.Unmarshal(msg.Body, &order)

    // 检查是否已处理
    if isProcessed(order.ID) {
        return nil
    }

    // 处理订单
    if err := processOrder(order); err != nil {
        return err
    }

    // 标记为已处理
    markAsProcessed(order.ID)
    return nil
}
2. 错误处理和重试
func handleMessage(ctx context.Context, msg *queue.Message) error {
    if err := processMessage(msg); err != nil {
        if isRetryable(err) {
            return err // 消息会重新入队
        }
        // 不可重试的错误:记录日志并返回 nil
        log.Printf("permanent error: %v", err)
        return nil
    }
    return nil
}
3. 使用 context 控制超时
func handleMessage(ctx context.Context, msg *queue.Message) error {
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    return processWithContext(ctx, msg)
}
4. 优雅关闭
queue := adapter.New()
defer func() {
    if closer, ok := queue.(queue.Closer); ok {
        closer.Close()
    }
}()

文档

详细文档请参考:

许可证

MIT License

Documentation

Overview

Package queue 提供统一的消息队列抽象层。

queue 包定义了消息队列的标准接口,支持内存队列和分布式队列(RocketMQ、Redis、RabbitMQ、Kafka 等)。 通过统一的 API,你可以轻松地在不同的消息队列实现之间切换,而无需修改业务代码。

功能特性

  • 统一的消息队列抽象接口
  • 支持多种消息队列实现(内存、RocketMQ、Redis、RabbitMQ)
  • 类型安全的消息处理
  • 支持发布订阅模式
  • 支持消息标签过滤(RocketMQ)
  • 支持延迟消息(RocketMQ)
  • 支持消息属性和元数据
  • 线程安全

快速开始

基本使用:

package main

import (
	"context"
	"encoding/json"
	"log"

	"github.com/f2xme/gox/queue"
	"github.com/f2xme/gox/queue/adapter/mem"
)

func main() {
	ctx := context.Background()

	// 创建内存队列
	q := mem.New()
	defer q.(queue.Closer).Close()

	// 订阅主题
	sub, err := q.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
		var order Order
		json.Unmarshal(msg.Body, &order)

		// 处理订单
		if err := processOrder(order); err != nil {
			return err // 返回错误,消息会重新入队
		}

		return nil // 返回 nil,消息被确认
	})
	if err != nil {
		log.Fatal(err)
	}
	defer sub.Unsubscribe()

	// 发布消息
	data, _ := json.Marshal(Order{ID: "123", Amount: 9900})
	err = q.Publish(ctx, "orders", data)
	if err != nil {
		log.Fatal(err)
	}
}

核心接口

Queue 接口结合了发布和订阅功能:

type Queue interface {
	Publisher
	Subscriber
}

Publisher 接口定义发布操作:

type Publisher interface {
	Publish(ctx context.Context, topic string, body []byte) error
	PublishWithOptions(ctx context.Context, topic string, body []byte, opts PublishOptions) error
}

Subscriber 接口定义订阅操作:

type Subscriber interface {
	Subscribe(ctx context.Context, topic string, handler Handler) (Subscription, error)
	SubscribeWithOptions(ctx context.Context, topic string, handler Handler, opts SubscribeOptions) (Subscription, error)
}

Handler 是消息处理函数:

type Handler func(ctx context.Context, msg *Message) error

返回 nil 表示消息处理成功(ACK),返回 error 表示处理失败(NACK)。

可用适配器

内存队列:

import "github.com/f2xme/gox/queue/adapter/mem"

queue := mem.New()

特性:进程内通信、零外部依赖、高性能、适合单机应用

RocketMQ 队列:

import "github.com/f2xme/gox/queue/adapter/rocketmq"

queue, err := rocketmq.New(
	rocketmq.WithNameServers([]string{"localhost:9876"}),
	rocketmq.WithGroupName("my-producer-group"),
)

特性:分布式消息队列、支持消息标签过滤、支持延迟消息、支持集群和广播消费、高可用和高吞吐量

使用示例

发布消息:

// 发布简单消息
err := queue.Publish(ctx, "orders", []byte(`{"order_id":"123"}`))

// 发布结构化消息
data, _ := json.Marshal(order)
err := queue.Publish(ctx, "orders", data)

// 发布带选项的消息(RocketMQ)
err := queue.PublishWithOptions(ctx, "orders", data, queue.PublishOptions{
	Tags: "urgent",
	Keys: []string{"order-123"},
	Properties: map[string]string{
		"source": "web",
	},
	DelayLevel: 3, // 延迟 10 秒
})

订阅消息:

// 订阅主题
sub, err := queue.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
	var order Order
	json.Unmarshal(msg.Body, &order)

	// 处理订单
	if err := processOrder(order); err != nil {
		return err // 返回错误,消息会重新入队
	}

	return nil // 返回 nil,消息被确认
})

// 取消订阅
defer sub.Unsubscribe()

// 带选项订阅(RocketMQ)
sub, err := queue.SubscribeWithOptions(ctx, "orders", handler, queue.SubscribeOptions{
	ConsumerGroup:  "order-processor",
	Tags:           "urgent||normal", // 过滤标签
	MaxConcurrency: 10,               // 限制并发
})

多个订阅者:

// 订阅者 1:发送邮件
queue.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
	return sendEmail(msg.Body)
})

// 订阅者 2:更新库存
queue.Subscribe(ctx, "orders", func(ctx context.Context, msg *queue.Message) error {
	return updateInventory(msg.Body)
})

优雅关闭:

queue := adapter.New()
defer func() {
	if closer, ok := queue.(queue.Closer); ok {
		closer.Close()
	}
}()

消息处理模式

点对点(Point-to-Point):

一个消息只被一个消费者处理:

// 生产者
queue.Publish(ctx, "tasks", taskData)

// 消费者 1
queue.Subscribe(ctx, "tasks", handler1)

// 消费者 2
queue.Subscribe(ctx, "tasks", handler2)

// 每个消息只会被 handler1 或 handler2 处理一次

发布订阅(Pub/Sub):

一个消息被所有订阅者处理:

// 发布者
queue.Publish(ctx, "events", eventData)

// 订阅者 1
queue.Subscribe(ctx, "events", handler1)

// 订阅者 2
queue.Subscribe(ctx, "events", handler2)

// 每个消息会被 handler1 和 handler2 都处理

最佳实践

使用结构化消息:

// 推荐:使用 JSON
type OrderMessage struct {
	OrderID string `json:"order_id"`
	Amount  int64  `json:"amount"`
}

data, _ := json.Marshal(OrderMessage{OrderID: "123", Amount: 9900})
queue.Publish(ctx, "orders", data)

实现幂等性:

func handleOrder(ctx context.Context, msg *queue.Message) error {
	var order OrderMessage
	json.Unmarshal(msg.Body, &order)

	// 检查是否已处理
	if isProcessed(order.OrderID) {
		return nil // 已处理,直接返回
	}

	// 处理订单
	processOrder(order)

	// 标记为已处理
	markAsProcessed(order.OrderID)

	return nil
}

错误处理和重试:

func handleMessage(ctx context.Context, msg *queue.Message) error {
	// 可重试的错误:返回 error
	if err := processMessage(msg); err != nil {
		if isRetryable(err) {
			return err // 消息会重新入队
		}
		// 不可重试的错误:记录日志并返回 nil
		log.Printf("permanent error: %v", err)
		return nil // 消息被确认,不再重试
	}
	return nil
}

使用 context 控制超时:

func handleMessage(ctx context.Context, msg *queue.Message) error {
	// 设置处理超时
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	return processWithContext(ctx, msg)
}

线程安全

所有队列实现都是线程安全的,可以在多个 goroutine 中并发使用。

Package queue 提供统一的消息队列抽象层。 支持内存队列和分布式队列的类型安全操作。

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("queue: closed")

ErrClosed is returned when operating on a closed queue.

View Source
var ErrFull = errors.New("queue: full")

ErrFull is returned when the queue is full and cannot accept more messages.

Functions

This section is empty.

Types

type AdvancedPublisher added in v0.1.2

type AdvancedPublisher interface {
	Publisher
	// PublishAndGetResult 发送消息并返回包含消息 ID 的结果
	PublishAndGetResult(ctx context.Context, topic string, body []byte, opts PublishOptions) (*SendResult, error)
}

AdvancedPublisher 扩展发布接口,支持获取发送结果(如消息 ID) 并非所有实现都支持,可通过类型断言按需使用:

if ap, ok := q.(queue.AdvancedPublisher); ok {
    result, err := ap.PublishAndGetResult(ctx, topic, body, opts)
}

type Closer

type Closer interface {
	// Close 释放队列持有的所有资源
	Close() error
}

Closer 为队列实现提供资源清理

type Handler

type Handler func(ctx context.Context, msg *Message) error

Handler 消息处理回调函数 返回 nil 表示确认消息,返回 error 表示拒绝消息

type Message

type Message struct {
	// ID 消息的唯一标识符
	ID string
	// Topic 消息所属的主题/频道
	Topic string
	// Body 消息负载
	Body []byte
	// Tags 可选的消息标签,用于过滤(RocketMQ 等使用)
	Tags string
	// Keys 可选的消息键,用于索引和搜索
	Keys []string
	// Properties 可选的键值对,用于自定义元数据
	Properties map[string]string
	// DelayLevel 延迟消息的延迟级别(0 = 不延迟)
	DelayLevel int
	// BornTimestamp 消息创建时间
	BornTimestamp time.Time
}

Message 表示队列中的一条消息

type PublishOptions

type PublishOptions struct {
	// Tags 消息标签,用于过滤(例如 "TagA||TagB")
	Tags string
	// Keys 消息键,用于索引
	Keys []string
	// Properties 自定义元数据
	Properties map[string]string
	// DelayLevel 延迟投递级别(0 = 立即投递)
	DelayLevel int
}

PublishOptions 发布消息的可选参数

type Publisher

type Publisher interface {
	// Publish 发送消息到指定主题
	Publish(ctx context.Context, topic string, body []byte) error
	// PublishWithOptions 使用额外选项发送消息
	PublishWithOptions(ctx context.Context, topic string, body []byte, opts PublishOptions) error
}

Publisher 定义基础发布操作接口

type Queue

type Queue interface {
	Publisher
	Subscriber
}

Queue 组合了 Publisher 和 Subscriber 接口

type SendResult added in v0.1.2

type SendResult struct {
	// MessageID Broker 分配的消息唯一标识,可用于追踪和幂等去重
	MessageID string
}

SendResult 发送消息的结果

type SubscribeOptions

type SubscribeOptions struct {
	// ConsumerGroup 消费者组名称(分布式队列必需)
	ConsumerGroup string
	// Tags 过滤消息的标签(例如 "TagA||TagB","*" 表示全部)
	Tags string
	// MaxConcurrency 限制并发消息处理数量(0 = 不限制)
	MaxConcurrency int
	// AutoCommit 启用自动消息确认(默认:true)
	AutoCommit bool
}

SubscribeOptions 订阅主题的可选参数

type Subscriber

type Subscriber interface {
	// Subscribe 为指定主题注册处理函数
	// 每当主题收到消息时,处理函数会被调用
	// 返回一个 Subscription,可用于取消订阅
	Subscribe(ctx context.Context, topic string, handler Handler) (Subscription, error)
	// SubscribeWithOptions 使用额外选项注册处理函数
	SubscribeWithOptions(ctx context.Context, topic string, handler Handler, opts SubscribeOptions) (Subscription, error)
}

Subscriber 定义订阅操作接口

type Subscription

type Subscription interface {
	// Unsubscribe 停止接收消息并释放资源
	Unsubscribe() error
}

Subscription 表示一个可以关闭的活动订阅

Directories

Path Synopsis
adapter
mem module
memadapter module
rocketmq module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL