briar

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: MIT Imports: 7 Imported by: 0

README

Briar - Go RabbitMQ Client Library

Note: This library is primarily designed for internal use.

Briar is a Go library designed to simplify the creation and management of RabbitMQ consumers. It provides a robust, fault-tolerant way to handle message consumption, including automatic reconnection, error handling, and dead-lettering.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetRoutingKey

func GetRoutingKey(routingKey string, headers amqp.Table, queueName string) (string, error)

GetRoutingKey searches for the original routing key in headers or returns the passed routingKey if it's recognized as original.

func HaltErrorf

func HaltErrorf(format string, a ...any) error

func NewDeadletterError

func NewDeadletterError() error

Types

type Action

type Action int
const (
	// Ack default ack this msg after you have successfully processed this
	// delivery.
	Ack Action = iota
	// NackDiscard the message will be dropped or delivered to a server
	// configured dead-letter queue.
	NackDiscard
	// NackRequeue deliver this message to a different consumer.
	NackRequeue
	// ManualAck leaves acknowledgement to the user using the msg.Ack()
	// method.
	ManualAck
)

func (Action) String

func (a Action) String() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"fmt"
	"log"
	"log/slog"
	"os"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/ttab/briar"
)

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
	slog.SetDefault(logger)

	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()

	c := briar.NewConsumer(logger, "amqp://guest:guest@localhost",
		briar.ConsumerOptions{
			ExchangeName: "test",
			QueueName:    "briar",
		}, func(_ context.Context, delivery amqp.Delivery) error {
			logger.Info("got message",
				"payload", string(delivery.Body))

			err := delivery.Ack(false)
			if err != nil {
				return fmt.Errorf("ack message: %w", err)
			}

			return nil
		})

	err := c.Run(ctx)
	if err != nil {
		log.Printf("failed to run: %v", err)
	}
}

func NewConsumer

func NewConsumer(
	logger *slog.Logger, uri string,
	opts ConsumerOptions,
	handler HandlerFunc,
) *Consumer

NewConsumer creates a rabbitmq consumer that is started by calling Run().

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

Run starts the consumer process. This will set up the topology and consume messages until:

* the context is cancelled * the HaltFunction returns true in response to an error * ...or the handler returns an HaltError.

type ConsumerOptions

type ConsumerOptions struct {
	ConnectionName string
	ExchangeName   string
	QueueName      string
	// DeliveryLimit is the number of times we'll try to handle a message
	// before deadlettering it.
	DeliveryLimit int64
	// RetryHandlingWait is the time we wait before retrying a failed message.
	RetryHandlingWait time.Duration
	RoutingKeys       []string
	Exclusive         bool
	RetryWait         time.Duration
	ConnectRetryWait  time.Duration
	ChannelRetryWait  time.Duration
	HaltFunction      HaltFunc
	OnError           ErrorFunc
}

type DeadletterError

type DeadletterError struct{}

func (*DeadletterError) Error

func (he *DeadletterError) Error() string

type ErrorFunc

type ErrorFunc func(code TransitionCode, count int, err error)

type HaltError

type HaltError struct {
	// contains filtered or unexported fields
}

func (*HaltError) Error

func (he *HaltError) Error() string

func (*HaltError) Unwrap

func (he *HaltError) Unwrap() error

type HaltFunc

type HaltFunc func(code TransitionCode, count int, err error) bool

func HaltOnNthFailure

func HaltOnNthFailure(code TransitionCode, n int) HaltFunc

HaltOnNthFailure is a convenience function for stopping retries after a number of consecutive failures.

type HandlerFunc

type HandlerFunc func(ctx context.Context, delivery amqp.Delivery) error

func AckHandler

func AckHandler(
	fn func(ctx context.Context, delivery amqp.Delivery) (Action, error),
) HandlerFunc

AckHandler is a convenience wrapper that allows the handler to return the Ack/Nack action as a value. Returning an error will cause the channel to be closed and re-opened. Return a HaltError to stop the consumer.

type TransitionCode

type TransitionCode string

TransitionCode are used to describe what step of the consumer process failed.

const (
	CodeConnect      TransitionCode = "connect"
	CodeOpenChan     TransitionCode = "channel"
	CodeDeclare      TransitionCode = "declare"
	CodeConsume      TransitionCode = "consume"
	CodeDisconnected TransitionCode = "disconnected"
	CodeChanClosed   TransitionCode = "chan_closed"
	CodeHandlerFail  TransitionCode = "handler_fail"
	CodeCancelled    TransitionCode = "cancelled"
)

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL