rabbit

package module
v0.1.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: MIT Imports: 8 Imported by: 3

README

rabbit

Master build status Go Report Card

A RabbitMQ wrapper lib around streadway/amqp rabbitmq/amqp091-go with some bells and whistles.

NOTE: streadway/amqp is no longer maintained and RabbitMQ team have forked streadway/amqp and created rabbitmq/amqp091-go. You can read about this change here. This library uses rabbitmq/amqp091-go.

  • Support for auto-reconnect
  • Support for context (ie. cancel/timeout)
  • Support for using multiple binding keys
  • Support Producer, Consumer or both modes

Motivation

We (Streamdal, formerly Batch.sh), make heavy use of RabbitMQ - we use it as the primary method for facilitating inter-service communication. Due to this, all services make use of RabbitMQ and are both publishers and consumers.

We wrote this lib to ensure that all of our services make use of Rabbit in a consistent, predictable way AND are able to survive network blips.

NOTE: This library works only with non-default exchanges. If you need support for default exchange - open a PR!

Usage

package main

import (
    "fmt"
    "log"  

    "github.com/streamdal/rabbit"
)

func main() { 
    r, err := rabbit.New(&rabbit.Options{
        URL:          "amqp://localhost",
        QueueName:    "my-queue",
        ExchangeName: "messages",
        BindingKeys:   []string{"messages"},
    })
    if err != nil {
        log.Fatalf("unable to instantiate rabbit: %s", err)
    }
    
    routingKey := "messages"
    data := []byte("pumpkins")

    // Publish something
    if err := r.Publish(context.Background(), routingKey, data); err != nil {
        log.Fatalf("unable to publish message: ")
    }

    // Consume once
    if err := r.ConsumeOnce(nil, func(amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
    }); err != nil {
        log.Fatalf("unable to consume once: %s", err),
    }

    var numReceived int

    // Consume forever (blocks)
    ctx, cancel := context.WithCancel(context.Background())

    r.Consume(ctx, nil, func(msg amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
        
        numReceived++
        
        if numReceived > 1 {
            r.Stop()
        }
    })

    // Or stop via ctx 
    r.Consume(..)
    cancel()
}
Retry Policies

You can specify a retry policy for the consumer. A pre-made ACK retry policy is available in the library at rp := rabbit.DefaultAckPolicy(). This policy will retry acknowledgement unlimited times

You can also create a new policy using the rabbit.NewRetryPolicy(maxAttempts, time.Millisecond * 200, time.Second, ...) function.

The retry policy can then be passed to consume functions as an argument:

consumeFunc := func(msg amqp.Delivery) error {
    fmt.Printf("Received new message: %+v\n", msg)
    
    numReceived++
    
    if numReceived > 1 {
            r.Stop()
        }
    }

rp := rabbit.DefaultAckPolicy()

r.Consume(ctx, nil, consumeFunc, rp)

Documentation

Overview

Package rabbit is a simple streadway/amqp wrapper library that comes with:

* Auto-reconnect support

* Context support

* Helpers for consuming once or forever and publishing

The library is used internally at https://batch.sh where it powers most of the platform's backend services.

For an example, refer to the README.md.

Index

Constants

View Source
const (
	// DefaultRetryReconnectSec determines how long to wait before attempting
	// to reconnect to a rabbit server
	DefaultRetryReconnectSec = 60

	// DefaultStopTimeout is the default amount of time Stop() will wait for
	// consume function(s) to exit.
	DefaultStopTimeout = 5 * time.Second

	// Both means that the client is acting as both a consumer and a producer.
	Both Mode = 0
	// Consumer means that the client is acting as a consumer.
	Consumer Mode = 1
	// Producer means that the client is acting as a producer.
	Producer Mode = 2

	ForceReconnectHeader = "rabbit-force-reconnect"
)
View Source
const RetryUnlimited = -1

Variables

View Source
var (
	// ErrShutdown will be returned if the client is shutdown via Stop() or Close()
	ErrShutdown = errors.New("client is shutdown")

	// DefaultConsumerTag is used for identifying consumer
	DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8]

	// DefaultAppID is used for identifying the producer
	DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8]
)

Functions

func ValidateOptions

func ValidateOptions(opts *Options) error

ValidateOptions validates various combinations of options.

Types

type Binding

type Binding struct {
	// Required
	ExchangeName string

	// Bind a queue to one or more routing keys
	BindingKeys []string

	// Whether to declare/create exchange on connect
	ExchangeDeclare bool

	// Required if declaring queue (valid: direct, fanout, topic, headers)
	ExchangeType string

	// Whether exchange should survive/persist server restarts
	ExchangeDurable bool

	// Whether to delete exchange when its no longer used; used only if ExchangeDeclare set to true
	ExchangeAutoDelete bool
}

Binding represents the information needed to bind a queue to an Exchange.

type ConsumeError

type ConsumeError struct {
	Message *amqp.Delivery
	Error   error
}

ConsumeError will be passed down the error channel if/when `f()` func runs into an error during `Consume()`.

type IRabbit

type IRabbit interface {
	Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error, rp ...*RetryPolicy)
	ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error, rp ...*RetryPolicy) error
	Publish(ctx context.Context, routingKey string, payload []byte, headers ...amqp.Table) error
	Stop(timeout ...time.Duration) error
	Close() error
}

IRabbit is the interface that the `rabbit` library implements. It's here as convenience.

type Logger

type Logger interface {
	// Debug sends out a debug message with the given arguments to the logger.
	Debug(args ...interface{})
	// Debugf formats a debug message using the given arguments and sends it to the logger.
	Debugf(format string, args ...interface{})
	// Info sends out an informational message with the given arguments to the logger.
	Info(args ...interface{})
	// Infof formats an informational message using the given arguments and sends it to the logger.
	Infof(format string, args ...interface{})
	// Warn sends out a warning message with the given arguments to the logger.
	Warn(args ...interface{})
	// Warnf formats a warning message using the given arguments and sends it to the logger.
	Warnf(format string, args ...interface{})
	// Error sends out an error message with the given arguments to the logger.
	Error(args ...interface{})
	// Errorf formats an error message using the given arguments and sends it to the logger.
	Errorf(format string, args ...interface{})
}

Logger is the common interface for user-provided loggers.

type Mode

type Mode int

Mode is the type used to represent whether the RabbitMQ clients is acting as a consumer, a producer, or both.

type NoOpLogger

type NoOpLogger struct {
}

NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.

func (*NoOpLogger) Debug

func (l *NoOpLogger) Debug(args ...interface{})

Debug is no-op implementation of Logger's Debug.

func (*NoOpLogger) Debugf

func (l *NoOpLogger) Debugf(format string, args ...interface{})

Debugf is no-op implementation of Logger's Debugf.

func (*NoOpLogger) Error

func (l *NoOpLogger) Error(args ...interface{})

Error is no-op implementation of Logger's Error.

func (*NoOpLogger) Errorf

func (l *NoOpLogger) Errorf(format string, args ...interface{})

Errorf is no-op implementation of Logger's Errorf.

func (*NoOpLogger) Info

func (l *NoOpLogger) Info(args ...interface{})

Info is no-op implementation of Logger's Info.

func (*NoOpLogger) Infof

func (l *NoOpLogger) Infof(format string, args ...interface{})

Infof is no-op implementation of Logger's Infof.

func (*NoOpLogger) Warn

func (l *NoOpLogger) Warn(args ...interface{})

Warn is no-op implementation of Logger's Warn.

func (*NoOpLogger) Warnf

func (l *NoOpLogger) Warnf(format string, args ...interface{})

Warnf is no-op implementation of Logger's Warnf.

type Options

type Options struct {
	// Required; format "amqp://user:pass@host:port"
	URLs []string

	// In what mode does the library operate (Both, Consumer, Producer)
	Mode Mode

	// If left empty, server will auto generate queue name
	QueueName string

	// Bindings is the set of information need to bind a queue to one or
	// more exchanges, specifying one or more binding (routing) keys.
	Bindings []Binding

	// https://godoc.org/github.com/streadway/amqp#Channel.Qos
	// Leave unset if no QoS preferences
	QosPrefetchCount int
	QosPrefetchSize  int

	// How long to wait before we retry connecting to a server (after disconnect)
	RetryReconnectSec int

	// Whether queue should survive/persist server restarts (and there are no remaining bindings)
	QueueDurable bool

	// Whether consumer should be the sole consumer of the queue; used only if
	// QueueDeclare set to true
	QueueExclusive bool

	// Whether to delete queue on consumer disconnect; used only if QueueDeclare set to true
	QueueAutoDelete bool

	// Whether to declare/create queue on connect; used only if QueueDeclare set to true
	QueueDeclare bool

	// Additional arguments to pass to the queue declaration or binding
	// https://github.com/streamdal/plumber/issues/210
	QueueArgs map[string]interface{}

	// Whether to automatically acknowledge consumed message(s)
	AutoAck bool

	// Used for identifying consumer
	ConsumerTag string

	// Used as a property to identify producer
	AppID string

	// Use TLS
	UseTLS bool

	// Skip cert verification (only applies if UseTLS is true)
	SkipVerifyTLS bool

	// Log is the (optional) logger to use for writing out log messages.
	Log Logger
}

Options determines how the `rabbit` library will behave and should be passed in to rabbit via `New()`. Many of the options are optional (and will fall back to sane defaults).

type Rabbit

type Rabbit struct {
	Conn                    *amqp.Connection
	ConsumerDeliveryChannel <-chan amqp.Delivery
	ConsumerRWMutex         *sync.RWMutex
	ConsumerWG              *sync.WaitGroup
	NotifyCloseChan         chan *amqp.Error
	ReconnectChan           chan struct{}
	ReconnectInProgress     bool
	ReconnectInProgressMtx  *sync.RWMutex
	ProducerServerChannel   *amqp.Channel
	ProducerRWMutex         *sync.RWMutex
	Options                 *Options
	// contains filtered or unexported fields
}

Rabbit struct that is instantiated via `New()`. You should not instantiate this struct by hand (unless you have a really good reason to do so).

func New

func New(opts *Options) (*Rabbit, error)

New is used for instantiating the library.

func (*Rabbit) Close

func (r *Rabbit) Close() error

Close stops any active Consume and closes the amqp connection (and channels using the conn)

You should re-instantiate the rabbit lib once this is called.

func (*Rabbit) Consume

func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error, rp ...*RetryPolicy)

Consume consumes messages from the configured queue (`Options.QueueName`) and executes `f` for every received message.

`Consume()` will block until it is stopped either via the passed in `ctx` OR by calling `Stop()`

It is also possible to see the errors that `f()` runs into by passing in an error channel (`chan *ConsumeError`).

Both `ctx` and `errChan` can be `nil`.

If the server goes away, `Consume` will automatically attempt to reconnect. Subsequent reconnect attempts will sleep/wait for `DefaultRetryReconnectSec` between attempts.

func (*Rabbit) ConsumeOnce

func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error, rp ...*RetryPolicy) error

ConsumeOnce will consume exactly one message from the configured queue, execute `runFunc()` on the message and return.

Same as with `Consume()`, you can pass in a context to cancel `ConsumeOnce()` or run `Stop()`.

func (*Rabbit) Publish

func (r *Rabbit) Publish(ctx context.Context, routingKey string, body []byte, headers ...amqp.Table) error

Publish publishes one message to the configured exchange, using the specified routing key.

func (*Rabbit) Stop

func (r *Rabbit) Stop(timeout ...time.Duration) error

Stop stops an in-progress `Consume()` or `ConsumeOnce()`

type RetryPolicy added in v0.1.26

type RetryPolicy struct {
	DelayMS     []time.Duration
	MaxAttempts int
	RetryCount  int // Default: unlimited (-1)
}

func DefaultAckPolicy added in v0.1.26

func DefaultAckPolicy() *RetryPolicy

DefaultAckPolicy is the default backoff policy for acknowledging messages.

func NewRetryPolicy added in v0.1.26

func NewRetryPolicy(maxAttempts int, t ...time.Duration) *RetryPolicy

NewRetryPolicy returns a new backoff policy with the given delays.

func (*RetryPolicy) AttemptCount added in v0.1.26

func (b *RetryPolicy) AttemptCount() string

AttemptCount returns the current attempt count as a string, for use with log messages

func (*RetryPolicy) Duration added in v0.1.26

func (b *RetryPolicy) Duration(n int) time.Duration

Duration returns the duration for the given attempt number If the attempt number exceeds the number of delays, the last delay is returned

func (*RetryPolicy) Reset added in v0.1.26

func (b *RetryPolicy) Reset()

Reset resets the current retry count to 0

func (*RetryPolicy) ShouldRetry added in v0.1.26

func (b *RetryPolicy) ShouldRetry() bool

ShouldRetry returns true if the current retry count is less than the max attempts

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL