grmq

package module
v1.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2024 License: MIT Imports: 10 Imported by: 0

README

GRMQ

Go Rabbit MQ

Build and test codecov Go Report Card

What are the typical use-cases for RabbitMQ broker ?

  • We create a durable topology (exchanges, queues, bindings).
  • Begin queue consuming (commonly in several goroutines with prefetch count) and use DLQ to avoid poison messages.
  • If we can't handle message at this time, we can retry a bit later (some external service is not available for instance)
  • Also, we expect that if something happens with connection, we can reestablish it and continue our work transparently.
  • Graceful shutdown to reduce probability of message duplication.

All of those commonly used cases are implemented in the package.

High abstraction wrapper for amqp091-go. Inspired by http package and cony

Features

  • re-connection support
  • graceful shutdown support
  • flexible context.Context based api
  • middlewares for publishers and consumers
  • DLQ declaration out of the box
  • flexible retries

Complete Example

type LogObserver struct {
	grmq.NoopObserver
}

func (o LogObserver) ClientError(err error) {
	log.Printf("rmq client error: %v", err)
}

func (o LogObserver) ConsumerError(consumer consumer.Consumer, err error) {
	log.Printf("unexpected consumer error (queue=%s): %v", consumer.Queue, err)
}

func main() {
	url := amqpUrl()

	pub := publisher.New(
		"exchange",
		"test",
		publisher.WithMiddlewares(publisher.PersistentMode()),
	)

	simpleHandler := consumer.HandlerFunc(func(ctx context.Context, delivery *consumer.Delivery) {
		log.Printf("message body: %s, queue: %s", delivery.Source().Body, delivery.Source().RoutingKey)
		err := delivery.Ack()
		if err != nil {
			panic(err)
		}
	})
	simpleConsumer := consumer.New(
		simpleHandler,
		"queue",
		consumer.WithConcurrency(32),   //default 1
		consumer.WithPrefetchCount(32), //default 1
	)

	retryPolicy := retry.NewPolicy(
		true, //move to dlq after last failed try
		retry.WithDelay(500*time.Millisecond, 1),
		retry.WithDelay(1*time.Second, 1),
		retry.WithDelay(2*time.Second, 1),
	)
	retryHandler := consumer.HandlerFunc(func(ctx context.Context, delivery *consumer.Delivery) {
		log.Printf("message body: %s, queue: %s", delivery.Source().Body, delivery.Source().RoutingKey)
		err := delivery.Retry()
		if err != nil {
			panic(err)
		}
	})
	retryConsumer := consumer.New(
		retryHandler,
		"retryQueue",
		consumer.WithRetryPolicy(retryPolicy),
	)

	cli := grmq.New(
		url,
		grmq.WithPublishers(pub),
		grmq.WithConsumers(simpleConsumer, retryConsumer),
		grmq.WithTopologyBuilding(
			topology.WithQueue("queue", topology.WithDLQ(true)),
			//you MUST declare queue with the same retry policy
			topology.WithQueue("retryQueue", topology.WithRetryPolicy(retryPolicy)),
			topology.WithDirectExchange("exchange"),
			topology.WithBinding("exchange", "queue", "test"),
		),
		grmq.WithReconnectTimeout(3*time.Second), //default 1s
		grmq.WithObserver(LogObserver{}),
	)
	//it tries to connect
	//declare topology
	//init publishers and consumers
	//returns first occurred error or nil 
	//or you can use  cli.Serve(context.Background()), which is completely non-blocking
	err := cli.Run(context.Background())
	if err != nil {
		panic(err)
	}

	err = pub.Publish(context.Background(), &amqp091.Publishing{Body: []byte("hello world")})
	if err != nil {
		panic(err)
	}

	//you may use any publisher to send message to any exchange
	err = pub.PublishTo(context.Background(), "", "retryQueue", &amqp091.Publishing{Body: []byte("retry me")})
	if err != nil {
		panic(err)
	}

	time.Sleep(10 * time.Second)

	cli.Shutdown()
}

Retries

This is quite fresh feature implemented in 1.4.0. Before using it you must know how it works under the hood. It combines two mechanisms: DLQ + TTL

Lets say we use policy below for queue test

retryPolicy := retry.NewPolicy(
	true,
	retry.WithDelay(500*time.Millisecond, 1),
	retry.WithDelay(1*time.Second, 1),
	retry.WithDelay(2*time.Second, 1), 
)

This configuration will create

  • exchange with name default-dead-letter
  • 4 extra queues
    • test.DLQ
    • test.retry.500
    • test.retry.1000
    • test.retry.2000
  • each retry queue will have x-message-ttl property equal to its delay
  • each retry queue will have DLX routing to the original queue test
  • consumer.Delivery.Retry() will find a suitable queue by x-death header, directly publish with confirmation to the queue, manually acknowledge the delivery
  • if there is no suitable retry option and moveToDql is true, it moves the message to test.DLQ
  • otherwise, it performs ack

Recommendation: If you want to change retry policy for a queue, before doing it, ensure there is no messages in retry queues.

Don't forget to delete old retry queues.

State and road map

  • the package is used in production (reconnection works perfect)
  • more tests need to be implemented
  • add go doc
  • add supporting for publishing confirmation to achieve more reliable publishing

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(url string, options ...ClientOption) *Client

func (*Client) Run

func (s *Client) Run(ctx context.Context) error

Run Block and wait first successfully established session It means all declarations were applied successfully All publishers were initialized All consumers were run Returns first occurred error during first session opening or nil

func (*Client) Serve added in v1.5.0

func (s *Client) Serve(ctx context.Context)

Serve Similar to Run but doesn't wait first successful session Just pass to Observer occurred errors and retry

func (*Client) Shutdown

func (s *Client) Shutdown()

Shutdown Perform graceful shutdown

type ClientOption

type ClientOption func(c *Client)

func WithConsumers

func WithConsumers(consumers ...consumer.Consumer) ClientOption

func WithDeclarations

func WithDeclarations(declarations topology.Declarations) ClientOption

func WithDialConfig added in v1.5.0

func WithDialConfig(config DialConfig) ClientOption

func WithObserver

func WithObserver(observer Observer) ClientOption

func WithPublishers

func WithPublishers(publishers ...*publisher.Publisher) ClientOption

func WithReconnectTimeout

func WithReconnectTimeout(timeout time.Duration) ClientOption

func WithTopologyBuilding

func WithTopologyBuilding(options ...topology.DeclarationsOption) ClientOption

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg consumer.Consumer, ch *amqp.Channel, retryPub *Publisher, observer Observer) *Consumer

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Run

func (c *Consumer) Run() error

type Declarator

type Declarator struct {
	// contains filtered or unexported fields
}

func NewDeclarator

func NewDeclarator(cfg topology.Declarations, ch *amqp.Channel) *Declarator

func (*Declarator) Close

func (c *Declarator) Close() error

func (*Declarator) Run

func (c *Declarator) Run() error

type DialConfig added in v1.5.0

type DialConfig struct {
	amqp.Config
	DialTimeout time.Duration
}

type NoopObserver

type NoopObserver struct {
}

func (NoopObserver) ClientError

func (n NoopObserver) ClientError(err error)

func (NoopObserver) ClientReady

func (n NoopObserver) ClientReady()

func (NoopObserver) ConsumerError

func (n NoopObserver) ConsumerError(consumer consumer.Consumer, err error)

func (NoopObserver) PublisherError added in v1.2.0

func (n NoopObserver) PublisherError(publisher *publisher.Publisher, err error)

func (NoopObserver) PublishingFlow added in v1.2.0

func (n NoopObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)

func (NoopObserver) ShutdownDone

func (n NoopObserver) ShutdownDone()

func (NoopObserver) ShutdownStarted

func (n NoopObserver) ShutdownStarted()

type Observer

type Observer interface {
	ClientReady()
	ClientError(err error)
	ConsumerError(consumer consumer.Consumer, err error)
	PublisherError(publisher *publisher.Publisher, err error)
	PublishingFlow(publisher *publisher.Publisher, flow bool)
	ShutdownStarted()
	ShutdownDone()
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(publisher *publisher2.Publisher, ch *amqp.Channel, observer Observer) *Publisher

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error

func (*Publisher) PublishWithConfirmation added in v1.4.0

func (p *Publisher) PublishWithConfirmation(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error

func (*Publisher) Run

func (p *Publisher) Run() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL