chamqp

package module
v0.0.0-...-c8817f1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: MIT Imports: 9 Imported by: 0

README

Chamqp

Features

Chamqp is a small layer above rabbitmq/amqp091-go featuring auto-reconnect using exponential back-off with an upper bound. This is especially useful when running multiple services and network disconnect will happen sooner or later in production.

Getting started

Simply run in your project

$ go get github.com/Contargo/chamqp

Usage - classic way

Chamqp is built with the intention to be compatible to the underlying rabbitmq/amqp091-go package. This means instead of amqp.Dial() you have to use chamqp.Dial(). On the connection itself you then can use .Channel(). Declaring queues or exchanges is done in the same fashion as with the amqp091-go package.

See the following example to illustrate it:

conn, err := chamqp.Dial(applicatonConfig.AMQPUrl)
channel := conn.Channel()
channel.ExchangeDeclare("exchangeName", "topic", false, false, false, false, nil, errChan)
...

channel.Publish(
    "exchangeName,
    "routing.key",
    false,
    false,
    amqp.Publishing{
        ContentType: "contentType",
        Body: "payload",
    },
)

Usage with builder

Experimental - use at your own risk.

It's cumbersome to keep track about all the parameters especially when multiple boolean flags are used. Therefor we added a small implementation using the builder pattern.

BindQueue("testqueue").
    WithRoutinghKey("routingKey").
    WithExchangeDecl("exchange").
    WithNoWaitDecl(false).
    WithArgs(nil).
    WithErrorChannel(nil).
    Build(channel)

You can also use default values, so you don't have to type everything:

BindQueue("test").
    WithRoutinghKey("routing").
    WithExchangeDecl("exchangeName").
    Defaults().
    BuildSpec()

For further samples have a look at the _test.go files

Defaults are held in a public accessible variable:

  • queue_bind.Defaults
  • exchange-declare.Defaults
  • consume.Defaults

Getting started for development

Simply clone this repository

$ git clone https://github.com/Contargo/chamqp

Documentation

Index

Constants

View Source
const (
	AllowSelfTermination = true
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel represents an AMQP channel. Used as a context for valid message Exchange. Errors on methods with this Channel will be detected and the channel will recreate itself.

func (*Channel) Consume

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table, deliveryChan chan<- amqp.Delivery, errorChan chan<- error)

Consume immediately starts delivering queued messages.

func (*Channel) ConsumeWithSpec

func (ch *Channel) ConsumeWithSpec(spec ConsumeSpec)

func (*Channel) ExchangeDeclare

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table, errorChan chan<- error)

ExchangeDeclare declares an Exchange on the server. If the Exchange does not already exist, the server will create it. If the Exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.

func (*Channel) ExchangeDeclareWithSpec

func (ch *Channel) ExchangeDeclareWithSpec(spec ExchangeDeclareSpec)

func (*Channel) NotifyPublish

func (ch *Channel) NotifyPublish() chan amqp.Confirmation

func (*Channel) Publish

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

Publish sends a Publishing from the client to an Exchange on the server.

func (*Channel) PublishJSON

func (ch *Channel) PublishJSON(exchange, key string, mandatory, immediate bool, objectToBeSent interface{}) error

func (*Channel) PublishJSONWithProperties

func (ch *Channel) PublishJSONWithProperties(exchange, key string, mandatory, immediate bool, objectToBeSent interface{}, properties Properties) error

func (*Channel) PublishJsonAndWaitForResponse

func (ch *Channel) PublishJsonAndWaitForResponse(replyQueueName, correlationId string, response, request interface{}, exchange, key string, mandatory, immediate bool, responseTimeout time.Duration) error

func (*Channel) QueueBind

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table, errorChan chan<- error)

QueueBind binds an Exchange to a Queue so that publishings to the Exchange will be routed to the Queue when the publishing routing Key matches the binding routing Key.

func (*Channel) QueueBindWithSpec

func (ch *Channel) QueueBindWithSpec(q QueueBindSpec)

func (*Channel) QueueDeclare

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table, queueChan chan<- amqp.Queue, errorChan chan<- error)

QueueDeclare declares a Queue to hold messages and deliver to consumers. Declaring creates a Queue if it doesn't already exist, or ensures that an existing Queue matches the same parameters.

func (*Channel) QueueDeclareWithSpec

func (ch *Channel) QueueDeclareWithSpec(q QueueDeclareSpec)

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection manages the serialization and deserialization of frames from IO and dispatches the frames to the appropriate channel. All RPC methods and asynchronous Publishing, Delivery, Ack, Nack and Return messages are multiplexed on this channel. There must always be active receivers for every asynchronous message on this connection.

func Dial

func Dial(url string) *Connection

Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the handshake deadline to 30 seconds. After handshake, deadlines are cleared.

Use `NotifyError` to register a receiver for errors on the connection.

func DialBlocked

func DialBlocked(url string) (*Connection, error)

func DialTLS

func DialTLS(url string, config *tls.Config) *Connection

func DialTLSBlocked

func DialTLSBlocked(url string, config *tls.Config) (*Connection, error)

func (*Connection) Channel

func (c *Connection) Channel() *Channel

Channel opens a unique, concurrent server channel to process the bulk of AMQP messages. Any error from methods on this receiver will cause the Channel to recreate itself.

func (*Connection) ChannelWithConfirm

func (c *Connection) ChannelWithConfirm(noWait bool) *Channel

func (*Connection) Close

func (c *Connection) Close() error

Close requests and waits for the response to close the AMQP connection.

func (*Connection) ConnectionState

func (c *Connection) ConnectionState() tls.ConnectionState

func (*Connection) NotifyError

func (c *Connection) NotifyError(receiver chan error) chan error

NotifyError registers a listener for error events either initiated by an connect or close.

type ConsumeSpec

type ConsumeSpec struct {
	Queue        string
	Consumer     string
	DeliveryChan chan<- amqp.Delivery

	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
	ErrorChan chan<- error
}

type ExchangeDeclareSpec

type ExchangeDeclareSpec struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table

	ErrorChan chan<- error
}

type NotifyPublishSpec

type NotifyPublishSpec struct {
	// contains filtered or unexported fields
}

type Properties

type Properties struct {
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
	Priority        uint8     // 0 to 9
	CorrelationId   string    // correlation identifier
	ReplyTo         string    // address to reply to (ex: RPC)
	Expiration      string    // message expiration spec
	MessageId       string    // message identifier
	Timestamp       time.Time // message timestamp
	Type            string    // message type name
	UserId          string    // creating user id - ex: "guest"
	AppId           string    // creating application id
}

type QueueBindSpec

type QueueBindSpec struct {
	Name     string
	Key      string
	Exchange string
	NoWait   bool
	Args     amqp.Table

	ErrorChan chan<- error
}

type QueueDeclareSpec

type QueueDeclareSpec struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table

	QueueChan chan<- amqp.Queue
	ErrorChan chan<- error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL