rabbit

package module
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2020 License: MIT Imports: 9 Imported by: 0

README

rabbit

Master build status Go Report Card

A RabbitMQ wrapper lib around streadway/amqp with some bells and whistles.

  • Support for auto-reconnect
  • Support for context (ie. cancel/timeout)
  • ???

Motivation

We (Batch), make heavy use of RabbitMQ - we use it as the primary method for facilitating inter-service communication. Due to this, all services make use of RabbitMQ and are both publishers and consumers.

We wrote this lib to ensure that all of our services make use of Rabbit in a consistent, predictable way AND are able to survive network blips.

NOTE: This library works only with non-default exchanges. If you need support for default exchange - open a PR!

Usage

package main

import (
    "fmt"
    "log"  

    "github.com/batchcorp/rabbit"
)

func main() { 
    r, err := rabbit.New(&rabbit.Options{
        URL:          "amqp://localhost",
        QueueName:    "my-queue",
        ExchangeName: "messages",
        RoutingKey:   "messages",
    })
    if err != nil {
        log.Fatalf("unable to instantiate rabbit: %s", err)
    }
    
    routingKey := "messages"
    data := []byte("pumpkins")

    // Publish something
    if err := r.Publish(ctx, routingKey, data); err != nil {
        log.Fatalf("unable to publish message: ")
    }

    // Consume once
    if err := r.ConsumeOnce(nil, func(amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
    }); err != nil {
        log.Fatalf("unable to consume once: %s", err),
    }

    var numReceived int

    // Consume forever (blocks)
    ctx, cancel := context.WithCancel(context.Background())

    r.Consume(ctx, nil, func(msg amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
        
        numReceived++
        
        if numReceived > 1 {
            r.Stop()
        }
    })

    // Or stop via ctx 
    r.Consume(..)
    cancel()
}

Documentation

Overview

Package rabbit is a simple streadway/amqp wrapper library that comes with:

* Auto-reconnect support

* Context support

* Helpers for consuming once or forever and publishing

The library is used internally at https://batch.sh where it powers most of the platform's backend services.

For an example, refer to the README.md.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Used for identifying consumer
	DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8]

	// Used for identifying producer
	DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8]
)

Functions

func ValidateOptions

func ValidateOptions(opts *Options) error

ValidateOptions validates various combinations of options.

Types

type ConsumeError

type ConsumeError struct {
	Message *amqp.Delivery
	Error   error
}

ConsumeError will be passed down the error channel if/when `f()` func runs into an error during `Consume()`.

type IRabbit

type IRabbit interface {
	Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)
	ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error
	Publish(ctx context.Context, routingKey string, payload []byte) error
	Stop() error
	Close() error
}

IRabbit is the interface that the `rabbit` library implements. It's here as convenience.

type Mode added in v0.1.3

type Mode int
const (
	// How long to wait before attempting to reconnect to a rabbit server
	DefaultRetryReconnectSec = 60

	Both     Mode = 0
	Consumer Mode = 1
	Producer Mode = 2
)

type Options

type Options struct {
	// Required; format "amqp://user:pass@host:port"
	URL string

	// In what mode does the library operate (Both, Consumer, Producer)
	Mode Mode

	// If left empty, server will auto generate queue name
	QueueName string

	// Required
	ExchangeName string

	// Used as either routing (publish) or binding key (consume)
	RoutingKey string

	// Whether to declare/create exchange on connect
	ExchangeDeclare bool

	// Required if declaring queue (valid: direct, fanout, topic, headers)
	ExchangeType string

	// Whether exchange should survive/persist server restarts
	ExchangeDurable bool

	// Whether to delete exchange when its no longer used; used only if ExchangeDeclare set to true
	ExchangeAutoDelete bool

	// https://godoc.org/github.com/streadway/amqp#Channel.Qos
	// Leave unset if no QoS preferences
	QosPrefetchCount int
	QosPrefetchSize  int

	// How long to wait before we retry connecting to a server (after disconnect)
	RetryReconnectSec int

	// Whether queue should survive/persist server restarts (and there are no remaining bindings)
	QueueDurable bool

	// Whether consumer should be the sole consumer of the queue; used only if
	// QueueDeclare set to true
	QueueExclusive bool

	// Whether to delete queue on consumer disconnect; used only if QueueDeclare set to true
	QueueAutoDelete bool

	// Whether to declare/create queue on connect; used only if QueueDeclare set to true
	QueueDeclare bool

	// Whether to automatically acknowledge consumed message(s)
	AutoAck bool

	// Used for identifying consumer
	ConsumerTag string

	// Used as a property to identify producer
	AppID string
}

Options determines how the `rabbit` library will behave and should be passed in to rabbit via `New()`. Many of the options are optional (and will fall back to sane defaults).

type Rabbit

type Rabbit struct {
	Conn                    *amqp.Connection
	ConsumerDeliveryChannel <-chan amqp.Delivery
	ConsumerRWMutex         *sync.RWMutex
	NotifyCloseChan         chan *amqp.Error
	ProducerServerChannel   *amqp.Channel
	ProducerRWMutex         *sync.RWMutex
	ConsumeLooper           director.Looper
	Options                 *Options
	// contains filtered or unexported fields
}

Rabbit struct that is instantiated via `New()`. You should not instantiate this struct by hand (unless you have a really good reason to do so).

func New

func New(opts *Options) (*Rabbit, error)

New is used for instantiating the library.

func (*Rabbit) Close added in v0.1.5

func (r *Rabbit) Close() error

Close stops any active Consume and closes the amqp connection (and channels using the conn)

You should re-instantiate the rabbit lib once this is called.

func (*Rabbit) Consume

func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)

Consume consumes messages from the configured queue (`Options.QueueName`) and executes `f` for every received message.

`Consume()` will block until it is stopped either via the passed in `ctx` OR by calling `Stop()`

It is also possible to see the errors that `f()` runs into by passing in an error channel (`chan *ConsumeError`).

Both `ctx` and `errChan` can be `nil`.

If the server goes away, `Consume` will automatically attempt to reconnect. Subsequent reconnect attempts will sleep/wait for `DefaultRetryReconnectSec` between attempts.

func (*Rabbit) ConsumeOnce

func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error

ConsumeOnce will consume exactly one message from the configured queue, execute `runFunc()` on the message and return.

Same as with `Consume()`, you can pass in a context to cancel `ConsumeOnce()` or run `Stop()`.

func (*Rabbit) Publish

func (r *Rabbit) Publish(ctx context.Context, routingKey string, body []byte) error

Publish publishes one message to the configured exchange, using the specified routing key.

NOTE: Context semantics are not implemented.

TODO: Implement ctx usage

func (*Rabbit) Stop

func (r *Rabbit) Stop() error

Stop stops an in-progress `Consume()` or `ConsumeOnce()`.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL