rmq

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: MIT Imports: 9 Imported by: 0

README

pkg/rmq

RabbitMQ клиент и consumer с retry через delay-очередь и DLQ для Go-микросервисов SSO Easy.

Установка

go get github.com/ssoeasy-dev/pkg/rmq@latest

Клиент

import "github.com/ssoeasy-dev/pkg/rmq"

client, err := rmq.NewClient(log, &rmq.Config{
    Host:     "localhost",
    Port:     5672,
    User:     "guest",
    Password: "guest",
    VHost:    "/",
})

defer client.Close()

Consumer

consumer, err := rmq.NewConsumer(log, client, &rmq.ConsumerConfig{
    Main: rmq.QueueConfig{
        Exchange:       "notifications",
        BindingPattern: "notification.email.*",
        Queue:          "notification.email.queue",
        TTL:            15 * 60 * 1000, // мс, 15m
    },
    Dead: &rmq.QueueConfig{
        Exchange:       "dlx",
        BindingPattern: "notification.email.*",
        Queue:          "notification.email-dlx.queue",
    },
    Delay: &rmq.DelayQueueConfig{
        QueueConfig: rmq.QueueConfig{
            Exchange:       "delay",
            BindingPattern: "notification.email.*",
            Queue:          "notification.email.delay.queue",
            TTL:            5 * 1000, // мс, 5s между попытками
        },
        MaxRetry: 3,
    },
    Handler: func(ctx context.Context, body []byte, routingKey string) error {
        // обработка сообщения
        // возврат error → retry; nil → ack
        return nil
    },
})
if err != nil {
    panic(err)
}

// Запуск (неблокирующий)
if err := consumer.Start(ctx); err != nil {
    panic(err)
}

// Graceful shutdown
consumer.Stop()

Типы

type QueueConfig struct {
    Queue          string
    BindingPattern string
    Exchange       string
    TTL            int
}

// DelayQueueConfig встраивает QueueConfig — Exchange, BindingPattern, Queue, TTL обязательны
type DelayQueueConfig struct {
    QueueConfig
    MaxRetry int
}

Топология очередей

При каждом Start() consumer объявляет всю топологию через отдельный init-канал. Если очередь уже существует с теми же параметрами — объявление идемпотентно. При изменении параметров (TTL, exchange) нужно удалить очередь вручную — RabbitMQ вернёт PRECONDITION_FAILED.

publish: notification.email.verification
  → notifications (topic exchange)
  → notification.email.queue  [binding: notification.email.*]
      ↓ ошибка
  → delay (topic exchange) с ключом notification.email.verification
  → notification.email.delay.queue  [binding: notification.email.*]
      ↓ TTL истёк → notifications с оригинальным ключом (следующая попытка)
      ↓ retry >= MaxRetry
  → dlx (topic exchange) с ключом notification.email.verification
  → notification.email-dlx.queue  [binding: notification.email.*]

Оригинальный routing key сохраняется на всём пути — x-dead-letter-routing-key не задаётся нигде, RabbitMQ сохраняет ключ автоматически.

При масштабировании новый consumer добавляет свои очереди к тем же delay и dlx exchange через свой паттерн — без конфликтов:

delay (topic exchange)
  ├── notification.email.*  → notification.email.delay.queue
  └── notification.sms.*    → notification.sms.delay.queue

Логика обработки ошибок

Событие Действие
Handler вернул nil Ack
Handler вернул error, retry < MaxRetry Публикация в delay exchange с инкрементом x-retry-count, Ack
retry >= MaxRetry Публикация в DLQ, Ack
Delay-очередь не настроена При любой ошибке сразу DLQ
Публикация в delay упала Fallback в DLQ
Публикация в DLQ упала Nack(false, false) — RabbitMQ применяет dead-letter политику очереди
Канал закрылся (обрыв соединения) Автоматический reconnect и рестарт

Каналы

Consumer использует три отдельных AMQP-канала:

Канал Назначение
initChannel Объявление топологии при старте, закрывается сразу после
consumeChannel channel.Consume с Qos(1, 0, false)
publishChannel Публикация в delay и DLQ

Разделение гарантирует что ошибка при объявлении очереди не закроет consume-канал, а Qos(1) обеспечивает корректный round-robin при нескольких consumer-ах.

Трассировка

x-trace-id и x-request-id из headers сообщения автоматически записываются в контекст через logger.TraceIDKey / logger.RequestIDKey. Это обеспечивает сквозную трассировку: auth.api → RabbitMQ → notificator.svc.

Десериализация сообщений

var msg MyMessage
if err := rmq.UnmarshalMessage(body, &msg); err != nil {
    return fmt.Errorf("failed to unmarshal: %w", err)
}

API

// Клиент
func NewClient(log *logger.Logger, cfg *Config) (*Client, error)
func (c *Client) Close() error

// Consumer — NewConsumer валидирует конфиг и возвращает ошибку при неполных данных
func NewConsumer(log *logger.Logger, client *Client, cfg *ConsumerConfig) (*Consumer, error)
func (c *Consumer) Start(ctx context.Context) error
func (c *Consumer) Stop()

// Хелпер
func UnmarshalMessage(data []byte, v any) error

Лицензия

MIT — см. LICENSE.

Контакты

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func UnmarshalMessage

func UnmarshalMessage(data []byte, v any) error

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(log *logger.Logger, cfg *Config) (*Client, error)

func (*Client) Channel

func (c *Client) Channel() *amqp091.Channel

func (*Client) Close

func (c *Client) Close() error

func (*Client) Reconnect

func (c *Client) Reconnect() error

type Config

type Config struct {
	Host     string
	Port     string
	User     string
	Password string
	VHost    string
}

func (*Config) URL

func (c *Config) URL() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(log *logger.Logger, client *Client, cfg *ConsumerConfig) (*Consumer, error)

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

func (*Consumer) Stop

func (c *Consumer) Stop()

type ConsumerConfig

type ConsumerConfig struct {
	Main    QueueConfig
	Delay   *DelayQueueConfig
	Dead    *QueueConfig
	Handler MessageHandler
}

type DelayQueueConfig

type DelayQueueConfig struct {
	QueueConfig
	MaxRetry int
}

type MessageHandler

type MessageHandler func(ctx context.Context, message []byte, routingKey string) error

type QueueConfig

type QueueConfig struct {
	Queue          string
	BindingPattern string
	Exchange       string
	TTL            int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL