rabbitmq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2022 License: MIT Imports: 6 Imported by: 0

README

go-rabbitmq

Wrapper of rabbitmq/amqp091-go that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐

Supported by Boot.dev

Deploy

Motivation

Streadway's AMQP library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.

Goal

The goal with go-rabbitmq is to still provide most all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly:

  • Automatic reconnection
  • Multithreaded consumers via a handler function
  • Reasonable defaults
  • Flow control handling
  • TCP block handling

⚙️ Installation

Inside a Go module:

go get github.com/prolicht-dev/go-rabbitmq

🚀 Quick Start Consumer

Default options
consumer, err := rabbitmq.NewConsumer(
    "amqp://guest:guest@localhost", rabbitmq.Config{},
    rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) rabbitmq.Action {
        log.Printf("consumed: %v", string(d.Body))
        // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
        return rabbitmq.Ack
    },
    "my_queue",
)
if err != nil {
    log.Fatal(err)
}
With options
consumer, err := rabbitmq.NewConsumer(
    "amqp://user:pass@localhost",
    rabbitmq.Config{},
    rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
		func(d rabbitmq.Delivery) rabbitmq.Action {
			log.Printf("consumed: %v", string(d.Body))
			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
			return rabbitmq.Ack
		},
		"my_queue",
		rabbitmq.WithConsumeOptionsConcurrency(10),
		rabbitmq.WithConsumeOptionsConsumerName(consumerName),
	)
if err != nil {
    log.Fatal(err)
}

🚀 Quick Start Publisher

Default options
publisher, err := rabbitmq.NewPublisher("amqp://user:pass@localhost", rabbitmq.Config{})
if err != nil {
    log.Fatal(err)
}
defer publisher.Close()
err = publisher.Publish([]byte("hello, world"), []string{"routing_key"})
if err != nil {
    log.Fatal(err)
}
With options
publisher, err := rabbitmq.NewPublisher(
    "amqp://user:pass@localhost",
    rabbitmq.Config{},
    // can pass nothing for no logging
    rabbitmq.WithPublisherOptionsLogging,
)
defer publisher.Close()
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish(
	[]byte("hello, world"),
	[]string{"routing_key"},
	rabbitmq.WithPublishOptionsContentType("application/json"),
	rabbitmq.WithPublishOptionsMandatory,
	rabbitmq.WithPublishOptionsPersistentDelivery,
	rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
    log.Fatal(err)
}

returns := publisher.NotifyReturn()
go func() {
    for r := range returns {
        log.Printf("message returned from server: %s", string(r.Body))
    }
}()

🚀 Quick Start Queue, Exchange and Binding Declaration

Consumer
consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost", rabbitmq.Config{})
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
		func(d rabbitmq.Delivery) rabbitmq.Action {
			log.Printf("consumed: %v", string(d.Body))
			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
			return rabbitmq.Ack
		},
		"my_queue",
		rabbitmq.WithConsumeDeclareOptions(
			rabbitmq.WithDeclareQueueDurable,
			rabbitmq.WithDeclareQueueQuorum,
			rabbitmq.WithDeclareExchangeName("events"),
			rabbitmq.WithDeclareExchangeKind("topic"),
			rabbitmq.WithDeclareExchangeDurable,
			rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}),
		),
	)
if err != nil {
    log.Fatal(err)
}
Publisher
publisher, err := rabbitmq.NewPublisher(
		"amqp://guest:guest@localhost", rabbitmq.Config{},
		rabbitmq.WithPublisherOptionsLogging,
		rabbitmq.WithPublisherDeclareOptions(
			rabbitmq.WithDeclareQueueName("my_queue"),
			rabbitmq.WithDeclareQueueDurable,
			rabbitmq.WithDeclareQueueQuorum,
			rabbitmq.WithDeclareExchangeName("events"),
			rabbitmq.WithDeclareExchangeKind("topic"),
			rabbitmq.WithDeclareExchangeDurable,
			rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}),
		),
	)
if err != nil {
    log.Fatal(err)
}
defer publisher.Close()

Other usage examples

See the examples directory for more ideas.

Stability

Note that the API is currently in v0. I don't plan on any huge changes, but there may be some small breaking changes before we hit v1.

💬 Contact

Twitter Follow

Submit an issue (above in the issues tab)

Transient Dependencies

My goal is to keep dependencies limited to 1, github.com/rabbitmq/amqp091-go.

👏 Contributing

I love help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable.

All pull requests should be submitted to the main branch.

Documentation

Index

Constants

View Source
const (
	Transient  uint8 = amqp.Transient
	Persistent uint8 = amqp.Persistent
)

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.

Variables

This section is empty.

Functions

func WithConsumeDeclareOptions

func WithConsumeDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*ConsumeOptions)

WithConsumeDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings before the consumer process starts.

func WithConsumeOptionsConcurrency

func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions)

WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages

func WithConsumeOptionsConsumerAutoAck

func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions)

WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false)

func WithConsumeOptionsConsumerExclusive

func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions)

WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.

func WithConsumeOptionsConsumerName

func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions)

WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given

func WithConsumeOptionsConsumerNoWait

func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions)

WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means it does not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.

func WithConsumeOptionsQOSGlobal

func WithConsumeOptionsQOSGlobal(options *ConsumeOptions)

WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection

func WithConsumeOptionsQOSPrefetch

func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions)

WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput. This doesn't affect the handler, messages are still processed one at a time.

func WithConsumerOptionsLogger

func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions)

WithConsumerOptionsLogger sets logging to a custom interface. Use WithConsumerOptionsLogging to just log to stdout.

func WithConsumerOptionsLogging

func WithConsumerOptionsLogging(options *ConsumerOptions)

WithConsumerOptionsLogging uses a default logger that writes to std out

func WithConsumerOptionsReconnectInterval

func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions)

WithConsumerOptionsReconnectInterval sets the interval at which the consumer will attempt to reconnect to the rabbit server

func WithDeclareBindingArgs

func WithDeclareBindingArgs(args Table) func(*DeclareOptions)

WithDeclareBindingArgs sets the arguments of the bindings to args. This function must be called after bindings have been defined, otherwise it has no effect.

func WithDeclareBindingNoWait

func WithDeclareBindingNoWait(options *DeclareOptions)

WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound the channel will not be closed with an error. This function must be called after bindings have been defined, otherwise it has no effect.

func WithDeclareBindings

func WithDeclareBindings(bindings []Binding) func(*DeclareOptions)

WithDeclareBindings sets the bindings that should be declared prior to other RabbitMQ actions are being executed. Only the settings will be validated if one of the bindings already exists on the server. Matching settings will result in no action, different settings will result in an error. If the 'Passive' property is set to false, missing bindings will be created on the server.

func WithDeclareBindingsForRoutingKeys

func WithDeclareBindingsForRoutingKeys(routingKeys []string) func(*DeclareOptions)

WithDeclareBindingsForRoutingKeys sets the bindings that should be declared prior to other RabbitMQ actions are being executed. This function must be called after the queue and exchange declaration settings have been set, otherwise this function has no effect.

func WithDeclareExchange

func WithDeclareExchange(settings *ExchangeOptions) func(*DeclareOptions)

WithDeclareExchange sets the exchange that should be declared prior to other RabbitMQ actions are being executed. Only the settings will be validated if the exchange already exists on the server. Matching settings will result in no action, different settings will result in an error. If the 'Passive' property is set to false, a missing exchange will be created on the server.

func WithDeclareExchangeArgs

func WithDeclareExchangeArgs(args Table) func(*DeclareOptions)

WithDeclareExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange.

func WithDeclareExchangeAutoDelete

func WithDeclareExchangeAutoDelete(options *DeclareOptions)

WithDeclareExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag.

func WithDeclareExchangeDurable

func WithDeclareExchangeDurable(options *DeclareOptions)

WithDeclareExchangeDurable returns a function that sets the binding exchange durable flag.

func WithDeclareExchangeInternal

func WithDeclareExchangeInternal(options *DeclareOptions)

WithDeclareExchangeInternal returns a function that sets the binding exchange internal flag.

func WithDeclareExchangeKind

func WithDeclareExchangeKind(kind string) func(*DeclareOptions)

WithDeclareExchangeKind returns a function that sets the binding exchange kind/type.

func WithDeclareExchangeName

func WithDeclareExchangeName(name string) func(*DeclareOptions)

WithDeclareExchangeName returns a function that sets the exchange name.

func WithDeclareExchangeNoDeclare

func WithDeclareExchangeNoDeclare(options *DeclareOptions)

WithDeclareExchangeNoDeclare returns a function that skips the declaration of the binding exchange. Use this setting if the exchange already exists and you don't need to declare it on consumer start.

func WithDeclareExchangeNoWait

func WithDeclareExchangeNoWait(options *DeclareOptions)

WithDeclareExchangeNoWait returns a function that sets the binding exchange noWait flag.

func WithDeclareQueue

func WithDeclareQueue(settings *QueueOptions) func(*DeclareOptions)

WithDeclareQueue sets the queue that should be declared prior to other RabbitMQ actions are being executed. Only the settings will be validated if the queue already exists on the server. Matching settings will result in no action, different settings will result in an error. If the 'Passive' property is set to false, a missing queue will be created on the server.

func WithDeclareQueueArgs

func WithDeclareQueueArgs(args Table) func(*DeclareOptions)

WithDeclareQueueArgs returns a function that sets the queue arguments.

func WithDeclareQueueAutoDelete

func WithDeclareQueueAutoDelete(options *DeclareOptions)

WithDeclareQueueAutoDelete sets the queue to auto delete, which means it will be deleted when there are no more consumers on it.

func WithDeclareQueueDurable

func WithDeclareQueueDurable(options *DeclareOptions)

WithDeclareQueueDurable sets the queue to durable, which means it won't be destroyed when the server restarts. It must only be bound to durable exchanges.

func WithDeclareQueueExclusive

func WithDeclareQueueExclusive(options *DeclareOptions)

WithDeclareQueueExclusive sets the queue to exclusive, which means it's are only accessible by the connection that declares it and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.

func WithDeclareQueueName

func WithDeclareQueueName(name string) func(*DeclareOptions)

WithDeclareQueueName returns a function that sets the queue name.

func WithDeclareQueueNoDeclare

func WithDeclareQueueNoDeclare(options *DeclareOptions)

WithDeclareQueueNoDeclare sets the queue to no declare, which means the queue will be assumed to be declared on the server, and thus only will be validated.

func WithDeclareQueueNoWait

func WithDeclareQueueNoWait(options *DeclareOptions)

WithDeclareQueueNoWait sets the queue to nowait, which means the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.

func WithDeclareQueueQuorum

func WithDeclareQueueQuorum(options *DeclareOptions)

WithDeclareQueueQuorum sets the queue a quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability.

func WithPublishOptionsAppID

func WithPublishOptionsAppID(appID string) func(*PublishOptions)

WithPublishOptionsAppID returns a function that sets the application id

func WithPublishOptionsContentEncoding

func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions)

WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e. "utf-8"

func WithPublishOptionsContentType

func WithPublishOptionsContentType(contentType string) func(*PublishOptions)

WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json"

func WithPublishOptionsCorrelationID

func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions)

WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier

func WithPublishOptionsExchange

func WithPublishOptionsExchange(exchange string) func(*PublishOptions)

WithPublishOptionsExchange returns a function that sets the exchange to publish to

func WithPublishOptionsExpiration

func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions)

WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a string value in milliseconds.

func WithPublishOptionsHeaders

func WithPublishOptionsHeaders(headers Table) func(*PublishOptions)

WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id"

func WithPublishOptionsImmediate

func WithPublishOptionsImmediate(options *PublishOptions)

WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the returns channel for you to handle

func WithPublishOptionsMandatory

func WithPublishOptionsMandatory(options *PublishOptions)

WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle

func WithPublishOptionsMessageID

func WithPublishOptionsMessageID(messageID string) func(*PublishOptions)

WithPublishOptionsMessageID returns a function that sets the message identifier

func WithPublishOptionsPersistentDelivery

func WithPublishOptionsPersistentDelivery(options *PublishOptions)

WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart. By default publishings are transient

func WithPublishOptionsPriority

func WithPublishOptionsPriority(priority uint8) func(*PublishOptions)

WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9

func WithPublishOptionsReplyTo

func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions)

WithPublishOptionsReplyTo returns a function that sets the reply to field

func WithPublishOptionsTimestamp

func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions)

WithPublishOptionsTimestamp returns a function that sets the timestamp for the message

func WithPublishOptionsType

func WithPublishOptionsType(messageType string) func(*PublishOptions)

WithPublishOptionsType returns a function that sets the message type name

func WithPublishOptionsUserID

func WithPublishOptionsUserID(userID string) func(*PublishOptions)

WithPublishOptionsUserID returns a function that sets the user id i.e. "user"

func WithPublisherDeclareOptions

func WithPublisherDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*PublisherOptions)

WithPublisherDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings before the publisher process starts.

func WithPublisherOptionsLogger

func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions)

WithPublisherOptionsLogger sets logging to a custom interface. Use WithPublisherOptionsLogging to just log to stdout.

func WithPublisherOptionsLogging

func WithPublisherOptionsLogging(options *PublisherOptions)

WithPublisherOptionsLogging sets logging to true on the consumer options and sets the

func WithPublisherOptionsReconnectInterval

func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *PublisherOptions)

WithPublisherOptionsReconnectInterval sets the interval at which the publisher will attempt to reconnect to the rabbit server

Types

type Action

type Action int

Action is an action that occurs after processed this delivery

const (
	// Ack default ack this msg after you have successfully processed this delivery.
	Ack Action = iota
	// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
	NackDiscard
	// NackRequeue deliver this message to a different consumer.
	NackRequeue
)

type Binding

type Binding struct {
	BindingOption
	QueueName    string
	ExchangeName string
	RoutingKey   string
}

Binding describes a queue binding to a specific exchange.

type BindingOption

type BindingOption struct {
	NoWait bool
	Args   Table
}

BindingOption are used to configure a queue bindings.

type Config

type Config amqp.Config

Config wraps amqp.Config Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.

type Confirmation

type Confirmation struct {
	amqp.Confirmation
	ReconnectionCount int
}

Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. Use NotifyPublish to consume these events. ReconnectionCount is useful in that each time it increments, the DeliveryTag is reset to 0, meaning you can use ReconnectionCount+DeliveryTag to ensure uniqueness

type ConsumeOptions

type ConsumeOptions struct {
	DeclareOptions
	QueueName         string
	Concurrency       int
	QOSPrefetch       int
	QOSGlobal         bool
	ConsumerName      string
	ConsumerAutoAck   bool
	ConsumerExclusive bool
	ConsumerNoWait    bool
	ConsumerNoLocal   bool
	ConsumerArgs      Table
}

ConsumeOptions are used to describe how a new consumer will be created.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer allows you to create and connect to queues for data consumption.

func NewConsumer

func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error)

NewConsumer returns a new Consumer connected to the given rabbitmq server

func (Consumer) Close

func (consumer Consumer) Close() error

Close cleans up resources and closes the consumer. The consumer is not safe for reuse

func (Consumer) StartConsuming

func (consumer Consumer) StartConsuming(
	handler Handler,
	queue string,
	optionFuncs ...func(*ConsumeOptions),
) error

StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s). The provided handler is called once for each message. If the provided queue doesn't exist, it will be created on the cluster

type ConsumerOptions

type ConsumerOptions struct {
	Logger            Logger
	ReconnectInterval time.Duration
}

ConsumerOptions are used to describe a consumer's configuration. Logger specifies a custom Logger interface implementation.

type DeclareOptions

type DeclareOptions struct {
	Queue    *QueueOptions
	Exchange *ExchangeOptions
	Bindings []Binding
}

DeclareOptions are used to describe how a new queues, exchanges the routing setup should look like.

func (*DeclareOptions) SetBindings

func (o *DeclareOptions) SetBindings(routingKeys []string, opt BindingOption)

SetBindings trys to generate bindings for the given routing keys and the queue and exchange options. If either Queue or Exchange properties are empty or no queue name is specified, no bindings will be set.

type Delivery

type Delivery struct {
	amqp.Delivery
}

Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.

type ExchangeOptions

type ExchangeOptions struct {
	Name       string
	Kind       string // possible values: empty string for default exchange or direct, topic, fanout
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Passive    bool // if false, a missing exchange will be created on the server
	Args       Table
}

ExchangeOptions are used to configure an exchange. If the Passive flag is set the client will only check if the exchange exists on the server and that the settings match, no creation attempt will be made.

type Handler

type Handler func(d Delivery) (action Action)

Handler defines the handler of each Delivery and return Action

type Logger

type Logger interface {
	Fatalf(string, ...interface{})
	Errorf(string, ...interface{})
	Warnf(string, ...interface{})
	Infof(string, ...interface{})
	Debugf(string, ...interface{})
	Tracef(string, ...interface{})
}

Logger is the interface to send logs to. It can be set using WithPublisherOptionsLogger() or WithConsumerOptionsLogger().

type PublishOptions

type PublishOptions struct {
	Exchange string
	// Mandatory fails to publish if there are no queues
	// bound to the routing key
	Mandatory bool
	// Immediate fails to publish if there are no consumers
	// that can ack bound to the queue on the routing key
	Immediate bool
	// MIME content type
	ContentType string
	// Transient (0 or 1) or Persistent (2)
	DeliveryMode uint8
	// Expiration time in ms that a message will expire from a queue.
	// See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
	Expiration string
	// MIME content encoding
	ContentEncoding string
	// 0 to 9
	Priority uint8
	// correlation identifier
	CorrelationID string
	// address to to reply to (ex: RPC)
	ReplyTo string
	// message identifier
	MessageID string
	// message timestamp
	Timestamp time.Time
	// message type name
	Type string
	// creating user id - ex: "guest"
	UserID string
	// creating application id
	AppID string
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers Table
}

PublishOptions are used to control how data is published

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher allows you to publish messages safely across an open connection

func NewPublisher

func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error)

NewPublisher returns a new publisher with an open channel to the cluster. If you plan to enforce mandatory or immediate publishing, those failures will be reported on the channel of Returns that you should setup a listener on. Flow controls are automatically handled as they are sent from the server, and publishing will fail with an error when the server is requesting a slowdown

func (Publisher) Close

func (publisher Publisher) Close() error

Close closes the publisher and releases resources The publisher should be discarded as it's not safe for re-use

func (*Publisher) NotifyPublish

func (publisher *Publisher) NotifyPublish() <-chan Confirmation

NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option

func (*Publisher) NotifyReturn

func (publisher *Publisher) NotifyReturn() <-chan Return

NotifyReturn registers a listener for basic.return methods. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.

func (*Publisher) Publish

func (publisher *Publisher) Publish(
	data []byte,
	routingKeys []string,
	optionFuncs ...func(*PublishOptions),
) error

Publish publishes the provided data to the given routing keys over the connection

type PublisherOptions

type PublisherOptions struct {
	DeclareOptions
	Logger            Logger
	ReconnectInterval time.Duration
}

PublisherOptions are used to describe a publisher's configuration. Logger is a custom logging interface.

type QueueOptions

type QueueOptions struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Passive    bool // if false, a missing queue will be created on the server
	Args       Table
}

QueueOptions are used to configure a queue. If the Passive flag is set the client will only check if the queue exists on the server and that the settings match, no creation attempt will be made.

type Return

type Return struct {
	amqp.Return
}

Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.

type Table

type Table map[string]interface{}

Table stores user supplied fields of the following types:

bool
byte
float32
float64
int
int16
int32
int64
nil
string
time.Time
amqp.Decimal
amqp.Table
[]byte
[]interface{} - containing above types

Functions taking a table will immediately fail when the table contains a value of an unsupported type.

The caller must be specific in which precision of integer it wishes to encode.

Use a type assertion when reading values from a table for type conversion.

RabbitMQ expects int32 for integer values.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL