gobroker

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2025 License: MIT Imports: 11 Imported by: 0

README

GoBroker v2

GoBroker is a unified messaging broker library for Go that provides a consistent API for working with multiple message broker systems, including RabbitMQ, Redis pub/sub, and Amazon MQ.

Go Reference

Features

  • Unified API: Consistent interface for publishing and subscribing across different broker implementations
  • Multiple Backends: Support for RabbitMQ, Redis pub/sub, and Amazon MQ
  • Connection Management: Automatic connection establishment, error handling, and reconnection
  • Channel Pooling: Efficient channel management with pooling and reuse
  • Error Handling: Comprehensive error handling and recovery
  • Auto-Reconnect: Automatic reconnection on connection failures

Installation

go install github.com/defensestation/gobroker/v2

Dependencies

Quick Start

Creating a Broker
// RabbitMQ broker
rabbitBroker := gobroker.NewBroker("localhost", gobroker.BrokerTypeRabbitMQ, &gobroker.EndpointOptions{
    Username: "guest",
    Password: "guest",
    Port:     "5672",
})
defer rabbitBroker.Close()

// Redis broker
redisBroker := gobroker.NewBroker("localhost", gobroker.BrokerTypeRedis, &gobroker.EndpointOptions{
    Password: "",
    Port:     "6379",
    DB:       0,
})
defer redisBroker.Close()

// Amazon MQ broker
amazonMQBroker := gobroker.NewBroker("admin:password@mq-broker.example.com:61613", gobroker.BrokerTypeAmazonMQ)
defer amazonMQBroker.Close()
Publishing Messages
// Create a message
message := map[string]interface{}{
    "id":        123,
    "content":   "Hello world!",
    "timestamp": time.Now().Unix(),
}

// Publish using the unified API
err := rabbitBroker.Publish("my-exchange.user.created", message)
err = redisBroker.Publish("user:created", message)
err = amazonMQBroker.Publish("/queue/user-events", message)
Subscribing to Messages
// Subscribe using the unified API
err := rabbitBroker.Subscribe("my-exchange.user.created", func(data []byte) {
    fmt.Printf("Received RabbitMQ message: %s\n", string(data))
}, "my-queue")

err = redisBroker.Subscribe("user:created", func(data []byte) {
    fmt.Printf("Received Redis message: %s\n", string(data))
})

err = amazonMQBroker.Subscribe("/queue/user-events", func(data []byte) {
    fmt.Printf("Received Amazon MQ message: %s\n", string(data))
})

Broker-Specific Usage

RabbitMQ

RabbitMQ implementation supports topics in the format exchange.routekey.

// Creating an exchange
exchange, err := rabbitBroker.BuildExchange("user-events", &gobroker.ExchangeOptions{
    Type:       "topic",
    Durable:    true,
    AutoDelete: false,
})

// Publishing directly to an exchange
err = exchange.Publish("user.created", message)

// Queue declare and bind
queueName, err := rabbitBroker.QueueDeclareAndBind("user-events", "user.created", "user-created-queue")
Redis

Redis implementation uses channel names as topics.

// Multiple channel subscription
err = redisBroker.RunRedisConsumer([]string{"user:created", "user:updated"}, func(data []byte) {
    fmt.Printf("Received Redis message: %s\n", string(data))
})
Amazon MQ

Amazon MQ implementation (using STOMP) uses destinations as topics.

// Using queues
err = amazonMQBroker.PublishToAmazonMQQueue("/queue/user-events", message)

// Using topics
err = amazonMQBroker.PublishToAmazonMQQueue("/topic/user-events", message)

Advanced Usage

Connection Management
// Get specific connection
conn, err := rabbitBroker.GetConnection(gobroker.PublishConnection)

// Get specific channel
ch, err := conn.GetChannel()

// Get specific channel by ID
ch, err := conn.GetChannel(5)
Error Handling
// All operations return errors that should be checked
if err := broker.Publish("topic", message); err != nil {
    log.Printf("Failed to publish message: %v", err)
}
Connection Options
// RabbitMQ with TLS
rabbitBroker := gobroker.NewBroker("secure-rabbit.example.com", gobroker.BrokerTypeRabbitMQ, &gobroker.EndpointOptions{
    Protocol: "amqps", // Use AMQPS protocol for TLS
    Username: "user",
    Password: "pass",
    Port:     "5671",  // Secure port
})

// Redis with authentication and database selection
redisBroker := gobroker.NewBroker("redis.example.com", gobroker.BrokerTypeRedis, &gobroker.EndpointOptions{
    Password: "secret",
    Port:     "6379",
    DB:       3, // Use database 3
})

Design Philosophy

GoBroker was designed with the following principles in mind:

  1. Unified Interface: Consistent API regardless of the underlying broker implementation
  2. Simplicity: Easy to use but powerful enough for complex scenarios
  3. Resilience: Robust error handling and automatic reconnection
  4. Performance: Efficient connection and channel management
  5. Extensibility: Easy to add support for additional broker types

WARNING

AmazonMQ has not been tested yet.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

GoBroker is licensed under the MIT License.

Documentation

Overview

version: 0.0.1 file to manage channel

version: 0.0.1 file to manage connection

version: 0.0.1 file to build exchange

version: 0.0.1 broker wrapper is lib to manage creation of exchanges and consumers only type supported is rabbitmq

version: 0.0.1 file publish messages

redis.go Implementation of Redis pub/sub for gobroker

Index

Constants

View Source
const (
	BrokerTypeRabbitMQ = "rabbitmq"
	BrokerTypeRedis    = "redis"
	BrokerTypeAmazonMQ = "amazonmq"
)

Updated broker type constants

Variables

View Source
var (
	AmazonMQPublishConnection  = "amazonmq_publish"
	AmazonMQConsumerConnection = "amazonmq_consume"
)
View Source
var (
	PublishConnection  = "publish"
	ConsumerConnection = "consume"
)

connection types

View Source
var (
	RedisPublishConnection  = "redis_publish"
	RedisConsumerConnection = "redis_consume"
)

Redis connection types

Functions

This section is empty.

Types

type AmazonMQChannel

type AmazonMQChannel struct {
	Conn   *stomp.Conn
	Status string
	Id     int

	// Subscription reference, if this channel is used by a consumer
	Sub *stomp.Subscription
}

AmazonMQChannel struct

func (*AmazonMQChannel) Close

func (ch *AmazonMQChannel) Close() error

Close an AmazonMQChannel

type AmazonMQConnection

type AmazonMQConnection struct {
	// The underlying STOMP connection
	Conn        *stomp.Conn
	Status      string
	Type        string
	ChannelPool map[int]*AmazonMQChannel

	// We'll just store the address or any needed info here
	Address string
	// If you want to store the original stomp options:
	Options []func(*stomp.Conn) error
	// contains filtered or unexported fields
}

AmazonMQConnection struct

func (*AmazonMQConnection) AddAmazonMQChannel

func (c *AmazonMQConnection) AddAmazonMQChannel() (*AmazonMQChannel, error)

AddAmazonMQChannel

func (*AmazonMQConnection) GetAmazonMQChannel

func (c *AmazonMQConnection) GetAmazonMQChannel(id ...int) (*AmazonMQChannel, error)

GetAmazonMQChannel

type Broker

type Broker struct {
	Endpoint string
	Type     string
	// contains filtered or unexported fields
}

Broker represents a message broker connection

func NewBroker

func NewBroker(endpoint string, brokerType string, opts ...*EndpointOptions) *Broker

NewBroker creates a new broker with unified API

func (*Broker) AddAmazonMQConnection

func (e *Broker) AddAmazonMQConnection(ctype string) (*AmazonMQConnection, error)

AddAmazonMQConnection: create a new STOMP connection and attach it to Broker

func (*Broker) AddConnection

func (e *Broker) AddConnection(ctype string) (*Connection, error)

create tls connection to borker

func (*Broker) AddRedisConnection

func (e *Broker) AddRedisConnection(ctype string) (*RedisConnection, error)

Add Redis connection to broker

func (*Broker) BuildExchange

func (b *Broker) BuildExchange(name string, opts ...*ExchangeOptions) (*Exchange, error)

build exchange

func (*Broker) Close

func (b *Broker) Close()

Close connections for all broker types

func (*Broker) GetAmazonMQConnection

func (e *Broker) GetAmazonMQConnection(ctype string) (*AmazonMQConnection, error)

GetAmazonMQConnection: retrieves an existing connection or creates one if needed

func (*Broker) GetConnection

func (e *Broker) GetConnection(ctype string) (*Connection, error)

GetConnection returns a live *Connection or tries to create one

func (*Broker) GetRedisConnection

func (e *Broker) GetRedisConnection(ctype string) (*RedisConnection, error)

Get Redis connection

func (*Broker) Publish

func (b *Broker) Publish(ctx context.Context, topic string, body interface{}) error

Extended unified publish method

func (*Broker) PublishToAmazonMQQueue

func (b *Broker) PublishToAmazonMQQueue(destination string, body interface{}) error

PublishToAmazonMQQueue publishes a message (JSON-encoded) to a STOMP destination

func (*Broker) PublishToExchange

func (b *Broker) PublishToExchange(ctx context.Context, exchangeName, routekey string, body interface{}, opts ...*PublishOptions) error

expose method to publish messages to exchange

func (*Broker) PublishToRedisChannel

func (b *Broker) PublishToRedisChannel(channel string, body interface{}) error

Publish message to Redis

func (*Broker) QueueDeclareAndBind

func (b *Broker) QueueDeclareAndBind(exchange, routeKey, queueName string) (string, error)

only declare and bind

func (*Broker) RunAmazonMQConsumer

func (b *Broker) RunAmazonMQConsumer(destination string, handler func([]byte)) error

RunAmazonMQConsumer subscribes to a STOMP destination and processes messages

func (*Broker) RunConsumer

func (b *Broker) RunConsumer(exchange, routeKey string, functions func([]byte), queueName string) error

only one channel is used per go cosumer

func (*Broker) RunRedisConsumer

func (b *Broker) RunRedisConsumer(channels []string, handler func([]byte)) error

Run Redis consumer

func (*Broker) Subscribe

func (b *Broker) Subscribe(topic string, handler func([]byte), queueName ...string) error

Extended unified subscribe method

type Channel

type Channel struct {
	*amqp.Channel
	Status string
	Id     int
}

type Connection

type Connection struct {
	*amqp.Connection
	Status      string
	Type        string
	ChannelPool map[int]*Channel
	// contains filtered or unexported fields
}

func (*Connection) AddChannel

func (c *Connection) AddChannel() (*Channel, error)

create channel for rabbitmq

func (*Connection) GetChannel

func (c *Connection) GetChannel(id ...int) (*Channel, error)

get channel can take id to get specific channel

type EndpointOptions

type EndpointOptions struct {
	Protocol string
	Username string
	Password string
	Port     string
	DB       int // For Redis
}

EndpointOptions defines connection parameters for different broker types

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

exchane struct

func (*Exchange) Publish

func (e *Exchange) Publish(ctx context.Context, routekey string, body interface{}, opts ...*PublishOptions) error

expose method to publish messages to exchange

func (*Exchange) QueueDeclareAndBind

func (e *Exchange) QueueDeclareAndBind(exchange, routeKey, queueName string, ch *Channel) (string, error)

only declare and bind

func (*Exchange) RunConsumer

func (e *Exchange) RunConsumer(exchange, routeKey string, functions func([]byte), queueName string) error

only one channel is used per go cosumer

type ExchangeOptions

type ExchangeOptions struct {
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

exchange options

type PublishOptions

type PublishOptions struct {
	Mandatory bool
	Immediate bool
}

type RedisChannel

type RedisChannel struct {
	Client *redis.Client
	Status string
	Id     int
	// contains filtered or unexported fields
}

RedisChannel struct to maintain compatibility with Channel interface

func (*RedisChannel) Close

func (ch *RedisChannel) Close() error

Close Redis channel

type RedisConnection

type RedisConnection struct {
	*redis.Client
	Status      string
	Type        string
	ChannelPool map[int]*RedisChannel
	// contains filtered or unexported fields
}

RedisConnection struct to maintain compatibility with Connection interface

func (*RedisConnection) AddRedisChannel

func (c *RedisConnection) AddRedisChannel() (*RedisChannel, error)

Add Redis channel

func (*RedisConnection) GetRedisChannel

func (c *RedisConnection) GetRedisChannel(id ...int) (*RedisChannel, error)

Get Redis channel

type RedisOptions

type RedisOptions struct {
	DB       int
	Password string
}

Redis-specific options

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL