Documentation
¶
Overview ¶
Package rabbit is a simple streadway/amqp wrapper library that comes with:
* Auto-reconnect support
* Context support
* Helpers for consuming once or forever and publishing
The library is used internally at https://batch.sh where it powers most of the platform's backend services.
For an example, refer to the README.md.
Index ¶
- Variables
- func ValidateOptions(opts *Options) error
- type ConsumeError
- type IRabbit
- type Mode
- type Options
- type Rabbit
- func (r *Rabbit) Close() error
- func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, ...)
- func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error
- func (r *Rabbit) Publish(ctx context.Context, routingKey string, body []byte) error
- func (r *Rabbit) Stop() error
Constants ¶
This section is empty.
Variables ¶
var ( // Used for identifying consumer DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8] // Used for identifying producer DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8] )
Functions ¶
func ValidateOptions ¶
ValidateOptions validates various combinations of options.
Types ¶
type ConsumeError ¶
ConsumeError will be passed down the error channel if/when `f()` func runs into an error during `Consume()`.
type IRabbit ¶
type IRabbit interface {
Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)
ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error
Publish(ctx context.Context, routingKey string, payload []byte) error
Stop() error
Close() error
}
IRabbit is the interface that the `rabbit` library implements. It's here as convenience.
type Options ¶
type Options struct {
// Required; format "amqp://user:pass@host:port"
URL string
// In what mode does the library operate (Both, Consumer, Producer)
Mode Mode
// If left empty, server will auto generate queue name
QueueName string
// Required
ExchangeName string
// Used as either routing (publish) or binding key (consume)
RoutingKey string
// Whether to declare/create exchange on connect
ExchangeDeclare bool
// Required if declaring queue (valid: direct, fanout, topic, headers)
ExchangeType string
// Whether exchange should survive/persist server restarts
ExchangeDurable bool
// Whether to delete exchange when its no longer used; used only if ExchangeDeclare set to true
ExchangeAutoDelete bool
// https://godoc.org/github.com/streadway/amqp#Channel.Qos
// Leave unset if no QoS preferences
QosPrefetchCount int
QosPrefetchSize int
// How long to wait before we retry connecting to a server (after disconnect)
RetryReconnectSec int
// Whether queue should survive/persist server restarts (and there are no remaining bindings)
QueueDurable bool
// Whether consumer should be the sole consumer of the queue; used only if
// QueueDeclare set to true
QueueExclusive bool
// Whether to delete queue on consumer disconnect; used only if QueueDeclare set to true
QueueAutoDelete bool
// Whether to declare/create queue on connect; used only if QueueDeclare set to true
QueueDeclare bool
// Whether to automatically acknowledge consumed message(s)
AutoAck bool
// Used for identifying consumer
ConsumerTag string
// Used as a property to identify producer
AppID string
}
Options determines how the `rabbit` library will behave and should be passed in to rabbit via `New()`. Many of the options are optional (and will fall back to sane defaults).
type Rabbit ¶
type Rabbit struct {
Conn *amqp.Connection
ConsumerDeliveryChannel <-chan amqp.Delivery
ConsumerRWMutex *sync.RWMutex
NotifyCloseChan chan *amqp.Error
ProducerServerChannel *amqp.Channel
ProducerRWMutex *sync.RWMutex
ConsumeLooper director.Looper
Options *Options
// contains filtered or unexported fields
}
Rabbit struct that is instantiated via `New()`. You should not instantiate this struct by hand (unless you have a really good reason to do so).
func (*Rabbit) Close ¶ added in v0.1.5
Close stops any active Consume and closes the amqp connection (and channels using the conn)
You should re-instantiate the rabbit lib once this is called.
func (*Rabbit) Consume ¶
func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)
Consume consumes messages from the configured queue (`Options.QueueName`) and executes `f` for every received message.
`Consume()` will block until it is stopped either via the passed in `ctx` OR by calling `Stop()`
It is also possible to see the errors that `f()` runs into by passing in an error channel (`chan *ConsumeError`).
Both `ctx` and `errChan` can be `nil`.
If the server goes away, `Consume` will automatically attempt to reconnect. Subsequent reconnect attempts will sleep/wait for `DefaultRetryReconnectSec` between attempts.
func (*Rabbit) ConsumeOnce ¶
ConsumeOnce will consume exactly one message from the configured queue, execute `runFunc()` on the message and return.
Same as with `Consume()`, you can pass in a context to cancel `ConsumeOnce()` or run `Stop()`.