README

msgr | AMQP Client

GoDoc Build Status Coverage Status

Summary

This go module is a tiny wrapper around the streadway/amqp, the standard low level golang client for rabbitmq, the open source message broker.

msgr abstracts away a few of the complexities of rabbitmq and exposes a slightly friendlier interface, intended for simple applications.

Documentation

For details on all the functionality in this library, see the GoDoc documentation.

Installation

This project supports modules and Go 1.13+. Add msgr to your own project the usual way -

go get github.com/piquette/msgr

Usage example

Producer
// Instantiate and connect to the server.
conf := &msgr.Config{
    URI:     "amqp://localhost:5672",
    Channel: "queue_name",
}
producer = msgr.ConnectP(conf)
defer producer.Close()

// Send a message.
Enqueue:
{
    success := producer.Post([]byte("hi")])
    if !success {
        // Retry for all eternity.
        log.Println("could not enqueue msg")
        time.Sleep(time.Second * 3)
        goto Enqueue
    }
}
Consumer
// Instantiate and connect to the server.
conf := &msgr.Config{
    URI:     "amqp://localhost:5672",
    Channel: "queue_name",
}
consumer = msgr.ConnectC(conf)
defer consumer.Close()

// Receive messages.
open, messages := s.Consumer.Accept()
if !open {
    return
}
// Range over the messages chan.
for recv := range messages {
    // Got one.
    fmt.Println(string(recv.Body)) // prints 'hi'.

    // Don't forget to acknowledge.
    recv.Ack(false)
}

Contributing

This modules is a work in progress and needs a lot of refinement. Please submit an issue if you need help!

Documentation

Overview

Package msgr is the latest iteration of the message queueing package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	Dial() error
	Close()
}

Client defines the base message queueing behavior

type Config

type Config struct {
	URI     string
	Channel string
}

Config is the queue configuration settings.

type Consumer

type Consumer interface {
	Accept() (bool, <-chan amqp.Delivery)
	Close()
}

Consumer defines the base message consuming behavior

type Producer

type Producer interface {
	Post([]byte) bool
	Close()
}

Producer defines the base message producing behavior

type QueueClient

type QueueClient struct {
	// contains filtered or unexported fields
}

QueueClient implements the client behavior.

func (*QueueClient) Close

func (c *QueueClient) Close()

Close closes a connection.

func (*QueueClient) Dial

func (c *QueueClient) Dial()

Dial makes a connection.

type QueueConsumer

type QueueConsumer struct {
	*QueueClient
}

QueueConsumer implements Consumer.

func ConnectC

func ConnectC(conf *Config) *QueueConsumer

ConnectC returns a consumer.

func (*QueueConsumer) Accept

func (c *QueueConsumer) Accept() (bool, <-chan amqp.Delivery)

Accept delivers a stream of messgaes.

type QueueProducer

type QueueProducer struct {
	*QueueClient
}

QueueProducer implements Producer.

func ConnectP

func ConnectP(conf *Config) *QueueProducer

ConnectP returns a producer.

func (*QueueProducer) Post

func (p *QueueProducer) Post(msg []byte) bool

Post sends a message.

Directories

Path Synopsis