amqp

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: MIT Imports: 8 Imported by: 0

README

go-amqp

Overview

go-amqp is an abstraction layer for the rabbitmq original library.

The rabbitmq original library is a very powerful and capable library. However, while using the library, we at delivery-much realized that it was a little too verbose to use.

The go-amqp library aims to facilitate the use of the rabbitmq library, implementing object-oriented interfaces for AMQP elements, making it possible to configure producers and consumers with much less and more readable code.

For instance, let's say you want to define an AMQP exchange and bind a queue to that exchange using the rabbitmq original library. You would probably do something like this:

conn, err := amqp.DialConfig("my-amqp-url", amqp.Config{})
if err != nil {
  return
}

ch, err := conn.Channel()
if err != nil {
  return
}

// define the exchange
err = ch.ExchangeDeclare(
  "my-exchange",
  "topic",
  true,
  false,
  false,
  true,
)
if err != nil {
  return
}

// define the queue
_, err = ch.QueueDeclare(
  "my-queue",
  true,
  false,
  true,
  true,
)
if err != nil {
  return
}

// bind the queue to the exchange
err = ch.QueueBind(
  "my-queue",
  "my-routing-key",
  "my-exchange",
  false,
)
if err != nil {
  return
}

This code is a little messy mainly because:

  • It is hard to understand directly the exchange and queue configuration since the configuration values are unnamed booleans ❌
  • There are way too many steps in the configuration, and those steps seem a little redundant ❌
  • Also, if you wanted to consume messages from that queue, you would need to implement your own message reading loop logic using channels ❌

Using the go-amqp library, you could do the same thing like this:

// start the client
client, err := goamqp.NewClient("my-amqp-url")
if err != nil {
  return
}

// start the exchange
e, err := client.StartExchange("my-exchange", amqp.ExchangeTypeTopic, amqp.ExchangeConfig{
  Durable: true,
  NoWait: true,
})
if err != nil {
  return
}

// define the queue and bind to the exchange using the routing-key
q, err := e.BindQueue("my-queue", "my-routing-key", amqp.QueueBindConfig{
  Durable: true,
  Exclusive: true,
  NoWait: true,
})
if err != nil {
  return
}

// define your consume logic
err := q.Consume(func(context.Context, Delivery) HandleResponse{ ... })
if err != nil {
  return
}

This code is a little better because:

  • The queue and exchange configurations are received in optional configuration objects, so the values are documented and named, making them easier to read ✅
  • The go-amqp library uses object orientation (exchanges and queues are defined as objects), so you can define your queues inside an exchange, which makes your code easier to reuse and understand ✅
  • The library mitigates many of the steps necessary to declare and consume your queues, and implements the message reading loop for you, so the whole process a lot more friendly ✅

This is just a sample of how the go-amqp library was made to make the developers' lives easier. The following topics in the documentation explain the library and its functionalities in much more detail.

Setup

To download go-amqp and add it to your project, just run:

$ go get github.com/delivery-much/go-amqp

And you're good to Go!

Starting your client

Starting your AMQP client using the go-amqp library is pretty easy. All you need to do is use the NewClient function passing your AMQP URL and a configuration.

Ex.:

import (
  goamqp "github.com/delivery-much/go-amqp"
)

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", amqp.Config{})
  if err != nil {
    return
  }
}

The NewClient function receives an AMQP URL and a configuration object. The expected configuration object is the same as the Config object for the rabbitmq original library, enabling the user to add extra configuration to the client connection.

The client represents the starting point for your AMQP configuration. It's the connection between your project and your AMQP server.

From the client you can:

  • Ping the server:
import (
  goamqp "github.com/delivery-much/go-amqp"
)

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  err = cl.Ping()
  if err != nil {
    fmt.Printf("AMQP client offline... %v\n", err)
    return
  }

  fmt.Println("AMQP client online!")
}
  • Close the connection:
import (
  goamqp "github.com/delivery-much/go-amqp"
)

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  err = cl.Close()
  if err != nil {
    fmt.Printf("Failed to close the AMQP client... %v\n", err)
    return
  }

  fmt.Println("AMQP client closed!")
}
  • And you can also create exchanges and queues to consume messages and producers to send messages, but more on that later.

Creating an exchange

After you have created your client, you can use the client's connection to create an exchange, using the StartExchange function.

Ex.:

import (
  goamqp "github.com/delivery-much/go-amqp"
)

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  e, err := client.StartExchange("my-exchange", goamqp.ExchangeTypeTopic, goamqp.ExchangeConfig{
    Durable: true,
    NoWait: true,
  })
  if err != nil {
    return
  }
}

The StartExchange function receives:

  • The exchange name.
  • The exchange type, which you can use one of the ExchangeType enums that go-amqp provides or use a string directly.
  • The exchange configuration object, which contains many available AMQP configuration values. (optional)

It returns the exchange object and an error if anything went wrong.

In the go-amqp library, exchanges are treated as objects. Each exchange instance has its own channel to publish messages. So, in a way, each exchange declaration is an instantiation of a communication channel between your project and the AMQP client.

If you desire multiple channels to handle concurrency, you can instantiate the same exchange more than once to create different channels.

Creating a queue

After you have started your exchange, you can use the exchange to define queues, using the BindQueue function.

Ex.:

import (
  goamqp "github.com/delivery-much/go-amqp"
)

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  e, err := client.StartExchange("my-exchange", goamqp.ExchangeTypeTopic, goamqp.ExchangeConfig{
    Durable: true,
    NoWait: true,
  })
  if err != nil {
    return
  }

  q, err := e.BindQueue("my-queue", "my-routing-key", goamqp.QueueBindConfig{
    Durable: true,
    Exclusive: true,
    NoWait: true,
  })
  if err != nil {
    return
  }
}

The BindQueue function receives:

  • The queue name.
  • The routing-key, which can be sent as an empty string ("") if you do not want to bind the queue using a routing-key.
  • The queue configuration object, which contains various available AMQP configuration values (optional).

go-amqp assumes that when you want to declare a queue, you intend to consume messages from that queue. Therefore, the queue needs to be bound to an exchange.

As a result, when you declare a queue using the go-amqp library, you declare and bind it directly inside an exchange, using the exchange's channel. This streamlined approach greatly simplifies the process required to declare and consume a queue.

As mentioned earlier, each exchange has its channel. If you desire multiple channels to handle concurrency, you can instantiate the same exchange and queues more than once to create different channels.

Consuming a queue

After you declared your queues, consuming messages becomes pretty easy.

All you need to do is use the Consume function to define the function that will be called when a message is received on that queue. But first, you need to define your handler function.

Handler function are functions that can be used to handle the amqp messages that are received on a queue. They have the following contract:

type HandlerFunc func(context.Context, Delivery) HandleResponse

They receive the message handling context and the amqp message Delivery, and return a HandleResponse.

The HandleResponse its an object that is used to tell the go-amqp library if the message handling was successfull or not, and if the message should be requeued or not.

// HandleResponse represents the response when handling a message
type HandleResponse struct {
	// Nack defines if the message should NOT be acknowledged, and should be requeued (default: false)
	Nack bool
	// Err is the error that could have occurred during the message handling (default: nil)
	Err error
}

Every time that a message is received on the specified queue, the go-amqp library will create an new empty context, and call the HandlerFunc provided using the context and the AMQP message. When the function finishes, the library will deal with the response accordingly.

Ex.:

import (
  goamqp "github.com/delivery-much/go-amqp"
)

func myHandlerFunction(ctx context.Context, d goamqp.Delivery) (res goamqp.HandleResponse) {
  fmt.Printf("Received Message: %s\n", d.MessageId)

  return
}

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  e, err := client.StartExchange("my-exchange", goamqp.ExchangeTypeTopic, goamqp.ExchangeConfig{
    Durable: true,
    NoWait: true,
  })
  if err != nil {
    return
  }

  q, err := e.BindQueue("my-queue", "my-routing-key", goamqp.QueueBindConfig{
    Durable: true,
    Exclusive: true,
    NoWait: true,
  })
  if err != nil {
    return
  }

  err = q.Consume(myHandlerFunction, goamqp.ConsumeConfig{
    ConsumerName: "my-consumer",
  })
  if err != nil {
    return
  }
}

The consume function receives:

  • The HandlerFunction.
  • The consume configuration object, which contains various available AMQP configuration values, including the consumer name (optional).

And returns an error if anything goes wrong.

Pre and post handle functions

The primary objective of the go-amqp library is to enhance the clarity and cleanliness of your AMQP code. To achieve this, several features have been integrated to empower developers in the effective handling of AMQP messages.

One of the core features of the library is the inclusion of Pre and Post handling functions.

Pre-handle functions are designed to execute before the main HandlerFunction is invoked. They have the following contract:

type PreHandleFunc func(*context.Context, *Delivery)

Similar to the HandlerFunction, PreHandleFuncs receive the message handling context and the message Delivery. However, they receive a pointer to those values, so the developer can alter the message handling context and even the message itself before handling it.

Post-handle functions are designed to execute after the main HandlerFunction is invoked. They have the following contract:

type PostHandleFunc func(context.Context, Delivery, HandleResponse)

Similar to the HandlerFunction, PostHandleFuncs receive the message handling context and the message Delivery. However, they also receive the HandleResponse that the main HandlerFunction returned.

Both the exchanges and queues have the Before and After functions, that can be used to define PostHandleFuncs and PreHandleFuncs to be called in the exchange or queue context.

When you define Pre or Post handle functions for an exchange, every queue associated with that exchange instance will invoke these functions.

On the other hand, when you define Pre or Post handle functions for a specific queue, only that particular queue will execute these designated functions.

Ex.:

import (
  "fmt"
  "context"
  goamqp "github.com/delivery-much/go-amqp"
)

func myHandlerFunction(ctx context.Context, d goamqp.Delivery) (res goamqp.HandleResponse) {
  fmt.Printf("Received Message: %s\n", d.MessageId)

  return
}

func preHandleFunction(ctx *context.Context, d *goamqp.Delivery) {
  *ctx = context.WithValue(*ctx, "id", "my handling id")
}

func postHandleFunction(ctx context.Context, d Delivery, res HandleResponse) {
  if res.Err != nil {
    fmt.Printf("Message with id %s failed to consume, %v\n", d.MessageId, res.Err)
  }
}

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  e, err := client.StartExchange("my-exchange", goamqp.ExchangeTypeTopic, goamqp.ExchangeConfig{
    Durable: true,
    NoWait: true,
  })
  if err != nil {
    return
  }

  e.Before(preHandleFunction) // preHandleFunction will be called before all the messages received on the exchange

  q, err := e.BindQueue("my-queue", "my-routing-key", goamqp.QueueBindConfig{
    Durable: true,
    Exclusive: true,
    NoWait: true,
  })
  if err != nil {
    return
  }

  q.After(postHandleFunction) // postHandleFunction will be called only after the messages received on this queue

  err = q.Consume(myHandlerFunction, goamqp.ConsumeConfig{
    ConsumerName: "my-consumer",
  })
  if err != nil {
    return
  }
}

Creating a message publisher

Creating a message publisher using the go-amqp library is really easy.

This library assumes that, when you want to publish a message, all you need is the exchange name (and a routing-key, if necessary). So using this line of thought, the process to create a publisher is really simplified.

After you have started your client, you can use the CreatePublisher function to start a new publisher.

Ex.:

import (
  goamqp "github.com/delivery-much/go-amqp"
)

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  pub, err := cl.CreatePublisher("my-exchange-name", false)
  if err != nil {
    return
  }
}

To create a publisher, all you need to do is specify the name of the exchange to which you want to publish messages. Every publisher instance will establish a new channel between your project and the AMQP server for message publication.

Additionally, there is an optional boolean value called NoWait that you can specify to manage message synchrony.

When you publish a message on the AMQP server, the client usually doesn't wait for a confirmation that the publishing was successful.

By setting the NoWait flag to false, the publishing channel will be created in confirmation mode, meaning the publisher will be able to wait for a confirmation from the server after a message is published.

On the other hand, when the NoWait flag is set to true, the publishing channel will not be able to wait for a confirmation from the server.

The default value for the NoWait flag is false.

Please note that not all AMQP servers support confirmation mode, so you may need to set the NoWait flag to true.

Publishing messages

Publish function

After you have created your publisher, you can use the Publish function to publish messages on the AMQP server.

Ex.:

import (
  goamqp "github.com/delivery-much/go-amqp"
)

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  pub, err := cl.CreatePublisher("my-exchange-name", false)
  if err != nil {
    return
  }

  body := []byte("my message!")

  err = pub.Publish(body, "my-routing-key", goamqp.PublishConfig{
    WaitConfirmation: true,
    MessageId: "my message's id"
  })
  if err != nil {
    return
  }
}

The publishing action receives the following parameters:

  • The message's body, wich must be a valid string in []bytesformat.
  • The routing key used to route the message. If no routing key is necessary, you can set this value as "".
  • An optional publish configuration, that has all the fields in the RabbitMQ original library Publishing struct. Also, in this configuration, you can use the WaitConfirmation flag to make the message publishing process wait for a confirmation from the server if your publisher was created in confirmation mode.

PublishJSON function

Same as the Publish function, but it encodes the received body as a JSON string before publishing. So the body needs to be a valid JSON representation (i.e.: A json string, a map[string]any value, or a struct with json tags)

Ex.:

import (
  goamqp "github.com/delivery-much/go-amqp"
)

type msgPayload struct {
  Field1 string `json:"field1"`
  Field2 string `json:"field2"`
}

func main() {
  cl, err := goamqp.NewClient("my-amqp-url", goamqp.Config{})
  if err != nil {
    return
  }

  pub, err := cl.CreatePublisher("my-exchange-name", false)
  if err != nil {
    return
  }

  body := msgPayload{
    Field1: "value1",
    Field2: "value2",
  }

  err = pub.Publish(body, "my-routing-key", goamqp.PublishConfig{
    WaitConfirmation: true,
    MessageId: "my message's id",
  })
  if err != nil {
    return
  }
}

Documentation

Index

Constants

View Source
const (
	// A direct exchange routes messages to queues based on a specified routing key.
	// When a message is published to a direct exchange with a particular routing key,
	// it will be delivered to the queue(s) that are bound to the same exchange with a matching routing key.
	ExchangeTypeDirect = ExchangeType("direct")

	// A fanout exchange broadcasts messages to all queues that are bound to it,
	// regardless of routing keys. It is a simple publish/subscribe mechanism where
	// all queues receive a copy of each message sent to the exchange.
	ExchangeTypeFanout = ExchangeType("fanout")

	// A topic exchange is more flexible than direct exchanges.
	// It routes messages to queues based on wildcard patterns in the routing keys.
	// Routing keys can include wildcards like '*' (matches one word) and '#' (matches zero or more words),
	// allowing for complex message routing based on patterns.
	ExchangeTypeTopic = ExchangeType("topic")

	// Headers exchanges use message header attributes to determine message routing, rather than routing keys.
	// The exchange will match headers against predefined criteria to determine which queues should receive the message.
	ExchangeTypeHeaders = ExchangeType("headers")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Authentication added in v1.1.0

type Authentication interface {
	Mechanism() string
	Response() string
}

Authentication interface provides a means for different SASL authentication mechanisms to be used during connection tuning.

type Client added in v1.1.0

type Client interface {
	// Close closes the AMQP connection
	Close() error
	// Ping checks if the AMQP connection is active
	Ping() error

	// StartExchange starts a AMQP exchange with its own channel and returns the exchange as an entity
	StartExchange(exchangeName string, exchangeType ExchangeType, conf ...ExchangeConfig) (Exchange, error)

	// CreatePublisher creates a new publisher with its own channel to publish messages on an exchange, given the exchange name.
	//
	// When the optional NoWait flag is set to true, the publisher will not be created in confirmation mode.
	// This means that when a message is published using this publisher, the library will not wait for confirmation a from the server.
	//
	// The NoWait flag should be used when your server does not support publishers in confirmation mode, or when you specifically want the publisher to be asynchronous.
	CreatePublisher(exchangeName string, NoWait ...bool) (Publisher, error)
}

Client represents a Client connection, and contains functions to manage the AMQP client

func NewClient

func NewClient(URL string, conf ...Config) (c Client, err error)

NewClient connects to the AMQP server using the provided configuration, and returns the AMQP Client.

type Config

type Config struct {
	// The SASL mechanisms to try in the client request, and the successful
	// mechanism used on the Connection object.
	// If SASL is nil, PlainAuth from the URL is used.
	SASL []Authentication

	// Vhost specifies the namespace of permissions, exchanges, queues and
	// bindings on the server.  Dial sets this to the path parsed from the URL.
	Vhost string

	ChannelMax int           // 0 max channels means 2^16 - 1
	FrameSize  int           // 0 max bytes means unlimited
	Heartbeat  time.Duration // less than 1s uses the server's interval

	// TLSClientConfig specifies the client configuration of the TLS connection
	// when establishing a tls transport.
	// If the URL uses an amqps scheme, then an empty tls.Config with the
	// ServerName from the URL is used.
	TLSClientConfig *tls.Config

	// Properties is table of properties that the client advertises to the server.
	// This is an optional setting - if the application does not set this,
	// the underlying library will use a generic set of client properties.
	Properties Table

	// Connection locale that we expect to always be en_US
	// Even though servers must return it as per the AMQP 0-9-1 spec,
	// we are not aware of it being used other than to satisfy the spec requirements
	Locale string

	// Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
	// then an AMQP connection handshake.
	// If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
	// used during TLS and AMQP handshaking.
	Dial func(network, addr string) (net.Conn, error)
}

Config represents the optional configuration that the user can provide when connecting to the AMQP client

type ConnectedStruct added in v1.1.0

type ConnectedStruct interface {
	// OnClose defines the function to execute when the AMQP channel for this struct is closed in any way.
	//
	// OnClose will start a new goroutine that listens to this connection 'closed' events.
	OnClose(f func(err *amqp.Error))
}

ConnectedStruct represents a struct that's connected to an AMQP channel

type ConsumeConfig

type ConsumeConfig struct {
	// When AutoAck is set to true, it means that as soon as a message is delivered to the consumer,
	// RabbitMQ automatically considers the message as acknowledged (ack) without the consumer having to send an explicit acknowledgment.
	//
	// default: false
	AutoAck bool

	// When a consumer is declared as Exclusive,
	// it means that no other consumer can access the same queue on the same channel/connection.
	// Exclusive consumers are often used for scenarios where you want to ensure that
	// only one consumer processes messages from a specific queue.
	//
	// default: false
	Exclusive bool

	// The NoLocal parameter, when set to true,
	// prevents a consumer from receiving messages that it publishes to the same connection.
	// In other words, it prevents consumers from consuming their own messages.
	//
	// default: false
	NoLocal bool

	// When NoWait is set to True,
	// it means that the method will not wait for a response from the server to confirm the consumption.
	// In other words, it makes the declaration non-blocking.
	// If any error occurs during the declaration, it won't be reported immediately.
	//
	// default: false
	NoWait bool

	// The name for the queue consumer.
	// When a consumer name is not provided, the library will generate one based on the queue information.
	ConsumerName string

	// When declaring an consumer in AMQP, you can include a set of optional arguments to customize its behavior
	// These arguments are provided as a collection of key-value pairs, where the keys represent specific configuration options,
	// and the values determine the settings for those options.
	Args Table
}

ConsumeConfig represents the configuration that can be provided when consuming a queue

type Delivery

type Delivery amqp.Delivery

Delivery represents a AMQP message that was received

type Exchange

type Exchange interface {
	ConnectedStruct
	// BindQueue declares a new queue on the exchange given a queue config and binds it to the exchange
	BindQueue(queueName, routingKey string, conf ...QueueBindConfig) (Queue, error)

	// Before adds functions that will be called in the exchange before the message handling
	Before(funcs ...PreHandleFunc)

	// After adds functions that will be called in the exchange after the message handling
	After(funcs ...PostHandleFunc)

	// Name returns the exchange name
	Name() string
	// PreHandleFuncs returns the pre handle funcs for the exchange
	PreHandleFuncs() []PreHandleFunc
	// PostHandleFuncs returns the post handle funcs for the exchange
	PostHandleFuncs() []PostHandleFunc
}

Exchange represents a AMQP message exchange

type ExchangeConfig

type ExchangeConfig struct {
	// When a exchange is declared as durable,
	// it means that RabbitMQ will make efforts to ensure that the component survives server restarts or failures.
	// Messages and configuration related to durable components are stored on disk.
	// This is useful when you want to ensure that important data is not lost in case of server failures.
	//
	// default: false
	Durable bool

	// An auto-delete exchange is automatically deleted by RabbitMQ once there are no consumers or bindings left for it.
	// This is often used for temporary components that are only needed for a specific duration or purpose.
	// It's a way to clean up resources automatically when they are no longer needed.
	//
	// default: false
	AutoDelete bool

	// When an exchange is declared as internal,
	// it means that it can't be directly published to by clients.
	// Internal exchanges are used for internal RabbitMQ mechanisms and cannot be used for normal publishing of messages.
	// They are useful for building advanced routing topologies within RabbitMQ.
	//
	// default: false
	Internal bool

	// When you declare a exchange with the "NoWait" option,
	// it means that the method will not wait for a response from the server to confirm the declaration.
	// This can improve declaration speed but comes with the trade-off that you won't receive an immediate response indicating success or failure.
	// It's often used when you want to declare components quickly and are willing to skip the confirmation step.
	//
	// default: false
	NoWait bool

	// When declaring an exchange in AMQP, you can include a set of optional arguments to customize the behavior of the exchange
	// These arguments are provided as a collection of key-value pairs, where the keys represent specific configuration options,
	// and the values determine the settings for those options.
	Args Table
}

ExchangeConfig represents the configuration for a rabbitmq exchange

type ExchangeType

type ExchangeType string

ExchangeType represents a exchange type that defines its behaviour

func (ExchangeType) ToString

func (t ExchangeType) ToString() string

ToString returns the string notation of the exchange type

type HandleResponse

type HandleResponse struct {
	// Nack defines if the message should NOT be acknowledged, and should be requeued (default: false)
	Nack bool
	// Err is the error that could have occurred during the message handling (default: nil)
	Err error
}

HandleResponse represents the response when handling a message

type HandlerFunc

type HandlerFunc func(context.Context, Delivery) HandleResponse

HandlerFunc represents a funcion that handles amqp messages

type PostHandleFunc

type PostHandleFunc func(context.Context, Delivery, HandleResponse)

PostHandleFunc represents a middleware function to be called after handling amqp messages.

It has access to a copy of the message handling context, the message, and the handle response.

type PreHandleFunc

type PreHandleFunc func(*context.Context, *Delivery)

PreHandleFunc represents a middleware function to be called before handling amqp messages.

It can alter the messaging context and even the message itself before anything is done

type PublishConfig

type PublishConfig struct {
	// When you set the mandatory flag to true while publishing a message,
	// it indicates that the message must be routed to at least one queue.
	// If the message cannot be routed to any queue, RabbitMQ will return the message to the publisher.
	// This flag is typically used when you want to ensure that your message is not lost and must be delivered to at least one queue.
	//
	// default: false
	Mandatory bool

	// When you set the immediate flag to true while publishing a message,
	// it indicates that the message should be delivered to a consumer as soon as possible.
	// If there are no available consumers to immediately accept the message, RabbitMQ will return the message to the publisher.
	//
	// default: false
	Imediate bool

	// Message publishing is, by default, an asynchronous event.
	// However, when the WaitConfirmation flag is set to true, and the publisher was created in confirmation mode,
	// the Publish method will wait for the server to return a response confirming that the message was indeed published.
	// It is important to note that this could potentially increase the message publishing time, depending on your server's latency.
	//
	// default: false
	WaitConfirmation bool

	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers Table

	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
	Priority        uint8     // 0 to 9
	CorrelationId   string    // correlation identifier
	ReplyTo         string    // address to to reply to (ex: RPC)
	Expiration      string    // message expiration spec
	MessageId       string    // message identifier
	Timestamp       time.Time // message timestamp
	Type            string    // message type name
	UserId          string    // creating user id - ex: "guest"
	AppId           string    // creating application id
}

PublishConfig represents the configuration to publish a message

type Publisher

type Publisher interface {
	ConnectedStruct
	// Publish publishes a message payload, in bytes format, on the publisher exchange.
	// The user can also provide a routing-key to publish the message and some extra configuration for that message, if needed.
	//
	// It is important to note that the message publishing, by default, is asynchronous.
	// However, you can make it synchronous by setting the WaitConfirmation flag from the PublishConfig as true.
	Publish(payload []byte, key string, conf ...PublishConfig) error

	// PublishJSON encodes the 'payload' param into a json string, and publishes it on the publisher exchange.
	// The user can also provide a routing-key to publish the message and some extra configuration for that message, if needed.
	//
	// It is important to note that the message publishing, by default, is asynchronous.
	// However, you can make it synchronous by setting the WaitConfirmation flag from the PublishConfig as true.
	PublishJSON(payload any, key string, conf ...PublishConfig) error
}

Publisher represents a AMQP message publisher

type Queue

type Queue interface {
	// Consume subscribes a consumer in the routing key to handle the messages.
	//
	// Consume will start a new goroutine that listens to message publishings
	// and handles them with the provided handler function
	Consume(handlerFn HandlerFunc, conf ...ConsumeConfig) error

	// Before adds functions that will be called in the queue before the message handling
	Before(funcs ...PreHandleFunc)

	// After adds functions that will be called in the queue after the message handling
	After(funcs ...PostHandleFunc)

	// Name returns the queue name
	Name() string
	// Exchange returns the Exchange that the queue is on
	Exchange() Exchange
	// RoutingKey returns the queue routing-key that was used to bind to the exchange
	RoutingKey() string
	// PreHandleFuncs returns the pre handle funcs for the queue
	PreHandleFuncs() []PreHandleFunc
	// PostHandleFuncs returns the post handle funcs for the queue
	PostHandleFuncs() []PostHandleFunc
}

Queue represents a AMQP queue

type QueueBindConfig

type QueueBindConfig struct {
	// When a queue is declared as durable,
	// it means that RabbitMQ will make efforts to ensure that the component survives server restarts or failures.
	// Messages and configuration related to durable components are stored on disk.
	// This is useful when you want to ensure that important data is not lost in case of server failures.
	//
	// default: false
	Durable bool

	// An auto-delete queue is automatically deleted by RabbitMQ once there are no consumers or bindings left for it.
	// This is often used for temporary components that are only needed for a specific duration or purpose.
	// It's a way to clean up resources automatically when they are no longer needed.
	//
	// default: false
	AutoDelete bool

	// When you declare a queue as exclusive,
	// it means that the queue can only be accessed by the current connection.
	// The queue will be automatically deleted by RabbitMQ when the connection that declared it is closed.
	// Exclusive queues are often used in scenarios where you want to ensure that a queue is used only by a single consumer or where you want to create a temporary work queue for a particular client or session.
	//
	// When you declare a queue as not exclusive, multiple connections can access it concurrently.
	// The queue will not be automatically deleted when the connection that declared it is closed.
	// It will remain in RabbitMQ until it is explicitly deleted or until the server decides to remove it due to other factors, such as lack of use or expiration.
	// Non-exclusive queues are typically used for more persistent, shared message processing scenarios where multiple consumers may need to access the same queue.
	//
	// default: false
	Exclusive bool

	// When you declare a queue with the "NoWait" option,
	// it means that the method will not wait for a response from the server to confirm the declaration.
	// This can improve declaration speed but comes with the trade-off that you won't receive an immediate response indicating success or failure.
	// It's often used when you want to declare components quickly and are willing to skip the confirmation step.
	//
	// default: false
	NoWait bool

	// When declaring an queue in RabbitMQ, you can include a set of optional arguments to customize its behavior
	// These arguments are provided as a collection of key-value pairs, where the keys represent specific configuration options,
	// and the values determine the settings for those options.
	Args Table
}

QueueConfig represents the configuration for binding a queue to an exchange

type Table

type Table map[string]any

Table represents the amqp extra arguments when executing AMQP actions.

When you do something using AMQP, like declaring an exchanges and queues, or publishing messages, you can include a set of optional arguments to customize its behavior.

These arguments are provided as a collection of key-value pairs, where the keys represent specific configuration options, and the values determine the settings for those options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL