grabbit

package module
v0.1.1-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2024 License: MIT Imports: 6 Imported by: 0

README

Grabbit

Go Reference Go Report Card

Grabbit is a simplified and idiomatic wrapper around the RabbitMQ Go client, making it easier to consume messages using common AMQP patterns. It abstracts the boilerplate code involved in setting up consumers, exchanges, and queues, allowing developers to focus on writing business logic.

Table of Contents

Features

  • Easy Configuration: Simplifies the setup of exchanges, queues, and bindings with functional options.
  • Middleware Support: Allows adding middleware for reusable message processing logic.
  • Context Integration: Uses Go's context.Context for cancellation and timeouts, facilitating graceful shutdowns.
  • Customizable Error Handling: Provides hooks to handle errors according to your application's needs.
  • Advanced Connection Settings: Supports custom AMQP configurations, including TLS settings.
  • Broker State Management: Manages connection status and provides channels for monitoring.
  • Automatic Reconnection: Handles reconnections seamlessly in case of connection loss.
  • Flexible Acknowledgments: Supports both automatic and manual message acknowledgments.

Installation

To install Grabbit, use go get:

go get github.com/hey-xico/grabbit

Ensure your project is using Go modules (a go.mod file is present).

Getting Started

Creating a Broker

The Broker is responsible for managing connections and consumers. You can create a new broker with the application's context:

ctx := context.Background()
broker := grabbit.NewBroker(ctx)

Customize the broker by setting an error handler or adjusting the backoff configuration:

broker.SetErrorHandler(func(err error) {
    log.Printf("Broker error: %v", err)
})

broker.SetBackoffConfig(grabbit.BackoffConfig{
    InitialInterval: 2 * time.Second,
    MaxInterval:     1 * time.Minute,
    Multiplier:      1.5,
})
Setting Up a Consumer

Create a consumer by specifying a name and a handler function:

consumer := broker.Consumer("my_consumer", func(ctx *grabbit.Context) error {
    // Process the message
    fmt.Printf("Received message: %s\n", string(ctx.Body()))
    return nil
})

Configure the consumer's exchange, queue, and binding:

consumer.
    Exchange("my_exchange", grabbit.DirectExchange, grabbit.WithExchangeDurable(true)).
    Queue("my_queue", grabbit.WithQueueDurable(true)).
    Binding("routing_key")

Set consumer options and QoS settings:

consumer.
    ConsumerOptions(grabbit.WithConsumerAutoAck(false)).
    QoS(10) // Prefetch 10 messages
Middleware Support

Grabbit supports middleware functions that can be applied to consumers or the broker:

// Define a middleware
func loggingMiddleware(next grabbit.HandlerFunc) grabbit.HandlerFunc {
    return func(ctx *grabbit.Context) error {
        log.Printf("Processing message: %s", string(ctx.Body()))
        return next(ctx)
    }
}

// Apply middleware to the consumer
consumer.Use(loggingMiddleware)

// Apply middleware to the broker (applies to all consumers)
broker.Use(loggingMiddleware)
Graceful Shutdown

To gracefully shut down the broker and all consumers, call Shutdown:

// Start the broker in a separate goroutine
go func() {
    if err := broker.Start("amqp://guest:guest@localhost:5672/"); err != nil {
        log.Fatalf("Broker stopped: %v", err)
    }
}()

// Listen for OS signals for graceful shutdown
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

<-signalChan
log.Println("Shutting down broker...")
if err := broker.Shutdown(); err != nil {
    log.Fatalf("Failed to shut down broker: %v", err)
}

Examples

Here's a complete example of setting up a broker and a consumer:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/hey-xico/grabbit"
)

func main() {
    ctx := context.Background()
    broker := grabbit.NewBroker(ctx)

    broker.SetErrorHandler(func(err error) {
        log.Printf("Broker error: %v", err)
    })

    // Define the message handler
    handler := func(ctx *grabbit.Context) error {
        log.Printf("Received message: %s", string(ctx.Body()))
        return nil
    }

    // Create a consumer
    broker.Consumer("my_consumer", handler).
        Exchange("my_exchange", grabbit.DirectExchange, grabbit.WithExchangeDurable(true)).
        Queue("my_queue", grabbit.WithQueueDurable(true)).
        Binding("routing_key").
        ConsumerOptions(grabbit.WithConsumerAutoAck(false)).
        QoS(10)

    // Start the broker
    go func() {
        if err := broker.Start("amqp://guest:guest@localhost:5672/"); err != nil {
            log.Fatalf("Broker stopped: %v", err)
        }
    }()

    // Wait for termination signal
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
    <-signalChan

    log.Println("Shutting down broker...")
    if err := broker.Shutdown(); err != nil {
        log.Fatalf("Failed to shut down broker: %v", err)
    }

    log.Println("Broker shut down gracefully.")
}

To run this example:

  1. Ensure RabbitMQ is running on your local machine at localhost:5672.

  2. Save the code to main.go.

  3. Run the program:

    go run main.go
    

Advanced Usage

Custom AMQP Configuration

Set custom AMQP configurations, such as TLS settings:

import (
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "log"

    "github.com/rabbitmq/amqp091-go"
)

// Load client certificate and CA
cert, err := tls.LoadX509KeyPair("client_cert.pem", "client_key.pem")
if err != nil {
    log.Fatalf("Failed to load certificates: %v", err)
}

caCert, err := ioutil.ReadFile("ca_cert.pem")
if err != nil {
    log.Fatalf("Failed to read CA certificate: %v", err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

tlsConfig := &tls.Config{
    Certificates: []tls.Certificate{cert},
    RootCAs:      caCertPool,
}

amqpConfig := amqp.Config{
    TLSClientConfig: tlsConfig,
}

broker.SetConfig(amqpConfig)
Monitoring Broker Status

Monitor the broker's connection status using the StatusChan channel:

go func() {
    for status := range broker.StatusChan {
        switch status {
        case grabbit.StatusConnected:
            log.Println("Broker connected")
        case grabbit.StatusDisconnected:
            log.Println("Broker disconnected")
        case grabbit.StatusConnecting:
            log.Println("Broker connecting")
        }
    }
}()
Handling Errors in Handlers

Message handlers can return errors to trigger retries or log issues:

handler := func(ctx *grabbit.Context) error {
    // Process the message
    if err := processMessage(ctx.Body()); err != nil {
        // Optionally, Nack the message to requeue it
        ctx.Nack(false, true)
        return fmt.Errorf("failed to process message: %w", err)
    }
    return nil
}

Documentation

Detailed documentation is available on pkg.go.dev.

Key types and functions:

  • Broker: Manages connections and consumers.
  • Consumer: Represents a message consumer.
  • Context: Provides methods to interact with the incoming message.
  • Functional options for configuring exchanges, queues, bindings, consumers, and QoS.

Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository on GitHub.

  2. Create a new feature branch:

    git checkout -b feature/my-feature
    
  3. Commit your changes with clear messages:

    git commit -am 'Add new feature'
    
  4. Push to the branch:

    git push origin feature/my-feature
    
  5. Create a new Pull Request on GitHub.

Please ensure that your code adheres to Go conventions and includes appropriate tests.

Coding Guidelines
  • Go Version: Target the latest stable Go release.
  • Code Style: Use go fmt for formatting.
  • Documentation: Update or add comments for any new public APIs.
  • Testing: Write unit tests for new features and ensure existing tests pass (go test ./...).
  • Linting: Use golint and go vet to check for issues.
  • Dependencies: Keep dependencies to a minimum and use Go modules.

License

Grabbit is released under the MIT License.


Need Help?

If you encounter any issues or have questions, feel free to:

  • Open an issue on the GitHub repository.
  • Submit a pull request with improvements or bug fixes.

Happy Coding!

Grabbit aims to simplify your experience with RabbitMQ in Go. By abstracting the complexities of AMQP setup, you can focus on building robust and efficient applications.

Documentation

Overview

Package grabbit provides a simplified and idiomatic wrapper around the RabbitMQ Go client, making it easier to consume messages using common AMQP patterns.

Key features include: - Easy configuration of exchanges, queues, and bindings. - Middleware support for reusable message processing logic. - Context integration for graceful shutdowns. - Customizable error handling. - Support for advanced connection settings, including TLS. - Broker state management and metrics tracking.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithBindingArgs

func WithBindingArgs(args amqp.Table) func(*BindingOptions)

WithBindingArgs sets the Args option for BindingOptions.

func WithBindingHeaders

func WithBindingHeaders(headers amqp.Table) func(*BindingOptions)

WithBindingHeaders sets the Headers option for BindingOptions.

func WithBindingNoWait

func WithBindingNoWait(noWait bool) func(*BindingOptions)

WithBindingNoWait sets the NoWait option for BindingOptions.

func WithConsumerArgs

func WithConsumerArgs(args amqp.Table) func(*ConsumerOptions)

WithConsumerArgs sets the Args option for ConsumerOptions.

func WithConsumerAutoAck

func WithConsumerAutoAck(autoAck bool) func(*ConsumerOptions)

WithConsumerAutoAck sets the AutoAck option for ConsumerOptions.

func WithConsumerExclusive

func WithConsumerExclusive(exclusive bool) func(*ConsumerOptions)

WithConsumerExclusive sets the Exclusive option for ConsumerOptions.

func WithConsumerNoLocal

func WithConsumerNoLocal(noLocal bool) func(*ConsumerOptions)

WithConsumerNoLocal sets the NoLocal option for ConsumerOptions.

func WithConsumerNoWait

func WithConsumerNoWait(noWait bool) func(*ConsumerOptions)

WithConsumerNoWait sets the NoWait option for ConsumerOptions.

func WithConsumerTag

func WithConsumerTag(tag string) func(*ConsumerOptions)

WithConsumerTag sets the ConsumerTag option for ConsumerOptions.

func WithExchangeArgs

func WithExchangeArgs(args amqp.Table) func(*ExchangeOptions)

WithExchangeArgs sets the Args option for ExchangeOptions.

func WithExchangeAutoDelete

func WithExchangeAutoDelete(autoDelete bool) func(*ExchangeOptions)

WithExchangeAutoDelete sets the AutoDelete option for ExchangeOptions.

func WithExchangeDurable

func WithExchangeDurable(durable bool) func(*ExchangeOptions)

WithExchangeDurable sets the Durable option for ExchangeOptions.

func WithExchangeInternal

func WithExchangeInternal(internal bool) func(*ExchangeOptions)

WithExchangeInternal sets the Internal option for ExchangeOptions.

func WithExchangeNoWait

func WithExchangeNoWait(noWait bool) func(*ExchangeOptions)

WithExchangeNoWait sets the NoWait option for ExchangeOptions.

func WithQoSPrefetchSize

func WithQoSPrefetchSize(prefetchSize int) func(*QoSOptions)

WithQoSPrefetchSize sets the PrefetchSize option for QoSOptions.

func WithQueueArgs

func WithQueueArgs(args amqp.Table) func(*QueueOptions)

WithQueueArgs sets the Args option for QueueOptions.

func WithQueueAutoDelete

func WithQueueAutoDelete(autoDelete bool) func(*QueueOptions)

WithQueueAutoDelete sets the AutoDelete option for QueueOptions.

func WithQueueDeadLetterExchange

func WithQueueDeadLetterExchange(exchange string) func(*QueueOptions)

WithQueueDeadLetterExchange sets the dead-letter exchange for the queue.

func WithQueueDurable

func WithQueueDurable(durable bool) func(*QueueOptions)

WithQueueDurable sets the Durable option for QueueOptions.

func WithQueueExclusive

func WithQueueExclusive(exclusive bool) func(*QueueOptions)

WithQueueExclusive sets the Exclusive option for QueueOptions.

func WithQueueMaxRetries

func WithQueueMaxRetries(max int) func(*QueueOptions)

WithQueueMaxRetries sets the maximum number of retries for the queue.

func WithQueueNoWait

func WithQueueNoWait(noWait bool) func(*QueueOptions)

WithQueueNoWait sets the NoWait option for QueueOptions.

Types

type BackoffConfig

type BackoffConfig struct {
	// InitialInterval is the initial wait time before reconnecting.
	InitialInterval time.Duration
	// MaxInterval is the maximum wait time between reconnections.
	MaxInterval time.Duration
	// Multiplier is the multiplier for exponential backoff.
	Multiplier float64
}

BackoffConfig defines the configuration for exponential backoff.

type BindingOptions

type BindingOptions struct {
	// RoutingKey is the routing key for binding.
	RoutingKey string
	// Headers are the headers for a headers exchange.
	Headers amqp.Table
	// NoWait indicates that the server should not wait for the binding.
	NoWait bool
	// Args are additional arguments for the binding.
	Args amqp.Table
}

BindingOptions defines the options for binding a queue to an exchange.

type Broker

type Broker struct {
	StatusChan chan BrokerStatus
	// contains filtered or unexported fields
}

Broker manages connections and consumers for RabbitMQ.

func NewBroker

func NewBroker(ctx context.Context) *Broker

NewBroker creates a new Broker instance with the provided application context. The application context is used to listen for cancellation signals and access global configurations.

func (*Broker) Consumer

func (b *Broker) Consumer(name string, handler HandlerFunc) *Consumer

Consumer creates a new Consumer with the specified name and handler, and adds it to the Broker.

func (*Broker) GetStatus

func (b *Broker) GetStatus() BrokerStatus

GetStatus returns the current status of the broker.

func (*Broker) SetBackoffConfig

func (b *Broker) SetBackoffConfig(config BackoffConfig)

SetBackoffConfig sets the backoff configuration for reconnections.

func (*Broker) SetConfig

func (b *Broker) SetConfig(config amqp.Config)

SetConfig sets the AMQP configuration for the broker. This allows users to customize connection settings, including TLS.

func (*Broker) SetErrorHandler

func (b *Broker) SetErrorHandler(handler ErrorHandler)

SetErrorHandler sets the error handler for the broker. Users can provide their own error handler to process errors from the broker and consumers.

func (*Broker) Shutdown

func (b *Broker) Shutdown() error

Shutdown gracefully shuts down the broker and all its consumers. It cancels the broker's context, triggering cancellation signals.

func (*Broker) Start

func (b *Broker) Start(url string) error

Start establishes the connection to the RabbitMQ server and starts all consumers. It will attempt to reconnect and restart consumers upon connection loss. It listens for cancellation signals from the application's context.

func (*Broker) Use

func (b *Broker) Use(middleware ...MiddlewareFunc)

Use adds middleware(s) to the Broker. Middleware functions will be applied to all consumers managed by the broker.

type BrokerStatus

type BrokerStatus int

BrokerStatus represents the connection status of the broker.

const (
	// StatusDisconnected indicates that the broker is disconnected.
	StatusDisconnected BrokerStatus = iota
	// StatusConnecting indicates that the broker is attempting to connect.
	StatusConnecting
	// StatusConnected indicates that the broker is connected.
	StatusConnected
)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a message consumer with its configurations.

func (*Consumer) Binding

func (c *Consumer) Binding(routingKey string, opts ...func(*BindingOptions)) *Consumer

Binding configures the binding options for the consumer.

func (*Consumer) ConsumerOptions

func (c *Consumer) ConsumerOptions(opts ...func(*ConsumerOptions)) *Consumer

ConsumerOptions sets the consumer options.

func (*Consumer) Exchange

func (c *Consumer) Exchange(name string, exchangeType ExchangeType, opts ...func(*ExchangeOptions)) *Consumer

Exchange configures the exchange for the consumer.

func (*Consumer) QoS

func (c *Consumer) QoS(prefetchCount int, opts ...func(*QoSOptions)) *Consumer

QoS sets the QoS options for the consumer.

func (*Consumer) Queue

func (c *Consumer) Queue(name string, opts ...func(*QueueOptions)) *Consumer

Queue configures the queue for the consumer.

func (*Consumer) Use

func (c *Consumer) Use(middleware ...MiddlewareFunc) *Consumer

Use adds middleware(s) to the Consumer. Middleware functions will be applied to the consumer's handler.

type ConsumerOptions

type ConsumerOptions struct {
	// ConsumerTag is the identifier for the consumer.
	ConsumerTag string
	// AutoAck indicates whether messages are automatically acknowledged.
	AutoAck bool
	// Exclusive indicates whether the consumer has exclusive access to the queue.
	Exclusive bool
	// NoLocal indicates that the server should not deliver messages published on the same channel.
	NoLocal bool
	// NoWait indicates that the server should not wait for the consumer to be registered.
	NoWait bool
	// Args are additional arguments for the consumer.
	Args amqp.Table
}

ConsumerOptions defines the options for a consumer.

type Context

type Context struct {
	context.Context
	Delivery amqp.Delivery
	// contains filtered or unexported fields
}

Context provides methods to interact with the incoming message.

func (*Context) Ack

func (c *Context) Ack(multiple bool) error

Ack acknowledges the message, indicating successful processing. If multiple is true, all messages up to this delivery tag are acknowledged.

func (*Context) Body

func (c *Context) Body() []byte

Body returns the message body.

func (*Context) Get

func (c *Context) Get(key string) (interface{}, bool)

Get retrieves a value from the context.

func (*Context) Header

func (c *Context) Header(key string) interface{}

Header returns the value of the specified header from the message.

func (*Context) Nack

func (c *Context) Nack(multiple, requeue bool) error

Nack negatively acknowledges the message, indicating unsuccessful processing. If requeue is true, the message will be requeued. If multiple is true, multiple messages are negatively acknowledged.

func (*Context) Reject

func (c *Context) Reject(requeue bool) error

Reject negatively acknowledges the message without the possibility of requeueing multiple messages. If requeue is true, the server will attempt to requeue the message.

func (*Context) Set

func (c *Context) Set(key string, value interface{})

Set sets a key-value pair in the context.

type ErrorHandler

type ErrorHandler func(error)

ErrorHandler is a function type for handling errors.

type ExchangeOptions

type ExchangeOptions struct {
	// Name is the name of the exchange.
	Name string
	// Type is the type of the exchange (e.g., direct, fanout, topic, headers).
	Type ExchangeType
	// Durable indicates whether the exchange survives broker restarts.
	Durable bool
	// AutoDelete indicates whether the exchange is deleted when unused.
	AutoDelete bool
	// Internal indicates whether the exchange is internal (used by the broker).
	Internal bool
	// NoWait indicates that the server should not wait for the exchange declaration.
	NoWait bool
	// Args are additional arguments for the exchange declaration.
	Args amqp.Table
}

ExchangeOptions defines the configuration options for an exchange.

type ExchangeType

type ExchangeType string

ExchangeType represents the type of an AMQP exchange.

const (
	// DirectExchange routes messages to queues based on the routing key.
	DirectExchange ExchangeType = "direct"
	// FanoutExchange routes messages to all bound queues, ignoring routing keys.
	FanoutExchange ExchangeType = "fanout"
	// TopicExchange routes messages to queues based on pattern matching.
	TopicExchange ExchangeType = "topic"
	// HeadersExchange routes messages based on matching message headers.
	HeadersExchange ExchangeType = "headers"
)

type HandlerFunc

type HandlerFunc func(*Context) error

HandlerFunc defines the handler function type for processing messages.

type MiddlewareFunc

type MiddlewareFunc func(HandlerFunc) HandlerFunc

MiddlewareFunc defines the middleware function type.

type QoSOptions

type QoSOptions struct {
	// PrefetchCount is the number of messages to prefetch.
	PrefetchCount int
	// PrefetchSize is the number of bytes to prefetch.
	PrefetchSize int
}

QoSOptions defines the Quality of Service options for a consumer.

type QueueOptions

type QueueOptions struct {
	// Name is the name of the queue.
	Name string
	// Durable indicates whether the queue survives broker restarts.
	Durable bool
	// AutoDelete indicates whether the queue is deleted when unused.
	AutoDelete bool
	// Exclusive indicates whether the queue is exclusive to the connection.
	Exclusive bool
	// NoWait indicates that the server should not wait for the queue declaration.
	NoWait bool
	// Args are additional arguments for the queue declaration.
	Args amqp.Table
}

QueueOptions defines the configuration options for a queue.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL