broker

A lightweight, resilient Go message broker abstraction library with built-in auto-reconnect, queue auto-resubscription, and support for multiple message brokers.
Overview
broker provides a unified Connection interface that abstracts away the differences between message broker implementations (RabbitMQ, Kafka, etc.). It handles the complexities of connection resilience so your application logic stays clean and simple.
Features
- Unified Interface: Single
Connection interface for publishing and subscribing across different brokers.
- Auto-Reconnect: Automatically reconnects when the connection is lost, with built-in retry and exponential backoff.
- Auto-Resubscribe: After a reconnection, all previous subscriptions are automatically restored (RabbitMQ) or handled transparently by the underlying client library (Kafka, Redis, MQTT).
- Error Centralization: Centralized error handling via
OnError callback.
- Reconnect Callbacks: Hook into reconnection events via
OnReconnected to trigger recovery logic (e.g., outbox pattern replay).
- Context Support: Publish methods accept
context.Context for timeout and cancellation control.
- Graceful Shutdown: Clean
Close method for proper resource cleanup.
Current Implementations
| Broker |
Status |
| RabbitMQ |
✅ Complete |
| Kafka |
✅ Complete |
| Redis |
✅ Complete |
| MQTT |
✅ Complete |
Installation
go get github.com/morkid/broker
Quick Start
RabbitMQ
package main
import (
"context"
"fmt"
"log"
"github.com/morkid/broker"
)
func main() {
conn := broker.CreateConnection("rabbitmq")
conn.OnError(func(err error) {
log.Printf("Broker error: %v", err)
})
conn.OnReconnected(func() {
log.Println("Reconnected — trigger recovery logic here")
})
// Connect — blocks until connected
err := conn.Connect("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
conn.Subscribe("my-queue", func(data []byte) {
fmt.Printf("Received: %s\n", string(data))
})
err = conn.Publish(context.Background(), "my-queue", []byte("Hello, World!"))
if err != nil {
log.Printf("Publish error: %v", err)
}
select {}
}
Kafka
package main
import (
"context"
"fmt"
"log"
"github.com/morkid/broker"
)
func main() {
conn := broker.CreateConnection("kafka")
conn.OnError(func(err error) {
log.Printf("Broker error: %v", err)
})
conn.OnReconnected(func() {
log.Println("Reconnected — trigger recovery logic here")
})
// Connect — supports kafka:// URL or legacy comma-separated format, returns immediately
// (actual connection happens lazily on first produce/consume)
err := conn.Connect("kafka://localhost:9092/?brokers=localhost:9092")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
conn.Subscribe("my-topic", func(data []byte) {
fmt.Printf("Received: %s\n", string(data))
})
err = conn.Publish(context.Background(), "my-topic", []byte("Hello, World!"))
if err != nil {
log.Printf("Publish error: %v", err)
}
select {}
}
Redis
package main
import (
"context"
"fmt"
"log"
"github.com/morkid/broker"
)
func main() {
// Redis Pub/Sub
conn := broker.CreateConnection("redis")
conn.OnError(func(err error) {
log.Printf("Broker error: %v", err)
})
// Connect — format: redis://host:port/db?mode=pubsub
err := conn.Connect("redis://localhost:6379/0")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
conn.Subscribe("my-channel", func(data []byte) {
fmt.Printf("Received: %s\n", string(data))
})
err = conn.Publish(context.Background(), "my-channel", []byte("Hello, World!"))
if err != nil {
log.Printf("Publish error: %v", err)
}
select {}
}
MQTT
package main
import (
"context"
"fmt"
"log"
"github.com/morkid/broker"
)
func main() {
conn := broker.CreateConnection("mqtt")
conn.OnError(func(err error) {
log.Printf("Broker error: %v", err)
})
// Connect — format: tcp://host:port?clientid=my-app&qos=1
err := conn.Connect("tcp://localhost:1883?clientid=my-app")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
conn.Subscribe("sensors/temperature", func(data []byte) {
fmt.Printf("Received: %s\n", string(data))
})
err = conn.Publish(context.Background(), "sensors/temperature", []byte("23.5"))
if err != nil {
log.Printf("Publish error: %v", err)
}
select {}
}
API Reference
ConnectionCallback
type ConnectionCallback func(data []byte)
Callback function invoked when a message is received.
Connection Interface
| Method |
Description |
Connect(url string) error |
Establishes a connection to the broker. |
Publish(ctx context.Context, queue string, body []byte, optionalContentType ...string) error |
Publishes a message to the specified queue/topic with context support for timeout/cancellation. Default content type is "text/plain". For RabbitMQ, queues are declared as durable automatically. |
Subscribe(queue string, callback ConnectionCallback) error |
Subscribes to a queue/topic and registers the callback for incoming messages. Acknowledgment behavior varies by broker: RabbitMQ uses auto-ack, Redis Streams uses explicit XAck, MQTT uses QoS levels, Kafka uses consumer group offsets. Includes panic recovery for callback safety. |
OnError(callback func(err error)) |
Sets a centralized error handler. Called on connection failures and other errors. |
OnReconnected(callback func()) |
Sets a callback that fires after a successful reconnection. Useful for replaying outbox data. |
Close() error |
Gracefully closes the connection and cleans up resources. |
CreateConnection
func CreateConnection(connectionType string) Connection
Factory function that returns a Connection implementation based on the provided type:
| Type |
Implementation |
"rabbitmq" or "rabbit" |
RabbitMQ |
"kafka" |
Kafka |
"redis" |
Redis (Pub/Sub or Streams) |
"mqtt" |
MQTT (with QoS config) |
| (default) |
RabbitMQ |
Auto-Reconnect
RabbitMQ
The RabbitMQ implementation actively manages reconnection:
- Connection Watch: A goroutine monitors the underlying AMQP connection for
NotifyClose events.
- Automatic Retry: When a disconnection is detected, the library enters a retry loop with a 5-second interval between attempts.
- Channel Recovery: Once the connection is re-established, a new AMQP channel is opened.
- Subscription Restoration: All previously registered subscriptions are automatically re-declared and re-consumed.
- Reconnect Callback: If
OnReconnected was configured, the callback fires at the end of the recovery sequence.
Connect() blocks until a real TCP connection and channel are established.
Kafka
The Kafka implementation (built on franz-go) handles connection management internally:
- Automatic: franz-go transparently reconnects to brokers when connections are lost, with built-in exponential backoff and retry for transient errors.
- State Monitoring: A background watcher pings the cluster every 15 seconds to detect disconnection and reconnection events.
- Reconnect Callback:
OnReconnected fires when the watcher detects the connection has been restored after a failure.
Connect() returns immediately — the client connects lazily on first produce or consume. The first Ping (up to 15s later) or any produce/consume operation will establish the actual connection.
Redis
The Redis implementation (built on go-redis) handles reconnection transparently via its connection pool:
- Automatic: go-redis automatically reconnects when connections are lost.
- PubSub: go-redis auto-resubscribes to previously subscribed channels on reconnect.
- Streams: Consumer groups resume from the last unacknowledged message automatically.
- State Monitoring: A background watcher pings Redis every 15 seconds to detect disconnection and reconnection events.
- Reconnect Callback:
OnReconnected fires when the watcher detects the connection has been restored after a failure.
Connect() blocks until the initial Ping succeeds, verifying the connection.
MQTT
The MQTT implementation (built on Eclipse Paho) provides built-in reconnection:
- Automatic: Paho auto-reconnect is enabled by default with
SetAutoReconnect(true) and a max reconnect interval of 30 seconds.
- Connection Loss:
OnConnectionLost handler fires and reports the error via OnError.
- Reconnection:
OnConnect handler fires on initial connect and each successful reconnect, triggering restoreSubscriptions() to re-subscribe to all registered topics.
- Reconnect Callback:
OnReconnected fires after a successful reconnection.
Connect() blocks until the connection is established (with a 10-second timeout).
| Broker |
URL Format |
| RabbitMQ |
amqp://user:pass@host:port/vhost |
| Kafka |
kafka://host1:9092/?brokers=host1:9092,host2:9092 or legacy host1:9092,host2:9092 |
| Redis |
redis://host:port/db?mode=pubsub (or ?mode=stream&group=my-group&consumer=worker-1&start=>) |
| MQTT |
tcp://host:port?clientid=my-app&qos=1&clean=true or ssl://host:port |
Redis URL Parameters
| Parameter |
Default |
Description |
mode |
pubsub |
pubsub (non-persistent channels) or stream (persistent consumer groups) |
group |
broker-group |
Consumer group name (Streams only) |
consumer |
hostname |
Consumer name within the group (Streams only) |
start |
> |
Start position: > unread only, 0 from beginning, $ latest (Streams only) |
MQTT URL Parameters
| Parameter |
Default |
Description |
clientid |
hostname+timestamp |
MQTT client identifier |
qos |
1 |
Quality of Service level: 0 (at most once), 1 (at least once), 2 (exactly once) |
clean |
true |
Clean session: true to start fresh, false to persist subscriptions |
username |
— |
MQTT username (optional) |
password |
— |
MQTT password (optional) |
Use Case: Outbox Pattern
The OnReconnected callback is designed to complement the Transactional Outbox pattern. When your application reconnects after a broker outage, you can trigger a replay of the outbox to ensure no messages are lost:
conn.OnReconnected(func() {
outbox.ReplayUnsentMessages()
})
Graceful Shutdown
Always call Close() to perform a graceful shutdown. This prevents message loss and ensures the broker connection is properly closed:
defer conn.Close()
- RabbitMQ: Closes the channel and connection.
- Kafka: Flushes any pending in-flight messages (with 10s timeout) before closing the client, then waits for the consumer loop to exit.
- Redis: Closes the PubSub connection (if active), closes the client, and waits for all consumer goroutines to exit.
- MQTT: Disconnects with a 250ms grace period to allow pending work to complete.
License
MIT