Documentation
¶
Index ¶
- Variables
- type Binding
- type Client
- func (c *Client) Channel() (*amqp.Channel, error)
- func (c *Client) Close()
- func (c *Client) CloseConsumer(name string) error
- func (c *Client) CloseProducer(name string) error
- func (c *Client) Consumer(name string) (*Consumer, error)
- func (c *Client) Open() (mq *Client, err error)
- func (c *Client) Producer(name string) (*Producer, error)
- func (c *Client) State() uint8
- type ConsumeOption
- type Consumer
- func (c *Consumer) Close()
- func (c *Consumer) CloseChan()
- func (c *Consumer) Name() string
- func (c *Consumer) Open() error
- func (c *Consumer) SetExchangeBinds(eb []*ExchangeBinds) *Consumer
- func (c *Consumer) SetMsgCallback(cb chan<- Delivery) *Consumer
- func (c *Consumer) SetQos(prefetch int) *Consumer
- func (c *Consumer) State() uint8
- type Delivery
- type Exchange
- type ExchangeBinds
- type Producer
- func (p *Producer) Close()
- func (p *Producer) Confirm(enable bool) *Producer
- func (p *Producer) ForDirect(exchange, route, queue, data string) error
- func (p *Producer) ForFanout(exchange, data string) error
- func (p *Producer) ForQueueArgs(exchange, route, kind string, bindings []*Binding, data []byte) error
- func (p *Producer) ForTopic(exchange, route, queue, data string) error
- func (p *Producer) IsOpen() bool
- func (p Producer) Name() string
- func (p *Producer) Open() error
- func (p *Producer) Publish(exchange, route string, msg *PublishMsg) error
- func (p *Producer) SetExchangeBinds(eb []*ExchangeBinds) *Producer
- func (p *Producer) State() uint8
- type PublishMsg
- type Queue
Constants ¶
This section is empty.
Variables ¶
View Source
var ( StateClosed = uint8(0) StateOpened = uint8(1) StateReopening = uint8(2) )
*
- 基于RabbitMQ的AMQ提供器,其初始化参数格式如下:
- <pre>
- {
- "brokerURL" : "amqp://guest:guest@localhost:5672/", // RabbitMQ server的连接地址
- "username" : "admin", // 登录RabbitMQ的账号
- "password" : "admin", // 登录RabbitMQ的密码
- }
- </pre> *
- @author Chirs Chou
Functions ¶
This section is empty.
Types ¶
type Binding ¶
type Binding struct { RouteKey string Queues []*Queue NoWait bool // default is false Args amqp.Table // default is nil }
Biding routeKey ==> queues
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func Engine ¶
func Engine(conf configuration.Configuration, systemId string) *Client
func (*Client) CloseConsumer ¶
func (*Client) CloseProducer ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
基于RabbitMQ消息中间件的客户端实现。
func (*Consumer) SetExchangeBinds ¶
func (c *Consumer) SetExchangeBinds(eb []*ExchangeBinds) *Consumer
func (*Consumer) SetMsgCallback ¶
type Exchange ¶
type Exchange struct { Name string Kind string Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table // default is nil }
Exchange 基于amqp的Exchange配置
type ExchangeBinds ¶
ExchangeBinds exchange ==> routeKey ==> queues
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
基于RabbitMQ的生产者封装。
func (*Producer) Confirm ¶
Confirm 是否开启生产者confirm功能, 默认为false, 该选项在Open()前设置. 说明: 目前仅实现串行化的confirm, 每次的等待confirm额外需要约50ms,建议上层并发调用Publish
func (*Producer) ForQueueArgs ¶
func (*Producer) Publish ¶
func (p *Producer) Publish(exchange, route string, msg *PublishMsg) error
在同步Publish Confirm模式下, 每次Publish将额外有约50ms的等待时间.如果采用这种模式,建议上层并发publish
func (*Producer) SetExchangeBinds ¶
func (p *Producer) SetExchangeBinds(eb []*ExchangeBinds) *Producer
type PublishMsg ¶
type PublishMsg struct { ContentType string // MIME content type ContentEncoding string // MIME content type DeliveryMode uint8 // Transient or Persistent Priority uint8 // 0 to 9 Timestamp time.Time Body []byte }
生产者生产的数据格式
func NewPublishMsg ¶
func NewPublishMsg(body []byte) *PublishMsg
Click to show internal directories.
Click to hide internal directories.