amqp

package module
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2023 License: MIT Imports: 13 Imported by: 0

README

amqp Go Reference

The AMQP (Advanced Message Queuing Protocol) package in Go is a wrapper for amqp091-go, offering a specific focus on stable and secure connection management. This package provides a high-level interface for interacting with RabbitMQ, emphasizing reliability and safety in connection handling. Key features include automatic reconnection strategies, a simplified API for creating consumers and publishers, and graceful connection closure. By wrapping amqp091-go with stability and safety in mind, this package facilitates robust and secure messaging in Go applications. Explore the documentation to leverage the power of AMQP with confidence in your projects.

Goals

  • Reliable Reconnection Management: Develop a robust reconnection mechanism with a specific connection and channel manager running in the background. Ensure seamless and automatic re-establishment of connections in case of interruptions or failures.

  • Concurrent Safe Message Consumption: Implement concurrent-safe methods for consuming messages, allowing multiple consumers to handle messages concurrently without conflicts. Ensure thread-safety in message consumption to support high-concurrency scenarios.

  • Custom Publish Method with Retry Support: Design a custom publish method to provide flexibility in message publishing. Incorporate retry support in the publish mechanism to automatically retry failed message deliveries, enhancing message reliability.

  • Consumer with Fault Tolerance: Develop a fault-tolerant consumer that can gracefully handle errors and exceptions during message processing. Implement mechanisms to handle and recover from faults, ensuring continuous and reliable message consumption.

  • Auto Encoder (JSON, GOB, ProtoBuf): Automatically encode your message body with any type upon publishing. Then, upon consumption, automatically decode the message body into your variable."

Install

go get -u github.com/Ja7ad/amqp

Example Consumer

package main

import (
	"fmt"
	"github.com/Ja7ad/amqp"
	"github.com/Ja7ad/amqp/types"
	"log"
)

type Person struct {
	Name string `json:"name"`
	Age  int    `json:"age"`
}

type Greeting struct {
	Msg string `json:"msg"`
}

const (
	routingKeyPerson   = "person"
	routingKeyGreeting = "greeting"
)

func main() {
	done := make(chan struct{})

	b, err := amqp.New("uri")
	if err != nil {
		log.Fatal(err)
	}

	con, err := b.Consumer(
		&types.Exchange{
			Name:       "test",
			Kind:       types.Topic,
			Declare:    true,
			Passive:    false,
			Durable:    true,
			AutoDelete: false,
			Internal:   false,
			NoWait:     false,
			Arguments:  nil,
		},
		&types.Queue{
			Name:       "test",
			Declare:    true,
			Durable:    true,
			Passive:    false,
			Exclusive:  false,
			AutoDelete: false,
			NoWait:     false,
			Arguments:  nil,
		},
		&types.Consumer{
			Name:      "test1",
			AutoAck:   false,
			Exclusive: false,
			NoLocal:   false,
			NoWait:    false,
			Arguments: nil,
		},
		[]*types.RoutingKey{
			{
				Key:     "greeting",
				Declare: true,
			},
			{
				Key:     "person",
				Declare: true,
			},
		},
		handler,
		amqp.WithConcurrentConsumer(10),
	)

	if err := con.Start(); err != nil {
		log.Fatal(err)
	}

	<-done
}

func handler(routingKey string, msgFunc func(vPtr any) (types.Delivery, error)) types.Action {
	switch routingKey {
	case routingKeyPerson:
		person := new(Person)
		msg, err := msgFunc(person)
		if err != nil {
			fmt.Println(err)
			return types.NackDiscard
		}

		fmt.Printf("routingKey: %s, msg: %v\n", msg.RoutingKey, person)
		return types.Ack
	case routingKeyGreeting:
		greeting := new(Greeting)
		msg, err := msgFunc(greeting)
		if err != nil {
			fmt.Println(err)
			return types.NackDiscard
		}

		fmt.Printf("routingKey: %s, msg: %v\n", msg.RoutingKey, greeting)
		return types.Ack
	default:
		return types.Reject
	}

}

Example Publisher

package main

import (
	"fmt"
	"github.com/Ja7ad/amqp"
	"github.com/Ja7ad/amqp/types"
	"log"
	"time"
)

type Person struct {
	Name string `json:"name"`
	Age  int    `json:"age"`
}

type Greeting struct {
	Msg string `json:"msg"`
}

const (
	routingKeyPerson   = "person"
	routingKeyGreeting = "greeting"
)

func main() {
	rb, err := amqp.New("uri")
	if err != nil {
		log.Fatal(err)
	}

	pub, err := rb.Publisher(&types.Exchange{
		Name:       "test",
		Kind:       types.Topic,
		Declare:    true,
		Passive:    false,
		Durable:    true,
		AutoDelete: false,
		Internal:   false,
		NoWait:     false,
		Arguments:  nil,
	}, false)
	if err != nil {
		log.Fatal(err)
	}

	person := &Person{
		Name: "javad",
		Age:  30,
	}

	if err := pub.Publish(false, false, types.Publishing{
		DeliveryMode: types.Persistent,
		Body:         person,
	}, routingKeyPerson); err != nil {
		log.Fatal(err)
	}

	greeting := &Greeting{
		Msg: "foobar",
	}

	if err := pub.Publish(false, false, types.Publishing{
		DeliveryMode: types.Persistent,
		Body:         greeting,
	}, routingKeyGreeting); err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQP

type AMQP struct {
	// contains filtered or unexported fields
}

func (*AMQP) Close

func (r *AMQP) Close() error

func (*AMQP) Consumer

func (r *AMQP) Consumer(
	exchange *types.Exchange,
	queue *types.Queue,
	consume *types.Consumer,
	routingKeys []*types.RoutingKey,
	messageHandler types.ConsumerHandler,
	options ...ConsumerOptions,
) (Consumer, error)

func (*AMQP) Publisher

func (r *AMQP) Publisher(exchange *types.Exchange, confirmMode bool, options ...PublisherOption) (Publisher, error)

Publisher create publisher interface for publishing message

type Broker

type Broker interface {
	// Consumer create new consumer instance
	Consumer(exchange *types.Exchange,
		queue *types.Queue,
		consumer *types.Consumer,
		routingKeys []*types.RoutingKey,
		messageHandler types.ConsumerHandler,
		options ...ConsumerOptions) (Consumer, error)

	// Publisher create a publisher instance
	Publisher(exchange *types.Exchange, confirmMode bool, options ...PublisherOption) (Publisher, error)

	// Close rabbitmq connection
	Close() error
}

func New

func New(url string, options ...RabbitMQOptions) (Broker, error)

New create amqp object for consume and publish

Example
rb, err := New("uri")
if err != nil {
	log.Fatal(err)
}

fmt.Println(rb)
Output:

type Consumer

type Consumer interface {
	// Start consumer for consume messages
	Start() error
	// Close close consumer
	Close()
}

type ConsumerOptions

type ConsumerOptions func(options *consumerOptions)

func EnableQOSGlobal

func EnableQOSGlobal(enable bool) ConsumerOptions

EnableQOSGlobal which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection

func WithConcurrentConsumer

func WithConcurrentConsumer(concurrent int) ConsumerOptions

WithConcurrentConsumer many goroutines will be spawned to run the provided handler on messages

func WithCustomQOSPrefetch

func WithCustomQOSPrefetch(prefetch int) ConsumerOptions

WithCustomQOSPrefetch which means that many messages will be fetched from the server in advance to help with throughput. This doesn't affect the handler, messages are still processed one at a time.

type Publisher

type Publisher interface {
	/*
		Publish sends a Publishing from the client to an exchange on the server.

		When you want a single message to be delivered to a single queue, you can
		publish to the default exchange with the routingKey of the queue name.  This is
		because every declared queue gets an implicit route to the default exchange.

		Since publishings are asynchronous, any undeliverable message will get returned
		by the server.  Add a listener with Channel.NotifyReturn to handle any
		undeliverable message when calling publish with either the mandatory or
		immediate parameters as true.

		Publishings can be undeliverable when the mandatory flag is true and no queue is
		bound that matches the routing key, or when the immediate flag is true and no
		consumer on the matched queue is ready to accept the delivery.

		This can return an error when the channel, connection or socket is closed.  The
		error or lack of an error does not indicate whether the server has received this
		publishing.

		It is possible for publishing to not reach the broker if the underlying socket
		is shut down without pending publishing packets being flushed from the kernel
		buffers.  The easy way of making it probable that all publishings reach the
		server is to always call Connection.Close before terminating your publishing
		application.  The way to ensure that all publishings reach the server is to add
		a listener to Channel.NotifyPublish and put the channel in confirm mode with
		Channel.Confirm.  Publishing delivery tags and their corresponding
		confirmations startConsume at 1.  Exit when all publishings are confirmed.

		When Publish does not return an error and the channel is in confirm mode, the
		internal counter for DeliveryTags with the first confirmation starts at 1.

		Note: routingKey is specific keys in queue for example (subject, topic)
	*/
	Publish(
		mandatory bool,
		immediate bool,
		msg types.Publishing,
		routingKeys ...string,
	) error

	/*
		PublishWithContext sends a Publishing from the client to an exchange on the server and control by prent context.

		When you want a single message to be delivered to a single queue, you can
		publish to the default exchange with the routingKey of the queue name.  This is
		because every declared queue gets an implicit route to the default exchange.

		Since publishings are asynchronous, any undeliverable message will get returned
		by the server.  Add a listener with Channel.NotifyReturn to handle any
		undeliverable message when calling publish with either the mandatory or
		immediate parameters as true.

		Publishings can be undeliverable when the mandatory flag is true and no queue is
		bound that matches the routing key, or when the immediate flag is true and no
		consumer on the matched queue is ready to accept the delivery.

		This can return an error when the channel, connection or socket is closed.  The
		error or lack of an error does not indicate whether the server has received this
		publishing.

		It is possible for publishing to not reach the broker if the underlying socket
		is shut down without pending publishing packets being flushed from the kernel
		buffers.  The easy way of making it probable that all publishings reach the
		server is to always call Connection.Close before terminating your publishing
		application.  The way to ensure that all publishings reach the server is to add
		a listener to Channel.NotifyPublish and put the channel in confirm mode with
		Channel.Confirm.  Publishing delivery tags and their corresponding
		confirmations startConsume at 1.  Exit when all publishings are confirmed.

		When Publish does not return an error and the channel is in confirm mode, the
		internal counter for DeliveryTags with the first confirmation starts at 1.

		Note: routingKey is specific keys in queue for example (subject, topic)
	*/
	PublishWithContext(
		ctx context.Context,
		mandatory bool,
		immediate bool,
		msg types.Publishing,
		routingKeys ...string,
	) error

	// PublishWithDeferredConfirmWithContext publishes the provided data to the given routing keys over the connection.
	// if the publisher is in confirm mode (which can be either done by calling `NotifyPublish` with a custom handler
	// or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned.
	// This confirmation can be used to check if the message was actually published or wait for this to happen.
	// If the publisher is not in confirm mode, the returned confirmation will always be nil.
	PublishWithDeferredConfirmWithContext(
		ctx context.Context,
		mandatory bool,
		immediate bool,
		msg types.Publishing,
		routingKeys ...string,
	) (types.PublisherConfirmation, error)

	// PublishWithRetry sends a Publishing from the client to an exchange on the server,
	// controlled by the provided context. It incorporates a retry mechanism, attempting
	// to publish the message multiple times with a configurable delay and maximum number
	// of retries.
	//
	// When you want a single message to be delivered to a specific queue, you can publish
	// to the default exchange with the routingKey set to the queue name. This is because
	// every declared queue gets an implicit route to the default exchange.
	//
	// Since publishings are asynchronous, any undeliverable message will be returned by
	// the server. Add a listener with Channel.NotifyReturn to handle undeliverable
	// messages when calling publish with either the mandatory or immediate parameters as true.
	//
	// Publishings can be undeliverable when the mandatory flag is true and no queue is
	// bound that matches the routing key, or when the immediate flag is true and no
	// consumer on the matched queue is ready to accept the delivery.
	//
	// This function may return an error when the channel, connection, or socket is closed.
	// The error, or lack of an error, does not indicate whether the server has received this
	// publishing.
	//
	// It is possible for publishing to not reach the broker if the underlying socket
	// is shut down without pending publishing packets being flushed from the kernel
	// buffers. To increase the likelihood that all publishings reach the server, it is
	// recommended to always call Connection.Close before terminating your publishing
	// application. Alternatively, add a listener to Channel.NotifyPublish and put the channel
	// in confirm mode with Channel.Confirm. Publishing delivery tags and their corresponding
	// confirmations start at 1. Exit when all publishings are confirmed.
	//
	// When PublishWithRetry does not return an error and the channel is in confirm mode,
	// the internal counter for DeliveryTags with the first confirmation starts at 1.
	//
	// Note: routingKey represents specific keys in the queue, such as subject or topic.
	PublishWithRetry(
		ctx context.Context,
		mandatory bool,
		immediate bool,
		msg types.Publishing,
		config types.PublisherConfig,
		routingKeys ...string,
	) error

	// NotifyReturn registers a listener for basic.return methods.
	// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
	// These notifications are shared across an entire connection, so if you're creating multiple
	// publishers on the same connection keep that in mind
	NotifyReturn(handler func(r types.Return))

	// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
	// These notifications are shared across an entire connection, so if you're creating multiple
	// publishers on the same connection keep that in mind
	NotifyPublish(handler func(p types.Confirmation))

	Close()
}

type PublisherOption added in v1.2.3

type PublisherOption func(options *publisherOptions)

func WithAutoMessageID added in v1.2.3

func WithAutoMessageID() PublisherOption

WithAutoMessageID set uuid for message on publishing

func WithAutoTimestamp added in v1.2.3

func WithAutoTimestamp() PublisherOption

WithAutoTimestamp set timestamp in message on publishing

type RabbitMQOptions

type RabbitMQOptions func(*rabbitMQOptions)

func ReconnectDelay

func ReconnectDelay(delay time.Duration) RabbitMQOptions

func WithCustomAMQPConfig

func WithCustomAMQPConfig(config amqp.Config) RabbitMQOptions

func WithCustomEncoder added in v1.2.1

func WithCustomEncoder(encType types.EncodeType) RabbitMQOptions

WithCustomEncoder change default encoder to another encoder (JSON, GOB, ProtoBuf)

Note: if you change default encoder form publisher or consumer, both match encoder (encode and decode)

func WithCustomLogger added in v1.1.0

func WithCustomLogger(logger logger.Logger) RabbitMQOptions

Directories

Path Synopsis
_example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL