Documentation
¶
Index ¶
- Variables
- func GetExchangerType(exchangeType string) string
- func GetIntPtrWithDefault(group map[string]string, key string, defaultValue *int) *int
- func GetIntWithDefault(group map[string]string, key string, defaultValue int) int
- func GetStringWithDefault(group map[string]string, key, defaultValue string) string
- func InterfaceToMap[T any](i interface{}) (map[string]T, error)
- func Print(msg []byte) error
- func PrintAsJSON(data interface{}, indent bool) error
- type ConsumerFunc
- type EnvConfig
- type Exchange
- type FetchResult
- type Manager
- func (m *Manager) CheckExchangeExists(exchangeName, exchangeType string) (bool, error)
- func (m *Manager) Close() error
- func (m *Manager) Connect() error
- func (m *Manager) Consume(queue string, autoAck bool, isContinue bool, funcName string) error
- func (m *Manager) ConsumeDelayMessage(queueName string, autoAck bool, isContinue bool, funcName string) error
- func (m *Manager) CreateDelayQueue(exchangeName, queueName, routingKey string, ttl int) (*amqp.Queue, error)
- func (m *Manager) CreateExchange(exchange, exchangeType string) error
- func (m *Manager) CreateQueue(exchangeName, queueName, routingKey string, queueArgs amqp.Table, ...) (*amqp.Queue, error)
- func (m *Manager) CreateVhost(vhostName string) error
- func (m *Manager) DeleteVhost(vhostName string) error
- func (m *Manager) GetBindings() (result map[string][]map[string]string, err error)
- func (m *Manager) GetConnection() *amqp.Connection
- func (m *Manager) GetExchanges() ([]Exchange, error)
- func (m *Manager) GetQueues() ([]Queue, error)
- func (m *Manager) IsConnected() bool
- func (m *Manager) NewChannel() (*amqp.Channel, error)
- func (m *Manager) Publish(exchange, routingKey string, msg []byte, persistent bool) error
- func (m *Manager) PublishDelayMessage(exchange, routingKey, queueName string, msg []byte, ttl int) error
- func (m *Manager) StartMonitor(exchangeName, queueName, funcName string, maxConsumer int) error
- type MonitorConfig
- func (c *MonitorConfig) Consume(manager *Manager) (body interface{}, err error)
- func (c *MonitorConfig) GetExchangeName() string
- func (c *MonitorConfig) GetQueueInfoByExchangeAndQueueKey(exchangeName string, queueKey string) (QueueInfo, bool)
- func (c *MonitorConfig) Publish(manager *Manager, msg []byte, persistent bool) (err error)
- func (c *MonitorConfig) UseKey(key string) *MonitorConfig
- func (c *MonitorConfig) UseQueue(queueName string) *MonitorConfig
- type Queue
- type QueueInfo
- type Settings
Constants ¶
This section is empty.
Variables ¶
var ConsumerFuncMap = map[string]ConsumerFunc{ "Print": Print, }
定义一个映射,将字符串映射到ConsumerFunc函数类型
var ExchangeTypeEnum = pEnum.NewEnum("ExchangeType", map[string]interface{}{
"DIRECT": "direct",
"TOPIC": "topic",
"FANOUT": "fanout",
"HEADERS": "headers",
})
var MonitorQueue = map[string]monitor{ "exchange_name": { // contains filtered or unexported fields }, "exchange_name2": { // contains filtered or unexported fields }, }
Functions ¶
func GetExchangerType ¶
GetExchangerType 根据传入的交换器类型获取对应的字符串值 参数:
exchangeType: 需要获取的交换器类型
返回值:
string: 交换器类型对应的字符串值,如果传入的类型不存在则返回默认的DIRECT类型
func GetIntPtrWithDefault ¶
GetIntPtrWithDefault 获取整数指针值,如果键不存在则返回默认值
func GetIntWithDefault ¶
GetIntWithDefault 获取整数值,如果键不存在则返回默认值
func GetStringWithDefault ¶
GetStringWithDefault 获取字符串值,如果键不存在则返回默认值
func InterfaceToMap ¶
InterfaceToMap =========================== interface 转换成Map ============================ InterfaceToMap[T] 将 interface{} 转换为指定类型的 map(自动匹配字段类型)
func PrintAsJSON ¶
PrintAsJSON 将任意数据结构转换为JSON并打印 data: 要转换的任意数据结构 indent: 是否使用缩进格式(true为带缩进,false为紧凑格式)
Types ¶
type Exchange ¶
type Exchange struct {
Name string `json:"name"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
}
定义结构体对应RabbitMQ API返回的JSON结构
type FetchResult ¶
type FetchResult[T any] struct { Resp *resty.Response Data T Raw []byte Status int Header map[string][]string }
func FetchDataGenericity ¶
type Manager ¶
type Manager struct {
ReceivedBody interface{} // 接收到的消息体
// contains filtered or unexported fields
}
func NewManager ¶
NewManager 创建一个新的管理器实例 fileName: 配置文件路径 默认会读取配置文件 .env 如不需读取 传入 "" settings: 配置设置,如果为nil则使用默认配置
可传入 isConnect 是否自动连接 默认为True
返回值: 指向Manager结构体的指针
func (*Manager) CheckExchangeExists ¶
CheckExchangeExists 检查RabbitMQ中是否存在指定的交换机
func (*Manager) Consume ¶
Consume 注册并启动一个消费者来消费指定队列的消息 queue: 要消费的队列名称 autoAck: 是否自动确认消息,true表示自动确认,false表示手动确认 isContinue: 是否持续消费,true表示持续消费消息,false表示处理一条消息后停止 funcName: 方法名,如果为空,则不处理消息,只消费消息 返回值: 消费过程中出现的错误
func (*Manager) ConsumeDelayMessage ¶
func (m *Manager) ConsumeDelayMessage(queueName string, autoAck bool, isContinue bool, funcName string) error
ConsumeDelayMessage 消费延时消息(从死信队列消费)
func (*Manager) CreateDelayQueue ¶
func (*Manager) CreateExchange ¶
CreateExchange 创建交换机 参数:
exchange: 交换机名称 exchangeType: 交换机类型
返回值:
error: 创建交换机过程中发生的错误
func (*Manager) CreateQueue ¶
func (m *Manager) CreateQueue( exchangeName, queueName, routingKey string, queueArgs amqp.Table, bindArgs amqp.Table) (*amqp.Queue, error)
CreateQueue 创建一个新的队列并将其绑定到指定的交换机 参数:
exchangeName: 交换机名称 QueueName: 队列名称 routingKey: 路由键 queueArgs: 队列声明的额外参数 bindArgs: 队列绑定的额外参数
返回值:
*amqp.Queue: 创建成功的队列对象 error: 创建过程中发生的错误
func (*Manager) CreateVhost ¶
CreateVhost 创建新的虚拟主机 该方法需要管理员权限才能执行 参数:
vhostName: 要创建的虚拟主机名称
返回值:
error: 操作过程中发生的错误
func (*Manager) DeleteVhost ¶
DeleteVhost 删除指定的虚拟主机 该方法需要管理员权限才能执行 注意:删除虚拟主机将同时删除其中的所有队列、交换机和绑定关系 参数:
vhostName: 要删除的虚拟主机名称
返回值:
error: 操作过程中发生的错误
func (*Manager) GetBindings ¶
获取所有绑定关系
func (*Manager) GetConnection ¶
func (m *Manager) GetConnection() *amqp.Connection
GetConnection 获取底层的AMQP连接对象 该函数提供对原始AMQP连接对象的访问,用于需要直接操作连接的高级操作
返回值:
*amqp.connection - 当前的AMQP连接实例,如果未连接则返回nil
func (*Manager) NewChannel ¶
NewChannel 创建一个新的AMQP通道 该方法会检查RabbitMQ连接状态,如果连接不存在或已关闭则返回错误 返回新创建的通道实例和可能发生的错误
参数:
无
返回值:
*amqp.Channel: 新创建的AMQP通道实例 error: 连接问题或其他错误时返回非nil值
func (*Manager) Publish ¶
Publish 向指定的交换机和路由键发布消息 exchange: 交换机名称 routingKey: 路由键 QueueName: 队列名称 msg: 要发送的消息内容 persistent: 是否持久化消息 返回值: 发布消息过程中发生的错误
func (*Manager) PublishDelayMessage ¶
func (m *Manager) PublishDelayMessage(exchange, routingKey, queueName string, msg []byte, ttl int) error
PublishDelayMessage 发布延时消息到指定的队列
参数:
exchange: 交换机名称 routingKey: 路由键 queueName: 队列名称 msg: 要发送的消息内容 ttl: 消息的存活时间(毫秒)
返回值:
error: 发布过程中可能出现的错误
func (*Manager) StartMonitor ¶
StartMonitor 启动消息队列监控器,创建多个goroutine并发消费消息 exchangeName: 交换机名称,用于指定消息来源的交换机 QueueName: 队列名称,用于指定要消费的队列 funcName: 函数名称,用于标识消费消息的处理函数 MaxConsumer: 最大消费者数量,指定并发消费的goroutine数量 返回值: 如果启动过程中出现错误则返回error,否则返回nil
type MonitorConfig ¶
type MonitorConfig struct {
CurrentMonitor *monitor
CurrentQueueInfo *QueueInfo
// contains filtered or unexported fields
}
func NewMonitorConfig ¶
func NewMonitorConfig(monitorQueueV map[string]monitor) *MonitorConfig
NewMonitorConfig 创建一个新的Config实例 参数:
monitorQueueV: 监控队列映射,如果为nil则使用默认的MonitorQueue
返回值:
*MonitorConfig: 返回Config实例的指针
func (*MonitorConfig) Consume ¶
func (c *MonitorConfig) Consume(manager *Manager) (body interface{}, err error)
Consume 从队列中消费消息 返回值:
body: 接收到的消息体 err: 执行过程中可能发生的错误
func (*MonitorConfig) GetExchangeName ¶
func (c *MonitorConfig) GetExchangeName() string
GetExchangeName 获取当前监控器的交换机名称 该函数首先检查监控器状态,然后返回交换机名称
参数:
- c: 指向Config实例的指针
返回值:
- string: 当前监控器的交换机名称
func (*MonitorConfig) GetQueueInfoByExchangeAndQueueKey ¶
func (c *MonitorConfig) GetQueueInfoByExchangeAndQueueKey(exchangeName string, queueKey string) (QueueInfo, bool)
GetQueueInfoByExchangeAndQueueKey 根据交换机名称和队列键值获取队列信息 参数:
exchangeName: 交换机名称 queueKey: 队列在queueInfo映射中的键值
返回值:
QueueInfo: 对应的队列信息 bool: 是否找到对应的队列信息
func (*MonitorConfig) Publish ¶
func (c *MonitorConfig) Publish(manager *Manager, msg []byte, persistent bool) (err error)
Publish 发布消息到监控队列 msg: 要发布的消息内容 persistent: 是否持久化消息 返回值: 发布操作的错误信息
func (*MonitorConfig) UseKey ¶
func (c *MonitorConfig) UseKey(key string) *MonitorConfig
UseKey 根据给定的键设置当前监控配置 参数:
key: 要使用的监控键值
返回值:
*MonitorConfig: 返回配置对象自身的指针,用于链式调用
func (*MonitorConfig) UseQueue ¶
func (c *MonitorConfig) UseQueue(queueName string) *MonitorConfig
UseQueue 选择并设置当前要使用的队列信息 参数:
queueName - 要使用的队列名称
返回值:
*MonitorConfig - 返回配置对象自身,支持链式调用
type Settings ¶
type Settings struct {
// contains filtered or unexported fields
}
Settings 私有结构体,定义RabbitMQ的配置设置
func NewDefaultSettings ¶
NewDefaultSettings 创建并返回一个带有默认配置的 Settings 实例。 该函数会根据传入的文件名加载环境变量配置,若未提供有效文件名则默认使用 ".env" 文件。 配置中与 RabbitMQ 相关的部分将被解析,并用于初始化 Settings 结构体中的各个字段。
参数:
- fileName: 指向配置文件路径的指针,可以为 nil。如果为 nil 或空字符串,则使用默认文件名 ".env"
返回值:
- *Settings: 包含从环境变量或默认值中读取的 RabbitMQ 配置信息的结构体指针