Documentation
¶
Overview ¶
Библиотека rabbitmq для работы с сервером RabbitMQ является вспомогательной и построена поверх github.com/rabbitmq/amqp091-go. В неё добавлено понятие инициализаторов каналов соединения — на их основе осуществляется автоматическое подключение к серверу и восстановление состояния.
Для установки соединения с сервером и инициализации обработчиков используется метод Run (синхронный) или Init (асинхронный), которые позволяют задать несколько Initializer. Эти обработчики будут вызываться при каждой установке соединения, чтобы восстановить топологию и заново проинициализировать работу сервиса.
В библиотеки представлены два генератора таких инициализаторов: Consume для обработки входящих сообщений и Publish для публикации. Для инициализации одновременной обработки входящих событий и публикации новых можно воспользоваться вспомогательной функцией Work.
Example ¶
// инициализируем контекст для выполнения ctx, cancel := context.WithTimeout(ctx, time.Minute/10) defer cancel() const queueName = "test.queue" // название очереди с сообщениями queue := rabbitmq.NewQueue(queueName) // создаём описание очереди handler := func(msg amqp091.Delivery) { // обработчик входящих сообщений fmt.Println("->", msg.MessageId) } // подключаемся к серверу и запускаем автоматическую обработку входящих сообщений pubFunc, err := rabbitmq.Work(ctx, addr, queue, handler) if err != nil { panic(err) } // публикуем сообщения for i := 1; i <= 3; i++ { // формируем сообщение msg := amqp091.Publishing{ MessageId: fmt.Sprintf("msg.%02d", i), ContentType: "text/plain", Body: []byte("data"), } // вызываем функцию публикации err := pubFunc(ctx, "", queueName, msg) if err != nil { panic(err) } fmt.Println("<-", msg.MessageId) } <-ctx.Done()
Output: <- msg.01 <- msg.02 <- msg.03 -> msg.01 -> msg.02 -> msg.03
Index ¶
- Variables
- func Connect(addr string) (conn *amqp091.Connection, err error)
- func Init(ctx context.Context, addr string, workers ...Initializer) error
- func Publish(opts ...PublishOption) (Publisher, Initializer)
- func Run(ctx context.Context, addr string, initializers ...Initializer) error
- func SetLogger(l zerolog.Logger)
- type ConsumeOption
- type Handler
- type Initializer
- type PublishOption
- func WithAppID(v string) PublishOption
- func WithImmediate() PublishOption
- func WithInit(v Initializer) PublishOption
- func WithMandatory() PublishOption
- func WithReplyTo(v string) PublishOption
- func WithReplyToQueue(v *Queue) PublishOption
- func WithTTL(v time.Duration) PublishOption
- func WithTimestamp() PublishOption
- type Publisher
- type Queue
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ReconnectDelay = time.Second * 2 // задержка перед повторным соединением MaxIteration = 5 // максимальное количество попыток )
Параметры для переподключения к серверу RabbitMQ в случае ошибки.
var ErrNoChannel = errors.New("channel is not initialized")
ErrNoChannel описывает ошибку не инициализированного канала.
Functions ¶
func Connect ¶
Connect возвращает инициализированное подключение к серверу RabbitMQ.
В случае ошибки подключения попытка повторяется несколько раз с небольшой задержкой (смотри MaxIteration и ReconnectTime).
func Init ¶
func Init(ctx context.Context, addr string, workers ...Initializer) error
Init запускает асинхронное выполнение Run и ожидает завершения самого первого процесса инициализации, после чего возвращает управление. Возвращает ошибку, если при первой инициализации обработчиков или установке соединения произошла ошибка.
func Publish ¶
func Publish(opts ...PublishOption) (Publisher, Initializer)
Publish возвращает функцию и обработчик для публикации сообщений.
Если перед публикацией необходимо произвести некоторые настройки канала, то можно задать свою функцию инициализации с помощью опции WithInit(ChannelHandler).
Example ¶
// инициализируем функцию публикации новых сообщений и соответствующий ей обработчик pubFunc, pubWorker := rabbitmq.Publish( rabbitmq.WithTTL(time.Minute), rabbitmq.WithTimestamp()) // подключаемся к серверу и запускаем работу наших обработчиков err := rabbitmq.Init(ctx, addr, pubWorker) if err != nil { panic(err) } // публикуем новые сообщения в очередь с небольшой задержкой for i := 1; i <= 3; i++ { msg := amqp091.Publishing{ MessageId: fmt.Sprintf("msg.%02d", i), ContentType: "text/plain", Body: []byte("data"), } // отправляем сообщения в нашу очередь err := pubFunc(ctx, "", "test.queue", msg) if err != nil { panic(err) } }
func Run ¶
func Run(ctx context.Context, addr string, initializers ...Initializer) error
Run осуществляет подключение к серверу RabbitMQ и инициализирует обработчики с этим соединением. Для каждого обработчика создаётся отдельный канал, а в случае ошибки инициализации всё повторяется.
Возвращает ошибку, если превышено количество попыток установки соединений. Плановое завершение осуществляется через контекст.
Types ¶
type ConsumeOption ¶
type ConsumeOption interface {
// contains filtered or unexported methods
}
ConsumeOption изменяет настройки получения сообщений.
func WithArgs ¶
func WithArgs(v amqp091.Table) ConsumeOption
WithArgs задает дополнительные параметры обработчика сообщений.
func WithExclusive ¶
func WithExclusive() ConsumeOption
WithExclusive взводит флаг эксклюзивного доступа к очереди.
func WithNoAutoAck ¶
func WithNoAutoAck() ConsumeOption
WithNoAutoAck запрещает автоматическое подтверждение приёма сообщений.
func WithNoLocal ¶
func WithNoLocal() ConsumeOption
func WithNoWait ¶
func WithNoWait() ConsumeOption
type Handler ¶
type Handler = func(amqp091.Delivery)
Handler является синонимом для функции обработки входящих сообщений.
type Initializer ¶
type Initializer = func(*amqp091.Channel) error
Initializer является синонимом функции для инициализации канала соединения RabbitMQ.
func Consume ¶
func Consume(queue *Queue, handler Handler, opts ...ConsumeOption) Initializer
Consume возвращает инициализированный обработчик входящих сообщений для указанной очереди.
По умолчанию включено автоматическое подтверждение приёма сообщения. Для его отключения используйте опцию WithNoAutoAck().
Example ¶
// функция для обработки входящих сообщений handler := func(msg amqp091.Delivery) { fmt.Println("->", msg.MessageId) msg.Ack(false) // подтверждаем обработку сообщения } // создаём описание очереди queue := rabbitmq.NewQueue("test.queue") // инициализируем для этой очереди обработчик входящих сообщений и получаем worker consumerWorker := queue.Consume(handler, rabbitmq.WithNoAutoAck()) // подключаемся к серверу и запускаем работу наших обработчиков err := rabbitmq.Init(ctx, addr, consumerWorker) if err != nil { panic(err) }
type PublishOption ¶
type PublishOption interface {
// contains filtered or unexported methods
}
PublishOption изменяет настройки публикации сообщений.
func WithAppID ¶
func WithAppID(v string) PublishOption
WithAppID задаёт идентификатор приложения, добавляемый во все отправляемые сообщения, перезаписывая любые ранее заданные в сообщении значения.
func WithImmediate ¶
func WithImmediate() PublishOption
func WithInit ¶
func WithInit(v Initializer) PublishOption
WithInit задаёт функцию для инициализации канала при подключении.
func WithMandatory ¶
func WithMandatory() PublishOption
func WithReplyTo ¶
func WithReplyTo(v string) PublishOption
WithReplyTo автоматически заполняет во всех отправляемых сообщениях поле ReplyTo заданным значением, если оно не заполнено в сообщении.
func WithReplyToQueue ¶
func WithReplyToQueue(v *Queue) PublishOption
WithReplyToQueue заполняет поле ReplyTo во всех сообщениях именем указанной очереди. Если имя очереди меняется, то для всех новых сообщений так же будет использовано новое имя.
При одновременном использовании с WithReplyTo, очередь имеет больший приоритет и будет использоваться именно она.
func WithTTL ¶
func WithTTL(v time.Duration) PublishOption
WithTTL задаёт ограничение по времени жизни сообщения.
func WithTimestamp ¶
func WithTimestamp() PublishOption
WithTimestamp добавляет временную метку перед отправкой сообщения, если она не задана.
type Publisher ¶
Publisher описывает функцию для публикации сообщений на сервер RabbitMQ.
func Work ¶
func Work(ctx context.Context, addr string, queue *Queue, handler Handler, opts ...PublishOption) (Publisher, error)
Work является вспомогательной функцией быстрой инициализации одновременной обработки входящих сообщений и публикации новых. В качестве параметров передаётся контекст для остановки сервиса, адрес для подключения к серверу RabbitMQ, очередь с входящими сообщениями и их обработчик. Кроме этого можно указать необязательные параметры для публикации. Возвращает функцию для публикации новых сообщений.
По умолчанию автоматически отсылается подтверждение о приёме входящих сообщений, а для исходящих заполняется поле ReplyTo указанием на очередь входящих сообщений.
Example ¶
const queueName = "test.queue" // название очереди с сообщениями queue := rabbitmq.NewQueue(queueName) // создаём описание очереди handler := func(msg amqp091.Delivery) { // обработчик входящих сообщений fmt.Println("->", msg.MessageId) } // подключаемся к серверу и запускаем автоматическую обработку входящих сообщений pubFunc, err := rabbitmq.Work(ctx, addr, queue, handler) if err != nil { panic(err) } // формируем сообщение для отправки msg := amqp091.Publishing{ MessageId: "msg.test", ContentType: "text/plain", Body: []byte("data"), } // вызываем функцию публикации err = pubFunc(ctx, "", queueName, msg) if err != nil { panic(err) }
type Queue ¶
type Queue struct { Name string // название очереди (пустое для приватной) Durable bool // сохранять сообщения при перезагрузке AutoDelete bool // автоматическое удаление очереди при отключении Exclusive bool // эксклюзивный доступ для текущего соединения NoWait bool // не ждать подтверждения декларирования от сервера Args amqp091.Table // дополнительные параметры // contains filtered or unexported fields }
Queue описывает очередь сообщений.
func (*Queue) Consume ¶
func (q *Queue) Consume(handler func(amqp091.Delivery), opts ...ConsumeOption) Initializer
Consume возвращает инициализированный обработчик входящих сообщений данной очереди.