package module
v1.2.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2021 License: Apache-2.0 Imports: 11 Imported by: 3


Build Status codecov Go Report Card Sourcegraph HitCount GoDoc License


This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])( entities like queues, exchanges, producers and consumers in a declarative way with a single config.

Exchanges, queues and producers are going to be initialized in the background.

go-mq supports both sync and async producers.

go-mq has auto reconnects on closed connection or network error. You can configure delay between each connect try using reconnect_delay option.


go get -u


Visit godoc to get information about library API.

For those of us who preferred learn something new on practice there is working examples in example directory.


You can configure mq using mq.Config struct directly or by filling it from config file.

Supported configuration tags:

  • yaml
  • json
  • mapstructure

Available options:

dsn: "amqp://login:password@host:port/virtual_host" # Use comma separated list for cluster connection
reconnect_delay: 5s                     # Interval between connection tries. Check for details.
test_mode: false                        # Switches library to use mocked broker. Defaults to false.
  - name: "exchange_name"
    type: "direct"
      # Available options with default values:
      auto_delete: false
      durable: false
      internal: false
      no_wait: false
  - name: "queue_name"
    exchange: "exchange_name"
    routing_key: "route"
    # A set of arguments for the binding.
    # The syntax and semantics of these arguments depend on the exchange class.
      no_wait: false
    # Available options with default values:
      auto_delete: false
      durable: false
      exclusive: false
      no_wait: false
  - name: "producer_name"
    buffer_size: 10                      # Declare how many messages we can buffer during fat messages publishing.
    exchange: "exchange_name"
    routing_key: "route"
    sync: false                          # Specify whether producer will worked in sync or async mode.
    # Available options with default values:
      content_type:  "application/json"
      delivery_mode: 2                   # 1 - non persistent, 2 - persistent.
  - name: "consumer_name"
    queue: "queue_name"
    workers: 1                           # Workers count. Defaults to 1.
    prefetch_count: 0                    # Prefetch message count per worker.
    prefetch_size: 0                     # Prefetch message size per worker.
    # Available options with default values:
      no_ack: false
      no_local: false
      no_wait: false
      exclusive: false

Error handling

All errors are accessible via exported channel:

package main

import (


func main() {
	config := mq.Config{} // Set your configuration.
	queue, _ := mq.New(config)
	// ...

	go handleMQErrors(queue.Error())
	// Other logic.

func handleMQErrors(errors <-chan error) {
	for err := range errors {

If channel is full – new errors will be dropped.

Errors from sync producer won't be accessible from error channel because they returned directly.


There are some cases that can only be tested with real broker and some cases that can only be tested with mocked broker.

If you are able to run tests with a real broker run them with:

go test -mock-broker=0

Otherwise mock will be used.


Check releases page.

How to upgrade

From version 0.x to 1.x
  • GetConsumer() method was renamed to Consumer(). This is done to follow go guideline.

  • GetProducer() method was removed. Use instead AsyncProducer() or SyncProducer() if you want to catch net error by yourself.


Feel free to create issues with bug reports or your wishes.



Package mq provides an ability to integrate with message broker via AMQP in a declarative way.



This section is empty.


This section is empty.


This section is empty.


type AsyncProducer

type AsyncProducer interface {
	// Produce sends message to broker. Returns immediately.
	Produce(data []byte)

AsyncProducer describes available methods for producer. This kind of producer is asynchronous. All occurred errors will be accessible with MQ.Error().

type Config

type Config struct {
	DSN            string        `mapstructure:"dsn" json:"dsn" yaml:"dsn"`
	ReconnectDelay time.Duration `mapstructure:"reconnect_delay" json:"reconnect_delay" yaml:"reconnect_delay"`
	TestMode       bool          `mapstructure:"test_mode" json:"test_mode" yaml:"test_mode"`
	Exchanges      Exchanges     `mapstructure:"exchanges" json:"exchanges" yaml:"exchanges"`
	Queues         Queues        `mapstructure:"queues" json:"queues" yaml:"queues"`
	Producers      Producers     `mapstructure:"producers" json:"producers" yaml:"producers"`
	Consumers      Consumers     `mapstructure:"consumers" json:"consumers" yaml:"consumers"`
	// contains filtered or unexported fields

Config describes all available options for amqp connection creation.

type ConnectionState added in v1.2.0

type ConnectionState uint8
const (
	ConnectionStateDisconnected ConnectionState = 1
	ConnectionStateConnected    ConnectionState = 2
	ConnectionStateConnecting   ConnectionState = 3

type Consumer

type Consumer interface {
	// Consume runs consumer's workers with specified handler.
	Consume(handler ConsumerHandler)

Consumer describes available methods for consumer.

type ConsumerConfig

type ConsumerConfig struct {
	Name          string  `mapstructure:"name" json:"name" yaml:"name"`
	Queue         string  `mapstructure:"queue" json:"queue" yaml:"queue"`
	Workers       int     `mapstructure:"workers" json:"workers" yaml:"workers"`
	Options       Options `mapstructure:"options" json:"options" yaml:"options"`
	PrefetchCount int     `mapstructure:"prefetch_count" json:"prefetch_count" yaml:"prefetch_count"`
	PrefetchSize  int     `mapstructure:"prefetch_size" json:"prefetch_size" yaml:"prefetch_size"`

ConsumerConfig describes consumer's configuration.

type ConsumerHandler

type ConsumerHandler func(message Message)

ConsumerHandler describes handler function signature. It will be called for each obtained message.

type Consumers

type Consumers []ConsumerConfig

Consumers describes configuration list for consumers.

type DeliveryMode

type DeliveryMode int

DeliveryMode describes an AMQP message delivery mode.

const (
	NonPersistent DeliveryMode = 1
	Persistent                 = 2

List of available values for `delivery_mode` producer option.

type ExchangeConfig

type ExchangeConfig struct {
	Name    string  `mapstructure:"name" json:"name" yaml:"name"`
	Type    string  `mapstructure:"type" json:"type" yaml:"type"`
	Options Options `mapstructure:"options" json:"options" yaml:"options"`

ExchangeConfig describes exchange's configuration.

type Exchanges

type Exchanges []ExchangeConfig

Exchanges describes configuration list for exchanges.

type MQ

type MQ interface {
	// Consumer returns consumer object by its name.
	Consumer(name string) (Consumer, error)
	// SetConsumerHandler allows you to set handler callback without getting consumer.
	SetConsumerHandler(name string, handler ConsumerHandler) error
	// AsyncProducer returns async producer. Should be used in most cases.
	AsyncProducer(name string) (AsyncProducer, error)
	// SyncProducer returns sync producer.
	SyncProducer(name string) (SyncProducer, error)
	// Error returns channel with all occurred errors.
	// Errors from sync producer won't be accessible.
	Error() <-chan error
	// Close stop all consumers and producers and close connection to broker.
	// Shows connection state
	ConnectionState() ConnectionState

MQ describes methods provided by message broker adapter.

func New

func New(config Config) (MQ, error)

New initializes AMQP connection to the message broker and returns adapter that provides an ability to get configured consumers and producers, read occurred errors and shutdown all workers.

type Message

type Message interface {
	Ack(multiple bool) error
	Nack(multiple, request bool) error
	Reject(requeue bool) error
	Body() []byte

Message describes available methods of the message obtained from queue.

type Options

type Options map[string]interface{}

Options describes optional configuration.

type ProducerConfig

type ProducerConfig struct {
	Sync       bool    `mapstructure:"sync" json:"sync" yaml:"sync"`
	BufferSize int     `mapstructure:"buffer_size" json:"buffer_size" yaml:"buffer_size"`
	Exchange   string  `mapstructure:"exchange" json:"exchange" yaml:"exchange"`
	Name       string  `mapstructure:"name" json:"name" yaml:"name"`
	RoutingKey string  `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"`
	Options    Options `mapstructure:"options" json:"options" yaml:"options"`

ProducerConfig describes producer's configuration.

type Producers

type Producers []ProducerConfig

Producers describes configuration list for producers.

type QueueConfig

type QueueConfig struct {
	Exchange       string  `mapstructure:"exchange" json:"exchange" yaml:"exchange"`
	Name           string  `mapstructure:"name" json:"name" yaml:"name"`
	RoutingKey     string  `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"`
	BindingOptions Options `mapstructure:"binding_options" json:"binding_options" yaml:"binding_options"`
	Options        Options `mapstructure:"options" json:"options" yaml:"options"`

QueueConfig describes queue's configuration.

type Queues

type Queues []QueueConfig

Queues describes configuration list for queues.

type SyncProducer

type SyncProducer interface {
	// Produce sends message to broker. Waits for result (ok, error).
	Produce(data []byte) error

SyncProducer describes available methods for synchronous producer.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL