clarimq

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2024 License: Apache-2.0 Imports: 12 Imported by: 1

README

INFO

This library is a wrapper around the Go AMQP Client Library.

This library includes support for:

  • structured logging to multiple writers
  • automatic recovery
  • retry functionality
  • publishing cache

Supported Go Versions

This library supports the most recent Go, currently 1.21.1

INSTALL

go get github.com/Clarilab/clarimq

USAGE

The Connection

First a connection instance needs to be initialized. The connection can be configured by passing needed connection options. Also there is the possibility to fully customize the configuration by passing a ConnectionOptions struct with the corresponding option. To ensure correct escaping of the URI, the SettingsToURI function can be used to convert a ConnectionSettings struct to a valid URI.

Note:

Although it is possible to publish and consume with one connection, it is best practice to use two separate connections for publisher and consumer activities.

Example Connection with some options:
conn, err := clarimq.NewConnection("amqp://user:password@localhost:5672/", 
	clarimq.WithConnectionOptionConnectionName("app-name-connection"),
	// more options can be passed
)
if err != nil {
	// handle error
}
Example Connection with custom options:
connectionSettings := &clarimq.ConnectionSettings{
	UserName: "username",
	Password: "password",
	Host:     "host",
	Port:     5672,
}

connectionOptions := &clarimq.ConnectionOptions{
	Config: &clarimq.Config{
		ChannelMax:      0,
		FrameSize:       0,
		Heartbeat:       0,
		TLSClientConfig: &tls.Config{},
		Properties:      map[string]interface{}{},
		Locale:          "",
	},
	PrefetchCount:     1,
	RecoveryInterval: 1,
},

conn, err := clarimq.NewConnection(clarimq.SettingsToURI(connectionSettings), 
	clarimq.WithCustomConnectionOptions(connectionOptions),
)
if err != nil {
	// handle error
}

When the connection is no longer needed, it should be closed to conserve resources.

Example
if err := conn.Close(); err != nil {
	// handle error
}

Errors

The "NotifyErrors()" method provides a channel that returns any errors that may happen concurrently. Mainly custom errors of types clarimq.AMQPError and clarimq.RecoveryFailedError are returned.

Example
handleErrors := func(errChan <-chan error) {
	for err := range errChan {
		if err == nil {
			return
		}

		var amqpErr *clarimq.AMQPError
		var recoveryFailed *clarimq.RecoveryFailedError

		switch {
		case errors.As(err, &amqpErr):
			fmt.Println(amqpErr) // handle amqp error
 
		case errors.As(err, &recoveryFailed):
			fmt.Println(recoveryFailed) // handle recoveryFailed error

		default:
			panic(err) // handle all other errors
		}
	}
}

conn, err := clarimq.NewConnection(clarimq.SettingsToURI(settings))
if err != nil {
	// handle error
}

go handleFailedRecovery(conn.NotifyErrors())

Publish messages

To publish messages a publisher instance needs to be created. A previously created connection must be handed over to the publisher.

The publisher can be configured by passing needed connector options. Also there is the possibility to fully customize the configuration by passing a PublishOptions struct with the corresponding option.

Example
publisher, err := clarimq.NewPublisher(conn,
	clarimq.WithPublishOptionAppID("my-application"),
	clarimq.WithPublishOptionExchange("my-exchange"),
	// more options can be passed
)
if err != nil {
	// handle error
}

The publisher can then be used to publish messages. The target can be a queue name, or a topic if the publisher is configured to publish messages to an exchange.

Example Simple publish:
if err := publisher.Publish(context.Background(), "my-target", "my-message"); err != nil {
	// handle error
}

Optionally the PublishWithOptions method can be used to configure the publish options just for this specific publish. The Method also gives the possibility to publish to multiple targets at once.

Example Publish with options:
if err := publisher.PublishWithOptions(context.Background(), []string{"my-target-1","my-target-2"}, "my-message",
	clarimq.WithPublishOptionMessageID("99819a3a-388f-4199-b7e6-cc580d85a2e5"),
	clarimq.WithPublishOptionTracing("7634e958-1509-479e-9246-5b80ad8fc64c"),
); err != nil {
	// handle error
}

Consume Messages

To consume messages a consumer instance needs to be created. A previously created connection must be handed over to the consumer.

The consumer can be configured by passing needed consume options. Also there is the possibility to fully customize the configuration by passing a ConsumeOptions struct with the corresponding option.

Example
consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
		clarimq.WithConsumerOptionConsumerName("my-consumer"),
	// more options can be passed
)
if err != nil {
	// handle error
}

The consumer can be used to declare exchanges, queues and queue-bindings:

Example
consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
	clarimq.WithConsumerOptionConsumerName("my-consumer"),
	clarimq.WithExchangeOptionDeclare(true),
	clarimq.WithExchangeOptionKind(clarimq.ExchangeTopic),
	clarimq.WithExchangeOptionName("my-exchange"),
	clarimq.WithQueueOptionDeclare(false), // is enabled by default, can be used to disable the default behavior
	clarimq.WithConsumerOptionBinding(
		clarimq.Binding{
			RoutingKey: "my-routing-key",
		},
	),
	// more options can be passed
)
if err != nil {
	// handle error
}

The consumer can be closed to stop consuming if needed. The consumer does not need to be explicitly closed for a graceful shutdown if its connection is closed afterwards. However when using the retry functionality without providing a connection, the consumer must be closed for a graceful shutdown of the retry connection to conserve resources.

Example
if err := consumer.Close(); err != nil {
	// handle error
}

Logging:

Structured logging is supported with the golang "log/slog" package. A text- or json-logger can be specified with the desired log level. The logs are written to a io.Writer that also can be specified.

Note: Multiple loggers can be specified!

Example
jsonBuff := new(bytes.Buffer)
textBuff := new(bytes.Buffer)

conn, err := clarimq.NewConnection(connectionSettings, 
	clarimq.WithConnectionOptionTextLogging(os.Stdout, slog.LevelInfo),
	clarimq.WithConnectionOptionTextLogging(textBuff, slog.LevelWarn),
	clarimq.WithConnectionOptionJSONLogging(jsonBuff, slog.LevelDebug),
)
if err != nil {
	// handle error
}

Return Handler:

When publishing mandatory messages, they will be returned if it is not possible to route the message to the given destination. A return handler can be specified to handle the the return. The return contains the original message together with some information such as an error code and an error code description.

If no return handler is specified a log will be written to the logger at warn level.

Example
returnHandler := func(r clarimq.Return) {
	// handle the return
}

conn, err := clarimq.NewConnection(connectionSettings, 
	clarimq.WithConnectionOptionReturnHandler(
		clarimq.ReturnHandler(returnHandler),
	),
)
if err != nil {
	// handle error
}

Recovery:

This library provides an automatic recovery with build-in exponential back-off functionality. When the connection to the broker is lost, the recovery will automatically try to reconnect. You can adjust the parameters of the back-off algorithm:

Example
conn, err := clarimq.NewConnection(settings,
	clarimq.WithConnectionOptionRecoveryInterval(2),    // default is 1 second
	clarimq.WithConnectionOptionBackOffFactor(3),        // default is 2
	clarimq.WithConnectionOptionMaxRecoveryRetries(16), // default is 10
)
if err != nil {
	// handle error
}

For the case the maximum number of retries is reached, a custom error of type RecoveryFailedError will be send to the error channel.

Publishing Cache:

To prevent loosing messages from being published while the broker has downtime / the client is recovering, the Publishing Cache can be used to cache the messages and publish them as soon as the client is fully recovered. The cache itself is an interface that can be implemented to the users needs. For example it could be implemented to use a redis store or any other storage of choice.

Note: This feature will only work effectively if durable queues/exchanges are used!

When the Publishing Cache is set, the "Publish" and "PublishWithOptions" methods will return an clarimq.ErrPublishFailedChannelClosedCached error which can be checked and handled to the users needs.

When the Publishing Cache is not set, the "Publish" and "PublishWithOptions" methods will return an clarimq.ErrPublishFailedChannelClosed error which can be checked and handled to the users needs.

To ensure a clean cache (when using an external cache like f.e. redis) the publisher should be closed when exiting. This will call the "Flush()" method of the Publishing Cache implementation. This step is optional and it up to the user to decide.

When implementing the publishing cache, it must be properly protected from concurrent access by multiple publisher instances to avoid race conditions.

Hint: The "cache" sub-package provides a simple "in-memory-cache" implementation, that can be used for testing, but could also be used in production.

Example
publisher, err := clarimq.NewPublisher(publishConn,
	clarimq.WithPublisherOptionPublishingCache(cache.NewBasicMemoryCache()),
)
if err != nil {
	// handle error
}

defer func() {
	if err := publisher.Close(); err != nil {
		// handle error
	}
}()

if err = b.publisher.PublishWithOptions(context.Background(), "my-target", "my-message",); err != nil {
	switch {
		case errors.Is(err, clarimq.ErrPublishFailedChannelClosedCached):
			return nil // message has been cached
		case errors.Is(err, clarimq.ErrPublishFailedChannelClosed):
			return err
		default:
			panic(err)
	}
}

Retry:

This library includes a retry functionality with a dead letter exchange and dead letter queues. To use the retry, some parameters have to be set:

Example
consumeConn, err := clarimq.NewConnection(clarimq.SettingsToURI(settings))
if err != nil {
	// handle error
}

retryOptions := &clarimq.RetryOptions{
	RetryConn: publishConn,
	Delays: []time.Duration{
		time.Second,
		time.Second * 2,
		time.Second * 3,
		time.Second * 4,
		time.Second * 5,
	},
	MaxRetries: 5,
	Cleanup:    true, // only set this to true if you want to remove all retry related queues and exchanges when closing the consumer
},

consumer, err := clarimq.NewConsumer(consumeConn, queueName, handler,
		clarimq.WithConsumerOptionDeadLetterRetry(retryOptions),
	)
if err != nil {
	// handle error
}

It is recommended to provide a separate publish connection for the retry functionality. If no connection is specified, a separate connection is established internally.

For each given delay a separate dead letter queue is declared. When a delivery is nacked by the consumer, it is republished via the delay queues one after another until it is acknowledged or the specified maximum number of retry attempts is reached.

External packages

Go AMQP Client Library

Documentation

Index

Constants

View Source
const (
	// Constant for RabbitMQ's default exchange (direct exchange).
	ExchangeDefault string = amqp.DefaultExchange
	// Constant for standard AMQP 0-9-1 direct exchange type.
	ExchangeDirect string = amqp.ExchangeDirect
	// Constant for standard AMQP 0-9-1 fanout exchange type.
	ExchangeFanout string = amqp.ExchangeFanout
	// Constant for standard AMQP 0-9-1 topic exchange type.
	ExchangeTopic string = amqp.ExchangeTopic
	// Constant for standard AMQP 0-9-1 headers exchange type.
	ExchangeHeaders string = amqp.ExchangeHeaders
)
View Source
const (
	ArgDLX string = "x-dead-letter-exchange"
	ArgDLK string = "x-dead-letter-routing-key"
	ArgTTL string = "x-message-ttl"
)

Variables

View Source
var ErrCacheNotSet = fmt.Errorf("publishing cache is not set")
View Source
var ErrHealthyConnection = errors.New("connection is healthy, no need to recover")

ErrHealthyConnection occurs when a manual recovery is triggered but the connection persists.

View Source
var ErrInvalidConnection = errors.New("invalid connection")

ErrInvalidConnection occurs when an invalid connection is passed to a publisher or a consumer.

View Source
var ErrMaxRetriesExceeded = errors.New("max retries exceeded")

ErrMaxRetriesExceeded occurs when the maximum number of retries exceeds.

View Source
var ErrNoActiveConnection = errors.New("no active connection to broker")

ErrNoActiveConnection occurs when there is no active connection while trying to get the failed recovery notification channel.

View Source
var ErrPublishFailedChannelClosed = errors.New("channel is closed")

ErrPublishFailedChannelClosed occurs when the channel is accessed while being closed.

View Source
var ErrPublishFailedChannelClosedCached = errors.New("channel is closed: publishing was cached")

ErrPublishFailedChannelClosedCached occurs when the channel is accessed while being closed but publishing was cached.

Functions

func SettingsToURI

func SettingsToURI(settings *ConnectionSettings) string

SettingsToURI can be used to convert a ConnectionSettings struct to a valid AMQP URI to ensure correct escaping.

Types

type AMQPError added in v0.3.0

type AMQPError amqp.Error

AMQPError is a custom error type that wraps amqp errors.

func (*AMQPError) Error added in v0.3.0

func (e *AMQPError) Error() string

type Action

type Action int

Action is an action that occurs after processed this delivery.

const (
	// Ack default ack this msg after you have successfully processed this delivery.
	Ack Action = iota
	// NackDiscard the message will be dropped or delivered to a broker configured dead-letter queue.
	NackDiscard
	// NackRequeue deliver this message to a different consumer.
	NackRequeue
	// Message acknowledgement is left to the user using the msg.Ack() method.
	Manual
)

type Binding

type Binding struct {
	*BindingOptions
	RoutingKey   string
	QueueName    string
	ExchangeName string
}

Binding describes the binding of a queue to a routing key to an exchange.

type BindingOptions

type BindingOptions struct {
	// Are used by plugins and broker-specific features such as message TTL, queue length limit, etc.
	Args Table
	// If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception.
	NoWait bool
	// If true, the binding will be declared if it does not already exist.
	Declare bool
}

BindingOptions describes the options a binding can have.

type Config

type Config amqp.Config

Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(uri string, options ...ConnectionOption) (*Connection, error)

NewConnection creates a new connection.

Must be closed with the Close() method to conserve resources!

func (*Connection) Close

func (c *Connection) Close() error

Close gracefully closes the connection to the broker.

func (*Connection) DecodeDeliveryBody

func (c *Connection) DecodeDeliveryBody(delivery Delivery, v any) error

DecodeDeliveryBody can be used to decode the body of a delivery into v.

func (*Connection) Name added in v1.2.0

func (c *Connection) Name() string

Name returns the name of the connection if specified, otherwise returns an empty string.

func (*Connection) NotifyErrors added in v0.3.0

func (c *Connection) NotifyErrors() <-chan error

NotifyErrors returns a channel that will return an errors that happen concurrently.

func (*Connection) Recover added in v0.3.0

func (c *Connection) Recover() error

Recover can be used to manually start the recovery.

func (*Connection) RemoveBinding

func (c *Connection) RemoveBinding(queueName string, routingKey string, exchangeName string, args Table) error

RemoveBinding removes a binding between an exchange and queue matching the key and arguments.

It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.

func (*Connection) RemoveExchange

func (c *Connection) RemoveExchange(name string, ifUnused bool, noWait bool) error

RemoveExchange removes the named exchange from the broker. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.

When ifUnused is true, the broker will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the broker does not delete it but close the channel with an exception instead. Set this to true if you are not the sole owner of the exchange.

When noWait is true, do not wait for a broker confirmation that the exchange has been deleted. Failing to delete the channel could close the channel. Add a NotifyClose listener to respond to these channel exceptions.

func (*Connection) RemoveQueue

func (c *Connection) RemoveQueue(name string, ifUnused bool, ifEmpty bool, noWait bool) (int, error)

RemoveQueue removes the queue from the broker including all bindings then purges the messages based on broker configuration, returning the number of messages purged.

When ifUnused is true, the queue will not be deleted if there are any consumers on the queue. If there are consumers, an error will be returned and the channel will be closed.

When ifEmpty is true, the queue will not be deleted if there are any messages remaining on the queue. If there are messages, an error will be returned and the channel will be closed.

func (*Connection) Renew added in v1.3.0

func (c *Connection) Renew(uri ...string) error

Renew can be used to establish a new connection. If new URI is provided, it will be used to renew the connection instead of the current URI.

func (*Connection) SetBackOffFactor added in v1.1.0

func (c *Connection) SetBackOffFactor(factor int)

SetBackOffFactor sets the exponential back-off factor.

Default: 2.

func (*Connection) SetDecoder added in v1.1.0

func (c *Connection) SetDecoder(decoder JSONDecoder)

SetReturnHandler provides possibility to set the json decoder.

func (*Connection) SetEncoder added in v1.1.0

func (c *Connection) SetEncoder(encoder JSONEncoder)

SetReturnHandler provides possibility to set the json encoder.

func (*Connection) SetLoggers added in v1.1.0

func (c *Connection) SetLoggers(loggers ...Logger)

SetLoggers provides possibility to add loggers.

func (*Connection) SetMaxRecoveryRetries added in v1.1.0

func (c *Connection) SetMaxRecoveryRetries(maxRetries int)

SetMaxRecoveryRetries sets the limit for maximum retries.

Default: 10.

func (*Connection) SetRecoveryInterval added in v1.1.0

func (c *Connection) SetRecoveryInterval(interval time.Duration)

SetRecoveryInterval sets the recovery interval.

Default: 1s.

func (*Connection) SetReturnHandler added in v1.1.0

func (c *Connection) SetReturnHandler(returnHandler ReturnHandler)

SetReturnHandler provides possibility to add a return handler.

type ConnectionOption

type ConnectionOption func(*ConnectionOptions)

ConnectionOption is an option for a Connection.

func WithConnectionOptionAMQPConfig

func WithConnectionOptionAMQPConfig(config *Config) ConnectionOption

WithConnectionOptionAMQPConfig sets the amqp.Config that will be used to create the connection.

Warning: this will override any values set in the connection config.

func WithConnectionOptionBackOffFactor

func WithConnectionOptionBackOffFactor(factor int) ConnectionOption

WithConnectionOptionBackOffFactor sets the exponential back-off factor.

Default: 2.

func WithConnectionOptionConnectionName

func WithConnectionOptionConnectionName(name string) ConnectionOption

WithConnectionOptionConnectionName sets the name of the connection.

func WithConnectionOptionDecoder

func WithConnectionOptionDecoder(decoder JSONDecoder) ConnectionOption

WithConnectionOptionDecoder sets the decoder that will be used to decode messages.

func WithConnectionOptionEncoder

func WithConnectionOptionEncoder(encoder JSONEncoder) ConnectionOption

WithConnectionOptionEncoder sets the encoder that will be used to encode messages.

func WithConnectionOptionLoggers added in v1.2.0

func WithConnectionOptionLoggers(loggers ...Logger) ConnectionOption

WithConnectionOptionLoggers adds multiple loggers.

func WithConnectionOptionMaxRecoveryRetries added in v0.3.0

func WithConnectionOptionMaxRecoveryRetries(maxRetries int) ConnectionOption

WithConnectionOptionMaxRecoveryRetries sets the limit for maximum retries.

Default: 10.

func WithConnectionOptionPrefetchCount

func WithConnectionOptionPrefetchCount(count int) ConnectionOption

WithConnectionOptionPrefetchCount sets the number of messages that will be prefetched.

func WithConnectionOptionRecoveryInterval added in v0.3.0

func WithConnectionOptionRecoveryInterval(interval time.Duration) ConnectionOption

WithConnectionOptionRecoveryInterval sets the initial recovery interval.

Default: 1s.

func WithConnectionOptionReturnHandler

func WithConnectionOptionReturnHandler(returnHandler ReturnHandler) ConnectionOption

WithConnectionOptionReturnHandler sets an Handler that can be used to handle undeliverable publishes.

When a publish is undeliverable from being mandatory, it will be returned and can be handled by this return handler.

func WithCustomConnectionOptions

func WithCustomConnectionOptions(options *ConnectionOptions) ConnectionOption

WithCustomConnectionOptions sets the connection options.

It can be used to set all connection options at once.

type ConnectionOptions

type ConnectionOptions struct {
	ReturnHandler

	Config *Config

	PrefetchCount      int
	RecoveryInterval   time.Duration
	MaxRecoveryRetries int
	BackOffFactor      int
	// contains filtered or unexported fields
}

ConnectionOptions are used to describe how a new connection will be created.

type ConnectionSettings

type ConnectionSettings struct {
	// UserName contains the username of the broker user.
	UserName string
	// Password contains the password of the broker user.
	Password string
	// Host contains the hostname or ip of the broker.
	Host string
	// Post contains the port number the broker is listening on.
	Port int
}

ConnectionSettings holds settings for a broker connection.

func (*ConnectionSettings) ToURI added in v1.3.0

func (c *ConnectionSettings) ToURI() string

ToURI returns the URI representation of the ConnectionSettings. Includes url escaping for safe usage.

type ConsumeOption

type ConsumeOption func(*ConsumeOptions)

ConsumeOption is an option for a Consumer.

func WithBindingOptionCustomBinding added in v0.2.0

func WithBindingOptionCustomBinding(binding Binding) ConsumeOption

WithBindingOptionCustomBinding adds a new binding to the queue which allows you to set the binding options on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to the zero value. If you want to declare your bindings for example, be sure to set Declare=true.

func WithConsumerOptionConsumerAutoAck

func WithConsumerOptionConsumerAutoAck(autoAck bool) ConsumeOption

WithConsumerOptionConsumerAutoAck sets the auto acknowledge property of the consumer.

Default: false.

func WithConsumerOptionConsumerExclusive

func WithConsumerOptionConsumerExclusive(exclusive bool) ConsumeOption

WithConsumerOptionConsumerExclusive sets the exclusive property of this consumer, which means the broker will ensure that this is the only consumer from this queue. When exclusive is false, the broker will fairly distribute deliveries across multiple consumers.

Default: false.

func WithConsumerOptionConsumerName

func WithConsumerOptionConsumerName(consumerName string) ConsumeOption

WithConsumerOptionConsumerName sets the name of the consumer.

If unset a random name will be given.

func WithConsumerOptionDeadLetterRetry

func WithConsumerOptionDeadLetterRetry(options *RetryOptions) ConsumeOption

WithConsumerOptionDeadLetterRetry enables the dead letter retry.

For each `delay` a dead letter queue will be declared.

After exceeding `maxRetries` the delivery will be dropped.

func WithConsumerOptionHandlerQuantity

func WithConsumerOptionHandlerQuantity(concurrency int) ConsumeOption

WithConsumerOptionHandlerQuantity sets the number of message handlers, that will run concurrently.

func WithConsumerOptionNoWait

func WithConsumerOptionNoWait(noWait bool) ConsumeOption

WithConsumerOptionNoWait sets the exclusive no-wait property of this consumer, which means it does not wait for the broker to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.

Default: false.

func WithConsumerOptionRoutingKey

func WithConsumerOptionRoutingKey(routingKey string) ConsumeOption

WithConsumerOptionRoutingKey binds the queue to a routing key with the default binding options.

func WithCustomConsumeOptions

func WithCustomConsumeOptions(options *ConsumeOptions) ConsumeOption

WithCustomConsumeOptions sets the consumer options.

It can be used to set all consumer options at once.

func WithExchangeOptionArgs

func WithExchangeOptionArgs(args Table) ConsumeOption

WithExchangeOptionArgs adds optional args to the exchange.

func WithExchangeOptionAutoDelete

func WithExchangeOptionAutoDelete(autoDelete bool) ConsumeOption

WithExchangeOptionAutoDelete sets whether the exchange is an auto-delete exchange.

Default: false.

func WithExchangeOptionDeclare

func WithExchangeOptionDeclare(declare bool) ConsumeOption

WithExchangeOptionDeclare sets whether the exchange should be declared on startup if it doesn't already exist.

Default: false.

func WithExchangeOptionDurable

func WithExchangeOptionDurable(durable bool) ConsumeOption

WithExchangeOptionDurable sets whether the exchange is a durable exchange.

Default: false.

func WithExchangeOptionInternal

func WithExchangeOptionInternal(internal bool) ConsumeOption

WithExchangeOptionInternal sets whether the exchange is an internal exchange.

Default: false.

func WithExchangeOptionKind

func WithExchangeOptionKind(kind string) ConsumeOption

WithExchangeOptionKind ensures the queue is a durable queue.

func WithExchangeOptionName

func WithExchangeOptionName(name string) ConsumeOption

WithExchangeOptionName sets the exchange name.

func WithExchangeOptionNoWait

func WithExchangeOptionNoWait(noWait bool) ConsumeOption

WithExchangeOptionNoWait sets whether the exchange is a no-wait exchange.

Default: false.

func WithExchangeOptionPassive

func WithExchangeOptionPassive(passive bool) ConsumeOption

WithExchangeOptionPassive sets whether the exchange is a passive exchange.

Default: false.

func WithQueueOptionArgs

func WithQueueOptionArgs(args Table) ConsumeOption

WithQueueOptionArgs adds optional args to the queue.

func WithQueueOptionAutoDelete

func WithQueueOptionAutoDelete(autoDelete bool) ConsumeOption

WithQueueOptionAutoDelete sets whether the queue is an auto-delete queue.

Default: false.

func WithQueueOptionDeclare

func WithQueueOptionDeclare(declare bool) ConsumeOption

WithQueueOptionDeclare sets whether the queue should be declared upon startup if it doesn't already exist.

Default: true.

func WithQueueOptionDurable

func WithQueueOptionDurable(durable bool) ConsumeOption

WithQueueOptionDurable sets whether the queue is a durable queue.

Default: false.

func WithQueueOptionExclusive

func WithQueueOptionExclusive(exclusive bool) ConsumeOption

WithQueueOptionExclusive sets whether the queue is an exclusive queue.

Default: false.

func WithQueueOptionNoWait

func WithQueueOptionNoWait(noWait bool) ConsumeOption

WithQueueOptionNoWait sets whether the queue is a no-wait queue.

Default: false.

func WithQueueOptionPassive

func WithQueueOptionPassive(passive bool) ConsumeOption

WithQueueOptionPassive sets whether the queue is a passive queue.

Default: false.

func WithQueueOptionPriority

func WithQueueOptionPriority(maxPriority Priority) ConsumeOption

WithQueueOptionPriority if set a priority queue will be declared with the given maximum priority.

func WithQueueOptionQuorum added in v0.2.0

func WithQueueOptionQuorum() ConsumeOption

WithQueueOptionQuorum sets the queue quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability.

type ConsumeOptions

type ConsumeOptions struct {
	ConsumerOptions *ConsumerOptions
	QueueOptions    *QueueOptions
	ExchangeOptions *ExchangeOptions
	RetryOptions    *RetryOptions
	Bindings        []Binding
	// The number of message handlers, that will run concurrently.
	HandlerQuantity int
}

ConsumeOptions are used to describe how a new consumer will be configured.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a consumer for AMQP messages.

func NewConsumer

func NewConsumer(conn *Connection, queueName string, handler HandlerFunc, options ...ConsumeOption) (*Consumer, error)

NewConsumer creates a new Consumer instance. Options can be passed to customize the behavior of the Consumer.

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops consuming messages from the subscribed queue.

When using the dead letter retry with enabled cleanup, the consumer must be closed to perform the cleanup.

type ConsumerOptions

type ConsumerOptions struct {
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Args Table
	// The name of the consumer / consumer-tag.
	Name string
	// Auto client acknowledgment for each message.
	AutoAck bool
	// Ensures that this is the sole consumer from the queue.
	Exclusive bool
	// If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception.
	NoWait bool
}

ConsumerOptions are used to configure the consumer.

type Delivery

type Delivery struct {
	amqp.Delivery
}

Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the broker to a consumer from Channel. Consume or Channel.Get.

type DeliveryMode

type DeliveryMode uint8

The delivery mode of a message can be either transient or persistent.

const (
	// TransientDelivery indicates that the message should be published as transient message.
	TransientDelivery DeliveryMode = iota + 1
	// PersistentDelivery indicates that the message should be published as persistent message.
	PersistentDelivery
)

type ExchangeOptions

type ExchangeOptions struct {
	// Are used by plugins and broker-specific features such as message TTL, queue length limit, etc.
	Args Table
	// Exchange name.
	Name string
	// Exchange type. Possible values: empty string for default exchange or direct, topic, fanout
	Kind string
	// If true, the exchange survives broker restart.
	Durable bool
	// If true, the exchange is deleted when last queue is unbound from it.
	AutoDelete bool
	// If yes, clients cannot publish to this exchange directly. It can only be used with exchange to exchange bindings.
	Internal bool
	// If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception.
	NoWait bool
	// If false, a missing exchange will be created on the broker.
	Passive bool
	// If true, the exchange will be created only if it does not already exist.
	Declare bool
}

ExchangeOptions are used to configure an exchange. If the Passive flag is set the client will only check if the exchange exists on the broker and that the settings match, no creation attempt will be made.

type HandlerFunc

type HandlerFunc func(d *Delivery) Action

HandlerFunc defines the handler of each Delivery and return Action.

type JSONDecoder

type JSONDecoder func(data []byte, v any) error

JSONDecoder parses JSON-encoded data and stores the result in the value pointed to by v. If v is nil or not a pointer, Unmarshal returns an InvalidUnmarshalError.

type JSONEncoder

type JSONEncoder func(v any) ([]byte, error)

JSONEncoder returns the JSON encoding of v.

type Logger added in v1.2.0

type Logger interface {
	Debug(msg string, args ...any)
	Error(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
}

Logger is an interface that is be used for log messages.

type Priority

type Priority uint8

Priority of a message can be either no priority, lowest, low, medium, high or highest.

const (
	// NoPriority indicates that the message should be published with no priority.
	NoPriority Priority = iota
	// LowestPriority indicates that the message should be published with lowest priority.
	LowestPriority
	// LowPriority indicates that the message should be published with low priority.
	LowPriority
	// NormalPriority indicates that the message should be published with normal priority.
	MediumPriority
	// HighPriority indicates that the message should be published with high priority.
	HighPriority
	// HighestPriority indicates that the message should be published with highest priority.
	HighestPriority
)

type PublishOptions

type PublishOptions struct {
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers Table
	// Message timestamp.
	Timestamp time.Time
	// Exchange name.
	Exchange string
	// MIME content type.
	ContentType string
	// Expiration time in ms that a message will expire from a queue.
	// See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
	Expiration string
	// MIME content encoding.
	ContentEncoding string
	// Correlation identifier.
	CorrelationID string
	// Address to reply to (ex: RPC).
	ReplyTo string
	// Message identifier.
	MessageID string
	// Message type name.
	Type string
	// Creating user id - default: "guest".
	UserID string
	// creating application id.
	AppID string
	// Mandatory fails to publish if there are no queues
	// bound to the routing key.
	Mandatory bool
	// Message priority level from 1 to 5 (0 == no priority).
	Priority Priority
	// Transient (0 or 1) or Persistent (2).
	DeliveryMode DeliveryMode
}

PublishOptions are used to control how data is published.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is a publisher for AMQP messages.

func NewPublisher

func NewPublisher(conn *Connection, options ...PublisherOption) (*Publisher, error)

Creates a new Publisher instance. Options can be passed to customize the behavior of the Publisher.

func (*Publisher) Close added in v0.3.0

func (publisher *Publisher) Close() error

Close closes the Publisher.

When using the publishing cache, the publisher must be closed to clear the cache.

func (*Publisher) Publish

func (publisher *Publisher) Publish(ctx context.Context, target string, data any) error

Publish publishes a message with the publish options configured in the Publisher.

target can be a queue name for direct publishing or a routing key.

func (*Publisher) PublishCachedMessages added in v0.3.0

func (publisher *Publisher) PublishCachedMessages(ctx context.Context, cacheLen int) error

func (*Publisher) PublishWithOptions

func (publisher *Publisher) PublishWithOptions(ctx context.Context, targets []string, data any, options ...PublisherOption) error

PublishWithOptions publishes a message to one or multiple targets.

Targets can be a queue names for direct publishing or routing keys.

Options can be passed to override the default options just for this publish.

type PublisherOption added in v0.3.0

type PublisherOption func(*PublisherOptions)

PublisherOption is an option for a Publisher.

func WithCustomPublishOptions

func WithCustomPublishOptions(options *PublisherOptions) PublisherOption

WithCustomPublishOptions sets the publish options.

It can be used to set all publisher options at once.

func WithPublishOptionAppID

func WithPublishOptionAppID(appID string) PublisherOption

WithPublishOptionAppID sets the application id.

func WithPublishOptionContentEncoding

func WithPublishOptionContentEncoding(contentEncoding string) PublisherOption

WithPublishOptionContentEncoding sets the content encoding, i.e. "utf-8".

func WithPublishOptionContentType

func WithPublishOptionContentType(contentType string) PublisherOption

WithPublishOptionContentType sets the content type, i.e. "application/json".

func WithPublishOptionDeliveryMode

func WithPublishOptionDeliveryMode(deliveryMode DeliveryMode) PublisherOption

WithPublishOptionDeliveryMode sets the message delivery mode. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during broker restart. By default publishing's are transient.

func WithPublishOptionExchange

func WithPublishOptionExchange(exchange string) PublisherOption

WithPublishOptionExchange sets the exchange to publish to.

func WithPublishOptionExpiration

func WithPublishOptionExpiration(expiration string) PublisherOption

WithPublishOptionExpiration sets the expiry/TTL of a message. As per RabbitMq spec, it must be a. string value in milliseconds.

func WithPublishOptionHeaders

func WithPublishOptionHeaders(headers Table) PublisherOption

WithPublishOptionHeaders sets message header values, i.e. "msg-id".

func WithPublishOptionMandatory

func WithPublishOptionMandatory(mandatory bool) PublisherOption

WithPublishOptionMandatory sets whether the publishing is mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle.

Default: false.

func WithPublishOptionMessageID

func WithPublishOptionMessageID(messageID string) PublisherOption

WithPublishOptionMessageID sets the message identifier.

func WithPublishOptionPriority

func WithPublishOptionPriority(priority Priority) PublisherOption

WithPublishOptionPriority sets the content priority from 0 to 9.

func WithPublishOptionReplyTo

func WithPublishOptionReplyTo(replyTo string) PublisherOption

WithPublishOptionReplyTo sets the reply to field.

func WithPublishOptionTimestamp

func WithPublishOptionTimestamp(timestamp time.Time) PublisherOption

WithPublishOptionTimestamp sets the timestamp for the message.

func WithPublishOptionTracing

func WithPublishOptionTracing(correlationID string) PublisherOption

WithPublishOptionTracing sets the content correlation identifier.

func WithPublishOptionType

func WithPublishOptionType(messageType string) PublisherOption

WithPublishOptionType sets the message type name.

func WithPublishOptionUserID

func WithPublishOptionUserID(userID string) PublisherOption

WithPublishOptionUserID sets the user id e.g. "user".

func WithPublisherOptionPublishingCache added in v0.3.0

func WithPublisherOptionPublishingCache(cache PublishingCache) PublisherOption

WithPublisherOptionPublishingCache enables the publishing cache.

An implementation of the PublishingCache interface must be provided.

type PublisherOptions added in v0.3.0

type PublisherOptions struct {
	// PublishingCache is the publishing cache.
	PublishingCache PublishingCache
	// PublishingOptions are the options for publishing messages.
	PublishingOptions *PublishOptions
}

PublisherOptions are the options for a publisher.

type Publishing added in v0.3.0

type Publishing interface {
	ID() string
	GetTargets() []string
	GetData() any
	GetOptions() *PublishOptions
}

Publishing is an interface for messages that are published to a broker.

type PublishingCache added in v0.3.0

type PublishingCache interface {
	// Put adds a publishing to the cache.
	Put(p Publishing) error
	// PopAll gets all publishing's from the cache and removes them.
	PopAll() ([]Publishing, error)
	// Len returns the number of publishing in the cache.
	Len() int
	// Flush removes all publishing's from the cache.
	Flush() error
}

PublishingCache is an interface for a cache of messages that could not be published due to a missing broker connection.

type QueueOptions

type QueueOptions struct {
	// Are used by plugins and broker-specific features such as message TTL, queue length limit, etc.
	Args Table

	// If true, the queue will survive a broker restart.
	Durable bool
	// If true, the queue is deleted when last consumer unsubscribes.
	AutoDelete bool
	// If true, the queue is used by only one connection and will be deleted when that connection closes.
	Exclusive bool
	// If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception.
	NoWait bool
	// If false, a missing queue will be created on the broker.
	Passive bool
	// If true, the queue will be declared if it does not already exist.
	Declare bool
	// contains filtered or unexported fields
}

QueueOptions are used to configure a queue. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception.

type RecoveryFailedError added in v0.3.0

type RecoveryFailedError struct {
	Err            error
	ConnectionName string
}

ErrRecoveryFailed occurs when the recovery failed after a connection loss.

func (*RecoveryFailedError) Error added in v0.3.0

func (e *RecoveryFailedError) Error() string

Error implements the Error method of the error interface.

type RetryOptions

type RetryOptions struct {
	// Is used to handle the retries on a separate connection.
	// If not specified, a connection will be created.
	RetryConn *Connection
	// The delays which a message will be exponentially redelivered with.
	Delays []time.Duration
	// The maximum number of times a message will be redelivered.
	MaxRetries int64
	// When enabled all retry related queues and exchanges associated when the consumer gets closed.
	//
	// Warning: Exiting messages on the retry queues will be purged.
	Cleanup bool
	// contains filtered or unexported fields
}

RetryOptions are used to describe how the retry will be configured.

type Return

type Return amqp.Return

Return captures a flattened struct of fields returned by the broker when a publishing is unable to be delivered due to the `mandatory` flag set and no route found.

type ReturnHandler

type ReturnHandler func(Return)

type Table

type Table map[string]any

Table stores user supplied fields of the following types:

bool
byte
float32
float64
int
int16
int32
int64
nil
string
time.Time
amqp.Decimal
amqp.Table
[]byte
[]interface{} - containing above types

Functions taking a table will immediately fail when the table contains a value of an unsupported type.

The caller must be specific in which precision of integer it wishes to encode.

Use a type assertion when reading values from a table for type conversion.

RabbitMQ expects int32 for integer values.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL