Documentation
¶
Index ¶
- Constants
- func DefaultPublishNotifier(p *SafeProducer, message amqp.Publishing, confirm amqp.Confirmation)
- func EnableDebug(isEnable bool)
- type BindOptions
- type Channel
- func (ch *Channel) Close() error
- func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, ...) (<-chan amqp.Delivery, error)
- func (ch *Channel) DoMethod(channel *amqp.Channel, methodName string) []reflect.Value
- func (ch *Channel) IsClosed() bool
- func (ch *Channel) RegisterMethod(methodName string, params ...interface{})
- func (d Channel) SetDelay(seconds int)
- type Config
- type Connection
- type ConsumeOptions
- type Consumer
- func (e Consumer) Close() error
- func (c *Consumer) Consume(handler MessageHandler) error
- func (e Consumer) DeclareAndBind() error
- func (c *Consumer) Deliveries() <-chan amqp.Delivery
- func (c *Consumer) DoMethod(methodName string) []reflect.Value
- func (e Consumer) ExchangeDeclare() error
- func (c *Consumer) Get(handler MessageHandler) error
- func (c *Consumer) Handler() MessageHandler
- func (c *Consumer) Qos(prefetchCount int) error
- func (e Consumer) QueueBind() error
- func (e Consumer) QueueDeclare() error
- func (c *Consumer) RegisterMethod(methodName string, params ...interface{})
- func (e Consumer) SetDelay(seconds int)
- type DeliveryCacher
- type DeliveryMapCache
- type Exchange
- type MessageHandler
- type Producer
- func (e Producer) Close() error
- func (e Producer) DeclareAndBind() error
- func (e Producer) ExchangeDeclare() error
- func (p *Producer) NotifyReturn(notifier ReturnNotifier)
- func (p *Producer) Publish(publishing amqp.Publishing) error
- func (e Producer) QueueBind() error
- func (e Producer) QueueDeclare() error
- func (p *Producer) RetryTimes() int
- func (p *Producer) ReturnNotifier() ReturnNotifier
- func (e Producer) SetDelay(seconds int)
- func (p *Producer) SetRetryTimes(times int)
- type PublishNotifier
- type PublishOptions
- type Queue
- type RabbitMQ
- func (r *RabbitMQ) Close() error
- func (r *RabbitMQ) Connect() error
- func (r *RabbitMQ) Connection() *Connection
- func (r *RabbitMQ) Dial() error
- func (r *RabbitMQ) NewConsumer(session Session) (*Consumer, error)
- func (r *RabbitMQ) NewProducer(session Session) (*Producer, error)
- func (r *RabbitMQ) NewSafeProducer(session Session, deliveryCache DeliveryCacher, notifier ...PublishNotifier) (*SafeProducer, error)
- func (r *RabbitMQ) SetDelay(seconds int)
- type ReturnNotifier
- type SafeProducer
- func (e SafeProducer) Close() error
- func (e SafeProducer) DeclareAndBind() error
- func (e SafeProducer) ExchangeDeclare() error
- func (p *SafeProducer) IsConfirmed() bool
- func (p *SafeProducer) Publish(publishing amqp.Publishing) error
- func (p *SafeProducer) PublishNotifier() PublishNotifier
- func (e SafeProducer) QueueBind() error
- func (e SafeProducer) QueueDeclare() error
- func (e SafeProducer) SetDelay(seconds int)
- type Session
Constants ¶
const ( // DefaultRetryTimes represents default retry times of safe publish DefaultRetryTimes = 3 // DefaultRepublishRoutine represents default number of quick republish goroutine DefaultRepublishRoutine = 10 )
const (
// DefaultDelaySeconds represents default delay retry seconds of reconnect or recreate
DefaultDelaySeconds = 3
)
const (
// DefaultMapGC represents default number of keys to start map garbage collection
DefaultMapGC = 200
)
Variables ¶
This section is empty.
Functions ¶
func DefaultPublishNotifier ¶
func DefaultPublishNotifier(p *SafeProducer, message amqp.Publishing, confirm amqp.Confirmation)
DefaultPublishNotifier represents default AMQP message confirmation handler. If received nak confirmation, the message will be sent again.
Types ¶
type BindOptions ¶
BindOptions represents queue bind options.
type Channel ¶
Channel is amqp.Channel wrapper.
func (*Channel) Consume ¶
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
Consume warps amqp.Channel.Consume, the returned Delivery will end only when channel closed by developer.
func (*Channel) DoMethod ¶
DoMethod executes the registered channel method and params by methodName.
func (*Channel) RegisterMethod ¶
RegisterMethod registers the channel method and params, when the channel is recreated, the method can be executed again.
type Connection ¶
type Connection struct { *amqp.Connection // contains filtered or unexported fields }
Connection is amqp.Connection wrapper.
func Dial ¶
func Dial(url string) (*Connection, error)
Dial wraps amqp.Dial, which can dial and get a auto reconnect connection.
func (*Connection) Channel ¶
func (c *Connection) Channel() (*Channel, error)
Channel wraps amqp.Connection.Channel, which can get a auto recreate channel.
type ConsumeOptions ¶
type ConsumeOptions struct { Tag string AutoAck bool Exclusive bool NoLocal bool NoWait bool Args amqp.Table }
ConsumeOptions represents consumer consume options.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a AMQP consumer.
func (*Consumer) Consume ¶
func (c *Consumer) Consume(handler MessageHandler) error
Consume starts consuming AMQP delivery message by handler until the consumer channel closed by developer.
func (Consumer) DeclareAndBind ¶
func (e Consumer) DeclareAndBind() error
DeclareAndBind declares an exchange and a queue, then binds the exchange to the queue.
func (*Consumer) Deliveries ¶
Deliveries returns consumer AMQP message delivery chan.
func (*Consumer) DoMethod ¶
DoMethod executes the consumer channel registered channel method and params by methodName.
func (Consumer) ExchangeDeclare ¶
func (e Consumer) ExchangeDeclare() error
ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
func (*Consumer) Get ¶
func (c *Consumer) Get(handler MessageHandler) error
Get consumes a single AMQP delivery message from the head of a queue by handler.
func (*Consumer) Handler ¶
func (c *Consumer) Handler() MessageHandler
Handler returns consumer message handler.
func (*Consumer) Qos ¶
Qos controls how many messages the server will try to keep on the network for consumers before receiving delivery acks.
func (Consumer) QueueBind ¶
func (e Consumer) QueueBind() error
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
func (Consumer) QueueDeclare ¶
func (e Consumer) QueueDeclare() error
QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
func (*Consumer) RegisterMethod ¶
RegisterMethod registers the channel method and params in consumer channel, when the channel is recreated, the method can be executed again.
type DeliveryCacher ¶
type DeliveryCacher interface { Store(deliveryTag uint64, publishing amqp.Publishing) Load(deliveryTag uint64) amqp.Publishing Republish() <-chan amqp.Publishing }
DeliveryCacher represents an object that can store and load AMQP publishing by deliveryTag, also when the channel is recreated, it can return publishings that not get ack confirmation and need to be republish in the delivery cache by AMQP publishing chan.
type DeliveryMapCache ¶
DeliveryMapCache implements the DeliveryCacher interface, and it can store, load and republish AMQP publishings by the thread-safe deliveryMap.
func NewDeliveryMapCache ¶
func NewDeliveryMapCache() *DeliveryMapCache
NewDeliveryMapCache creates a new *DeliveryMapCache.
func (*DeliveryMapCache) Load ¶
func (c *DeliveryMapCache) Load(deliverTag uint64) amqp.Publishing
Load loads the AMQP publishing by deliveryTag and deletes the deliveryTag key in deliveryMap.
func (*DeliveryMapCache) Republish ¶
func (c *DeliveryMapCache) Republish() <-chan amqp.Publishing
Republish returns the AMQP publishing chan, then sends the publishing that not get ack confirmation and need to be republish in the deliveryMap.
func (*DeliveryMapCache) Store ¶
func (c *DeliveryMapCache) Store(deliveryTag uint64, publishing amqp.Publishing)
Store stores the AMQP publishing in deliveryMap by deliveryTag. If the number of deliveryTag keys reaches DefaultMapGC, deliveryMap will gc and recreate.
type Exchange ¶
type Exchange struct { Name string Type string Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table }
Exchange represents exchange declare config.
type MessageHandler ¶
MessageHandler represents a AMQP delivery message handle function.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer represents a AMQP producer.
func (Producer) DeclareAndBind ¶
func (e Producer) DeclareAndBind() error
DeclareAndBind declares an exchange and a queue, then binds the exchange to the queue.
func (Producer) ExchangeDeclare ¶
func (e Producer) ExchangeDeclare() error
ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
func (*Producer) NotifyReturn ¶
func (p *Producer) NotifyReturn(notifier ReturnNotifier)
NotifyReturn registers a listener for AMQP return message by notifier. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
func (*Producer) Publish ¶
func (p *Producer) Publish(publishing amqp.Publishing) error
Publish sends a amqp.Publishing from the producer to an exchange on the server. If an error occurred, producer will retry to publish by retryTimes and delaySeconds, when retryTimes < 0, it will retry forever until publish successfully.
func (Producer) QueueBind ¶
func (e Producer) QueueBind() error
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
func (Producer) QueueDeclare ¶
func (e Producer) QueueDeclare() error
QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
func (*Producer) RetryTimes ¶
RetryTimes returns producer publish retry times.
func (*Producer) ReturnNotifier ¶
func (p *Producer) ReturnNotifier() ReturnNotifier
ReturnNotifier returns producer AMQP return message handler.
func (Producer) SetDelay ¶
func (e Producer) SetDelay(seconds int)
SetDelay sets delay retry seconds for the AMQP channel.
func (*Producer) SetRetryTimes ¶
SetRetryTimes sets publish retry times for the producer.
type PublishNotifier ¶
type PublishNotifier func(*SafeProducer, amqp.Publishing, amqp.Confirmation)
PublishNotifier represents a AMQP message confirmation handle function.
type PublishOptions ¶
PublishOptions represents producer publish options.
type Queue ¶
type Queue struct { Name string Durable bool AutoDelete bool Exclusive bool NoWait bool Args amqp.Table }
Queue represents queue declare config.
type RabbitMQ ¶
type RabbitMQ struct {
// contains filtered or unexported fields
}
RabbitMQ represents a AMQP connection with its URI config.
func (*RabbitMQ) Connection ¶
func (r *RabbitMQ) Connection() *Connection
Connection returns the AMQP connection.
func (*RabbitMQ) NewConsumer ¶
NewConsumer returns a new *Consumer which contains a new AMQP channel by session.
func (*RabbitMQ) NewProducer ¶
NewProducer returns a new *Producer which contains a new AMQP channel by session.
func (*RabbitMQ) NewSafeProducer ¶
func (r *RabbitMQ) NewSafeProducer(session Session, deliveryCache DeliveryCacher, notifier ...PublishNotifier) (*SafeProducer, error)
NewSafeProducer returns a new *SafeProducer which contains a new AMQP channel by session. The *SafeProducer will set the retryTimes with DefaultRetryTimes, and run a publish listener with notifier for reliable publishing, if notifier is nil, it will use DefaultPublishNotifier.
type ReturnNotifier ¶
ReturnNotifier represents a AMQP return message handle function.
type SafeProducer ¶
type SafeProducer struct { *Producer // contains filtered or unexported fields }
SafeProducer represents a AMQP safe producer.
func (SafeProducer) DeclareAndBind ¶
func (e SafeProducer) DeclareAndBind() error
DeclareAndBind declares an exchange and a queue, then binds the exchange to the queue.
func (SafeProducer) ExchangeDeclare ¶
func (e SafeProducer) ExchangeDeclare() error
ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
func (*SafeProducer) IsConfirmed ¶
func (p *SafeProducer) IsConfirmed() bool
IsConfirmed reports whether the channel is in confirm mode.
func (*SafeProducer) Publish ¶
func (p *SafeProducer) Publish(publishing amqp.Publishing) error
Publish sends a amqp.Publishing from the safe producer to an exchange on the server. If an error occurred, safe producer will retry to publish by retryTimes and delaySeconds, when retryTimes < 0, it will retry forever until publish successfully. Safe producer will record the publish message sent to the server to deliveryCache and each time a message is sent, the deliveryTag increases.
func (*SafeProducer) PublishNotifier ¶
func (p *SafeProducer) PublishNotifier() PublishNotifier
PublishNotifier returns producer AMQP message confirmation handler.
func (SafeProducer) QueueBind ¶
func (e SafeProducer) QueueBind() error
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
func (SafeProducer) QueueDeclare ¶
func (e SafeProducer) QueueDeclare() error
QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
type Session ¶
type Session struct { Exchange Exchange Queue Queue BindOptions BindOptions ConsumeOptions ConsumeOptions PublishOptions PublishOptions }
Session represents executer configs and options.