rabbit

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 16 Imported by: 0

README

rabbit

Тонкая обёртка над rabbitmq/amqp091-go с lifecycle-интерфейсом timmbarton/layout.

Фокус:

  • автоматический reconnect с exponential backoff + jitter;
  • декларирование exchange/queue/bind на старте;
  • QoS prefetch, DLX, publisher confirms;
  • типизированные handler'ы через generics;
  • без внешних зависимостей помимо amqp091-go, layout и zap.

Установка

go get github.com/timmbarton/rabbit/v2

Требования: Go 1.24+, RabbitMQ 3.11+ или LavinMQ.

Quick start

package main

import (
    "context"
    "log"

    "github.com/timmbarton/rabbit/v2"
)

type OrderCreated struct {
    OrderID int64  `json:"order_id"`
    Email   string `json:"email"`
}

func main() {
    cfg := rabbit.ConnectorConfig{
        Name:                 "orders-service",
        ReconnectionAttempts: 10,
        URI: rabbit.URIConfig{
            Host:     "localhost",
            Port:     5672,
            Username: "guest",
            Password: "guest",
            Vhost:    "/",
        },
        UseDurableQueues: true,
    }

    ctx := context.Background()

    // Publisher
    pub := rabbit.NewPublisher(cfg)
    if err := pub.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer pub.Stop(ctx)

    _ = pub.Publish(ctx, "orders.created", OrderCreated{OrderID: 1, Email: "a@b.c"})

    // Consumer
    cons := rabbit.NewConsumer(cfg)
    _ = cons.AddSubscriber(
        "orders.created", // queue name
        "",               // routing key (пусто — default exchange)
        rabbit.NewHandler(func(ctx context.Context, o *OrderCreated) error {
            log.Printf("got order %d for %s", o.OrderID, o.Email)
            return nil
        }),
    )
    if err := cons.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer cons.Stop(ctx)

    select {} // block
}

Конфигурация

type ConnectorConfig struct {
    Name                 string // человекочитаемое имя (для layout/логов)
    ReconnectionAttempts int    // сколько раз пытаться подключиться
    InitialReconnectWait int    // начальный backoff, мс (default 500)
    MaxReconnectWait     int    // максимальный backoff, мс (default 30000)

    UseTLS bool                 // форсировать amqps://
    URI    URIConfig

    Exchange     string         // если пусто — default exchange
    ExchangeType ExchangeType   // direct / topic / fanout / headers; default direct

    UseDurableQueues bool       // durable-флаг для exchange и очередей

    Prefetch       int          // QoS prefetch; default 100
    QueueArguments amqp.Table   // доп. аргументы для QueueDeclare

    DeadLetterExchange   string // x-dead-letter-exchange
    DeadLetterRoutingKey string // x-dead-letter-routing-key
    MaxDeliveryAttempts  int    // 0 = disabled; иначе reject(false) после N попыток

    PublisherConfirms bool          // включить publisher confirms
    PublishAckTimeout time.Duration // default 5s

    StopTimeout time.Duration       // default 10s
}

type URIConfig struct {
    Host, Username, Password, Vhost string
    Port                             int

    // TLS (опционально)
    CertFile, CACertFile, KeyFile, ServerName string
}
TLS

TLS-схема (amqps://) включается автоматически при любом из условий:

  • UseTLS: true
  • URI.Port == 5671
  • задан URI.CertFile или URI.CACertFile
cfg := rabbit.ConnectorConfig{
    UseTLS: true,
    URI: rabbit.URIConfig{
        Host:       "broker.example.com",
        Port:       5671,
        Username:   "svc",
        Password:   "***",
        Vhost:      "/",
        CACertFile: "/etc/ssl/certs/lavinmq-ca.pem",
        ServerName: "broker.example.com",
    },
}
Reconnect
  • При разрыве соединения библиотека сама переподключается экспоненциальным backoff'ом с jitter.
  • Backoff стартует с InitialReconnectWait, удваивается до MaxReconnectWait.
  • После реконнекта exchange и все подписки восстанавливаются автоматически.

Exchange и routing key

Default exchange (простой вариант)

Если cfg.Exchange == "", используется default exchange — routing key равен имени очереди. Publisher автоматически декларирует очередь при первой публикации.

cfg.Exchange = "" // default exchange

_ = pub.Publish(ctx, "tasks", payload) // routing key = имя очереди "tasks"

_ = cons.AddSubscriber("tasks", "", handler)
Topic exchange

Для pub/sub с pattern matching:

cfg := rabbit.ConnectorConfig{
    // ...
    Exchange:     "events",
    ExchangeType: rabbit.ExchangeTypeTopic,
}

// publisher публикует в exchange с routing key
_ = pub.Publish(ctx, "user.signup.email", payload)
_ = pub.Publish(ctx, "user.signup.sms", payload)

// consumer подписывается на pattern
_ = cons.AddSubscriber("q.user-signups", "user.signup.*", handler)

Exchange декларируется автоматически на Connector.Start. Очереди декларируются и биндятся на Consumer.Start.

Publisher

Базовая публикация
_ = pub.Publish(ctx, "orders.created", order)

Publish сериализует order в JSON и отправляет с:

  • ContentType: application/json
  • DeliveryMode: persistent (2)
  • MessageId — случайный 16-hex ID
  • Timestamp — текущее UTC-время
Кастомные headers
import amqp "github.com/rabbitmq/amqp091-go"

_ = pub.PublishWithHeaders(ctx, "orders.created", order, amqp.Table{
    "correlation_id": corrID,
    "trace_id":       traceID,
})
Publisher confirms

Гарантированная доставка до брокера:

cfg := rabbit.ConnectorConfig{
    // ...
    PublisherConfirms: true,
    PublishAckTimeout: 3 * time.Second,
}

pub := rabbit.NewPublisher(cfg)
_ = pub.Start(ctx)

if err := pub.Publish(ctx, "orders.created", order); err != nil {
    if errors.Is(err, rabbit.ErrBrokerNacked) {
        // брокер отклонил сообщение
    }
    // либо таймаут ожидания ack, либо сетевая ошибка
}

Consumer

Регистрация подписчиков
cons := rabbit.NewConsumer(cfg)

// Обязательно ДО Start:
err := cons.AddSubscriber("q.orders", "orders.*",
    rabbit.NewHandler(func(ctx context.Context, o *Order) error {
        return processOrder(ctx, o)
    }),
)

// AddSubscriber после Start вернёт ErrConsumerAlreadyStarted:
_ = cons.Start(ctx)
if err := cons.AddSubscriber("q.late", "late", h); err == rabbit.ErrConsumerAlreadyStarted {
    // ...
}
NewHandler[T]

Оборачивает типизированный обработчик:

  • JSON-decode msg.Body*T
  • вызов handler(ctx, *T)
  • Ack при успехе
  • Reject(requeue=true) при ошибке handler'а (настраивается)
  • Reject(requeue=false) при ошибке unmarshal (битый JSON не имеет смысла повторять)
handler := rabbit.NewHandler(
    func(ctx context.Context, o *Order) error {
        return processOrder(ctx, o)
    },
    rabbit.WithLogBody(false),     // не логировать body (default)
    rabbit.WithRequeueOnError(false), // при ошибке reject(false) — в DLX
)
Опции
Опция Default Назначение
WithLogBody(bool) false Логировать ли body в debug/error (полезно в dev, выключать в prod).
WithRequeueOnError(bool) true Reject(requeue) при ошибке handler'а. false → сообщение уйдёт в DLX.

Dead Letter Exchange

Защита от бесконечного requeue проблемного сообщения:

cfg := rabbit.ConnectorConfig{
    // ...
    DeadLetterExchange:   "events.dlx",
    DeadLetterRoutingKey: "dlq",
    MaxDeliveryAttempts:  5, // после 5 неудачных попыток → в DLX
}

Как это работает:

  1. Библиотека декларирует очередь с x-dead-letter-exchange (и опционально x-dead-letter-routing-key).
  2. Перед каждым вызовом handler'а библиотека читает заголовок x-death (его автоматически добавляет брокер при Reject(false)) и считает суммарное число попыток.
  3. Если попыток >= MaxDeliveryAttemptsReject(false) → сообщение уходит в DLX без вызова handler'а.

Важно: для работы счётчика сообщение должно хотя бы раз побывать в DLX-цикле. То есть стратегия — WithRequeueOnError(false) или ручной Reject(false) внутри handler'а, чтобы брокер добавлял x-death counter.

Пример DLX-setup:

// Основная очередь — с ссылкой на DLX
cfg := rabbit.ConnectorConfig{
    // ...
    Exchange:             "events",
    ExchangeType:         rabbit.ExchangeTypeTopic,
    DeadLetterExchange:   "events.dlx",
    MaxDeliveryAttempts:  3,
}

// DLX-exchange и dlq-очередь должны быть задекларированы отдельно —
// например, через второй Connector/Consumer с Exchange = "events.dlx".

QoS / Prefetch

По умолчанию — 100. Применяется на каждый channel после реконнекта:

cfg.Prefetch = 50 // в любой момент времени consumer обрабатывает не более 50 сообщений

QueueArguments

Передайте любые дополнительные arguments для QueueDeclare:

import amqp "github.com/rabbitmq/amqp091-go"

cfg.QueueArguments = amqp.Table{
    "x-max-length":      int64(10000),
    "x-message-ttl":     int64(60000), // 1 минута
    "x-max-priority":    byte(10),
}

Пользовательские аргументы имеют приоритет над DLX-аргументами при конфликте ключей.

Shared connection

По умолчанию NewPublisher(cfg) и NewConsumer(cfg) создают два независимых TCP-подключения. Для сервисов, которые и публикуют, и подписываются, RabbitMQ рекомендует одно подключение на процесс. Используйте:

conn := rabbit.NewConnector(cfg)
pub  := rabbit.NewPublisherWithConnector(conn)
cons := rabbit.NewConsumerWithConnector(conn)

// Жизненный цикл управляется снаружи:
_ = conn.Start(ctx)
_ = cons.Start(ctx) // pub.Start / cons.Start на shared conn — no-op

// ...

_ = cons.Stop(ctx)
_ = pub.Stop(ctx)
_ = conn.Stop(ctx)

Интеграция с timmbarton/layout

Все три компонента реализуют интерфейс Start(ctx) error / Stop(ctx) error / GetName() string:

import "github.com/timmbarton/layout"

app := layout.New().
    Add(conn).
    Add(pub).
    Add(cons)

_ = app.Run(ctx)

Ошибки

Ошибка Когда
ErrConnectorAlreadyStarted Повторный Connector.Start.
ErrConsumerAlreadyStarted Повторный Consumer.Start или AddSubscriber после Start.
ErrBrokerNacked Publisher confirms: брокер отклонил сообщение.

Тесты

go test ./...
go test -race ./...
go test -cover ./...

Юнит-тесты покрывают:

  • URI scheme switching (amqp / amqps)
  • reconnect backoff с jitter, respect ctx cancellation
  • DLX counter (x-death header) для разных форматов
  • QueueArguments merge с DLX
  • NewHandler[T]: ack/reject/unmarshal error, опции
  • Consumer lifecycle: AddSubscriber до/после Start
  • Publisher: channel caching per routing key, MessageID uniqueness

Без внешних зависимостей — только stdlib + amqp091-go для типов.

Breaking changes с v1

Если мигрируете с предыдущей версии:

Было Стало
Publish(ctx, queueName, data) Publish(ctx, routingKey, data) (семантика)
AddSubscriber(queueName, handler) AddSubscriber(queueName, routingKey, handler) error
ConnectorConfig.URI — inline struct URIConfig — именованный тип
NewHandler[T](fn) NewHandler[T](fn, ...HandlerOption)
Start возвращает только dial-ошибку Может вернуть ErrConnectorAlreadyStarted

Все остальные поля ConnectorConfig добавлены с дефолтами, старый код продолжит работать при минимальных правках.

Лицензия

MIT — см. LICENSE.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBrokerNacked = errors.New("broker nacked message")

ErrBrokerNacked — брокер отклонил публикацию (publisher confirms mode).

View Source
var ErrConnectorAlreadyStarted = errors.New("connector already started")

ErrConnectorAlreadyStarted возвращается при повторном вызове Start().

View Source
var ErrConnectorNotStarted = errors.New("connector not started")

ErrConnectorNotStarted возвращается при попытке использования до Start().

View Source
var ErrConsumerAlreadyStarted = errors.New("consumer already started")

ErrConsumerAlreadyStarted возвращается при повторном Start() либо при AddSubscriber после Start.

Functions

This section is empty.

Types

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector управляет TCP-подключением к AMQP-брокеру, авто-реконнектом и декларацией exchange.

func NewConnector

func NewConnector(cfg ConnectorConfig) *Connector

NewConnector создаёт новый Connector по заданной конфигурации.

func (*Connector) GetName

func (c *Connector) GetName() string

GetName возвращает человекочитаемое имя для layout.

func (*Connector) Start

func (c *Connector) Start(ctx context.Context) error

Start устанавливает начальное подключение и запускает фоновый reconnect-loop. Повторный вызов возвращает ErrConnectorAlreadyStarted.

func (*Connector) Stop

func (c *Connector) Stop(ctx context.Context) error

Stop останавливает reconnect-loop и закрывает подключение. Поддерживает повторный Start после Stop.

type ConnectorConfig

type ConnectorConfig struct {
	Name                 string `validate:"required"`
	ReconnectionAttempts int    `validate:"required,min=1"`
	InitialReconnectWait int    `validate:"min=0"` // в миллисекундах
	MaxReconnectWait     int    `validate:"min=0"` // в миллисекундах

	// UseTLS — явный флаг принудительного использования amqps://.
	// Если true, либо если задан CertFile/CACertFile, либо Port == 5671 —
	// будет использована TLS-схема.
	UseTLS bool

	URI URIConfig `validate:"required"`

	// Exchange — имя exchange для публикации и bind'ов.
	// Если пусто, используется default exchange (routing key = queue name).
	Exchange string

	// ExchangeType — direct / topic / fanout / headers.
	// Применяется только если Exchange != "". Default — "direct".
	ExchangeType ExchangeType `validate:"omitempty,oneof=direct topic fanout headers"`

	// UseDurableQueues — декларировать очереди и exchange как durable.
	UseDurableQueues bool

	// Prefetch — QoS prefetch count для consumer'а.
	// Если 0, используется defaultPrefetch (100).
	Prefetch int `validate:"min=0"`

	// QueueArguments — аргументы, применяемые ко всем декларируемым очередям.
	// Мёржатся с аргументами DLX (DLX имеет приоритет только если пользователь
	// не задал соответствующие ключи).
	QueueArguments amqp.Table

	// DeadLetterExchange — exchange, куда отправляются сообщения при Reject(false).
	DeadLetterExchange string

	// DeadLetterRoutingKey — опциональный routing key для DLX.
	DeadLetterRoutingKey string

	// MaxDeliveryAttempts — после скольких неудачных обработок сообщение
	// уходит в DLX (через Reject(false)). 0 = disabled (бесконечный requeue,
	// как в старой версии). Счётчик берётся из header "x-death".
	MaxDeliveryAttempts int `validate:"min=0"`

	// PublisherConfirms — включить publisher confirms (подтверждение брокером).
	PublisherConfirms bool

	// PublishAckTimeout — таймаут ожидания подтверждения публикации от брокера.
	// Применяется только если PublisherConfirms == true. Default — 5s.
	PublishAckTimeout time.Duration

	// StopTimeout — таймаут graceful shutdown. Default — 10s.
	StopTimeout time.Duration
}

ConnectorConfig — конфигурация подключения к AMQP-брокеру и поведения библиотеки.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer подписывает обработчики на очереди AMQP.

func NewConsumer

func NewConsumer(cfg ConnectorConfig) *Consumer

NewConsumer создаёт Consumer с собственным Connector'ом.

func NewConsumerWithConnector

func NewConsumerWithConnector(conn *Connector) *Consumer

NewConsumerWithConnector создаёт Consumer, использующий переданный Connector.

func (*Consumer) AddSubscriber

func (c *Consumer) AddSubscriber(queueName, routingKey string, handler NotificationHandler) error

AddSubscriber регистрирует подписчика на очередь. queueName — имя очереди, routingKey — ключ для bind в exchange (если exchange задан). Должен быть вызван ДО Start. После Start возвращает ErrConsumerAlreadyStarted.

func (*Consumer) GetName

func (c *Consumer) GetName() string

GetName возвращает человекочитаемое имя для layout.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start декларирует все очереди/bind'ы и запускает потребителей в goroutines.

func (*Consumer) Stop

func (c *Consumer) Stop(ctx context.Context) error

Stop останавливает всех потребителей, дожидается их завершения и закрывает connection (если он собственный).

type ExchangeType

type ExchangeType = string

ExchangeType — тип exchange в AMQP.

const (
	ExchangeTypeDirect  ExchangeType = "direct"
	ExchangeTypeTopic   ExchangeType = "topic"
	ExchangeTypeFanout  ExchangeType = "fanout"
	ExchangeTypeHeaders ExchangeType = "headers"
)

type HandlerOption

type HandlerOption func(*HandlerOptions)

HandlerOption — функциональная опция для NewHandler.

func WithLogBody

func WithLogBody(v bool) HandlerOption

WithLogBody включает логирование body сообщения.

func WithRequeueOnError

func WithRequeueOnError(v bool) HandlerOption

WithRequeueOnError устанавливает стратегию Reject при ошибке handler'а.

type HandlerOptions

type HandlerOptions struct {
	// LogBody — логировать ли body сообщения на debug/error уровне.
	// По умолчанию false — логируются только метаданные (MessageId, RoutingKey, Timestamp).
	LogBody bool

	// RequeueOnError — делать ли Reject(requeue=true) при ошибке handler'а.
	// Если false — Reject(requeue=false), сообщение уходит в DLX
	// (если настроен). По умолчанию true (прежнее поведение).
	RequeueOnError bool
}

HandlerOptions — опции для NewHandler.

type NotificationHandler

type NotificationHandler func(context.Context, amqp.Delivery)

NotificationHandler — низкоуровневый обработчик AMQP-сообщения.

func NewHandler

func NewHandler[T any](
	handler func(context.Context, *T) error,
	opts ...HandlerOption,
) NotificationHandler

NewHandler оборачивает типизированный handler в NotificationHandler:

  • JSON-decode body в *T
  • вызов handler(ctx, *T)
  • Ack при успехе, Reject при ошибке

При ошибке unmarshal — всегда Reject(requeue=false) (битое сообщение, повторять бессмысленно).

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher публикует сообщения в AMQP-брокер. Кеширует канал на каждый routing key.

func NewPublisher

func NewPublisher(cfg ConnectorConfig) *Publisher

NewPublisher создаёт Publisher с собственным Connector'ом. Для shared-connection между Publisher и Consumer см. NewPublisherWithConnector.

func NewPublisherWithConnector

func NewPublisherWithConnector(conn *Connector) *Publisher

NewPublisherWithConnector создаёт Publisher, использующий переданный Connector. Connector должен быть запущен отдельно (через Start).

func (*Publisher) GetName

func (p *Publisher) GetName() string

GetName возвращает человекочитаемое имя для layout.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, routingKey string, data any) error

Publish публикует data (JSON-сериализованный) в exchange с указанным routing key. Если cfg.Exchange == "" — публикация идёт в default exchange, и routingKey используется как имя очереди.

func (*Publisher) PublishWithHeaders

func (p *Publisher) PublishWithHeaders(ctx context.Context, routingKey string, data any, headers amqp.Table) error

PublishWithHeaders — то же, что Publish, но с кастомными headers.

func (*Publisher) Start

func (p *Publisher) Start(ctx context.Context) error

Start запускает вложенный Connector, если он был создан Publisher'ом. Если Publisher использует переданный Connector — ничего не делает.

func (*Publisher) Stop

func (p *Publisher) Stop(ctx context.Context) error

Stop останавливает вложенный Connector, если он был создан Publisher'ом. Перед остановкой закрывает все публикационные каналы.

type URIConfig

type URIConfig struct {
	Host     string `validate:"required"`
	Port     int    `validate:"required,min=0"`
	Username string `validate:"required"`
	Password string `validate:"required"`
	Vhost    string `validate:"required"`

	CertFile   string // client TLS auth - path to certificate (PEM)
	CACertFile string // client TLS auth - path to CA certificate (PEM)
	KeyFile    string // client TLS auth - path to private key (PEM)
	ServerName string // client TLS auth - server name
}

URIConfig — параметры подключения к брокеру.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL