broker

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: MIT Imports: 11 Imported by: 0

README

broker

Go Test

A lightweight, resilient Go message broker abstraction library with built-in auto-reconnect, queue auto-resubscription, and support for multiple message brokers.

Overview

broker provides a unified Connection interface that abstracts away the differences between message broker implementations (RabbitMQ, Kafka, etc.). It handles the complexities of connection resilience so your application logic stays clean and simple.

Features

  • Unified Interface: Single Connection interface for publishing and subscribing across different brokers.
  • Auto-Reconnect: Automatically reconnects when the connection is lost, with built-in retry and exponential backoff.
  • Auto-Resubscribe: After a reconnection, all previous subscriptions are automatically restored (RabbitMQ) or handled transparently by the underlying client library (Kafka, Redis, MQTT).
  • Error Centralization: Centralized error handling via OnError callback.
  • Reconnect Callbacks: Hook into reconnection events via OnReconnected to trigger recovery logic (e.g., outbox pattern replay).
  • Context Support: Publish methods accept context.Context for timeout and cancellation control.
  • Graceful Shutdown: Clean Close method for proper resource cleanup.

Current Implementations

Broker Status
RabbitMQ ✅ Complete
Kafka ✅ Complete
Redis ✅ Complete
MQTT ✅ Complete

Installation

go get github.com/morkid/broker

Quick Start

RabbitMQ
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/morkid/broker"
)

func main() {
    conn := broker.CreateConnection("rabbitmq")

    conn.OnError(func(err error) {
        log.Printf("Broker error: %v", err)
    })

    conn.OnReconnected(func() {
        log.Println("Reconnected — trigger recovery logic here")
    })

    // Connect — blocks until connected
    err := conn.Connect("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    conn.Subscribe("my-queue", func(data []byte) {
        fmt.Printf("Received: %s\n", string(data))
    })

    err = conn.Publish(context.Background(), "my-queue", []byte("Hello, World!"))
    if err != nil {
        log.Printf("Publish error: %v", err)
    }

    select {}
}
Kafka
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/morkid/broker"
)

func main() {
    conn := broker.CreateConnection("kafka")

    conn.OnError(func(err error) {
        log.Printf("Broker error: %v", err)
    })

    conn.OnReconnected(func() {
        log.Println("Reconnected — trigger recovery logic here")
    })

    // Connect — supports kafka:// URL or legacy comma-separated format, returns immediately
    // (actual connection happens lazily on first produce/consume)
    err := conn.Connect("kafka://localhost:9092/?brokers=localhost:9092")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    conn.Subscribe("my-topic", func(data []byte) {
        fmt.Printf("Received: %s\n", string(data))
    })

    err = conn.Publish(context.Background(), "my-topic", []byte("Hello, World!"))
    if err != nil {
        log.Printf("Publish error: %v", err)
    }

    select {}
}
Redis
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/morkid/broker"
)

func main() {
    // Redis Pub/Sub
    conn := broker.CreateConnection("redis")

    conn.OnError(func(err error) {
        log.Printf("Broker error: %v", err)
    })

    // Connect — format: redis://host:port/db?mode=pubsub
    err := conn.Connect("redis://localhost:6379/0")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    conn.Subscribe("my-channel", func(data []byte) {
        fmt.Printf("Received: %s\n", string(data))
    })

    err = conn.Publish(context.Background(), "my-channel", []byte("Hello, World!"))
    if err != nil {
        log.Printf("Publish error: %v", err)
    }

    select {}
}
MQTT
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/morkid/broker"
)

func main() {
    conn := broker.CreateConnection("mqtt")

    conn.OnError(func(err error) {
        log.Printf("Broker error: %v", err)
    })

    // Connect — format: tcp://host:port?clientid=my-app&qos=1
    err := conn.Connect("tcp://localhost:1883?clientid=my-app")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    conn.Subscribe("sensors/temperature", func(data []byte) {
        fmt.Printf("Received: %s\n", string(data))
    })

    err = conn.Publish(context.Background(), "sensors/temperature", []byte("23.5"))
    if err != nil {
        log.Printf("Publish error: %v", err)
    }

    select {}
}

API Reference

ConnectionCallback
type ConnectionCallback func(data []byte)

Callback function invoked when a message is received.

Connection Interface
Method Description
Connect(url string) error Establishes a connection to the broker.
Publish(ctx context.Context, queue string, body []byte, optionalContentType ...string) error Publishes a message to the specified queue/topic with context support for timeout/cancellation. Default content type is "text/plain". For RabbitMQ, queues are declared as durable automatically.
Subscribe(queue string, callback ConnectionCallback) error Subscribes to a queue/topic and registers the callback for incoming messages. Acknowledgment behavior varies by broker: RabbitMQ uses auto-ack, Redis Streams uses explicit XAck, MQTT uses QoS levels, Kafka uses consumer group offsets. Includes panic recovery for callback safety.
OnError(callback func(err error)) Sets a centralized error handler. Called on connection failures and other errors.
OnReconnected(callback func()) Sets a callback that fires after a successful reconnection. Useful for replaying outbox data.
Close() error Gracefully closes the connection and cleans up resources.
CreateConnection
func CreateConnection(connectionType string) Connection

Factory function that returns a Connection implementation based on the provided type:

Type Implementation
"rabbitmq" or "rabbit" RabbitMQ
"kafka" Kafka
"redis" Redis (Pub/Sub or Streams)
"mqtt" MQTT (with QoS config)
(default) RabbitMQ

Auto-Reconnect

RabbitMQ

The RabbitMQ implementation actively manages reconnection:

  1. Connection Watch: A goroutine monitors the underlying AMQP connection for NotifyClose events.
  2. Automatic Retry: When a disconnection is detected, the library enters a retry loop with a 5-second interval between attempts.
  3. Channel Recovery: Once the connection is re-established, a new AMQP channel is opened.
  4. Subscription Restoration: All previously registered subscriptions are automatically re-declared and re-consumed.
  5. Reconnect Callback: If OnReconnected was configured, the callback fires at the end of the recovery sequence.

Connect() blocks until a real TCP connection and channel are established.

Kafka

The Kafka implementation (built on franz-go) handles connection management internally:

  1. Automatic: franz-go transparently reconnects to brokers when connections are lost, with built-in exponential backoff and retry for transient errors.
  2. State Monitoring: A background watcher pings the cluster every 15 seconds to detect disconnection and reconnection events.
  3. Reconnect Callback: OnReconnected fires when the watcher detects the connection has been restored after a failure.

Connect() returns immediately — the client connects lazily on first produce or consume. The first Ping (up to 15s later) or any produce/consume operation will establish the actual connection.

Redis

The Redis implementation (built on go-redis) handles reconnection transparently via its connection pool:

  1. Automatic: go-redis automatically reconnects when connections are lost.
  2. PubSub: go-redis auto-resubscribes to previously subscribed channels on reconnect.
  3. Streams: Consumer groups resume from the last unacknowledged message automatically.
  4. State Monitoring: A background watcher pings Redis every 15 seconds to detect disconnection and reconnection events.
  5. Reconnect Callback: OnReconnected fires when the watcher detects the connection has been restored after a failure.

Connect() blocks until the initial Ping succeeds, verifying the connection.

MQTT

The MQTT implementation (built on Eclipse Paho) provides built-in reconnection:

  1. Automatic: Paho auto-reconnect is enabled by default with SetAutoReconnect(true) and a max reconnect interval of 30 seconds.
  2. Connection Loss: OnConnectionLost handler fires and reports the error via OnError.
  3. Reconnection: OnConnect handler fires on initial connect and each successful reconnect, triggering restoreSubscriptions() to re-subscribe to all registered topics.
  4. Reconnect Callback: OnReconnected fires after a successful reconnection.

Connect() blocks until the connection is established (with a 10-second timeout).

Connection URL Formats
Broker URL Format
RabbitMQ amqp://user:pass@host:port/vhost
Kafka kafka://host1:9092/?brokers=host1:9092,host2:9092 or legacy host1:9092,host2:9092
Redis redis://host:port/db?mode=pubsub (or ?mode=stream&group=my-group&consumer=worker-1&start=>)
MQTT tcp://host:port?clientid=my-app&qos=1&clean=true or ssl://host:port
Redis URL Parameters
Parameter Default Description
mode pubsub pubsub (non-persistent channels) or stream (persistent consumer groups)
group broker-group Consumer group name (Streams only)
consumer hostname Consumer name within the group (Streams only)
start > Start position: > unread only, 0 from beginning, $ latest (Streams only)
MQTT URL Parameters
Parameter Default Description
clientid hostname+timestamp MQTT client identifier
qos 1 Quality of Service level: 0 (at most once), 1 (at least once), 2 (exactly once)
clean true Clean session: true to start fresh, false to persist subscriptions
username MQTT username (optional)
password MQTT password (optional)

Use Case: Outbox Pattern

The OnReconnected callback is designed to complement the Transactional Outbox pattern. When your application reconnects after a broker outage, you can trigger a replay of the outbox to ensure no messages are lost:

conn.OnReconnected(func() {
    outbox.ReplayUnsentMessages()
})

Graceful Shutdown

Always call Close() to perform a graceful shutdown. This prevents message loss and ensures the broker connection is properly closed:

defer conn.Close()
  • RabbitMQ: Closes the channel and connection.
  • Kafka: Flushes any pending in-flight messages (with 10s timeout) before closing the client, then waits for the consumer loop to exit.
  • Redis: Closes the PubSub connection (if active), closes the client, and waits for all consumer goroutines to exit.
  • MQTT: Disconnects with a 250ms grace period to allow pending work to complete.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection interface {
	// Connect establishes the initial connection to the broker.
	Connect(url string) error

	// Publish sends a message to the specified queue/topic.
	// The context allows for timeout and cancellation control.
	Publish(ctx context.Context, queue string, body []byte, optionalContentType ...string) error

	// Subscribe registers a callback for the given queue/topic.
	// The implementation handles auto-resubscription behind the scenes.
	Subscribe(queue string, callback ConnectionCallback) error

	// OnError sets a centralized error handler.
	// Every connection error (e.g., broker disconnection) will be reported here,
	// which is useful for logging and implementing recovery logic.
	OnError(callback func(err error))

	// OnReconnected is called after the connection and channel have been successfully restored.
	// This is the ideal place to trigger outbox replay or other recovery logic.
	OnReconnected(callback func())

	// Close performs a graceful shutdown, cleaning up all resources.
	Close() error
}

Connection defines the interface for message broker connections.

func CreateConnection

func CreateConnection(connectionType string) Connection

CreateConnection returns a Connection implementation for the given broker type.

func NewKafkaConnection

func NewKafkaConnection() Connection

NewKafkaConnection creates a new Kafka-backed Connection.

func NewMQTTConnection

func NewMQTTConnection() Connection

NewMQTTConnection creates a new MQTT-backed Connection.

func NewRabbitConnection

func NewRabbitConnection() Connection

NewRabbitConnection creates a new RabbitMQ-backed Connection.

func NewRedisConnection

func NewRedisConnection() Connection

NewRedisConnection creates a new Redis-backed Connection. The mode (pubsub/stream) is determined by the URL passed to Connect().

type ConnectionCallback

type ConnectionCallback func(data []byte)

ConnectionCallback is the callback function invoked when a new message is received.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL