Documentation
¶
Overview ¶
Package rabbit is a simple streadway/amqp wrapper library that comes with:
* Auto-reconnect support
* Context support
* Helpers for consuming once or forever and publishing
The library is used internally at https://batch.sh where it powers most of the platform's backend services.
For an example, refer to the README.md.
Index ¶
- Variables
- func ValidateOptions(opts *Options) error
- type Binding
- type ConsumeError
- type IRabbit
- type Logger
- type Mode
- type NoOpLogger
- func (l *NoOpLogger) Debug(args ...interface{})
- func (l *NoOpLogger) Debugf(format string, args ...interface{})
- func (l *NoOpLogger) Error(args ...interface{})
- func (l *NoOpLogger) Errorf(format string, args ...interface{})
- func (l *NoOpLogger) Info(args ...interface{})
- func (l *NoOpLogger) Infof(format string, args ...interface{})
- func (l *NoOpLogger) Warn(args ...interface{})
- func (l *NoOpLogger) Warnf(format string, args ...interface{})
- type Options
- type Rabbit
- func (r *Rabbit) Close() error
- func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, ...)
- func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error
- func (r *Rabbit) Publish(ctx context.Context, routingKey string, body []byte) error
- func (r *Rabbit) Stop() error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrShutdown will be returned if the underlying connection has already // been closed (ie. if you Close()'d and then tried to Publish()) ErrShutdown = errors.New("connection has been shutdown") // DefaultConsumerTag is used for identifying consumer DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8] // DefaultAppID is used for identifying the producer DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8] )
Functions ¶
func ValidateOptions ¶
ValidateOptions validates various combinations of options.
Types ¶
type Binding ¶ added in v0.1.13
type Binding struct {
// Required
ExchangeName string
// Bind a queue to one or more routing keys
BindingKeys []string
// Whether to declare/create exchange on connect
ExchangeDeclare bool
// Required if declaring queue (valid: direct, fanout, topic, headers)
ExchangeType string
// Whether exchange should survive/persist server restarts
ExchangeDurable bool
// Whether to delete exchange when its no longer used; used only if ExchangeDeclare set to true
ExchangeAutoDelete bool
}
Binding represents the information needed to bind a queue to an Exchange.
type ConsumeError ¶
ConsumeError will be passed down the error channel if/when `f()` func runs into an error during `Consume()`.
type IRabbit ¶
type IRabbit interface {
Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)
ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error
Publish(ctx context.Context, routingKey string, payload []byte) error
Stop() error
Close() error
}
IRabbit is the interface that the `rabbit` library implements. It's here as convenience.
type Logger ¶ added in v0.1.15
type Logger interface {
// Debug sends out a debug message with the given arguments to the logger.
Debug(args ...interface{})
// Debugf formats a debug message using the given arguments and sends it to the logger.
Debugf(format string, args ...interface{})
// Info sends out an informational message with the given arguments to the logger.
Info(args ...interface{})
// Infof formats an informational message using the given arguments and sends it to the logger.
Infof(format string, args ...interface{})
// Warn sends out a warning message with the given arguments to the logger.
Warn(args ...interface{})
// Warnf formats a warning message using the given arguments and sends it to the logger.
Warnf(format string, args ...interface{})
// Error sends out an error message with the given arguments to the logger.
Error(args ...interface{})
// Errorf formats an error message using the given arguments and sends it to the logger.
Errorf(format string, args ...interface{})
}
Logger is the common interface for user-provided loggers.
type Mode ¶ added in v0.1.3
type Mode int
Mode is the type used to represent whether the RabbitMQ cliens is acting as a consumer, a producer, or both.
const ( // DefaultRetryReconnectSec determines how long to wait before attempting // to reconnect to a rabbit server DefaultRetryReconnectSec = 60 // Both means that the client is acting as both a consumer and a producer. Both Mode = 0 // Consumer means that the client is acting as a consumer. Consumer Mode = 1 // Producer means that the client is acting as a producer. Producer Mode = 2 )
type NoOpLogger ¶ added in v0.1.15
type NoOpLogger struct {
}
NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.
func (*NoOpLogger) Debug ¶ added in v0.1.15
func (l *NoOpLogger) Debug(args ...interface{})
Debug is no-op implementation of Logger's Debug.
func (*NoOpLogger) Debugf ¶ added in v0.1.15
func (l *NoOpLogger) Debugf(format string, args ...interface{})
Debugf is no-op implementation of Logger's Debugf.
func (*NoOpLogger) Error ¶ added in v0.1.15
func (l *NoOpLogger) Error(args ...interface{})
Error is no-op implementation of Logger's Error.
func (*NoOpLogger) Errorf ¶ added in v0.1.15
func (l *NoOpLogger) Errorf(format string, args ...interface{})
Errorf is no-op implementation of Logger's Errorf.
func (*NoOpLogger) Info ¶ added in v0.1.15
func (l *NoOpLogger) Info(args ...interface{})
Info is no-op implementation of Logger's Info.
func (*NoOpLogger) Infof ¶ added in v0.1.15
func (l *NoOpLogger) Infof(format string, args ...interface{})
Infof is no-op implementation of Logger's Infof.
func (*NoOpLogger) Warn ¶ added in v0.1.15
func (l *NoOpLogger) Warn(args ...interface{})
Warn is no-op implementation of Logger's Warn.
func (*NoOpLogger) Warnf ¶ added in v0.1.15
func (l *NoOpLogger) Warnf(format string, args ...interface{})
Warnf is no-op implementation of Logger's Warnf.
type Options ¶
type Options struct {
// Required; format "amqp://user:pass@host:port"
URLs []string
// In what mode does the library operate (Both, Consumer, Producer)
Mode Mode
// If left empty, server will auto generate queue name
QueueName string
// Bindings is the set of information need to bind a queue to one or
// more exchanges, specifying one or more binding (routing) keys.
Bindings []Binding
// https://godoc.org/github.com/streadway/amqp#Channel.Qos
// Leave unset if no QoS preferences
QosPrefetchCount int
QosPrefetchSize int
// How long to wait before we retry connecting to a server (after disconnect)
RetryReconnectSec int
// Whether queue should survive/persist server restarts (and there are no remaining bindings)
QueueDurable bool
// Whether consumer should be the sole consumer of the queue; used only if
// QueueDeclare set to true
QueueExclusive bool
// Whether to delete queue on consumer disconnect; used only if QueueDeclare set to true
QueueAutoDelete bool
// Whether to declare/create queue on connect; used only if QueueDeclare set to true
QueueDeclare bool
// Whether to automatically acknowledge consumed message(s)
AutoAck bool
// Used for identifying consumer
ConsumerTag string
// Used as a property to identify producer
AppID string
// Use TLS
UseTLS bool
// Skip cert verification (only applies if UseTLS is true)
SkipVerifyTLS bool
// Log is the (optional) logger to use for writing out log messages.
Log Logger
}
Options determines how the `rabbit` library will behave and should be passed in to rabbit via `New()`. Many of the options are optional (and will fall back to sane defaults).
type Rabbit ¶
type Rabbit struct {
Conn *amqp.Connection
ConsumerDeliveryChannel <-chan amqp.Delivery
ConsumerRWMutex *sync.RWMutex
NotifyCloseChan chan *amqp.Error
ProducerServerChannel *amqp.Channel
ProducerRWMutex *sync.RWMutex
ConsumeLooper director.Looper
Options *Options
// contains filtered or unexported fields
}
Rabbit struct that is instantiated via `New()`. You should not instantiate this struct by hand (unless you have a really good reason to do so).
func (*Rabbit) Close ¶ added in v0.1.5
Close stops any active Consume and closes the amqp connection (and channels using the conn)
You should re-instantiate the rabbit lib once this is called.
func (*Rabbit) Consume ¶
func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)
Consume consumes messages from the configured queue (`Options.QueueName`) and executes `f` for every received message.
`Consume()` will block until it is stopped either via the passed in `ctx` OR by calling `Stop()`
It is also possible to see the errors that `f()` runs into by passing in an error channel (`chan *ConsumeError`).
Both `ctx` and `errChan` can be `nil`.
If the server goes away, `Consume` will automatically attempt to reconnect. Subsequent reconnect attempts will sleep/wait for `DefaultRetryReconnectSec` between attempts.
func (*Rabbit) ConsumeOnce ¶
ConsumeOnce will consume exactly one message from the configured queue, execute `runFunc()` on the message and return.
Same as with `Consume()`, you can pass in a context to cancel `ConsumeOnce()` or run `Stop()`.