rabbitmq

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2022 License: MIT Imports: 8 Imported by: 0

README

RabbitMQ lib

Библиотека rabbitmq для работы с сервером RabbitMQ является вспомогательной и построена поверх github.com/rabbitmq/amqp091-go. В неё добавлено понятие инициализаторов каналов соединения: на их основе осуществляется автоматическое восстановление подключения к серверу и восстановление состояния.

Для установки соединения с сервером и запуска обработчиков используется метод Run (синхронный) или Init (асинхронный), которые позволяют задать несколько инициализаторов Initializer. Эти обработчики будут вызываться при каждой установке соединения, чтобы восстановить топологию и заново проинициализировать работу своих сервисов.

В библиотеки представлены два генератора таких инициализаторов: Consume для обработки входящих сообщений и Publish для публикации. Для инициализации одновременной обработки входящих событий и публикации новых можно воспользоваться вспомогательной функцией Work.

const queueName = "test.queue"          // название очереди с сообщениями
queue := rabbitmq.NewQueue(queueName)   // создаём описание очереди
handler := func(msg amqp091.Delivery) { // обработчик входящих сообщений
    fmt.Println("->", msg.MessageId)
}

// подключаемся к серверу и запускаем автоматическую обработку входящих сообщений
pubFunc, err := rabbitmq.Work(ctx, addr, queue, handler)
if err != nil {
    panic(err)
}

// формируем сообщение для отправки
msg := amqp091.Publishing{
    MessageId:   "msg.test",
    ContentType: "text/plain",
    Body:        []byte("data"),
}
// вызываем функцию публикации
err = pubFunc(ctx, "", queueName, msg)
if err != nil {
    panic(err)
}

Documentation

Overview

Библиотека rabbitmq для работы с сервером RabbitMQ является вспомогательной и построена поверх github.com/rabbitmq/amqp091-go. В неё добавлено понятие инициализаторов каналов соединения — на их основе осуществляется автоматическое подключение к серверу и восстановление состояния.

Для установки соединения с сервером и инициализации обработчиков используется метод Run (синхронный) или Init (асинхронный), которые позволяют задать несколько Initializer. Эти обработчики будут вызываться при каждой установке соединения, чтобы восстановить топологию и заново проинициализировать работу сервиса.

В библиотеки представлены два генератора таких инициализаторов: Consume для обработки входящих сообщений и Publish для публикации. Для инициализации одновременной обработки входящих событий и публикации новых можно воспользоваться вспомогательной функцией Work.

Example
// инициализируем контекст для выполнения
ctx, cancel := context.WithTimeout(ctx, time.Minute/10)
defer cancel()

const queueName = "test.queue"          // название очереди с сообщениями
queue := rabbitmq.NewQueue(queueName)   // создаём описание очереди
handler := func(msg amqp091.Delivery) { // обработчик входящих сообщений
	fmt.Println("->", msg.MessageId)
}

// подключаемся к серверу и запускаем автоматическую обработку входящих сообщений
pubFunc, err := rabbitmq.Work(ctx, addr, queue, handler)
if err != nil {
	panic(err)
}

// публикуем сообщения
for i := 1; i <= 3; i++ {
	// формируем сообщение
	msg := amqp091.Publishing{
		MessageId:   fmt.Sprintf("msg.%02d", i),
		ContentType: "text/plain",
		Body:        []byte("data"),
	}
	// вызываем функцию публикации
	err := pubFunc(ctx, "", queueName, msg)
	if err != nil {
		panic(err)
	}
	fmt.Println("<-", msg.MessageId)
}

<-ctx.Done()
Output:

<- msg.01
<- msg.02
<- msg.03
-> msg.01
-> msg.02
-> msg.03

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ReconnectDelay = time.Second * 2 // задержка перед повторным соединением
	MaxIteration   = 5               // максимальное количество попыток
)

Параметры для переподключения к серверу RabbitMQ в случае ошибки.

View Source
var ErrNoChannel = errors.New("channel is not initialized")

ErrNoChannel описывает ошибку не инициализированного канала.

Functions

func Connect

func Connect(addr string) (conn *amqp091.Connection, err error)

Connect возвращает инициализированное подключение к серверу RabbitMQ.

В случае ошибки подключения попытка повторяется несколько раз с небольшой задержкой (смотри MaxIteration и ReconnectTime).

func Init

func Init(ctx context.Context, addr string, workers ...Initializer) error

Init запускает асинхронное выполнение Run и ожидает завершения самого первого процесса инициализации, после чего возвращает управление. Возвращает ошибку, если при первой инициализации обработчиков или установке соединения произошла ошибка.

func Publish

func Publish(opts ...PublishOption) (Publisher, Initializer)

Publish возвращает функцию и обработчик для публикации сообщений.

Если перед публикацией необходимо произвести некоторые настройки канала, то можно задать свою функцию инициализации с помощью опции WithInit(ChannelHandler).

Example
// инициализируем функцию публикации новых сообщений и соответствующий ей обработчик
pubFunc, pubWorker := rabbitmq.Publish(
	rabbitmq.WithTTL(time.Minute),
	rabbitmq.WithTimestamp())

// подключаемся к серверу и запускаем работу наших обработчиков
err := rabbitmq.Init(ctx, addr, pubWorker)
if err != nil {
	panic(err)
}

// публикуем новые сообщения в очередь с небольшой задержкой
for i := 1; i <= 3; i++ {
	msg := amqp091.Publishing{
		MessageId:   fmt.Sprintf("msg.%02d", i),
		ContentType: "text/plain",
		Body:        []byte("data"),
	}
	// отправляем сообщения в нашу очередь
	err := pubFunc(ctx, "", "test.queue", msg)
	if err != nil {
		panic(err)
	}
}

func Run

func Run(ctx context.Context, addr string, initializers ...Initializer) error

Run осуществляет подключение к серверу RabbitMQ и инициализирует обработчики с этим соединением. Для каждого обработчика создаётся отдельный канал, а в случае ошибки инициализации всё повторяется.

Возвращает ошибку, если превышено количество попыток установки соединений. Плановое завершение осуществляется через контекст.

func SetLogger

func SetLogger(l zerolog.Logger)

SetLogger настраивает публикацию логов работы. Не является потокобезопасным методом и рекомендуется переопределять перед началом работы с библиотекой.

Types

type ConsumeOption

type ConsumeOption interface {
	// contains filtered or unexported methods
}

ConsumeOption изменяет настройки получения сообщений.

func WithArgs

func WithArgs(v amqp091.Table) ConsumeOption

WithArgs задает дополнительные параметры обработчика сообщений.

func WithExclusive

func WithExclusive() ConsumeOption

WithExclusive взводит флаг эксклюзивного доступа к очереди.

func WithName

func WithName(v string) ConsumeOption

WithName задаёт имя обработчика сообщений.

func WithNoAutoAck

func WithNoAutoAck() ConsumeOption

WithNoAutoAck запрещает автоматическое подтверждение приёма сообщений.

func WithNoLocal

func WithNoLocal() ConsumeOption

func WithNoWait

func WithNoWait() ConsumeOption

type Handler

type Handler = func(amqp091.Delivery)

Handler является синонимом для функции обработки входящих сообщений.

type Initializer

type Initializer = func(*amqp091.Channel) error

Initializer является синонимом функции для инициализации канала соединения RabbitMQ.

func Consume

func Consume(queue *Queue, handler Handler, opts ...ConsumeOption) Initializer

Consume возвращает инициализированный обработчик входящих сообщений для указанной очереди.

По умолчанию включено автоматическое подтверждение приёма сообщения. Для его отключения используйте опцию WithNoAutoAck().

Example
// функция для обработки входящих сообщений
handler := func(msg amqp091.Delivery) {
	fmt.Println("->", msg.MessageId)
	msg.Ack(false) // подтверждаем обработку сообщения
}
// создаём описание очереди
queue := rabbitmq.NewQueue("test.queue")
// инициализируем для этой очереди обработчик входящих сообщений и получаем worker
consumerWorker := queue.Consume(handler, rabbitmq.WithNoAutoAck())
// подключаемся к серверу и запускаем работу наших обработчиков
err := rabbitmq.Init(ctx, addr, consumerWorker)
if err != nil {
	panic(err)
}

type PublishOption

type PublishOption interface {
	// contains filtered or unexported methods
}

PublishOption изменяет настройки публикации сообщений.

func WithAppID

func WithAppID(v string) PublishOption

WithAppID задаёт идентификатор приложения, добавляемый во все отправляемые сообщения, перезаписывая любые ранее заданные в сообщении значения.

func WithImmediate

func WithImmediate() PublishOption

func WithInit

func WithInit(v Initializer) PublishOption

WithInit задаёт функцию для инициализации канала при подключении.

func WithMandatory

func WithMandatory() PublishOption

func WithReplyTo

func WithReplyTo(v string) PublishOption

WithReplyTo автоматически заполняет во всех отправляемых сообщениях поле ReplyTo заданным значением, если оно не заполнено в сообщении.

func WithReplyToQueue

func WithReplyToQueue(v *Queue) PublishOption

WithReplyToQueue заполняет поле ReplyTo во всех сообщениях именем указанной очереди. Если имя очереди меняется, то для всех новых сообщений так же будет использовано новое имя.

При одновременном использовании с WithReplyTo, очередь имеет больший приоритет и будет использоваться именно она.

func WithTTL

func WithTTL(v time.Duration) PublishOption

WithTTL задаёт ограничение по времени жизни сообщения.

func WithTimestamp

func WithTimestamp() PublishOption

WithTimestamp добавляет временную метку перед отправкой сообщения, если она не задана.

type Publisher

type Publisher = func(ctx context.Context, exchange, key string, msg amqp091.Publishing) error

Publisher описывает функцию для публикации сообщений на сервер RabbitMQ.

func Work

func Work(ctx context.Context, addr string, queue *Queue, handler Handler, opts ...PublishOption) (Publisher, error)

Work является вспомогательной функцией быстрой инициализации одновременной обработки входящих сообщений и публикации новых. В качестве параметров передаётся контекст для остановки сервиса, адрес для подключения к серверу RabbitMQ, очередь с входящими сообщениями и их обработчик. Кроме этого можно указать необязательные параметры для публикации. Возвращает функцию для публикации новых сообщений.

По умолчанию автоматически отсылается подтверждение о приёме входящих сообщений, а для исходящих заполняется поле ReplyTo указанием на очередь входящих сообщений.

Example
const queueName = "test.queue"          // название очереди с сообщениями
queue := rabbitmq.NewQueue(queueName)   // создаём описание очереди
handler := func(msg amqp091.Delivery) { // обработчик входящих сообщений
	fmt.Println("->", msg.MessageId)
}

// подключаемся к серверу и запускаем автоматическую обработку входящих сообщений
pubFunc, err := rabbitmq.Work(ctx, addr, queue, handler)
if err != nil {
	panic(err)
}

// формируем сообщение для отправки
msg := amqp091.Publishing{
	MessageId:   "msg.test",
	ContentType: "text/plain",
	Body:        []byte("data"),
}
// вызываем функцию публикации
err = pubFunc(ctx, "", queueName, msg)
if err != nil {
	panic(err)
}

type Queue

type Queue struct {
	Name       string        // название очереди (пустое для приватной)
	Durable    bool          // сохранять сообщения при перезагрузке
	AutoDelete bool          // автоматическое удаление очереди при отключении
	Exclusive  bool          // эксклюзивный доступ для текущего соединения
	NoWait     bool          // не ждать подтверждения декларирования от сервера
	Args       amqp091.Table // дополнительные параметры
	// contains filtered or unexported fields
}

Queue описывает очередь сообщений.

func NewQueue

func NewQueue(name string) *Queue

NewQueue возвращает новое описание очереди с заданным именем.

func (*Queue) Consume

func (q *Queue) Consume(handler func(amqp091.Delivery), opts ...ConsumeOption) Initializer

Consume возвращает инициализированный обработчик входящих сообщений данной очереди.

func (*Queue) String

func (q *Queue) String() string

String возвращает имя очереди. Возвращаемое значение может отличаться от Name. Если очередь была с пустым именем и прошла декларацию, то возвращаемое название очереди сгенерировано сервером.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL