Documentation
¶
Index ¶
- Variables
- type Connector
- type ConnectorConfig
- type Consumer
- type ExchangeType
- type HandlerOption
- type HandlerOptions
- type NotificationHandler
- type Publisher
- func (p *Publisher) GetName() string
- func (p *Publisher) Publish(ctx context.Context, routingKey string, data any) error
- func (p *Publisher) PublishWithHeaders(ctx context.Context, routingKey string, data any, headers amqp.Table) error
- func (p *Publisher) Start(ctx context.Context) error
- func (p *Publisher) Stop(ctx context.Context) error
- type URIConfig
Constants ¶
This section is empty.
Variables ¶
var ErrBrokerNacked = errors.New("broker nacked message")
ErrBrokerNacked — брокер отклонил публикацию (publisher confirms mode).
var ErrConnectorAlreadyStarted = errors.New("connector already started")
ErrConnectorAlreadyStarted возвращается при повторном вызове Start().
var ErrConnectorNotStarted = errors.New("connector not started")
ErrConnectorNotStarted возвращается при попытке использования до Start().
var ErrConsumerAlreadyStarted = errors.New("consumer already started")
ErrConsumerAlreadyStarted возвращается при повторном Start() либо при AddSubscriber после Start.
Functions ¶
This section is empty.
Types ¶
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector управляет TCP-подключением к AMQP-брокеру, авто-реконнектом и декларацией exchange.
func NewConnector ¶
func NewConnector(cfg ConnectorConfig) *Connector
NewConnector создаёт новый Connector по заданной конфигурации.
type ConnectorConfig ¶
type ConnectorConfig struct {
Name string `validate:"required"`
ReconnectionAttempts int `validate:"required,min=1"`
InitialReconnectWait int `validate:"min=0"` // в миллисекундах
MaxReconnectWait int `validate:"min=0"` // в миллисекундах
// UseTLS — явный флаг принудительного использования amqps://.
// Если true, либо если задан CertFile/CACertFile, либо Port == 5671 —
// будет использована TLS-схема.
UseTLS bool
URI URIConfig `validate:"required"`
// Exchange — имя exchange для публикации и bind'ов.
// Если пусто, используется default exchange (routing key = queue name).
Exchange string
// ExchangeType — direct / topic / fanout / headers.
// Применяется только если Exchange != "". Default — "direct".
ExchangeType ExchangeType `validate:"omitempty,oneof=direct topic fanout headers"`
// UseDurableQueues — декларировать очереди и exchange как durable.
UseDurableQueues bool
// Prefetch — QoS prefetch count для consumer'а.
// Если 0, используется defaultPrefetch (100).
Prefetch int `validate:"min=0"`
// QueueArguments — аргументы, применяемые ко всем декларируемым очередям.
// Мёржатся с аргументами DLX (DLX имеет приоритет только если пользователь
// не задал соответствующие ключи).
QueueArguments amqp.Table
// DeadLetterExchange — exchange, куда отправляются сообщения при Reject(false).
DeadLetterExchange string
// DeadLetterRoutingKey — опциональный routing key для DLX.
DeadLetterRoutingKey string
// MaxDeliveryAttempts — после скольких неудачных обработок сообщение
// уходит в DLX (через Reject(false)). 0 = disabled (бесконечный requeue,
// как в старой версии). Счётчик берётся из header "x-death".
MaxDeliveryAttempts int `validate:"min=0"`
// PublisherConfirms — включить publisher confirms (подтверждение брокером).
PublisherConfirms bool
// PublishAckTimeout — таймаут ожидания подтверждения публикации от брокера.
// Применяется только если PublisherConfirms == true. Default — 5s.
PublishAckTimeout time.Duration
// StopTimeout — таймаут graceful shutdown. Default — 10s.
StopTimeout time.Duration
}
ConnectorConfig — конфигурация подключения к AMQP-брокеру и поведения библиотеки.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer подписывает обработчики на очереди AMQP.
func NewConsumer ¶
func NewConsumer(cfg ConnectorConfig) *Consumer
NewConsumer создаёт Consumer с собственным Connector'ом.
func NewConsumerWithConnector ¶
NewConsumerWithConnector создаёт Consumer, использующий переданный Connector.
func (*Consumer) AddSubscriber ¶
func (c *Consumer) AddSubscriber(queueName, routingKey string, handler NotificationHandler) error
AddSubscriber регистрирует подписчика на очередь. queueName — имя очереди, routingKey — ключ для bind в exchange (если exchange задан). Должен быть вызван ДО Start. После Start возвращает ErrConsumerAlreadyStarted.
type ExchangeType ¶
type ExchangeType = string
ExchangeType — тип exchange в AMQP.
const ( ExchangeTypeDirect ExchangeType = "direct" ExchangeTypeTopic ExchangeType = "topic" ExchangeTypeFanout ExchangeType = "fanout" ExchangeTypeHeaders ExchangeType = "headers" )
type HandlerOption ¶
type HandlerOption func(*HandlerOptions)
HandlerOption — функциональная опция для NewHandler.
func WithLogBody ¶
func WithLogBody(v bool) HandlerOption
WithLogBody включает логирование body сообщения.
func WithRequeueOnError ¶
func WithRequeueOnError(v bool) HandlerOption
WithRequeueOnError устанавливает стратегию Reject при ошибке handler'а.
type HandlerOptions ¶
type HandlerOptions struct {
// LogBody — логировать ли body сообщения на debug/error уровне.
// По умолчанию false — логируются только метаданные (MessageId, RoutingKey, Timestamp).
LogBody bool
// RequeueOnError — делать ли Reject(requeue=true) при ошибке handler'а.
// Если false — Reject(requeue=false), сообщение уходит в DLX
// (если настроен). По умолчанию true (прежнее поведение).
RequeueOnError bool
}
HandlerOptions — опции для NewHandler.
type NotificationHandler ¶
NotificationHandler — низкоуровневый обработчик AMQP-сообщения.
func NewHandler ¶
func NewHandler[T any]( handler func(context.Context, *T) error, opts ...HandlerOption, ) NotificationHandler
NewHandler оборачивает типизированный handler в NotificationHandler:
- JSON-decode body в *T
- вызов handler(ctx, *T)
- Ack при успехе, Reject при ошибке
При ошибке unmarshal — всегда Reject(requeue=false) (битое сообщение, повторять бессмысленно).
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher публикует сообщения в AMQP-брокер. Кеширует канал на каждый routing key.
func NewPublisher ¶
func NewPublisher(cfg ConnectorConfig) *Publisher
NewPublisher создаёт Publisher с собственным Connector'ом. Для shared-connection между Publisher и Consumer см. NewPublisherWithConnector.
func NewPublisherWithConnector ¶
NewPublisherWithConnector создаёт Publisher, использующий переданный Connector. Connector должен быть запущен отдельно (через Start).
func (*Publisher) Publish ¶
Publish публикует data (JSON-сериализованный) в exchange с указанным routing key. Если cfg.Exchange == "" — публикация идёт в default exchange, и routingKey используется как имя очереди.
func (*Publisher) PublishWithHeaders ¶
func (p *Publisher) PublishWithHeaders(ctx context.Context, routingKey string, data any, headers amqp.Table) error
PublishWithHeaders — то же, что Publish, но с кастомными headers.
type URIConfig ¶
type URIConfig struct {
Host string `validate:"required"`
Port int `validate:"required,min=0"`
Username string `validate:"required"`
Password string `validate:"required"`
Vhost string `validate:"required"`
CertFile string // client TLS auth - path to certificate (PEM)
CACertFile string // client TLS auth - path to CA certificate (PEM)
KeyFile string // client TLS auth - path to private key (PEM)
ServerName string // client TLS auth - server name
}
URIConfig — параметры подключения к брокеру.