rabbit

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2025 License: MIT Imports: 5 Imported by: 0

README

# go_rabbit

Lightweight fault‑tolerant RabbitMQ adapter in Go – producers, consumers, automatic reconnects, graceful‑shutdown, and built‑in metrics. Suitable for microservices as well as CLI utilities.

Version: 0.1.0
License: MIT


Features

  • Reliable connection — automatic re-dial with back-off and publisher confirms.
  • Simple API — unified wrapper over amqp091-go for producers and consumers.
  • Graceful shutdown — graceful shutdown on SIGINT/SIGTERM.
  • Observability — built-in Prometheus metrics and flexible structured logging: by default standard log,
    but optionally any compatible logger (e.g. zap) via SetLogger.
  • Flexible topology description — exchanges, queues, bindings, DLQ, TTL, prefetch described in YAML/ENV.
  • Utilities — UUID generation, MIME detection of attachments, and other useful functions.

Tech stack

Category Package / tool
AMQP driver github.com/rabbitmq/amqp091-go
Utilities github.com/google/uuid, mimetype

Full list of dependencies is in go.mod.


Installation

go get github.com/GwynCerbin/go_rabbit@latest

Quick start

Configuration can be loaded from YAML or directly from environment variables.
For ENV, just set prefixes (see envPrefix tags) and use the package
github.com/caarlos0/env/v10.

YAML configuration:

client:
  host: rabbitmq:5672
  vhost: /
  heartbeat: 10s           # TCP heartbeat
  reconnect: 30s           # max reconnect interval
  logging: true

exchange_binding:
  - name: exchange1_example
    type: topic
    durable: true
  - name: exchange2_example
    type: fanout
    durable: false
  - name: dlx
    type: direct
    durable: true

queue_binding:
  - name: queue1_example
    durable: true
    exchange_name: exchange1_example
    routing_key: routing_key1_example
  - name: queue2_example
    durable: true
    exchange_name: exchange1_example
    routing_key: routing_key2_example
  - name: queue3_example
    durable: true
    exchange_name: exchange2_example
    routing_key: ""
    args:
      x-dead-letter-exchange: dlx
      x-dead-letter-routing-key: jobs.dlq
  - name: jobs.dlq
    durable: true
    exchange_name: dlx
    routing_key: jobs.dlq


example_consumer:
  queue: queue1_example
  args:
    x‑prefetch-count: 20

example1_publisher:
  exchange: exchange1_example
  routing_key: routing_key1_example
  app_id: access-manager
  persistent: true

example2_publisher:
  exchange: exchange2_example
  routing_key: ""
  app_id: audit-service
  persistent: false

ENV configuration:

CLIENT_HOST=rabbitmq:5672
CLIENT_VHOST=/
CLIENT_HEARTBEAT=10s
CLIENT_RECONNECT=30s
CLIENT_LOGGING=true
CLIENT_USERNAME=guest
CLIENT_PASSWORD=guest
PUBLISHER1_EXCHANGE=exchange1_example
PUBLISHER1_ROUTING_KEY=routing_key1_example
PUBLISHER1_APP_ID=access-manager
PUBLISHER1_PERSISTENT=true
PUBLISHER2_EXCHANGE=exchange2_example
PUBLISHER2_ROUTING_KEY=
PUBLISHER2_APP_ID=audit-service
PUBLISHER2_PERSISTENT=false
QUEUE_NAME=queue1_example
QUEUE_DURABLE=true
QUEUE_EXCHANGE_NAME=exchange1_example
QUEUE_ROUTING_KEY=routing_key1_example
EXAMPLE_CONSUMER_QUEUE=queue1_example
EXAMPLE_CONSUMER_ARGS_X_PREFETCH_COUNT=20

Go struct

  • Client — connection parameters (see the client section in YAML / prefix CLIENT_ in ENV).
  • ExampleExchanges — exchange topology; corresponds to example_exchanges.
  • QueueBinding — queue declarations and their bindings.
  • ExampleConsumer — consumer configuration.
  • Example1Publisher, Example2Publisher — settings of two different publishers.

If you prefer ENV over YAML, set variables with the shown prefixes
(CLIENT_HOST, PUBLISHER1_EXCHANGE, etc.) and use the package
github.com/caarlos0/env/v10.

type Rabbit struct {
    Client            Client                `envPrefix:"CLIENT_"            yaml:"client"`
	ExchangeBinding   []ExchangeDeclare     `envPrefix:"EXAMPLE_EXCHANGES_" yaml:"exchange_binding"`
    QueueBinding      []QueueDeclareAndBind `envPrefix:"QUEUE_"             yaml:"queue_binding"`
    ExampleConsumer   ConsumerConfig        `envPrefix:"EXAMPLE_CONSUMER_"  yaml:"example_consumer"`
    Example1Publisher PublisherConfig       `envPrefix:"PUBLISHER1_"        yaml:"example1_publisher"`
    Example2Publisher PublisherConfig       `envPrefix:"PUBLISHER2_"        yaml:"example2_publisher"`
}

Infrastructure initialization:

rabCon, err := rabbit.Dial(cfg.Rabbit.Client)
if err != nil {
    zap.L().Error("rabbit init", zap.Error(err))
    return
}

defer func() {
    if err = rabCon.Close(); err != nil {
        zap.L().Error("rabbit close connection gracefully", zap.Error(err))
    }
}()

// declare queues if needed 
for i := range cfg.Rabbit.QueueBinding {
    if err = rabCon.QueueDeclareAndBind(&cfg.Rabbit.QueueBinding[i]); err != nil {
        zap.L().Error("rabbit declare queue and bind", zap.Error(err))
        
        return
    }
}

// declare exchanges if needed
for i := range cfg.Rabbit.ExchangeBinding {
    if err = rabCon.QueueDeclareAndBind(&cfg.Rabbit.ExchangeBinding[i]); err != nil {
        zap.L().Error("rabbit declare queue and bind", zap.Error(err))
        
        return
    }
}

pub, err := rabCon.CreatePublisher(cfg.Rabbit.Example1Publisher)
if err != nil {
    zap.L().Error("rabbit create publisher", zap.Error(err))
    return
}

pubWithConf, err := rabCon.CreatePublisherWithConfirmation(cfg.Rabbit.Example2Publisher)
if err != nil {
    zap.L().Error("rabbit create publisher", zap.Error(err))
    return
}

consumer, err := con.CreateConsumer(cfg.Rabbit.ExampleConsumer)
if err != nil {
    zap.L().Error("create consumer", zap.Any("error", err))

    return
}

collector, err := infra.Init(infra.Connectors{
    Consumer: consumer,
    Publisher: pub,
    PublisherWithConf: pubWithConf,
})
if err != nil {
    zap.L().Error("init infra", zap.Any("error", err))

    return
}

var ctx, cancel = context.WithCancel(context.Background())

go func() {
    if err = broker.ListenAndServe(cfg.Broker ,collector.BrokerInfra()); err != nil {
        zap.L().Error("broker server closed", zap.Any("error", err))
    }

    cancel()
}()

if err = rest.ListenAndServe(cfg.RestServer, collector.RestInfra()); err != nil {
    zap.L().Error("rest server closed", zap.Any("error", err))
}

<-ctx.Done()

Architecture

Element Role in message processing
Producer Creates and publishes messages to an Exchange.
Exchange Receives messages from producers and, according to binding rules, routes them to the corresponding queues.
Queue Buffer from which consumers pull messages. Supports FIFO behavior and holds messages until Ack / Expire.
Binding Link Exchange → Queue with a routing_key defining which message goes to which queue.
Consumer Receives messages, executes business logic, and reports the result to the broker (Ack, Nack, Reject).
Ack / Nack
  • Ack — confirms successful processing, message is removed.
  • Nack (requeue=true) — message is returned to the queue.
  • Reject (requeue=true) — final rejection (or sent to DLQ if x-dead-letter-exchange is set).
┌────────┐   publish   ┌────────────┐
│Producer│ ───────────▶│  Exchange  │
└────────┘             └────┬───────┘
                             │ bind
                       ┌─────▼──────┐  pull
                       │   Queue    │◀───┐
                       └────┬───────┘    │ ack / nack
                            │ deliver    │ (requeue if nack)
                       ┌────▼─────┐      │
                       │ Consumer │──────┘
                       └──────────┘

Error‑handling guide

Any handler MUST end with one of three actions:
Ack() — acknowledgment, Nack() — negative acknowledgment (with secondary publishing of the message back to the same queue),
Reject() — final rejection without redelivery.
If none of these methods are called, the message will remain in the unacked state and will be redelivered after consumer restart.

Handling scenario Action Broker behavior
Successful business logic Ack() Message is removed from the queue.
Error but want to retry later (temporary DB, external service) Nack() RabbitMQ sets requeue=true → message returns to the tail of the queue.
Format/business validation error (retry useless) Reject() Message is rejected (or goes to DLQ if configured).

Handler example

func (hs *HandlerSet) TagDeleted(msg broker.Message) {
	var tags []string
	if err := json.Unmarshal(msg.Body(), &tags); err != nil {
		// Invalid format → Reject
		hs.log.Error("invalid body", zap.Error(err))
		_ = msg.Reject()
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err := hs.manager.DeleteTagPowers(ctx, tags); err != nil {
		// Temporary error → Nack (requeue)
		hs.log.Error("delete tag powers", zap.Error(err))
		_ = msg.Nack() // automatically requeue = true
		return
	}

	// All good → Ack
	_ = msg.Ack()
}

Starting listener with graceful shutdown

func ListenAndServe(consumer broker.Consumer, infra manager.BrokerManager) error {
	const timeOut = 5 * time.Second

	quit := make(chan os.Signal, 1)
	sv   := broker.NewListener(consumer).Init(router(infra))

	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		if err := sv.ListenAndServe(); err != nil {
			logrus.Error("listener stopped", zap.Error(err))
			quit <- syscall.SIGCHLD
		}
	}()
	logrus.Info("Broker listener started")

	switch <-quit {
	case syscall.SIGINT, syscall.SIGTERM:
		logrus.Info("Shutdown ...")
		ctx, cancel := context.WithTimeout(context.Background(), timeOut)
		defer cancel()
		return sv.Shutdown(ctx)
	case syscall.SIGCHLD:
		proc, _ := os.FindProcess(os.Getpid())
		_ = proc.Signal(syscall.SIGTERM)
	}
	return nil
}
  • SIGINT/SIGTERM — initiate smooth shutdown.
  • SIGCHLD — internal signal if ListenAndServe stopped with error: process restarts via own SIGTERM.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	// Consume retrieves the next available message or an error if the consumer is closed.
	// The returned Message must be acknowledged or rejected by the caller.
	Consume() (Message, error)

	// Close stops message consumption and releases any resources.
	// After Close, subsequent calls to Consume should return an error.
	Close() error
}

Consumer defines the interface for consuming messages from a broker. Each implementation should manage its own connection and message stream.

type ConsumerCloseError

type ConsumerCloseError struct {
}

func (ConsumerCloseError) Error

func (ConsumerCloseError) Error() string

type EmptyRoutError

type EmptyRoutError struct {
}

func (EmptyRoutError) Error

func (EmptyRoutError) Error() string

type Instance

type Instance struct {
	// contains filtered or unexported fields
}

Instance is a running listener created from Listener.

  • workChan: buffered channel through which the dispatcher feeds anonymous handler functions to the workers.
  • wg: WaitGroup for graceful shutdown synchronization.
  • gos: fixed worker pool size determined at Init() time.
  • router: map routingKey → handler function.
  • consumer: same consumer object shared with the parent Listener.

func (*Instance) ListenAndServe

func (l *Instance) ListenAndServe() error

ListenAndServe starts the worker pool and enters an infinite loop that consumes messages from the broker. If consumer.Consume() returns an error, it is propagated to the caller, enabling graceful shutdown at a higher level.

func (*Instance) Shutdown

func (l *Instance) Shutdown(ctx context.Context) error

Shutdown initiates a graceful shutdown. It waits either for stop() to finish or for the context to be canceled/expired.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener encapsulates common parameters of a message‑queue subscriber.

  • consumer: an object that implements the broker.Consumer interface.
  • gos: desired number of concurrent goroutines used by an Instance.

Listener itself does not process messages; it acts as a factory that creates an Instance where the real work happens.

func NewListener

func NewListener(consumer Consumer) *Listener

NewListener constructs a Listener with a default parallelism level of 1.

func (*Listener) Init

func (l *Listener) Init(router Router) *Instance

Init takes a Router snapshot and returns a ready‑to‑run Instance. The workChan capacity is set to 1; workers drain it quickly, so a larger buffer is rarely needed. To start with another router, create a new Instance instead of mutating the old one.

func (*Listener) SetConcurrency

func (l *Listener) SetConcurrency(n int) error

SetConcurrency sets the number of goroutines that will be spawned later inside an Instance. It validates the input (n >= 1) and clamps the value by runtime.GOMAXPROCS(0).

func (*Listener) SetLogger

func (l *Listener) SetLogger(logger LoggerFunc)

SetLogger overrides the default stdlib logger. Pass nil to restore logging to the standard library’s log.Print.

type LoggerFunc

type LoggerFunc func(error)

LoggerFunc is a pluggable callback for error reporting. Users can inject any logger (zap, logrus, zerolog, etc.) by calling listener.SetLogger(customLogger).

type Message

type Message interface {
	// Headers returns the message metadata headers.
	Headers() map[string]interface{}

	// ContentType returns the MIME type of the message payload.
	ContentType() string

	// IsRedelivered signals if this delivery is a redelivery of a previous message.
	IsRedelivered() bool

	// Body returns the raw payload bytes.
	Body() []byte

	// RoutingKey returns the message routing key set on the AMQP delivery.
	RoutingKey() string

	// Ack acknowledges successful processing of the message.
	// It signals the broker to remove the message from the queue.
	Ack() error

	// Nack negatively acknowledges the message, requeuing it.
	// It signals a processing failure.
	Nack() error

	// Reject rejects the message without multiple-nack support.
	// It doesn't requeue the message.
	Reject() error
}

Message represents a single broker-delivered message, allowing inspection and acknowledgment. Implementations wrap the broker-specific delivery type.

type Publisher

type Publisher interface {
	// Publish sends a message payload in the given context.
	// It returns an error if the message could not be delivered.
	Publish(context.Context, []byte) error

	// Close releases any resources held by the publisher, such as channels or connections.
	// After Close, further calls to Publish should return an error.
	Close() error
}

Publisher defines the interface for publishing messages to a broker. Implementations should send the payload and handle any connection lifecycle.

type Router

type Router map[string]func(message Message)

func NewRouter

func NewRouter() Router

func (Router) Add

func (r Router) Add(key string, f func(message Message))

type UnroutedMessage

type UnroutedMessage struct {
}

func (UnroutedMessage) Error

func (UnroutedMessage) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL