Version: v0.4.19 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2021 License: MIT Imports: 10 Imported by: 0



Package rconsumer provides an easy-to use consumer abstraction for setting up delivery consumers with minimal boilerplate.




This section is empty.


This section is empty.


This section is empty.


type AmqpArgs

type AmqpArgs struct {
	// Queue is the name of the Queue to consume from
	Queue string
	// ConsumerName identifies this consumer with the broker.
	ConsumerName string
	// AutoAck is whether the broker should ack messages automatically as it sends them.
	// Otherwise the consumer will handle acking messages.
	AutoAck bool
	// Exclusive is whether this consumer should be the exclusive consumer for this
	// Queue.
	Exclusive bool
	// Args are additional args to pass to the amqp.Channel.Consume() method.
	Args amqp.Table

AmqpArgs are the args the consumer will be created with by calling amqp.Channel.Args.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields

Consumer is a service helper for consuming messages from one or more queues.

package main

import (

type BasicProcessor struct {

// ConsumeArgs returns the args to be made to the consumer's internal
// Channel.Consume() method.
func (processor *BasicProcessor) AmqpArgs() rconsumer.AmqpArgs {
	return rconsumer.AmqpArgs{
		ConsumerName: "example_consumer_queue",
		AutoAck:      false,
		Exclusive:    false,
		Args:         nil,

// SetupChannel is called before consuming begins, and allows the handler to declare
// any routes, bindings, etc, necessary to handle it's route.
func (processor *BasicProcessor) SetupChannel(
	ctx context.Context, amqpChannel middleware.AmqpRouteManager,
) error {
	_, err := amqpChannel.QueueDeclare(
	if err != nil {
		return fmt.Errorf("error declaring Queue: %w", err)

	return nil

// HandleDelivery is the business logic invoked for each delivery.
func (processor *BasicProcessor) HandleDelivery(
	ctx context.Context, delivery amqp.Delivery,
) (requeue bool, err error) {
	// Print the message
	fmt.Println("BODY:", delivery.Body)

	// Returning no error will result in an ACK of the message.
	return false, nil

// Cleanup allows the route handler to remove any resources necessary on close.
func (processor *BasicProcessor) CleanupChannel(
	ctx context.Context, amqpChannel middleware.AmqpRouteManager,
) error {
	_, err := amqpChannel.QueueDelete(
		"example_consumer_queue", false, false, false,
	if err != nil {
		return fmt.Errorf("error deleting Queue: %w", err)

	return nil

func main() {
	// Get a new connection to our test broker.
	connection, err := amqp.Dial(amqptest.TestDialAddress)
	if err != nil {
	defer connection.Close()

	// Get a new channel from our robust connection.
	channel, err := connection.Channel()
	if err != nil {

	// Create a new consumer that uses our robust channel.
	consumer := rconsumer.New(channel, rconsumer.DefaultOpts())
	defer consumer.StartShutdown()

	// Create a new delivery processor and register it.
	processor := new(BasicProcessor)
	err = consumer.RegisterProcessor(processor)
	if err != nil {

	// This method will block forever as the consumer runs.
	err = consumer.Run()
	if err != nil {

func New

func New(channel *amqp.Channel, opts Opts) *Consumer

New returns a new consumer which will pull deliveries from the passed amqp.Channel.

func (*Consumer) RegisterProcessor

func (consumer *Consumer) RegisterProcessor(
	processor DeliveryProcessor,
) error

RegisterProcessor registers a DeliveryProcessor implementation value. Will panic if called after consumer start.

func (*Consumer) Run

func (consumer *Consumer) Run() error

Run the consumer. This method blocks until the consumer has completed shutdown.

func (*Consumer) StartShutdown

func (consumer *Consumer) StartShutdown()

StartShutdown beings shutdown of the Consumer. This method will return immediately, it does not block until shutdown is complete.

type DeliveryProcessor added in v0.4.10

type DeliveryProcessor interface {
	// AmqpArgs returns the args that amqp.Channel.Consume should be called with.
	AmqpArgs() AmqpArgs

	// SetupChannel is called before the consumer is created, and is designed to let
	// this handler declare any exchanges or queues necessary to handle deliveries.
	SetupChannel(ctx context.Context, amqpChannel middleware.AmqpRouteManager) error

	// HandleDelivery will be called once per delivery. Returning a non-nil err will
	// result in it being logged and the delivery being nacked. If requeue is true, the
	// nacked delivery will be requeued. If err is nil, requeue is ignored.
	// NOTE: if this method panics, the delivery will be nacked regardless of requeue's
	// value
	HandleDelivery(ctx context.Context, delivery amqp.Delivery) (requeue bool, err error)

	// CleanupChannel is called at shutdown to allow the route handler to clean up any
	// necessary resources.
	CleanupChannel(ctx context.Context, amqpChannel middleware.AmqpRouteManager) error

DeliveryProcessor is an interface for handling consuming from a route. Implementors of this interface will be registered with a consumer.

type Middleware

type Middleware struct {
	// contains filtered or unexported fields

Middleware holds the middleware to register on a consumer.

func (*Middleware) AddCleanupChannel

func (config *Middleware) AddCleanupChannel(processorMiddleware middleware.CleanupChannel)

AddCleanupChannel adds a middleware.CleanupChannel to be added to each DeliveryProcessor.CleanupChannel passed to a Consumer.

func (*Middleware) AddDelivery

func (config *Middleware) AddDelivery(processorMiddleware middleware.Delivery)

AddDelivery adds a middleware.Delivery to be added to each DeliveryProcessor.HandleDelivery passed to a Consumer.

func (*Middleware) AddProvider

func (config *Middleware) AddProvider(provider middleware.ProvidesMiddleware) error

AddProvider adds consume middleware provided by methods of provider.

func (*Middleware) AddSetupChannel

func (config *Middleware) AddSetupChannel(processorMiddleware middleware.SetupChannel)

AddSetupChannel adds a middleware.SetupChannel to be added to each DeliveryProcessor.SetupChannel passed to a Consumer.

type Opts

type Opts struct {
	// contains filtered or unexported fields

Opts holds options for running a consumer.

func DefaultOpts

func DefaultOpts() Opts

DefaultOpts returns new Opts object with default settings.

func (Opts) WithDefaultLogging

func (opts Opts) WithDefaultLogging(log bool) Opts

WithDefaultLogging enables the default zerolog.Logger logging middleware. If false all other logging settings have no effect.

Default: true

func (Opts) WithLogDeliveryLevel

func (opts Opts) WithLogDeliveryLevel(level zerolog.Level) Opts

WithLogDeliveryLevel is the minimum logging level to log the full delivery object at.

Default: zerolog.DebugLevel.

func (Opts) WithLogSuccessLevel

func (opts Opts) WithLogSuccessLevel(level zerolog.Level) Opts

WithLogSuccessLevel is the minimum logging level to log a successful delivery at.

Default: zerolog.DebugLevel.

func (Opts) WithLogger

func (opts Opts) WithLogger(logger zerolog.Logger) Opts

WithLogger sets the zerolog.Logger for the default logging middleware to use If WithDefaultLogging is false, this setting has no effect.

Default: lockless, pretty-printed logger set to Info level.

func (Opts) WithLoggingLevel

func (opts Opts) WithLoggingLevel(level zerolog.Level) Opts

WithLoggingLevel sets the level of the logger passed to WithLogger.

func (Opts) WithMaxWorkers

func (opts Opts) WithMaxWorkers(max int) Opts

WithMaxWorkers sets the maximum number of workers that can be running at the same time. If 0 or less, no limit will be used.

Default: 0.

func (Opts) WithMiddleware

func (opts Opts) WithMiddleware(processorMiddleware Middleware) Opts

WithMiddleware sets the Middleware to use. Default: includes all default middleware in the consumer/middleware package.

Default: Middleware{}


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL