simpleamqp

package module
v0.0.0-...-19899e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2023 License: MIT Imports: 8 Imported by: 5

README

simpleamqp

Build Status GoDoc License

simpleamqp is a very opinionated minimal wrapper around the AMQP Go library with the following features:

  • Publish events to a exchange
  • Consume events from a exchange
  • Try to be always running
  • Best effort when sending or receiving the message

Build

You need a Go runtime installed in your system which supports modules. A nice way to have multiple Go versions and switch easily between them is the g application.

A Makefile is available, so you only need to run:

make build

Two binaries will be built, a publisher and a consumer.

Running tests

Load environment variables to set the BROKER_URI environment variable.

source dev/env_develop

Start a RabbitMQ service with default configuration (specified in /dev/env_develop).

make start_dependencies

Run tests. They will only work if the RabbitMQ container is up.

make test

Execution example

Make sure that the environment variables are loaded before executing each command, so they can read the BROKER_URI. Also that you have a RabbitMQ container up and running with those parameters.

In one terminal, execute:

./consumer

In another one, execute:

./publisher

Now you should see how an event is sent periodically by the publisher, and read by the consumer.

Additional info

Publish to a exchange
Features
  • Reconnect when a message can't be delivered
  • Message buffer to avoid blocking the publisher
  • Discard messages when the message buffer is full due to a connection problem
  • Compress messages based on "compress" header (gzip)
Known Issues
  • When there is a connection problem, some messages can be lost
Unimplemented features
  • Exchange options not configurable
  • Message headers and characteristics (delivery mode, ttl, ...) not configurable
Receive messages from a exchange
Features
  • Receive using various routing keys
  • main option for the queue configurables
  • Reconnect in case of connection error
  • Reconnect when no messages received in a configurable amount of time (to detect some kind of connection problems at NATED networks where the NATED connection expires)
  • Decompress messages based on "compress" header (gzip)
Known Issues
  • When there is a connection problem, some messages can be lost
Unimplemented features
  • Exchange options not configurable

Documentation

Index

Constants

View Source
const (
	COMPRESS_HEADER = "compress"
)

Variables

View Source
var DefaultQueueOptions = QueueOptions{
	Durable:   true,
	Delete:    false,
	Exclusive: false,
}

DefaultQueueOptions define the default options when creating queues: durable, not autodelete and not exclusive

Functions

This section is empty.

Types

type AMQPConsumer

type AMQPConsumer interface {
	Receive(exchange string, routingKeys []string, queue string, queueOptions QueueOptions, queueTimeout time.Duration) chan AmqpMessage
	ReceiveWithoutTimeout(exchange string, routingKeys []string, queue string, queueOptions QueueOptions) chan AmqpMessage
}

AMQPConsumer represents an AMQP consumer. Used to receive messages with or without timeout

type AMQPPublisher

type AMQPPublisher interface {
	Publish(string, []byte, ...map[string]interface{})
	PublishWithTTL(string, []byte, int, ...map[string]interface{})
}

AMQPPublisher represents an AMQP Publisher that can publish messages with or without TTL

type AmqpConsumer

type AmqpConsumer struct {
	// contains filtered or unexported fields
}

AmqpConsumer holds the brokerURI

func NewAmqpConsumer

func NewAmqpConsumer(brokerURI string) *AmqpConsumer

NewAmqpConsumer returns an AMQP Consumer

func (*AmqpConsumer) Receive

func (client *AmqpConsumer) Receive(exchange string, routingKeys []string, queue string, queueOptions QueueOptions, queueTimeout time.Duration) chan AmqpMessage

Receive Return a AmqpMessage channel to receive messages using a given queue connected to the exchange with one ore more routing keys Autoreconnect on error or when we have no message after queueTimeout expired. Use 0 when not timeout is required. The function declares the queue

func (*AmqpConsumer) ReceiveWithoutTimeout

func (client *AmqpConsumer) ReceiveWithoutTimeout(exchange string, routingKeys []string, queue string, queueOptions QueueOptions) chan AmqpMessage

ReceiveWithoutTimeout the same behavior that Receive method, but without using a timeout for receiving from the queue

type AmqpMessage

type AmqpMessage struct {
	Exchange   string
	RoutingKey string
	Body       string
}

AmqpMessage struct

type AmqpPublisher

type AmqpPublisher struct {
	// contains filtered or unexported fields
}

AmqpPublisher holds the brokerURI, exchange name and channel where to submit messages to be publish to rabbitmq

func NewAmqpPublisher

func NewAmqpPublisher(brokerURI, exchange string) *AmqpPublisher

NewAmqpPublisher returns an AmqpPublisher

func (*AmqpPublisher) Publish

func (publisher *AmqpPublisher) Publish(routingKey string, message []byte, headers ...map[string]interface{})

Publish publish a message using the given routing key

func (*AmqpPublisher) PublishWithTTL

func (publisher *AmqpPublisher) PublishWithTTL(routingKey string, message []byte, ttl int, headers ...map[string]interface{})

PublishWithTTL publish a message waiting the given TTL

type QueueOptions

type QueueOptions struct {
	Durable   bool
	Delete    bool
	Exclusive bool
}

QueueOptions holds the flags when declaring a queue: durable, delete, exclusive

Directories

Path Synopsis
examples
Package mocks represent the AMQPConsumer mocks Code generated by mockery v1.0.0
Package mocks represent the AMQPConsumer mocks Code generated by mockery v1.0.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL