amqp

package
v1.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2020 License: MIT Imports: 6 Imported by: 0

README

Rabbitmq 封装

封装 amqp 协议的基本使用方法,让amqp用起来更简单

使用

配置项
// Config 配置项
// ExchangeName 和 Exchange 二选一,用于指定发布和订阅时使用的交换机
type Config struct {
	Addr string // rabbitmq 地址
	ExchangeName string    // 使用该值创建一个直连的交换机
	Exchange     *Exchange // 自定义默认交换机
	Consumer *Consumer // 在定于队列时,作为消费者使用的参数
}
交换机
const (
    ExchangeDirect  = "direct"
    ExchangeFanout  = "fanout"
    ExchangeTopic   = "topic"
    ExchangeHeaders = "headers"
)

type Exchange struct {
	Name       string // 名称
	Kind       string // 交换机类型,4 种类型之一
	Durable    bool   // 是否持久化
	AutoDelete bool   // 是否自动删除
	Internal   bool   // 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
	NoWait     bool   // 是否等待通知定义交换机结果
	Args       amqp.Table
}
队列
type Queue struct {
	Name       string     // 必须包含前缀标识使用类型 msg. | rpc. | reply. | notify.
	Key        string     // 和交换机绑定时用的Key, 如果不设置,默认和 Name 一样
	Durable    bool       // 消息代理重启后,队列依旧存在
	AutoDelete bool       // 当最后一个消费者退订后即被删除
	Exclusive  bool       // 只被一个连接(connection)使用,而且当连接关闭后队列即被删除
	NoWait     bool       // 不需要服务器返回
	ReplyTo    *Queue     // rpc 的消息回应道哪个队列
	Args       amqp.Table // 一些消息代理用他来完成类似与TTL的某些额外功能
}
消息结构
// Message 消息体
type Message struct {
	ContentType string // 消息类型
	Queue       *Queue // 来自于哪个队列
	Data        []byte // 消息数据
}
示例
import (
	"github.com/go-eyas/toolkit/amqp"
)

func main() {
	mq := amqp.New(*amqp.Config{
    	Addr: "amqp://guest:guest@127.0.0.1:5672",
    	ExchangeName: "toolkit.exchange.test",
    })
    queue := &amqp.Queue{Name: "toolkit.queue.test"}
    err := mq.Pub(queue, &amqp.Message{Data: []byte("{\"hello\":\"world\"}")})
    
    msgch, err := mq.Sub(queue)
    for msg := range msgch {
    	fmt.Printf("%s", string(msg.Data))
    }
}

godoc

API 文档

Documentation

Index

Constants

View Source
const (
	ExchangeDirect  = "direct"  // 直连交换机
	ExchangeFanout  = "fanout"  // 扇形交换机
	ExchangeTopic   = "topic"   // 主题交换机
	ExchangeHeaders = "headers" // 头交换机
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Addr         string    // rabbitmq 地址
	ExchangeName string    // 使用该值创建一个直连的交换机
	Exchange     *Exchange // 自定义默认交换机
	Consumer     *Consumer // 在定于队列时,作为消费者使用的参数
}

Config 配置项 ExchangeName 和 Exchange 二选一,用于指定发布和订阅时使用的交换机

type Consumer

type Consumer struct {
	Name      string
	AutoAck   bool // 自动确认
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
}

Consumer 定义消费者选项

type Exchange

type Exchange struct {
	Name       string // 名称
	Kind       string // 交换机类型,4 种类型之一
	Durable    bool   // 是否持久化
	AutoDelete bool   // 是否自动删除
	Internal   bool   // 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
	NoWait     bool   // 是否等待通知定义交换机结果
	Args       amqp.Table
	IsDeclare  bool // 是否已定义
}

Exchange 定义交换机

type MQ

type MQ struct {
	Addr     string
	Client   *amqp.Connection
	Channel  *amqp.Channel
	Exchange *Exchange
	Consumer *Consumer
	// contains filtered or unexported fields
}

func New

func New(conf *Config) (*MQ, error)

Init 初始化

func (*MQ) ExchangeDeclare

func (mq *MQ) ExchangeDeclare(e *Exchange) error

func (*MQ) Init

func (mq *MQ) Init() error

Init 初始化 1. 初始化交换机

func (*MQ) Pub

func (mq *MQ) Pub(q *Queue, msg *Message, exchanges ...*Exchange) error

Pub 给队列发送消息, q 队列, msg 消息, exchanges 交换机,可以用多个交换机多次发送,默认使用初始化时指定的交换机

func (*MQ) QueueBind

func (mq *MQ) QueueBind(q *Queue, e *Exchange) error

func (*MQ) QueueDeclare

func (mq *MQ) QueueDeclare(q *Queue) error

func (*MQ) Sub

func (mq *MQ) Sub(q *Queue) (<-chan *Message, error)

Sub 定于队列消息 q 队列 return 接收消息的通道 , 错误对象

type MQApp

type MQApp struct {
	Client *MQ
	// contains filtered or unexported fields
}

func NewApp

func NewApp(config *Config) (*MQApp, error)

func (*MQApp) On

func (mq *MQApp) On(queue *Queue, handler ...MQHandler)

监听队列触发函数

func (*MQApp) Pub

func (mq *MQApp) Pub(q *Queue, msg *Message) error

func (*MQApp) Route

func (mq *MQApp) Route(routes map[*Queue]MQHandler)

type MQContext

type MQContext struct {
	Request *Message
	Client  *MQ
	App     *MQApp
}

func (*MQContext) BindJSON

func (c *MQContext) BindJSON(v interface{}) error

func (*MQContext) Pub

func (c *MQContext) Pub(q *Queue, msg *Message) error

type MQHandler

type MQHandler func(*MQContext)

type Message

type Message struct {
	ContentType string // 消息类型
	Queue       *Queue // 来自于哪个队列
	Data        []byte // 消息数据
	// contains filtered or unexported fields
}

Message 消息体

func (*Message) JSON

func (m *Message) JSON(v interface{}) error

JSON 以 json 解析消息体的数据为指定结构体

func (*Message) ReplyTo

func (m *Message) ReplyTo(msg *Message) error

ReplyTo 给回复的队列发送消息

type Queue

type Queue struct {
	Name       string     // 必须包含前缀标识使用类型 msg. | rpc. | reply. | notify.
	Key        string     // 和交换机绑定时用的Key
	Durable    bool       // 消息代理重启后,队列依旧存在
	AutoDelete bool       // 当最后一个消费者退订后即被删除
	Exclusive  bool       // 只被一个连接(connection)使用,而且当连接关闭后队列即被删除
	NoWait     bool       // 不需要服务器返回
	ReplyTo    *Queue     // rpc 的消息回应道哪个队列
	Args       amqp.Table // 一些消息代理用他来完成类似与TTL的某些额外功能
	IsDeclare  bool       // 是否已定义
	// contains filtered or unexported fields
}

Queue 队列

func (*Queue) GetKey

func (q *Queue) GetKey() string

func (*Queue) ReplyQueue

func (q *Queue) ReplyQueue() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL