rabbitmqutils

package module
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2021 License: MIT Imports: 11 Imported by: 2

README

go-rabbitmq-utils

GoDoc Go Report Card Build Status codecov

The library that provides utility entities for working with RabbitMQ.

Features

  • client:
    • options (will be applied on connecting):
      • maximal queue size (optionally);
      • queues for declaring:
        • will survive server restarts and remain without consumers;
      • message ID generator for them automatic generating;
    • operations:
      • with a connection:
        • opening;
        • closing;
      • with messages:
        • message publishing:
          • check the specified queue name based on the declared queues;
          • automatic marshalling of a message data to JSON;
          • setting of auxiliary message fields:
            • setting of a message ID:
              • receiving of a custom message ID (optionally);
              • automatic generating of a message ID (optionally);
            • setting of a message timestamp;
        • getting of a single message:
          • check the specified queue name based on the declared queues;
          • block the execution flow until the message is received or an error occurs;
        • starting of message consuming:
          • check the specified queue name based on the declared queues;
          • automatic generating of a consumer name;
        • cancelling of message consuming:
          • check the specified queue name based on the declared queues;
          • automatic generating of a consumer name;
  • message consumer:
    • arguments:
      • client;
      • queue name;
      • outer message handler;
    • operations:
      • message consuming:
        • starting;
        • cancelling;
      • message handling:
        • support of concurrent handling;
  • wrappers for an outer message handler:
    • acknowledger:
      • processing on success:
        • acknowledging of the message;
        • logging of the success fact;
      • processing on failure:
        • rejecting of the message:
          • with once message handling;
          • with twice message handling (i.e. once requeue);
        • logging of the error;
    • JSON message handler:
      • automatical creating of a receiver for a message data by its specified type;
      • unmarshalling of a message data from JSON to the created receiver.

Installation

Prepare the directory:

$ mkdir --parents "$(go env GOPATH)/src/github.com/thewizardplusplus/"
$ cd "$(go env GOPATH)/src/github.com/thewizardplusplus/"

Clone this repository:

$ git clone https://github.com/thewizardplusplus/go-rabbitmq-utils.git
$ cd go-rabbitmq-utils

Install dependencies with the dep tool:

$ dep ensure -vendor-only

Examples

package main

import (
	"fmt"
	stdlog "log"
	"os"
	"reflect"
	"runtime"
	"sync"

	"github.com/go-log/log/print"
	rabbitmqutils "github.com/thewizardplusplus/go-rabbitmq-utils"
)

type exampleMessage struct {
	FieldOne int
	FieldTwo string
}

type messageHandler struct {
	locker   sync.Mutex
	messages []exampleMessage
}

func (messageHandler *messageHandler) MessageType() reflect.Type {
	return reflect.TypeOf(exampleMessage{})
}

func (messageHandler *messageHandler) HandleMessage(message interface{}) error {
	messageHandler.locker.Lock()
	defer messageHandler.locker.Unlock()

	messageHandler.messages =
		append(messageHandler.messages, message.(exampleMessage))
	return nil
}

func main() {
	dsn, ok := os.LookupEnv("MESSAGE_BROKER_ADDRESS")
	if !ok {
		dsn = "amqp://rabbitmq:rabbitmq@localhost:5672"
	}

	// prepare the client
	logger := stdlog.New(os.Stderr, "", stdlog.LstdFlags|stdlog.Lmicroseconds)
	client, err :=
		rabbitmqutils.NewClient(dsn, rabbitmqutils.WithQueues([]string{"example"}))
	if err != nil {
		logger.Fatal(err)
	}
	defer client.Close()

	// start the message consuming
	var messageHandler messageHandler
	messageConsumer, err := rabbitmqutils.NewMessageConsumer(
		client,
		"example",
		rabbitmqutils.Acknowledger{
			MessageHandling: rabbitmqutils.OnceMessageHandling,
			MessageHandler: rabbitmqutils.JSONMessageHandler{
				MessageHandler: &messageHandler,
			},
			// wrap the standard logger via the github.com/go-log/log package
			Logger: print.New(logger),
		},
	)
	if err != nil {
		logger.Fatal(err)
	}
	go messageConsumer.StartConcurrently(runtime.NumCPU())

	// publish the messages
	for i := 0; i < 10; i++ {
		err = client.PublishMessage("example", "", exampleMessage{
			FieldOne: 10 + i,
			FieldTwo: fmt.Sprintf("message data #%d", i),
		})
		if err != nil {
			logger.Fatal(err)
		}
	}
	messageConsumer.Stop()

	// print the results
	for _, message := range messageHandler.messages {
		fmt.Printf("%+v\n", message)
	}

	// Unordered output:
	// {FieldOne:10 FieldTwo:message data #0}
	// {FieldOne:11 FieldTwo:message data #1}
	// {FieldOne:12 FieldTwo:message data #2}
	// {FieldOne:13 FieldTwo:message data #3}
	// {FieldOne:14 FieldTwo:message data #4}
	// {FieldOne:15 FieldTwo:message data #5}
	// {FieldOne:16 FieldTwo:message data #6}
	// {FieldOne:17 FieldTwo:message data #7}
	// {FieldOne:18 FieldTwo:message data #8}
	// {FieldOne:19 FieldTwo:message data #9}
}

License

The MIT License (MIT)

Copyright © 2021 thewizardplusplus

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPAcknowledger

type AMQPAcknowledger interface {
	amqp.Acknowledger
}

AMQPAcknowledger ...

It is used only for mock generating.

type Acknowledger

type Acknowledger struct {
	MessageHandling MessageHandling
	MessageHandler  FailingMessageHandler
	Logger          log.Logger
}

Acknowledger ...

func (Acknowledger) HandleMessage

func (acknowledger Acknowledger) HandleMessage(message amqp.Delivery)

HandleMessage ...

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client ...

func NewClient

func NewClient(dsn string, options ...ClientOption) (Client, error)

NewClient ...

func (Client) CancelConsuming

func (client Client) CancelConsuming(queue string) error

CancelConsuming ...

func (Client) Close

func (client Client) Close() error

Close ...

func (Client) ConsumeMessages

func (client Client) ConsumeMessages(queue string) (
	<-chan amqp.Delivery,
	error,
)

ConsumeMessages ...

func (Client) GetMessage added in v1.1.3

func (client Client) GetMessage(queue string) (amqp.Delivery, error)

GetMessage ...

func (Client) PublishMessage

func (client Client) PublishMessage(
	queue string,
	messageID string,
	messageData interface{},
) error

PublishMessage ...

type ClientConfig

type ClientConfig struct {
	// contains filtered or unexported fields
}

ClientConfig ...

type ClientOption

type ClientOption func(clientConfig *ClientConfig)

ClientOption ...

func WithClock

func WithClock(clock Clock) ClientOption

WithClock ...

func WithDialer

func WithDialer(dialer Dialer) ClientOption

WithDialer ...

func WithIDGenerator

func WithIDGenerator(idGenerator IDGenerator) ClientOption

WithIDGenerator ...

func WithMaximalQueueSize

func WithMaximalQueueSize(maximalQueueSize int) ClientOption

WithMaximalQueueSize ...

func WithQueues

func WithQueues(queues []string) ClientOption

WithQueues ...

type Clock

type Clock func() time.Time

Clock ...

type ClockInterface

type ClockInterface interface {
	Time() time.Time
}

ClockInterface ...

It is used only for mock generating.

type ConnectionWrapper

type ConnectionWrapper struct {
	*amqp.Connection
}

ConnectionWrapper ...

func (ConnectionWrapper) Channel

func (wrapper ConnectionWrapper) Channel() (MessageBrokerChannel, error)

Channel ...

type ContextCancellerInterface

type ContextCancellerInterface interface {
	CancelContext()
}

ContextCancellerInterface ...

It is used only for mock generating.

type Dialer

type Dialer func(dsn string) (MessageBrokerConnection, error)

Dialer ...

type DialerInterface

type DialerInterface interface {
	Dial(dsn string) (MessageBrokerConnection, error)
}

DialerInterface ...

It is used only for mock generating.

type FailingMessageHandler

type FailingMessageHandler interface {
	HandleMessage(message amqp.Delivery) error
}

FailingMessageHandler ...

type IDGenerator

type IDGenerator func() (string, error)

IDGenerator ...

type IDGeneratorInterface

type IDGeneratorInterface interface {
	GenerateID() (string, error)
}

IDGeneratorInterface ...

It is used only for mock generating.

type JSONMessageHandler

type JSONMessageHandler struct {
	MessageHandler SpecificMessageHandler
}

JSONMessageHandler ...

func (JSONMessageHandler) HandleMessage

func (handler JSONMessageHandler) HandleMessage(message amqp.Delivery) error

HandleMessage ...

type Logger

type Logger interface {
	log.Logger
}

Logger ...

It is used only for mock generating.

type MessageBrokerChannel

type MessageBrokerChannel interface {
	Qos(prefetchCount int, prefetchSize int, global bool) error
	QueueDeclare(
		queueName string,
		durable bool,
		autoDelete bool,
		exclusive bool,
		noWait bool,
		arguments amqp.Table,
	) (amqp.Queue, error)
	Publish(
		exchange string,
		queueName string,
		mandatory bool,
		immediate bool,
		message amqp.Publishing,
	) error
	Get(
		queueName string,
		autoAcknowledge bool,
	) (message amqp.Delivery, ok bool, err error)
	Consume(
		queueName string,
		consumerName string,
		autoAcknowledge bool,
		exclusive bool,
		noLocal bool,
		noWait bool,
		arguments amqp.Table,
	) (<-chan amqp.Delivery, error)
	Cancel(consumerName string, noWait bool) error
	Close() error
}

MessageBrokerChannel ...

type MessageBrokerConnection

type MessageBrokerConnection interface {
	Channel() (MessageBrokerChannel, error)
	Close() error
}

MessageBrokerConnection ...

type MessageConsumer

type MessageConsumer struct {
	// contains filtered or unexported fields
}

MessageConsumer ...

func NewMessageConsumer

func NewMessageConsumer(
	client MessageConsumerClient,
	queue string,
	messageHandler MessageHandler,
) (MessageConsumer, error)

NewMessageConsumer ...

func (MessageConsumer) Start

func (consumer MessageConsumer) Start()

Start ...

func (MessageConsumer) StartConcurrently

func (consumer MessageConsumer) StartConcurrently(concurrencyFactor int)

StartConcurrently ...

func (MessageConsumer) Stop

func (consumer MessageConsumer) Stop() error

Stop ...

type MessageConsumerClient

type MessageConsumerClient interface {
	ConsumeMessages(queue string) (<-chan amqp.Delivery, error)
	CancelConsuming(queue string) error
}

MessageConsumerClient ...

type MessageHandler

type MessageHandler interface {
	HandleMessage(message amqp.Delivery)
}

MessageHandler ...

type MessageHandling

type MessageHandling int

MessageHandling ...

const (
	OnceMessageHandling MessageHandling = iota
	TwiceMessageHandling
)

...

type SpecificMessageHandler

type SpecificMessageHandler interface {
	MessageType() reflect.Type
	HandleMessage(message interface{}) error
}

SpecificMessageHandler ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL