messaging

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2026 License: MIT Imports: 8 Imported by: 0

README

RabbitMQ Messenger for Go

This package provides a Go implementation of a messaging system using RabbitMQ as the underlying message broker. It features a generic interface IMessenger and a concrete implementation RabbitMqMessenger that handles publishing, subscribing, and unsubscribing to message topics with built-in error handling and reconnection logic.

IMessenger Interface

The IMessenger interface defines the contract for messaging operations with generic types for the topic key and message type:

package messaging

import (
	"github.com/uoul/go-async"
)

type IMessenger[K, M any] interface {
	Publish(topic K, msg any) error
	Subscribe(topic K) async.Sequence[M]
	Unsubscribe(subscription async.Sequence[M])
}
  • Publish(topic K, msg any) error: Sends a message to the specified topic. Returns an error if publishing fails.
  • Subscribe(topic K) async.Sequence[M]: Creates a subscription to the specified topic and returns an asynchronous sequence for receiving messages.
  • Unsubscribe(subscription async.Sequence[M]): Removes a subscription and cleans up associated resources.

RabbitMqMessenger Implementation

RabbitMqMessenger is the concrete implementation of IMessenger for RabbitMQ, handling all communication with the broker:

Core Components
  • Connection Management: Automatically establishes and maintains connection to RabbitMQ, with reconnection logic when the connection is lost.
  • Message Publishing: Serializes messages and publishes them to exchanges with configurable retry logic.
  • Subscription Handling: Manages multiple subscriptions, automatically declaring exchanges and queues, and binding them with routing keys.
  • Asynchronous Processing: Uses Go channels and the reflect.Select mechanism to handle multiple concurrent operations.
Configuration Options

The messenger can be customized using functional options:

  • WithRabbitMqSerializer(serializer ISerializer): Sets the serialization format for messages (defaults to JSON).
  • WithRabbitMqRetryInterval(interval time.Duration): Sets the delay between reconnection attempts (defaults to 10 seconds).
  • WithRabbitMqMaxRetries(retries uint): Sets the maximum number of retry attempts for failed message publishing (defaults to 10).
  • WithRabbitMqStreamBufferSize(size uint): Sets the buffer size for message streams (defaults to 50).

Key Features

  • Automatic Reconnection: The messenger continuously attempts to reconnect to RabbitMQ if the connection is lost.
  • Message Retry: Failed message publications are retried up to the configured maximum number of times.
  • Dynamic Subscription Management: Subscriptions can be added and removed at runtime without restarting the messenger.
  • Flexible Serialization: Supports different serialization formats through the ISerializer interface.
  • Resource Cleanup: Automatically cleans up queues and resources when subscriptions are removed.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithRabbitMqMaxRetries

func WithRabbitMqMaxRetries(retries uint) func(*RabbitMqMessenger)

func WithRabbitMqRetryInterval

func WithRabbitMqRetryInterval(interval time.Duration) func(*RabbitMqMessenger)

func WithRabbitMqSerializer

func WithRabbitMqSerializer(serializer serialization.ISerializer) func(*RabbitMqMessenger)

func WithRabbitMqStreamBufferSize

func WithRabbitMqStreamBufferSize(size uint) func(*RabbitMqMessenger)

Types

type IMessenger

type IMessenger[K, M any] interface {
	Publish(topic K, msg any) error
	Subscribe(topic K) async.Sequence[M]
	Unsubscribe(subsciption async.Sequence[M])
}

func NewRabbitMqMessenger

func NewRabbitMqMessenger(ctx context.Context, host string, port uint16, user string, password string, opts ...func(*RabbitMqMessenger)) IMessenger[RabbitMqExchange, amqp.Delivery]

type RabbitMqExchange

type RabbitMqExchange struct {
	Type       string
	Exchange   string
	RoutingKey string
}

type RabbitMqMessenger

type RabbitMqMessenger struct {
	// contains filtered or unexported fields
}

func (*RabbitMqMessenger) Publish

func (r *RabbitMqMessenger) Publish(topic RabbitMqExchange, msg any) error

Publish implements IMessenger.

func (*RabbitMqMessenger) Subscribe

Subscribe implements IMessenger.

func (*RabbitMqMessenger) Unsubscribe

func (r *RabbitMqMessenger) Unsubscribe(subsciption async.Sequence[amqp.Delivery])

Unsubscribe implements IMessenger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL