goqueue

package module
v1.0.0-beta.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2025 License: MIT Imports: 15 Imported by: 0

README ยถ

๐Ÿš€ GoQueue - Universal Go Message Queue Library

Go Reference Go Report Card GitHub stars

One library to rule them all - A powerful, extensible, and developer-friendly Go wrapper that simplifies message queue operations across multiple platforms. Build robust, scalable applications with consistent queue operations, regardless of your underlying message broker.

โœจ Why GoQueue?

Core Concept

๐ŸŽฏ Universal Interface - Write once, run anywhere. Switch between queue providers without changing your code
โšก Production Ready - Built-in retry mechanisms, dead letter queues, and error handling
๐Ÿ›ก๏ธ Type Safe - Strongly typed interfaces with comprehensive error handling
๐Ÿ”ง Extensible - Plugin architecture for custom middleware and queue providers
๐Ÿ“Š Observable - Built-in logging and middleware support for monitoring
๐Ÿš€ Developer Experience - Intuitive API design with sensible defaults


๐Ÿ“‹ Table of Contents


๐Ÿš€ Quick Start

Get up and running in less than 5 minutes:

go get -u github.com/bxcodec/goqueue
package main

import (
	"context"
    "log"

	"github.com/bxcodec/goqueue"
	"github.com/bxcodec/goqueue/consumer"
    "github.com/bxcodec/goqueue/publisher"
	"github.com/bxcodec/goqueue/interfaces"
)

func main() {

	// Create queue service
    queueSvc := goqueue.NewQueueService(
        options.WithConsumer(myConsumer),
        options.WithPublisher(myPublisher),
        options.WithMessageHandler(handleMessage),
    )

    // Publish a message
    queueSvc.Publish(context.Background(), interfaces.Message{
        Data:   map[string]interface{}{"hello": "world"},
        Action: "user.created",
        Topic:  "users",
    })

    // Start consuming
    queueSvc.Start(context.Background())
}

func handleMessage(ctx context.Context, m interfaces.InboundMessage) error {
    log.Printf("Received: %v", m.Data)
    return m.Ack(ctx) // Acknowledge successful processing
}

๐Ÿ’ซ Features

๐ŸŽฏ Core Features
  • Multi-Provider Support: Currently supports RabbitMQ (more coming soon!)
  • Unified API: Consistent interface across all queue providers
  • Type Safety: Strongly typed message structures
  • Context Support: Full Go context integration for cancellation and timeouts
๐Ÿ›ก๏ธ Reliability & Resilience
  • Automatic Retries: Configurable retry mechanisms with exponential backoff
  • Dead Letter Queues: Handle failed messages gracefully
  • Circuit Breaker: Built-in protection against cascading failures
  • Graceful Shutdown: Clean resource cleanup on application termination
๐Ÿ”ง Advanced Capabilities
  • Middleware System: Extensible pipeline for message processing
  • Custom Serialization: Support for JSON, Protocol Buffers, and custom formats
  • Message Routing: Flexible topic and routing key patterns
  • Batching: Efficient batch message processing
  • Connection Pooling: Optimized connection management
๐Ÿ“Š Observability
  • Structured Logging: Built-in zerolog integration
  • Metrics Ready: Hooks for Prometheus, StatsD, and custom metrics
  • Tracing Support: OpenTelemetry compatible
  • Health Checks: Built-in health check endpoints

๐Ÿ› ๏ธ Installation

# Install the core library
go get -u github.com/bxcodec/goqueue
Requirements
  • Go 1.21 or higher
  • Message broker (RabbitMQ supported, more coming soon)

๐Ÿ“– Basic Usage

๐Ÿš€ Publisher Example
package main

import (
    "context"
    "github.com/bxcodec/goqueue/publisher"
    publisherOpts "github.com/bxcodec/goqueue/options/publisher"
    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    // Connect to RabbitMQ
    conn, _ := amqp.Dial("amqp://localhost:5672/")

    // Create publisher
    pub := publisher.NewPublisher(
		publisherOpts.PublisherPlatformRabbitMQ,
		publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
            Conn:                     conn,
			PublisherChannelPoolSize: 5,
		}),
        publisherOpts.WithPublisherID("my-service"),
    )

    // Publish message
    err := pub.Publish(context.Background(), interfaces.Message{
        Data:   map[string]interface{}{"user_id": 123, "action": "signup"},
        Action: "user.created",
        Topic:  "users",
    })
	if err != nil {
        log.Fatal(err)
    }
}
๐Ÿ“จ Consumer Example
package main

import (
    "context"
    "github.com/bxcodec/goqueue/consumer"
    consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)

func main() {
    // Create consumer
    cons := consumer.NewConsumer(
		consumerOpts.ConsumerPlatformRabbitMQ,
        consumerOpts.WithQueueName("user-events"),
		consumerOpts.WithMaxRetryFailedMessage(3),
        consumerOpts.WithBatchMessageSize(10),
    )

    // Start consuming
    cons.Consume(context.Background(), messageHandler, metadata)
}

func messageHandler(ctx context.Context, msg interfaces.InboundMessage) error {
    // Process your message
    userData := msg.Data.(map[string]interface{})

    // Business logic here
    if err := processUser(userData); err != nil {
        // Retry with exponential backoff
        return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
    }

    // Acknowledge successful processing
    return msg.Ack(ctx)
}

๐Ÿ”ง Advanced Features

๐Ÿ”„ Retry Mechanisms

GoQueue provides sophisticated retry mechanisms with multiple strategies:

// Exponential backoff retry
return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)

// Custom retry logic
return msg.RetryWithDelayFn(ctx, func(retryCount int64) int64 {
    return retryCount * 2 // Custom delay calculation
})

// Move to dead letter queue after max retries
return msg.MoveToDeadLetterQueue(ctx)
๐Ÿ”Œ Middleware System

Extend functionality with custom middleware:

// Custom logging middleware
func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc {
    return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
        return func(ctx context.Context, m interfaces.InboundMessage) error {
            start := time.Now()
            err := next(ctx, m)
            log.Printf("Message processed in %v", time.Since(start))
            return err
        }
    }
}

// Apply middleware
cons := consumer.NewConsumer(
    consumerOpts.ConsumerPlatformRabbitMQ,
    consumerOpts.WithMiddlewares(
        LoggingMiddleware(),
        MetricsMiddleware(),
        AuthMiddleware(),
    ),
)
๐ŸŽ›๏ธ Configuration Options

Fine-tune your queue behavior:

cons := consumer.NewConsumer(
    consumerOpts.ConsumerPlatformRabbitMQ,
    consumerOpts.WithQueueName("high-priority-queue"),
    consumerOpts.WithMaxRetryFailedMessage(5),
    consumerOpts.WithBatchMessageSize(50),
    consumerOpts.WithConsumerID("worker-01"),
    consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
        ConsumerChannel: channel,
        ReQueueChannel:  requeueChannel,
        QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{
            Durable:    true,
            AutoDelete: false,
            Exclusive:  false,
        },
    }),
)

๐ŸŽฎ Examples

๐Ÿ“ Complete Examples

Explore our comprehensive examples:

๐Ÿฐ RabbitMQ Quick Setup

Start RabbitMQ with Docker:

# Clone the repository
git clone https://github.com/bxcodec/goqueue.git
cd goqueue/examples/rabbitmq/basic

# Start RabbitMQ
docker-compose up -d

# Run the example
go run main.go
๐Ÿ”„ Retry Architecture

GoQueue Retry Architecture

Automatic retry mechanism with exponential backoff and dead letter queue


๐Ÿ—๏ธ Architecture

๐ŸŽฏ Design Principles
  • Interface Segregation: Clean, focused interfaces for different responsibilities
  • Dependency Injection: Easy testing and swappable implementations
  • Error Handling: Comprehensive error types and recovery mechanisms
  • Performance: Optimized for high-throughput scenarios
  • Extensibility: Plugin architecture for custom providers and middleware
๐Ÿงฉ Core Components

Core Concept

๐Ÿ“ฆ Provider Support
Provider Status Features
RabbitMQ ๐Ÿ”„ Beta Version Full feature support
Google Pub/Sub ๐Ÿ“‹ Planned Coming soon
AWS SQS + SNS ๐Ÿ“‹ Planned Coming soon

๐Ÿ”ง Configuration

๐Ÿ“ Logging Setup

GoQueue uses structured logging with zerolog:

import "github.com/bxcodec/goqueue"

// Setup basic logging (automatic when importing consumer/publisher)
// OR setup with custom configuration:
goqueue.SetupLoggingWithDefaults() // Pretty console output for development

๐Ÿงช Testing

Run the test suite:

# Unit tests
make test

# Integration tests with RabbitMQ
make integration-test


๐Ÿ“š Documentation

๐Ÿ“– Component Documentation

Explore our comprehensive guides for each system component:

Component Description Documentation
๐Ÿ”Œ Middleware Extend functionality with custom logic ๐Ÿ“– Middleware Guide
๐Ÿ“จ Consumer Reliable message consumption and processing ๐Ÿ“– Consumer Guide
๐Ÿ“ค Publisher High-performance message publishing ๐Ÿ“– Publisher Guide
๐Ÿ”„ RabbitMQ Retry Advanced retry mechanisms for RabbitMQ ๐Ÿ“– Retry Architecture

๐Ÿค Contributing

We welcome contributions! Here's how to get started:

๐Ÿš€ Quick Contribution Guide
  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request
๐Ÿ“‹ Development Setup
# Clone your fork
git clone https://github.com/yourusername/goqueue.git
cd goqueue

# Install dependencies
go mod download

# Run tests
make test

# Run linting
make lint

๐ŸŽฏ Contribution Areas
  • ๐Ÿ”Œ New Queue Providers (Google Pub/Sub, SQS+SNS)
  • ๐Ÿ› ๏ธ Middleware Components (Metrics, Tracing, Auth)
  • ๐Ÿ“š Documentation & Examples
  • ๐Ÿงช Testing & Benchmarks
  • ๐Ÿ› Bug Fixes & Improvements

๐Ÿ“ž Support & Community


๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


๐Ÿ™ Acknowledgments

  • Thanks to all contributors
  • Inspired by the Go community's best practices
  • Built with โค๏ธ for the Go ecosystem

Documentation ยถ

Index ยถ

Constants ยถ

This section is empty.

Variables ยถ

View Source
var (
	// JSONEncoder is an implementation of the EncoderFn interface
	// that encodes a Message into JSON format.
	JSONEncoder EncoderFn = func(_ context.Context, m interfaces.Message) (data []byte, err error) {
		return json.Marshal(m)
	}
	// JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message.
	JSONDecoder DecoderFn = func(_ context.Context, data []byte) (m interfaces.Message, err error) {
		err = json.Unmarshal(data, &m)
		return
	}

	DefaultEncoder EncoderFn = JSONEncoder
	DefaultDecoder DecoderFn = JSONDecoder
)
View Source
var (
	// JSONEncoding represents the encoding configuration for JSON.
	JSONEncoding = &Encoding{
		ContentType: headerVal.ContentTypeJSON,
		Encode:      JSONEncoder,
		Decode:      JSONDecoder,
	}
	DefaultEncoding = JSONEncoding
)

Functions ยถ

func AddGoQueueEncoding ยถ

func AddGoQueueEncoding(contentType headerVal.ContentType, encoding *Encoding)

AddGoQueueEncoding stores the given encoding for the specified content type in the goQueueEncodingMap. The goQueueEncodingMap is a concurrent-safe map that maps content types to encodings. The content type is specified by the `contentType` parameter, and the encoding is specified by the `encoding` parameter. This function is typically used to register custom encodings for specific content types in the GoQueue library.

func SetupLogging ยถ

func SetupLogging()

SetupLogging configures zerolog with sensible defaults for goqueue. This is automatically called when importing consumer or publisher packages, but can be called explicitly for custom configuration.

Note: This function is safe to call multiple times.

func SetupLoggingWithDefaults ยถ

func SetupLoggingWithDefaults()

SetupLoggingWithDefaults configures zerolog and sets up a default global logger with console output and reasonable formatting for development. This is useful for development environments or when you want pretty-printed logs.

Types ยถ

type DecoderFn ยถ

type DecoderFn func(ctx context.Context, data []byte) (m interfaces.Message, err error)

DecoderFn is a function type that decodes a byte slice into a Message. It takes a context and a byte slice as input and returns a Message and an error.

type EncoderFn ยถ

type EncoderFn func(ctx context.Context, m interfaces.Message) (data []byte, err error)

EncoderFn is a function type that encodes a message into a byte slice. It takes a context and a message as input and returns the encoded data and an error (if any).

type Encoding ยถ

type Encoding struct {
	ContentType headerVal.ContentType // The content type associated with this encoding.
	Encode      EncoderFn             // The encoding function used to encode data.
	Decode      DecoderFn             // The decoding function used to decode data.
}

Encoding represents an encoding configuration for a specific content type.

func GetGoQueueEncoding ยถ

func GetGoQueueEncoding(contentType headerVal.ContentType) (res *Encoding, ok bool)

GetGoQueueEncoding returns the encoding associated with the given content type. It looks up the encoding in the goQueueEncodingMap and returns it along with a boolean value indicating if the encoding was found. If the encoding is not found, it returns nil and false.

type QueueService ยถ

type QueueService struct {
	NumberOfConsumer int // The number of consumers to process messages concurrently.
	// contains filtered or unexported fields
}

QueueService represents a service that handles message queuing operations.

func NewQueueService ยถ

func NewQueueService(opts ...options.GoQueueOptionFunc) *QueueService

NewQueueService creates a new instance of QueueService with the provided options. It accepts a variadic parameter `opts` which allows configuring the QueueService. The options are applied in the order they are provided. Returns a pointer to the created QueueService.

func (*QueueService) Publish ยถ

func (qs *QueueService) Publish(ctx context.Context, m interfaces.Message) (err error)

Publish publishes a message to the queue. It returns an error if the publisher is not defined or if there was an error while publishing the message.

func (*QueueService) Start ยถ

func (qs *QueueService) Start(ctx context.Context) (err error)

Start starts the queue service by spawning multiple consumers to process messages. It returns an error if the consumer or handler is not defined. The method uses the provided context to manage the lifecycle of the consumers.

func (*QueueService) Stop ยถ

func (qs *QueueService) Stop(ctx context.Context) error

Stop stops the queue service by stopping the consumer and closing the publisher. It returns an error if there was an issue stopping the consumer or closing the publisher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL