Roger, Rabbit
amqp with automagic redials
Basic Overview
Roger, Rabbit is a wrapper and drop-in replacement for
streadway/amqp that offers the following
enhancements:
- Automatic Connection Recoveries: Both Connection and Channel objects will
automatically recover from unexpected broker disconnections, and restore their states
and settings so you don't miss a beat.
- Custom Middleware: For all methods and events.
- Testing utilities: Force reconnects, and Testify helpers for common setup
requirements.
- Convenience Types:
- Confirmation Producers: handles broker confirmation on publish before
returning.
- Consumer Framework: Register http-like handlers to consume from queues, with
nack-on-error, max concurrent worker, and other boilerplate.
Getting Started
This package assumes you are familiar with RabbitMQ and
the streadway/amqp package. It does not offer a
tutorial of basic functionality, instead highlighting what sets it apart from the
officially sanctioned driver.
The official tutorials use
streadway/amqp in their Go walkthroughs. Users
are encouraged to start there. You can still follow along using this package by
changing "github.com/streadway/amqp"
import statements to
"github.com/peake100/rogerRabbit-go/pkg/amqp"
.
If you find any examples that do NOT work after such a replacement, please open a PR!
For quickstart and guides:
read the docs.
For full API documentation:
pkg.go.dev.
For library development guide,
read the docs.
Demo
Survive a Disconnection Event Using the amqp Package:
// Get a new connection to our test broker.
//
// DialCtx is a new function that allows the Dial function to keep attempting
// re-dials to the broker until the passed context expires.
connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
if err != nil {
panic(err)
}
// Get a new channel from our robust connection.
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// We can use the Test method to return a testing harness with some additional
// methods. ForceReconnect force-closes the underlying streadway Channel, causing
// the robust Channel to reconnect.
//
// We'll use a dummy *testing.T object here. These methods are designed for tests
// only and should not be used in production code.
channel.Test(new(testing.T)).ForceReconnect(context.Background())
// We can see here our channel is still open.
fmt.Println("IS CLOSED:", channel.IsClosed())
// We can even declare a queue on it
queue, err := channel.QueueDeclare(
"example_channel_reconnect", // name
false, // durable
true, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
panic(err)
}
// Here is the result
fmt.Printf("QUEUE : %+v\n", queue)
// Explicitly close the connection. This will also close all child channels.
err = connection.Close()
if err != nil {
panic(err)
}
// Now that we have explicitly closed the connection, the channel will be closed.
fmt.Println("IS CLOSED:", channel.IsClosed())
// IS CLOSED: false
// QUEUE : {Name:example_channel_reconnect Messages:0 Consumers:0}
// IS CLOSED: true
Effortless Publishing with Confirmations using the roger Package:
// Get a new connection to our test broker.
connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
if err != nil {
panic(err)
}
defer connection.Close()
// Get a new channel from our robust connection for publishing. This channel will
// be put into confirmation mode by the producer.
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// Declare a queue to produce to
queue, err := channel.QueueDeclare(
"example_confirmation_producer", // name
false, // durable
true, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
// Create a new producer using our channel. Passing nil to opts will result in
// default opts being used. By default, a Producer will put the passed channel in
// confirmation mode, and each time publish is called, will block until a
// confirmation from the server has been received.
producer := rproducer.New(channel, nil)
producerComplete := make(chan struct{})
// Run the producer in it's own goroutine.
go func() {
// Signal this routine has exited on exit.
defer close(producerComplete)
err = producer.Run()
if err != nil {
panic(err)
}
}()
messagesPublished := new(sync.WaitGroup)
for i := 0; i < 10; i++ {
messagesPublished.Add(1)
// Publish each message in it's own goroutine The producer handles the
// boilerplate of tracking publication tags and receiving broker confirmations.
go func() {
// Release our WaitGroup on exit.
defer messagesPublished.Done()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Publish a message, this method will block until we get a publication
// confirmation from the broker OR ctx expires.
err = producer.Publish(
ctx,
"", // exchange
queue.Name, // queue
true, // mandatory
false, // immediate
amqp.Publishing{
Body: []byte("test message"),
},
)
fmt.Println("Message Published and Confirmed!")
if err != nil {
panic(err)
}
}()
}
// Wait for all our messages to be published
messagesPublished.Wait()
// Start the shutdown of the producer
producer.StartShutdown()
// Wait for the producer to exit.
<-producerComplete
// exit.
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
// Message Published and Confirmed!
Motivations
AMQP is a messaging protocol most commonly used by
RabbitMQ.
streadway/amqp, the sanctioned RabbitMQ driver for
Go, is an excellent library with a great API but limited scope. It offers a full
implementation of the AMQP spec, but comes with very few additional quality-of-life
features.
From its documentation:
Goals
- Provide a functional interface that closely represents the AMQP 0.9.1 model
targeted to RabbitMQ as a server. This includes the minimum necessary to
interact the semantics of the protocol.
Things not intended to be supported:
Without a supplied way to handle reconnections,
bespoke
solutions
abound.
Most of these solutions are overly-fitted to a specific problem (consumer vs producer or
involve domain-specific logic), are prone to data races (can you spot them in the
first link?), are cumbersome to inject into a production code (do we abort the business
logic on an error or try to recover in-place?), and have bugs (each solution has its own
redial bugs rather than finding them in a single lib where fixes can benefit everyone
and community code coverage is high).
Nome of this is meant to disparage the above solutions -- they likely work great in the
code they were created for -- but they point to a need that is not being filled by the
sanctioned driver. The nature of the default *Channel
API encourages solutions that
are ill-suited to stateless handlers OR require you to handle retries every place you
must interact with the broker. Such implementation details can be annoying when writing
higher-level business logic and can lead to either unnecessary error returns, bespoke
solutions in every project, or messy calling code at sites which need to interact with
an AMQP broker.
Roger, Rabbit is inspired by aio-pika's
robust connections and channels
which abstract away connection management with an identical API to their non-robust
counterparts, allowing robust AMQP broker interactions with minimal fuss and very few
limitations.
NOTE:
Roger, Rabbit is not meant to supplant streadway/amqp (We build on top of it!), but an
extension with quality-of-life features. Roger, Rabbit would not be possible without the
amazing groundwork laid down by streadway/amqp.
Goals
The goals of the Roger, Rabbit package are as follows:
-
Offer a Drop-in Replacement for streadway/amqp: API's may be extended (adding
fields to amqp.Config
or additional methods to *amqp.Channel
, for instance) but
must not break existing code unless absolutely necessary.
-
Add as few Additional Error Paths as Possible: Errors may be extended with
additional information concerning disconnect scenarios, but new error type returns
from *Connection
or *amqp.Channel
should be an absolute last resort.
-
Be Highly Extensible: Roger, Rabbit seeks to offer a high degree of extensibility
via features like middleware, in an effort to reduce the balkanization of amqp client
solutions.
Current Limitations & Warnings
-
Performance: Roger, Rabbit has not been extensively benchmarked against
streadway/amqp
. To see preliminary benchmarks, take a look at the next section.
-
Transaction Support: Roger, Rabbit does not currently support amqp Transactions,
as the author does not use them. Draft PR's with possible implementations are welcome!
-
Reliability: While the author uses this library in production, it is still early
days, and more battle-testing will be needed before this library is promoted to
version 1.0. PR's are welcome for Bug Fixes, code coverage, or new features.
Benchmarks
Because of Roger, Rabbit's middleware-driven design, some overhead is expected vs
streadway proper. However, initial benchmarks are promising, and show only minimal
impact. For most applications, the overhead cost is likely worth the cost for ease of
development and flexibility.
Still, if absolute peak throughput is critical to an application, a less general and
more tailored approach may be warranted.
Benchmarks can be found in ./amqp/benchmark_test.go
.
Machine: Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
OPERATION |
LIB |
EXECUTIONS |
NS/OP |
COMPARISON |
QueueInspect |
sw |
2,838 |
812,594 |
-- |
|
rr |
2,470 |
813,269 |
+0.1% |
Publish |
sw |
74,559 |
28,882 |
-- |
|
rr |
70,665 |
30,031 |
+4.0% |
Publish & Confirm |
sw |
34,528 |
59,703 |
-- |
|
rr |
35,481 |
62,198 |
+4.2% |
Consume (QoS 100) |
sw |
75,433 |
27,206 |
-- |
|
rr |
73,957 |
29,846 |
+9.7% |
The above numbers were calculated by running each benchmark 4 times, then taking the
fastest result for each library.
The benchmarks were run with the following command:
go test -p 1 -count 4 -bench=Comparison -run=NoTests -benchtime=2s ./...
Acknowledgements
This library is built on top of streadway/amqp and
would not be possible without such an amazing foundation.
Prerequisites
Golang 1.6+
Attributions