Grabbit

package module
v0.0.0-...-3964641 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: MIT Imports: 16 Imported by: 0

README

Grabbit — RabbitMQ 简易管理与监控库(中文说明)

这是一个用于简化 RabbitMQ 操作和监控的 Go 库(模块名:github.com/Xiao-yi123/Grabbit)。它封装了常用的 RabbitMQ 操作(连接、通道、交换机/队列声明、绑定、发布、消费)并提供了对 RabbitMQ HTTP API 的简单访问以及按任务配置的监控器支持。


功能概览

  • 管理与维护 RabbitMQ 连接(自动创建 connection、channel 管理)
  • 声明或检查交换机、声明队列并绑定到交换机
  • 发布消息(支持持久化选项)
  • 消费消息(支持自动/手动确认、调用自定义处理函数)
  • 并发消费者(StartMonitor 支持同时启动多个消费者)
  • 通过 RabbitMQ HTTP API 获取交换机、队列与绑定信息(需启用 management 插件)
  • ENV 分组格式的配置解析与默认值支持
  • 简单的监控配置结构(MonitorQueue、MonitorConfig)

先决条件

  • Go 1.20+(项目 go.mod 指定 go 1.24.7,但兼容较新的 Go 版本)
  • 可访问的 RabbitMQ 服务(AMQP 协议端口,默认 5672)
  • 如果要使用 HTTP API(获取 exchanges/queues/bindings),需启用 RabbitMQ Management Plugin(默认 API 端口 15672)

安装

在你的项目中引入本模块(示例):

go get github.com/Xiao-yi123/Grabbit

或者在你的 go.mod 中添加依赖并执行 go mod tidy


配置(环境或文件)

库支持从分组格式的 env 文件读取配置(示例文件 .example.env)。默认会读取 RABBITMQ 分组内的以下键:

  • RABBITMQ_HOST(默认 127.0.0.1
  • RABBITMQ_PORT(默认 5672
  • RABBITMQ_USER(默认 guest
  • RABBITMQ_PASSWORD(默认 guest
  • RABBITMQ_API_PORT(默认 15672
  • RABBITMQ_VIRTUAL_HOST(默认 /
  • RABBITMQ_HEARTBEAT(默认 0,代码中以 0 表示使用默认心跳)
  • RABBITMQ_SOCKET_TIMEOUT
  • RABBITMQ_PREFETCH_COUNT
  • RABBITMQ_MAX_CONSUMER
  • RABBITMQ_BLOCKED_CONNECTION_TIMEOUT
  • RABBITMQ_CONNECTION_ATTEMPTS
  • RABBITMQ_RETRY_DELAY

示例 .example.env(分组格式):

[RABBITMQ]
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_API_PORT=15672
RABBITMQ_VIRTUAL_HOST=/
RABBITMQ_HEARTBEAT=0
RABBITMQ_SOCKET_TIMEOUT=12
RABBITMQ_PREFETCH_COUNT=1
RABBITMQ_MAX_CONSUMER=5
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT=10

代码会尝试用 NewManager(fileName string, settings *Settings) 加载该文件(传空字符串表示使用默认值 / 不提供文件时回退到内置默认)。


快速开始示例

下面是常见用法的简单示例,演示如何初始化管理器、声明交换机/队列、发布和消费消息。

初始化 Manager:

package main

import (
    "log"
    "github.com/Xiao-yi123/Grabbit"
)

func main() {
    // 传入分组 env 文件路径;传空字符串表示使用默认配置
    mgr, err := Grabbit.NewManager(".example.env",nil)
    if err != nil {
        log.Fatalf("NewManager error: %v", err)
    }
    defer mgr.Close()

    // 之后可以使用 mgr 调用各种方法
}

声明交换机与队列并绑定:

// 声明交换机
if err := mgr.CreateExchange("my-exchange", Grabbit.GetExchangerType("DIRECT")); err != nil {
    log.Fatalf("创建交换机失败: %v", err)
}

// 创建队列并绑定(queueName、routingKey)
queue, err := mgr.CreateQueue("my-exchange", "my-queue", "my-key", nil, nil)
if err != nil {
    log.Fatalf("创建队列失败: %v", err)
}
log.Printf("创建队列成功: %s", queue.Name)

发布消息:

body := []byte("hello world")
if err := mgr.Publish("my-exchange", "my-key", body, true); err != nil {
    log.Fatalf("消息发布失败: %v", err)
}

消费消息(同步、一次):

// 通过 ConsumerFuncMap 先注册处理函数,例如在 init 中:
rabbit.ConsumerFuncMap["Print"] = func(msg []byte) error {
    fmt.Println("收到消息:", string(msg))
    return nil
}

// 调用消费(autoAck=false 手动 ack,isContinue=false 处理一条就返回)
if err := mgr.Consume("my-queue", false, false, "Print"); err != nil {
    log.Fatalf("消费失败: %v", err)
}

并发消费者(监控器):

// StartMonitor 会为每个 consumer 启动一个独立 goroutine(并为每个 consumer 创建独立 channel)
if err := mgr.StartMonitor("my-exchange", "my-queue", "Print", 5); err != nil {
    log.Fatalf("启动监控失败: %v", err)
}

通过 HTTP API 获取信息(需开启 management plugin):

exchanges, err := mgr.GetExchanges()
queues, err := mgr.GetQueues()
bindings, err := mgr.GetBindings()

API 说明(主要方法)

  • NewManager(fileName string) (*Manager, error)
    • 创建并返回 Manager 实例,支持通过 env 文件加载配置(传空字符串使用默认值)
  • (*Manager) Connect() error
    • 建立 AMQP 连接(内部管理重连逻辑请参阅源码)
  • (*Manager) NewChannel() (*amqp.Channel, error)
    • 返回新的 channel(会检查连接状态)
  • (*Manager) CreateExchange(exchange, exchangeType string) error
    • 声明交换机(exchangeType 支持:DIRECT、TOPIC、FANOUT、HEADERS;内部使用 pEnum)
  • (*Manager) CreateQueue(exchangeName, queueName, routingKey string, queueArgs, bindArgs amqp.Table) (*amqp.Queue, error)
    • 声明队列并绑定到交换机
  • (*Manager) Publish(exchange, routingKey string, msg []byte, persistent bool) error
    • 发布消息(persistent 控制消息是否持久化)
  • (*Manager) Consume(queue string, autoAck bool, isContinue bool, funcName string) error
    • 消费消息;若传入 funcName(并在 ConsumerFuncMap 注册了对应函数),会调用该函数处理消息
  • (*Manager) StartMonitor(exchangeName, queueName, funcName string, maxConsumer int) error
    • 并发启动多个消费者
  • (*Manager) GetExchanges() ([]Exchange, error),GetQueues(), GetBindings()
    • 通过 HTTP API 获取 RabbitMQ 信息(使用 resty 客户端)

辅助结构:

  • MonitorConfig / MonitorQueue / QueueInfo:用于预先配置多个监控任务与队列信息
  • Settings:封装配置读取与默认值获取(提供通用的 Get(fieldName string) interface{} 方法)
  • Utils:包括解析分组 env 文件、JSON 打印等辅助函数

运行测试

项目包含若干测试文件(例如 manager_test.goapi_test.gomonitor_config_test.go 等)。运行:

go test ./...

注意:

  • 有些测试会尝试连接本地/远程 RabbitMQ 服务,请确保 RabbitMQ 服务可用并且管理插件启用(用于 API 测试)。
  • 测试可能依赖于 .example.env 或你自己的配置文件。

注意事项与实现细节

  • 每个是否并发消费的 goroutine 都会使用独立的 AMQP channel(源码在 Consume 中创建独立 channel),这是良好实践,避免 channel 竞争。
  • Settings.Get 返回的是字段的副本(字符串/整数或解引用的指针副本),以防调用方修改返回值影响原始配置(请注意副本为新创建的指针)。
  • HTTP API 调用使用 resty 客户端,GetBindings 使用 fxjson 做灵活解析。
  • ExchangeType 使用了 pEnum 包(仓库依赖)做枚举处理。

常见问题(FAQ)

Q: 如何指定自定义配置文件? A: 调用 NewManager("path/to/your.env"),文件需为分组格式(见示例)。

Q: 管理插件未启用时会怎样? A: GetExchanges / GetQueues / GetBindings 会请求管理 API(默认 15672),若不可达则返回错误。确保管理插件启用并可以通过 http://host:15672 访问。

Q: 消费者函数如何注册? A: 在程序初始化阶段将函数赋到 rabbit.ConsumerFuncMap,例如:

rabbit.ConsumerFuncMap["Print"] = Print

Print 的签名应为 func([]byte) error


贡献

欢迎提交 issue 或 PR。贡献者请遵循标准 Go 代码风格,并添加测试(如适用)。


Documentation

Index

Constants

This section is empty.

Variables

View Source
var ConsumerFuncMap = map[string]ConsumerFunc{

	"Print": Print,
}

定义一个映射,将字符串映射到ConsumerFunc函数类型

View Source
var ExchangeTypeEnum = pEnum.NewEnum("ExchangeType", map[string]interface{}{
	"DIRECT":  "direct",
	"TOPIC":   "topic",
	"FANOUT":  "fanout",
	"HEADERS": "headers",
})
View Source
var MonitorQueue = map[string]monitor{
	"exchange_name": {
		// contains filtered or unexported fields
	},
	"exchange_name2": {
		// contains filtered or unexported fields
	},
}

Functions

func GetExchangerType

func GetExchangerType(exchangeType string) string

GetExchangerType 根据传入的交换器类型获取对应的字符串值 参数:

exchangeType: 需要获取的交换器类型

返回值:

string: 交换器类型对应的字符串值,如果传入的类型不存在则返回默认的DIRECT类型

func GetIntPtrWithDefault

func GetIntPtrWithDefault(group map[string]string, key string, defaultValue *int) *int

GetIntPtrWithDefault 获取整数指针值,如果键不存在则返回默认值

func GetIntWithDefault

func GetIntWithDefault(group map[string]string, key string, defaultValue int) int

GetIntWithDefault 获取整数值,如果键不存在则返回默认值

func GetStringWithDefault

func GetStringWithDefault(group map[string]string, key, defaultValue string) string

GetStringWithDefault 获取字符串值,如果键不存在则返回默认值

func InterfaceToMap

func InterfaceToMap[T any](i interface{}) (map[string]T, error)

InterfaceToMap =========================== interface 转换成Map ============================ InterfaceToMap[T] 将 interface{} 转换为指定类型的 map(自动匹配字段类型)

func Print

func Print(msg []byte) error

func PrintAsJSON

func PrintAsJSON(data interface{}, indent bool) error

PrintAsJSON 将任意数据结构转换为JSON并打印 data: 要转换的任意数据结构 indent: 是否使用缩进格式(true为带缩进,false为紧凑格式)

Types

type ConsumerFunc

type ConsumerFunc func(msg []byte) error

ConsumerFunc 定义消息消费的回调函数类型

type EnvConfig

type EnvConfig struct {
	Groups map[string]map[string]string // 外层key是分组名,内层是键值对
}

存储分组配置的结构

type Exchange

type Exchange struct {
	Name       string `json:"name"`
	Type       string `json:"type"`
	Durable    bool   `json:"durable"`
	AutoDelete bool   `json:"auto_delete"`
	Internal   bool   `json:"internal"`
}

定义结构体对应RabbitMQ API返回的JSON结构

type FetchResult

type FetchResult[T any] struct {
	Resp   *resty.Response
	Data   T
	Raw    []byte
	Status int
	Header map[string][]string
}

func FetchDataGenericity

func FetchDataGenericity[T any](
	client *resty.Client,
	method, url string,
	options ...map[string]interface{},
) (*FetchResult[T], error)

type Manager

type Manager struct {
	ReceivedBody interface{} // 接收到的消息体
	// contains filtered or unexported fields
}

func NewManager

func NewManager(fileName string, settings *Settings) (*Manager, error)

NewManager 创建一个新的管理器实例 fileName: 配置文件路径 默认会读取配置文件 .env 如不需读取 传入 "" settings: 配置设置,如果为nil则使用默认配置

可传入 isConnect 是否自动连接 默认为True

返回值: 指向Manager结构体的指针

func (*Manager) CheckExchangeExists

func (m *Manager) CheckExchangeExists(exchangeName, exchangeType string) (bool, error)

CheckExchangeExists 检查RabbitMQ中是否存在指定的交换机

func (*Manager) Close

func (m *Manager) Close() error

Close 关闭客户端连接

func (*Manager) Connect

func (m *Manager) Connect() error

Connect 建立与RabbitMQ服务器的连接

func (*Manager) Consume

func (m *Manager) Consume(queue string, autoAck bool, isContinue bool, funcName string) error

Consume 注册并启动一个消费者来消费指定队列的消息 queue: 要消费的队列名称 autoAck: 是否自动确认消息,true表示自动确认,false表示手动确认 isContinue: 是否持续消费,true表示持续消费消息,false表示处理一条消息后停止 funcName: 方法名,如果为空,则不处理消息,只消费消息 返回值: 消费过程中出现的错误

func (*Manager) ConsumeDelayMessage

func (m *Manager) ConsumeDelayMessage(queueName string, autoAck bool, isContinue bool, funcName string) error

ConsumeDelayMessage 消费延时消息(从死信队列消费)

func (*Manager) CreateDelayQueue

func (m *Manager) CreateDelayQueue(exchangeName, queueName, routingKey string, ttl int) (*amqp.Queue, error)

func (*Manager) CreateExchange

func (m *Manager) CreateExchange(exchange, exchangeType string) error

CreateExchange 创建交换机 参数:

exchange: 交换机名称
exchangeType: 交换机类型

返回值:

error: 创建交换机过程中发生的错误

func (*Manager) CreateQueue

func (m *Manager) CreateQueue(
	exchangeName, queueName, routingKey string,
	queueArgs amqp.Table,
	bindArgs amqp.Table) (*amqp.Queue, error)

CreateQueue 创建一个新的队列并将其绑定到指定的交换机 参数:

exchangeName: 交换机名称
QueueName: 队列名称
routingKey: 路由键
queueArgs: 队列声明的额外参数
bindArgs: 队列绑定的额外参数

返回值:

*amqp.Queue: 创建成功的队列对象
error: 创建过程中发生的错误

func (*Manager) CreateVhost

func (m *Manager) CreateVhost(vhostName string) error

CreateVhost 创建新的虚拟主机 该方法需要管理员权限才能执行 参数:

vhostName: 要创建的虚拟主机名称

返回值:

error: 操作过程中发生的错误

func (*Manager) DeleteVhost

func (m *Manager) DeleteVhost(vhostName string) error

DeleteVhost 删除指定的虚拟主机 该方法需要管理员权限才能执行 注意:删除虚拟主机将同时删除其中的所有队列、交换机和绑定关系 参数:

vhostName: 要删除的虚拟主机名称

返回值:

error: 操作过程中发生的错误

func (*Manager) GetBindings

func (m *Manager) GetBindings() (result map[string][]map[string]string, err error)

获取所有绑定关系

func (*Manager) GetConnection

func (m *Manager) GetConnection() *amqp.Connection

GetConnection 获取底层的AMQP连接对象 该函数提供对原始AMQP连接对象的访问,用于需要直接操作连接的高级操作

返回值:

*amqp.connection - 当前的AMQP连接实例,如果未连接则返回nil

func (*Manager) GetExchanges

func (m *Manager) GetExchanges() ([]Exchange, error)

获取所有交换机

func (*Manager) GetQueues

func (m *Manager) GetQueues() ([]Queue, error)

获取所有队列

func (*Manager) IsConnected

func (m *Manager) IsConnected() bool

IsConnected 检查是否已连接

func (*Manager) NewChannel

func (m *Manager) NewChannel() (*amqp.Channel, error)

NewChannel 创建一个新的AMQP通道 该方法会检查RabbitMQ连接状态,如果连接不存在或已关闭则返回错误 返回新创建的通道实例和可能发生的错误

参数:

返回值:

*amqp.Channel: 新创建的AMQP通道实例
error: 连接问题或其他错误时返回非nil值

func (*Manager) Publish

func (m *Manager) Publish(exchange, routingKey string, msg []byte, persistent bool) error

Publish 向指定的交换机和路由键发布消息 exchange: 交换机名称 routingKey: 路由键 QueueName: 队列名称 msg: 要发送的消息内容 persistent: 是否持久化消息 返回值: 发布消息过程中发生的错误

func (*Manager) PublishDelayMessage

func (m *Manager) PublishDelayMessage(exchange, routingKey, queueName string, msg []byte, ttl int) error

PublishDelayMessage 发布延时消息到指定的队列

参数:

exchange: 交换机名称
routingKey: 路由键
queueName: 队列名称
msg: 要发送的消息内容
ttl: 消息的存活时间(毫秒)

返回值:

error: 发布过程中可能出现的错误

func (*Manager) StartMonitor

func (m *Manager) StartMonitor(exchangeName, queueName, funcName string, maxConsumer int) error

StartMonitor 启动消息队列监控器,创建多个goroutine并发消费消息 exchangeName: 交换机名称,用于指定消息来源的交换机 QueueName: 队列名称,用于指定要消费的队列 funcName: 函数名称,用于标识消费消息的处理函数 MaxConsumer: 最大消费者数量,指定并发消费的goroutine数量 返回值: 如果启动过程中出现错误则返回error,否则返回nil

type MonitorConfig

type MonitorConfig struct {
	CurrentMonitor   *monitor
	CurrentQueueInfo *QueueInfo
	// contains filtered or unexported fields
}

func NewMonitorConfig

func NewMonitorConfig(monitorQueueV map[string]monitor) *MonitorConfig

NewMonitorConfig 创建一个新的Config实例 参数:

monitorQueueV: 监控队列映射,如果为nil则使用默认的MonitorQueue

返回值:

*MonitorConfig: 返回Config实例的指针

func (*MonitorConfig) Consume

func (c *MonitorConfig) Consume(manager *Manager) (body interface{}, err error)

Consume 从队列中消费消息 返回值:

body: 接收到的消息体
err: 执行过程中可能发生的错误

func (*MonitorConfig) GetExchangeName

func (c *MonitorConfig) GetExchangeName() string

GetExchangeName 获取当前监控器的交换机名称 该函数首先检查监控器状态,然后返回交换机名称

参数:

  • c: 指向Config实例的指针

返回值:

  • string: 当前监控器的交换机名称

func (*MonitorConfig) GetQueueInfoByExchangeAndQueueKey

func (c *MonitorConfig) GetQueueInfoByExchangeAndQueueKey(exchangeName string, queueKey string) (QueueInfo, bool)

GetQueueInfoByExchangeAndQueueKey 根据交换机名称和队列键值获取队列信息 参数:

exchangeName: 交换机名称
queueKey: 队列在queueInfo映射中的键值

返回值:

QueueInfo: 对应的队列信息
bool: 是否找到对应的队列信息

func (*MonitorConfig) Publish

func (c *MonitorConfig) Publish(manager *Manager, msg []byte, persistent bool) (err error)

Publish 发布消息到监控队列 msg: 要发布的消息内容 persistent: 是否持久化消息 返回值: 发布操作的错误信息

func (*MonitorConfig) UseKey

func (c *MonitorConfig) UseKey(key string) *MonitorConfig

UseKey 根据给定的键设置当前监控配置 参数:

key: 要使用的监控键值

返回值:

*MonitorConfig: 返回配置对象自身的指针,用于链式调用

func (*MonitorConfig) UseQueue

func (c *MonitorConfig) UseQueue(queueName string) *MonitorConfig

UseQueue 选择并设置当前要使用的队列信息 参数:

queueName - 要使用的队列名称

返回值:

*MonitorConfig - 返回配置对象自身,支持链式调用

type Queue

type Queue struct {
	Name      string `json:"name"`
	Messages  int    `json:"messages"`
	Consumers int    `json:"consumers"`
	Durable   bool   `json:"durable"`
}

type QueueInfo

type QueueInfo struct {
	Title        string // 监控任务的标题
	QueueName    string // 要监控的队列的名称
	QueueFun     string // 与队列相关的函数名称
	MaxConsumer  int    //队列的最大消费者数量
	IsCreateTask bool   //是否创建任务
	RoutingKey   string // 添加路由键字段 可以不传 默认跟队列名一致

}

type Settings

type Settings struct {
	// contains filtered or unexported fields
}

Settings 私有结构体,定义RabbitMQ的配置设置

func NewDefaultSettings

func NewDefaultSettings(fileName *string) *Settings

NewDefaultSettings 创建并返回一个带有默认配置的 Settings 实例。 该函数会根据传入的文件名加载环境变量配置,若未提供有效文件名则默认使用 ".env" 文件。 配置中与 RabbitMQ 相关的部分将被解析,并用于初始化 Settings 结构体中的各个字段。

参数:

  • fileName: 指向配置文件路径的指针,可以为 nil。如果为 nil 或空字符串,则使用默认文件名 ".env"

返回值:

  • *Settings: 包含从环境变量或默认值中读取的 RabbitMQ 配置信息的结构体指针

func (*Settings) Get

func (c *Settings) Get(fieldName string) interface{}

Get 通用获取配置值的方法,通过字段名获取对应的值 确保返回的是字段值的副本,无法修改原始字段

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL