queue

package
v0.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

README

Queue Extension

Production-ready message queue extension for the Forge framework, providing a unified interface across multiple backend implementations (in-memory, Redis, RabbitMQ, NATS).

Features

  • Multiple Backends: InMemory, Redis, RabbitMQ, NATS
  • Rich Queue Operations: Declare, delete, purge, list queues
  • Flexible Publishing: Single, batch, and delayed message publishing
  • Robust Consuming: Worker pools with configurable concurrency
  • Reliability: Automatic retries with exponential backoff
  • Priority Queues: Message prioritization (0-9 scale)
  • Dead Letter Queues: Automatic DLQ for failed messages
  • Message TTL: Expiration and time-to-live support
  • Observability: Built-in metrics and tracing
  • Health Checks: Connection monitoring and stats
  • Thread-Safe: Concurrent operations across all backends

Table of Contents

Installation

import "github.com/xraph/forge/extensions/queue"

Quick Start

Basic Usage
package main

import (
    "context"
    "log"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/queue"
)

func main() {
    app := forge.New(
        forge.WithAppName("queue-demo"),
        forge.WithExtensions(
            queue.NewExtension(
                queue.WithDriver("inmemory"),
            ),
        ),
    )
    
    ctx := context.Background()
    if err := app.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer app.Stop(ctx)
    
    // Get queue service from container (using helper function)
    q, err := queue.Get(app.Container())
    if err != nil {
        log.Fatal(err)
    }
    
    // Declare a queue
    err = q.DeclareQueue(ctx, "tasks", queue.DefaultQueueOptions())
    if err != nil {
        log.Fatal(err)
    }
    
    // Publish a message
    err = q.Publish(ctx, "tasks", queue.Message{
        Body: []byte(`{"action":"send_email","to":"user@example.com"}`),
    })
    if err != nil {
        log.Fatal(err)
    }
    
    // Consume messages
    err = q.Consume(ctx, "tasks", func(ctx context.Context, msg queue.Message) error {
        log.Printf("Processing message: %s", msg.Body)
        return nil // Auto-ack on success
    }, queue.DefaultConsumeOptions())
    if err != nil {
        log.Fatal(err)
    }
    
    // Keep running
    select {}
}

Configuration

YAML Configuration
queue:
  # Backend driver: inmemory, redis, rabbitmq, nats
  driver: redis
  
  # Connection (choose url OR hosts)
  url: redis://localhost:6379
  # hosts: ["localhost:6379"]
  
  username: ""
  password: ""
  vhost: "/"  # RabbitMQ only
  
  # Connection pool
  max_connections: 10
  max_idle_connections: 5
  connect_timeout: 10s
  read_timeout: 30s
  write_timeout: 30s
  keep_alive: 60s
  
  # Retry policy
  max_retries: 3
  retry_backoff: 100ms
  retry_multiplier: 2.0
  max_retry_backoff: 30s
  
  # Default queue settings
  default_prefetch: 10
  default_concurrency: 1
  default_timeout: 30s
  enable_dead_letter: true
  dead_letter_suffix: ".dlq"
  
  # Performance
  enable_persistence: true
  enable_priority: false
  enable_delayed: false
  max_message_size: 1048576  # 1MB
  
  # Security
  enable_tls: false
  tls_cert_file: ""
  tls_key_file: ""
  tls_ca_file: ""
  insecure_skip_verify: false
  
  # Monitoring
  enable_metrics: true
  enable_tracing: true
Programmatic Configuration
ext := queue.NewExtension(
    queue.WithDriver("rabbitmq"),
    queue.WithURL("amqp://localhost:5672"),
    queue.WithAuth("guest", "guest"),
    queue.WithVHost("/"),
    queue.WithMaxConnections(20),
    queue.WithPrefetch(50),
    queue.WithConcurrency(10),
    queue.WithTimeout(60 * time.Second),
    queue.WithDeadLetter(true),
    queue.WithPersistence(true),
    queue.WithPriority(true),
    queue.WithDelayed(true),
    queue.WithMetrics(true),
    queue.WithTracing(true),
)
Backend-Specific URLs
// In-Memory (no URL needed)
queue.WithDriver("inmemory")

// Redis
queue.WithURL("redis://localhost:6379")
queue.WithURL("redis://:password@localhost:6379/0")
queue.WithURL("redis://localhost:6379?db=0&max_retries=3")

// RabbitMQ
queue.WithURL("amqp://guest:guest@localhost:5672/")
queue.WithURL("amqps://user:pass@rabbitmq.example.com:5671/vhost")

// NATS
queue.WithURL("nats://localhost:4222")
queue.WithURL("nats://user:pass@nats.example.com:4222")
queue.WithHosts("nats://server1:4222", "nats://server2:4222")  // Cluster
Using Database Extension's Redis Connection

If you're already using the database extension with a Redis connection, you can reuse it for the queue instead of creating a separate connection:

YAML Configuration
database:
  databases:
    - name: redis-cache
      type: redis
      dsn: redis://localhost:6379/0

queue:
  driver: redis
  database_redis_connection: redis-cache  # Reuse database connection
Programmatic Configuration
app := forge.New(
    forge.WithExtensions(
        database.NewExtension(
            database.WithDatabase(database.DatabaseConfig{
                Name: "redis-cache",
                Type: database.TypeRedis,
                DSN:  "redis://localhost:6379/0",
            }),
        ),
        queue.NewExtension(
            queue.WithDriver("redis"),
            queue.WithDatabaseRedisConnection("redis-cache"),
        ),
    ),
)
Benefits
  • Single Redis connection shared between database and queue
  • Simplified configuration
  • Reduced connection overhead
  • Consistent connection pooling and settings
DI Helper Functions

Easy ways to resolve queue services from the DI container:

// Package-level helpers
q, err := queue.Get(app.Container())
q := queue.MustGet(app.Container())  // Panics on error

// App-level helpers
q, err := queue.GetFromApp(app)
q := queue.MustGetFromApp(app)  // Panics on error

// Extension method (if you have the extension instance)
ext := queue.NewExtension(...)
app.RegisterExtension(ext)
q := ext.(*queue.Extension).Queue()

API Reference

Queue Interface
type Queue interface {
    // Connection management
    Connect(ctx context.Context) error
    Disconnect(ctx context.Context) error
    Ping(ctx context.Context) error

    // Queue management
    DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
    DeleteQueue(ctx context.Context, name string) error
    ListQueues(ctx context.Context) ([]string, error)
    GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
    PurgeQueue(ctx context.Context, name string) error

    // Publishing
    Publish(ctx context.Context, queue string, message Message) error
    PublishBatch(ctx context.Context, queue string, messages []Message) error
    PublishDelayed(ctx context.Context, queue string, message Message, delay time.Duration) error

    // Consuming
    Consume(ctx context.Context, queue string, handler MessageHandler, opts ConsumeOptions) error
    StopConsuming(ctx context.Context, queue string) error

    // Message operations
    Ack(ctx context.Context, messageID string) error
    Nack(ctx context.Context, messageID string, requeue bool) error
    Reject(ctx context.Context, messageID string) error

    // Dead letter queue
    GetDeadLetterQueue(ctx context.Context, queue string) ([]Message, error)
    RequeueDeadLetter(ctx context.Context, queue string, messageID string) error

    // Stats
    Stats(ctx context.Context) (*QueueStats, error)
}
Message
type Message struct {
    ID          string                 // Unique message identifier
    Queue       string                 // Queue name
    Body        []byte                 // Message payload
    Headers     map[string]string      // Custom headers
    Priority    int                    // 0-9, higher = more priority
    Delay       time.Duration          // Delay before processing
    Expiration  time.Duration          // Message TTL
    Retries     int                    // Current retry count
    MaxRetries  int                    // Maximum retry attempts
    PublishedAt time.Time              // Publication timestamp
    Metadata    map[string]interface{} // Additional metadata
}
QueueOptions
type QueueOptions struct {
    Durable         bool          // Survives broker restart
    AutoDelete      bool          // Deleted when no consumers
    Exclusive       bool          // Used by only one connection
    MaxLength       int64         // Maximum number of messages
    MaxLengthBytes  int64         // Maximum total message bytes
    MessageTTL      time.Duration // Message time-to-live
    MaxPriority     int           // Maximum priority (0-255)
    DeadLetterQueue string        // DLQ for failed messages
    Arguments       map[string]interface{}
}
ConsumeOptions
type ConsumeOptions struct {
    ConsumerTag   string        // Consumer identifier
    AutoAck       bool          // Auto-acknowledge messages
    Exclusive     bool          // Exclusive consumer
    PrefetchCount int           // Number of messages to prefetch
    Priority      int           // Consumer priority
    Concurrency   int           // Number of concurrent workers
    RetryStrategy RetryStrategy // Retry configuration
    Timeout       time.Duration // Message processing timeout
    Arguments     map[string]interface{}
}

type RetryStrategy struct {
    MaxRetries      int           // Maximum retry attempts
    InitialInterval time.Duration // Initial backoff interval
    MaxInterval     time.Duration // Maximum backoff interval
    Multiplier      float64       // Exponential backoff multiplier
}
QueueInfo
type QueueInfo struct {
    Name          string
    Messages      int64     // Total messages
    Consumers     int       // Active consumers
    MessageBytes  int64     // Total message size
    PublishRate   float64   // Messages/sec
    DeliverRate   float64   // Messages/sec
    AckRate       float64   // Messages/sec
    Durable       bool
    AutoDelete    bool
    CreatedAt     time.Time
    LastMessageAt time.Time
}
QueueStats
type QueueStats struct {
    QueueCount      int64
    TotalMessages   int64
    TotalConsumers  int
    PublishRate     float64
    DeliverRate     float64
    AckRate         float64
    UnackedMessages int64
    ReadyMessages   int64
    Uptime          time.Duration
    MemoryUsed      int64
    ConnectionCount int
    Version         string
    Extra           map[string]interface{}
}

Usage Examples

Publishing Messages
q, _ := queue.Get(app.Container())

// Simple publish
err := q.Publish(ctx, "orders", queue.Message{
    Body: []byte(`{"order_id":"12345","amount":99.99}`),
})

// With headers and priority
err = q.Publish(ctx, "orders", queue.Message{
    Body: []byte(`{"order_id":"12345"}`),
    Headers: map[string]string{
        "content-type": "application/json",
        "trace-id":     "abc123",
    },
    Priority: 8, // High priority
})

// With expiration
err = q.Publish(ctx, "orders", queue.Message{
    Body:       []byte(`{"order_id":"12345"}`),
    Expiration: 5 * time.Minute, // Message expires in 5 minutes
})
Batch Publishing
messages := []queue.Message{
    {Body: []byte(`{"id":1}`)},
    {Body: []byte(`{"id":2}`)},
    {Body: []byte(`{"id":3}`)},
}

err := q.PublishBatch(ctx, "bulk-orders", messages)
Delayed Messages
// Execute in 1 hour
err := q.PublishDelayed(ctx, "reminders", queue.Message{
    Body: []byte(`{"reminder":"meeting at 3pm"}`),
}, 1*time.Hour)

// Execute in 24 hours (scheduled jobs)
err = q.PublishDelayed(ctx, "daily-reports", queue.Message{
    Body: []byte(`{"type":"daily_summary"}`),
}, 24*time.Hour)
Consuming Messages
// Basic consumer
err := q.Consume(ctx, "orders", func(ctx context.Context, msg queue.Message) error {
    log.Printf("Processing order: %s", msg.Body)
    // Process message...
    return nil // Auto-ack on success
}, queue.DefaultConsumeOptions())

// Custom consumer with manual ack
opts := queue.ConsumeOptions{
    AutoAck:       false, // Manual acknowledgment
    PrefetchCount: 50,
    Concurrency:   10, // 10 concurrent workers
    Timeout:       60 * time.Second,
}

err = q.Consume(ctx, "orders", func(ctx context.Context, msg queue.Message) error {
    // Process message...
    if err := processOrder(msg.Body); err != nil {
        // Nack and requeue
        q.Nack(ctx, msg.ID, true)
        return err
    }
    
    // Acknowledge success
    return q.Ack(ctx, msg.ID)
}, opts)
Worker Pool with Concurrency
opts := queue.ConsumeOptions{
    AutoAck:       false,
    PrefetchCount: 100,
    Concurrency:   20, // 20 concurrent workers
    Timeout:       30 * time.Second,
    RetryStrategy: queue.RetryStrategy{
        MaxRetries:      5,
        InitialInterval: 1 * time.Second,
        MaxInterval:     60 * time.Second,
        Multiplier:      2.0,
    },
}

err := q.Consume(ctx, "heavy-tasks", func(ctx context.Context, msg queue.Message) error {
    // CPU-intensive work
    result, err := processHeavyTask(ctx, msg.Body)
    if err != nil {
        return err // Will retry based on RetryStrategy
    }
    
    log.Printf("Task completed: %v", result)
    return nil
}, opts)
Priority Queues
// Declare queue with priority support
opts := queue.QueueOptions{
    Durable:     true,
    MaxPriority: 10, // Enable priorities 0-10
}
q.DeclareQueue(ctx, "priority-tasks", opts)

// Publish high-priority message
q.Publish(ctx, "priority-tasks", queue.Message{
    Body:     []byte(`{"task":"critical"}`),
    Priority: 10, // Highest priority
})

// Publish normal-priority message
q.Publish(ctx, "priority-tasks", queue.Message{
    Body:     []byte(`{"task":"normal"}`),
    Priority: 5,
})

// High-priority messages are consumed first
Dead Letter Queues
// Declare queue with DLQ
opts := queue.QueueOptions{
    Durable:         true,
    DeadLetterQueue: "failed-orders", // DLQ name
}
q.DeclareQueue(ctx, "orders", opts)

// Consumer with retry strategy
consumeOpts := queue.ConsumeOptions{
    AutoAck:       false,
    PrefetchCount: 10,
    RetryStrategy: queue.RetryStrategy{
        MaxRetries:      3,
        InitialInterval: 1 * time.Second,
        MaxInterval:     10 * time.Second,
        Multiplier:      2.0,
    },
}

err := q.Consume(ctx, "orders", func(ctx context.Context, msg queue.Message) error {
    if err := processOrder(msg.Body); err != nil {
        // After max retries, message goes to DLQ
        return err
    }
    return nil
}, consumeOpts)

// Check DLQ
deadLetters, err := q.GetDeadLetterQueue(ctx, "orders")
for _, msg := range deadLetters {
    log.Printf("Failed message: %s", msg.Body)
    
    // Optionally requeue after fixing issue
    if shouldRetry(msg) {
        q.RequeueDeadLetter(ctx, "orders", msg.ID)
    }
}
Queue Management
// List all queues
queues, err := q.ListQueues(ctx)
for _, name := range queues {
    log.Printf("Queue: %s", name)
}

// Get queue info
info, err := q.GetQueueInfo(ctx, "orders")
log.Printf("Queue: %s, Messages: %d, Consumers: %d", 
    info.Name, info.Messages, info.Consumers)

// Purge queue (delete all messages)
err = q.PurgeQueue(ctx, "orders")

// Delete queue
err = q.DeleteQueue(ctx, "old-queue")
Health Checks and Stats
// Check connection health
err := q.Ping(ctx)
if err != nil {
    log.Printf("Queue unhealthy: %v", err)
}

// Get system stats
stats, err := q.Stats(ctx)
log.Printf("Queues: %d, Messages: %d, Consumers: %d",
    stats.QueueCount, stats.TotalMessages, stats.TotalConsumers)
log.Printf("Publish rate: %.2f msg/s, Deliver rate: %.2f msg/s",
    stats.PublishRate, stats.DeliverRate)
Graceful Shutdown
// Stop specific consumer
err := q.StopConsuming(ctx, "orders")

// Stop all consumers and disconnect
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err = q.Disconnect(ctx)

Backend Comparison

Feature InMemory Redis RabbitMQ NATS
Persistence
Distributed
Priority Queues
Delayed Messages ✅ (plugin)
Dead Letter Queue
Max Throughput ~1M msg/s ~100K msg/s ~50K msg/s ~500K msg/s
Latency <1μs <1ms ~2ms <1ms
Clustering
Best For Testing General purpose Complex routing High throughput
When to Use Each Backend

InMemory

  • Local development and testing
  • Proof of concepts
  • Single-node deployments
  • No persistence needed

Redis

  • Simple pub/sub patterns
  • Already using Redis for caching
  • Moderate throughput requirements
  • Good operational tooling

RabbitMQ

  • Complex routing requirements
  • Need for strong message guarantees
  • Enterprise messaging patterns
  • Existing AMQP infrastructure

NATS

  • Very high throughput needs
  • Microservices communication
  • Cloud-native architectures
  • Low-latency requirements

Testing

Unit Testing
func TestQueueOperations(t *testing.T) {
    // Use in-memory backend for tests
    config := queue.Config{
        Driver: "inmemory",
    }
    
    logger := logger.NewNoopLogger()
    metrics := metrics.NewNoopMetrics()
    
    q := queue.NewInMemoryQueue(config, logger, metrics)
    
    ctx := context.Background()
    err := q.Connect(ctx)
    if err != nil {
        t.Fatal(err)
    }
    defer q.Disconnect(ctx)
    
    // Test publish
    err = q.Publish(ctx, "test", queue.Message{
        Body: []byte("test"),
    })
    if err != nil {
        t.Errorf("Publish failed: %v", err)
    }
    
    // Test consume
    received := make(chan queue.Message, 1)
    err = q.Consume(ctx, "test", func(ctx context.Context, msg queue.Message) error {
        received <- msg
        return nil
    }, queue.DefaultConsumeOptions())
    if err != nil {
        t.Errorf("Consume failed: %v", err)
    }
    
    select {
    case msg := <-received:
        if string(msg.Body) != "test" {
            t.Errorf("Expected 'test', got '%s'", msg.Body)
        }
    case <-time.After(1 * time.Second):
        t.Error("Timeout waiting for message")
    }
}
Integration Testing
func TestRabbitMQIntegration(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping integration test")
    }
    
    config := queue.Config{
        Driver: "rabbitmq",
        URL:    "amqp://guest:guest@localhost:5672/",
    }
    
    logger := logger.NewTestLogger(t)
    metrics := metrics.NewNoopMetrics()
    
    q, err := queue.NewRabbitMQQueue(config, logger, metrics)
    if err != nil {
        t.Fatal(err)
    }
    
    // Run tests...
}

Best Practices

Message Design
// ✅ DO: Use structured messages with metadata
type OrderMessage struct {
    OrderID   string    `json:"order_id"`
    UserID    string    `json:"user_id"`
    Amount    float64   `json:"amount"`
    CreatedAt time.Time `json:"created_at"`
}

msg := queue.Message{
    Body: json.Marshal(OrderMessage{...}),
    Headers: map[string]string{
        "content-type": "application/json",
        "version":      "v1",
        "trace-id":     ctx.Value("trace-id"),
    },
}

// ❌ DON'T: Send unstructured or overly large messages
msg := queue.Message{
    Body: []byte("process order 12345"), // Too vague
}
Error Handling
// ✅ DO: Implement proper retry logic
err := q.Consume(ctx, "orders", func(ctx context.Context, msg queue.Message) error {
    if err := processOrder(msg.Body); err != nil {
        if isRetryable(err) {
            return err // Will retry with backoff
        }
        // Permanent error - log and ack
        log.Printf("Permanent error: %v", err)
        return nil // Ack to prevent retry
    }
    return nil
}, queue.ConsumeOptions{
    RetryStrategy: queue.RetryStrategy{
        MaxRetries:      5,
        InitialInterval: 1 * time.Second,
        MaxInterval:     60 * time.Second,
        Multiplier:      2.0,
    },
})

// ❌ DON'T: Swallow errors
err := q.Consume(ctx, "orders", func(ctx context.Context, msg queue.Message) error {
    processOrder(msg.Body) // Ignoring error
    return nil
}, opts)
Context Propagation
// ✅ DO: Pass context with timeouts
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := q.Publish(ctx, "orders", msg)

// ✅ DO: Propagate trace IDs
err = q.Consume(ctx, "orders", func(ctx context.Context, msg queue.Message) error {
    traceID := msg.Headers["trace-id"]
    ctx = context.WithValue(ctx, "trace-id", traceID)
    
    return processOrder(ctx, msg.Body)
}, opts)
Resource Management
// ✅ DO: Use connection pooling
config := queue.Config{
    MaxConnections:     20,
    MaxIdleConnections: 10,
    ConnectTimeout:     10 * time.Second,
}

// ✅ DO: Implement graceful shutdown
func shutdown(q queue.Queue) {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    // Stop consumers first
    q.StopConsuming(ctx, "orders")
    
    // Then disconnect
    q.Disconnect(ctx)
}

// ❌ DON'T: Create new connections for each operation
for i := 0; i < 1000; i++ {
    q, _ := queue.NewRedisQueue(config, logger, metrics)
    q.Connect(ctx)
    q.Publish(ctx, "orders", msg)
    q.Disconnect(ctx)
}
Monitoring
// ✅ DO: Monitor queue depth
ticker := time.NewTicker(30 * time.Second)
go func() {
    for range ticker.C {
        info, _ := q.GetQueueInfo(ctx, "orders")
        if info.Messages > 10000 {
            log.Printf("WARNING: Queue depth high: %d", info.Messages)
        }
    }
}()

// ✅ DO: Track processing metrics
err := q.Consume(ctx, "orders", func(ctx context.Context, msg queue.Message) error {
    start := time.Now()
    defer func() {
        duration := time.Since(start)
        metrics.Histogram("queue.process.duration", duration.Seconds())
    }()
    
    return processOrder(msg.Body)
}, opts)

Performance Tuning

Throughput Optimization
// High throughput configuration
config := queue.Config{
    MaxConnections:     50,
    MaxIdleConnections: 25,
    DefaultPrefetch:    100,
    DefaultConcurrency: 20,
    EnablePersistence:  false, // Trade durability for speed (if acceptable)
}

opts := queue.ConsumeOptions{
    AutoAck:       true,  // Skip ack overhead if messages can be lost
    PrefetchCount: 100,   // Fetch many messages at once
    Concurrency:   20,    // Process many messages concurrently
}
Latency Optimization
// Low latency configuration
config := queue.Config{
    MaxConnections:     10,
    DefaultPrefetch:    1,
    DefaultConcurrency: 1,
    ConnectTimeout:     5 * time.Second,
    ReadTimeout:        5 * time.Second,
}

opts := queue.ConsumeOptions{
    PrefetchCount: 1,  // Minimize prefetch for lowest latency
    Concurrency:   1,  // Sequential processing
}
Memory Optimization
// Memory-constrained configuration
config := queue.Config{
    MaxConnections:     5,
    MaxIdleConnections: 2,
    DefaultPrefetch:    10,
    MaxMessageSize:     65536, // 64KB max
}

opts := queue.QueueOptions{
    MaxLength:      1000,  // Limit queue size
    MaxLengthBytes: 10485760, // 10MB max
    MessageTTL:     5 * time.Minute,
}

Troubleshooting

Connection Issues
// Check connectivity
err := q.Ping(ctx)
if err != nil {
    log.Printf("Connection failed: %v", err)
    
    // Try reconnecting
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    if err := q.Connect(ctx); err != nil {
        log.Printf("Reconnection failed: %v", err)
    }
}
Message Not Consumed
  1. Check queue exists: ListQueues(ctx)
  2. Verify consumer is running: GetQueueInfo(ctx, "queue-name")
  3. Check prefetch settings: Increase PrefetchCount
  4. Monitor consumer errors: Add logging to MessageHandler
High Latency
  1. Reduce prefetch: Lower PrefetchCount to 1-10
  2. Increase concurrency: Raise Concurrency to match workload
  3. Check network: Use same datacenter/region for queue backend
  4. Profile handler: Identify slow operations in MessageHandler
Memory Usage
  1. Limit queue size: Set MaxLength or MaxLengthBytes
  2. Add message TTL: Set MessageTTL to expire old messages
  3. Reduce prefetch: Lower PrefetchCount
  4. Monitor stats: Use Stats() to track memory usage
Common Errors
// ErrNotConnected
// Solution: Call Connect() before operations

// ErrQueueNotFound
// Solution: Call DeclareQueue() first

// ErrTimeout
// Solution: Increase timeout or check backend health

// ErrMessageTooLarge
// Solution: Reduce message size or increase MaxMessageSize

// ErrQueueFull
// Solution: Increase MaxLength or add consumers

Architecture

Extension Lifecycle
Register → Start → [Running] → Stop
    ↓        ↓                    ↓
  Config  Connect            Disconnect
  Create Queue            Stop Consumers
  DI Register
Message Flow
Publisher → Publish() → [Backend Queue] → Consume() → Handler
                             ↓                           ↓
                        Dead Letter ←────────────── Error + Max Retries
Concurrency Model

Each consumer spawns a worker pool:

Consumer → Worker Pool (n goroutines) → Handler
              ↓
        Prefetch Buffer (m messages)
              ↓
        Rate Limiting & Backpressure

License

MIT License - Part of the Forge Framework

Contributing

Contributions welcome! Please ensure:

  • Tests pass: go test ./...
  • Coverage maintained: go test -cover
  • Linting clean: golangci-lint run
  • Examples work: Test with real backends

Support

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotConnected       = errors.New("queue: not connected")
	ErrAlreadyConnected   = errors.New("queue: already connected")
	ErrConnectionFailed   = errors.New("queue: connection failed")
	ErrQueueNotFound      = errors.New("queue: queue not found")
	ErrQueueAlreadyExists = errors.New("queue: queue already exists")
	ErrMessageNotFound    = errors.New("queue: message not found")
	ErrInvalidMessage     = errors.New("queue: invalid message")
	ErrConsumerNotFound   = errors.New("queue: consumer not found")
	ErrPublishFailed      = errors.New("queue: publish failed")
	ErrConsumeFailed      = errors.New("queue: consume failed")
	ErrAckFailed          = errors.New("queue: acknowledgment failed")
	ErrNackFailed         = errors.New("queue: negative acknowledgment failed")
	ErrTimeout            = errors.New("queue: operation timeout")
	ErrInvalidConfig      = errors.New("queue: invalid configuration")
	ErrUnsupportedDriver  = errors.New("queue: unsupported driver")
	ErrMessageTooLarge    = errors.New("queue: message too large")
	ErrQueueFull          = errors.New("queue: queue is full")
)

Common queue errors.

Functions

func NewExtension

func NewExtension(opts ...ConfigOption) forge.Extension

NewExtension creates a new queue extension with functional options.

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new queue extension with a complete config.

Types

type Config

type Config struct {
	// Driver specifies the queue backend: "inmemory", "redis", "rabbitmq", "nats"
	Driver string `json:"driver" mapstructure:"driver" yaml:"driver"`

	// Connection settings
	URL      string   `json:"url,omitempty"      mapstructure:"url"      yaml:"url,omitempty"`
	Hosts    []string `json:"hosts,omitempty"    mapstructure:"hosts"    yaml:"hosts,omitempty"`
	Username string   `json:"username,omitempty" mapstructure:"username" yaml:"username,omitempty"`
	Password string   `json:"password,omitempty" mapstructure:"password" yaml:"password,omitempty"`
	VHost    string   `json:"vhost,omitempty"    mapstructure:"vhost"    yaml:"vhost,omitempty"` // RabbitMQ only

	// Connection pool
	MaxConnections     int           `json:"max_connections"      mapstructure:"max_connections"      yaml:"max_connections"`
	MaxIdleConnections int           `json:"max_idle_connections" mapstructure:"max_idle_connections" yaml:"max_idle_connections"`
	ConnectTimeout     time.Duration `json:"connect_timeout"      mapstructure:"connect_timeout"      yaml:"connect_timeout"`
	ReadTimeout        time.Duration `json:"read_timeout"         mapstructure:"read_timeout"         yaml:"read_timeout"`
	WriteTimeout       time.Duration `json:"write_timeout"        mapstructure:"write_timeout"        yaml:"write_timeout"`
	KeepAlive          time.Duration `json:"keep_alive"           mapstructure:"keep_alive"           yaml:"keep_alive"`

	// Retry policy
	MaxRetries      int           `json:"max_retries"       mapstructure:"max_retries"       yaml:"max_retries"`
	RetryBackoff    time.Duration `json:"retry_backoff"     mapstructure:"retry_backoff"     yaml:"retry_backoff"`
	RetryMultiplier float64       `json:"retry_multiplier"  mapstructure:"retry_multiplier"  yaml:"retry_multiplier"`
	MaxRetryBackoff time.Duration `json:"max_retry_backoff" mapstructure:"max_retry_backoff" yaml:"max_retry_backoff"`

	// Default queue settings
	DefaultPrefetch    int           `json:"default_prefetch"    mapstructure:"default_prefetch"    yaml:"default_prefetch"`
	DefaultConcurrency int           `json:"default_concurrency" mapstructure:"default_concurrency" yaml:"default_concurrency"`
	DefaultTimeout     time.Duration `json:"default_timeout"     mapstructure:"default_timeout"     yaml:"default_timeout"`
	EnableDeadLetter   bool          `json:"enable_dead_letter"  mapstructure:"enable_dead_letter"  yaml:"enable_dead_letter"`
	DeadLetterSuffix   string        `json:"dead_letter_suffix"  mapstructure:"dead_letter_suffix"  yaml:"dead_letter_suffix"`

	// Performance
	EnablePersistence bool  `json:"enable_persistence" mapstructure:"enable_persistence" yaml:"enable_persistence"`
	EnablePriority    bool  `json:"enable_priority"    mapstructure:"enable_priority"    yaml:"enable_priority"`
	EnableDelayed     bool  `json:"enable_delayed"     mapstructure:"enable_delayed"     yaml:"enable_delayed"`
	MaxMessageSize    int64 `json:"max_message_size"   mapstructure:"max_message_size"   yaml:"max_message_size"`

	// Security
	EnableTLS          bool   `json:"enable_tls"              mapstructure:"enable_tls"           yaml:"enable_tls"`
	TLSCertFile        string `json:"tls_cert_file,omitempty" mapstructure:"tls_cert_file"        yaml:"tls_cert_file,omitempty"`
	TLSKeyFile         string `json:"tls_key_file,omitempty"  mapstructure:"tls_key_file"         yaml:"tls_key_file,omitempty"`
	TLSCAFile          string `json:"tls_ca_file,omitempty"   mapstructure:"tls_ca_file"          yaml:"tls_ca_file,omitempty"`
	InsecureSkipVerify bool   `json:"insecure_skip_verify"    mapstructure:"insecure_skip_verify" yaml:"insecure_skip_verify"`

	// Monitoring
	EnableMetrics bool `json:"enable_metrics" mapstructure:"enable_metrics" yaml:"enable_metrics"`
	EnableTracing bool `json:"enable_tracing" mapstructure:"enable_tracing" yaml:"enable_tracing"`

	// Database integration
	// DatabaseRedisConnection specifies the name of a Redis connection
	// from the database extension to reuse instead of creating a new one.
	// Only valid when Driver is "redis".
	// If specified, the queue will use this connection and ignore
	// URL, Hosts, Username, Password, and other connection settings.
	DatabaseRedisConnection string `` /* 126-byte string literal not displayed */

	// Config loading flags (not serialized)
	RequireConfig bool `json:"-" mapstructure:"-" yaml:"-"`
}

Config contains configuration for the queue extension.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default queue configuration.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a functional option for Config.

func WithAuth

func WithAuth(username, password string) ConfigOption

WithAuth sets authentication credentials.

func WithConcurrency

func WithConcurrency(concurrency int) ConfigOption

WithConcurrency sets default concurrency.

func WithConfig

func WithConfig(config Config) ConfigOption

WithConfig sets the complete config.

func WithDatabaseRedisConnection added in v0.7.4

func WithDatabaseRedisConnection(name string) ConfigOption

WithDatabaseRedisConnection configures queue to use an existing Redis connection from the database extension.

func WithDeadLetter

func WithDeadLetter(enable bool) ConfigOption

WithDeadLetter enables/disables dead letter queue.

func WithDelayed

func WithDelayed(enable bool) ConfigOption

WithDelayed enables/disables delayed messages.

func WithDriver

func WithDriver(driver string) ConfigOption

WithDriver sets the driver.

func WithHosts

func WithHosts(hosts ...string) ConfigOption

WithHosts sets the hosts.

func WithMaxConnections

func WithMaxConnections(max int) ConfigOption

WithMaxConnections sets max connections.

func WithMetrics

func WithMetrics(enable bool) ConfigOption

WithMetrics enables metrics.

func WithPersistence

func WithPersistence(enable bool) ConfigOption

WithPersistence enables/disables message persistence.

func WithPrefetch

func WithPrefetch(prefetch int) ConfigOption

WithPrefetch sets default prefetch count.

func WithPriority

func WithPriority(enable bool) ConfigOption

WithPriority enables/disables priority queues.

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

WithRequireConfig requires config from ConfigManager.

func WithTLS

func WithTLS(certFile, keyFile, caFile string) ConfigOption

WithTLS enables TLS.

func WithTimeout

func WithTimeout(timeout time.Duration) ConfigOption

WithTimeout sets default timeout.

func WithTracing

func WithTracing(enable bool) ConfigOption

WithTracing enables tracing.

func WithURL

func WithURL(url string) ConfigOption

WithURL sets the URL.

func WithVHost

func WithVHost(vhost string) ConfigOption

WithVHost sets the virtual host (RabbitMQ only).

type ConsumeOptions

type ConsumeOptions struct {
	ConsumerTag   string                 `json:"consumer_tag,omitempty"`
	AutoAck       bool                   `json:"auto_ack"`           // Auto-acknowledge messages
	Exclusive     bool                   `json:"exclusive"`          // Exclusive consumer
	PrefetchCount int                    `json:"prefetch_count"`     // Number of messages to prefetch
	Priority      int                    `json:"priority,omitempty"` // Consumer priority
	Concurrency   int                    `json:"concurrency"`        // Number of concurrent workers
	RetryStrategy RetryStrategy          `json:"retry_strategy"`     // Retry configuration
	Timeout       time.Duration          `json:"timeout,omitempty"`  // Message processing timeout
	Arguments     map[string]interface{} `json:"arguments,omitempty"`
}

ConsumeOptions contains consumer configuration.

func DefaultConsumeOptions

func DefaultConsumeOptions() ConsumeOptions

DefaultConsumeOptions returns default consume options.

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension for queue functionality.

func (*Extension) Dependencies added in v0.8.0

func (e *Extension) Dependencies() []string

Dependencies returns the names of extensions this extension depends on. When using a database Redis connection, the queue depends on the database extension.

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks if the queue is healthy.

func (*Extension) Queue

func (e *Extension) Queue() Queue

Queue returns the queue instance.

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the queue extension with the app.

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the queue extension.

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the queue extension.

type InMemoryQueue

type InMemoryQueue struct {
	// contains filtered or unexported fields
}

InMemoryQueue implements Queue interface with an in-memory store.

func NewInMemoryQueue

func NewInMemoryQueue(config Config, logger forge.Logger, metrics forge.Metrics) *InMemoryQueue

NewInMemoryQueue creates a new in-memory queue instance.

func (*InMemoryQueue) Ack

func (q *InMemoryQueue) Ack(ctx context.Context, messageID string) error

func (*InMemoryQueue) Connect

func (q *InMemoryQueue) Connect(ctx context.Context) error

func (*InMemoryQueue) Consume

func (q *InMemoryQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*InMemoryQueue) DeclareQueue

func (q *InMemoryQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*InMemoryQueue) DeleteQueue

func (q *InMemoryQueue) DeleteQueue(ctx context.Context, name string) error

func (*InMemoryQueue) Disconnect

func (q *InMemoryQueue) Disconnect(ctx context.Context) error

func (*InMemoryQueue) GetDeadLetterQueue

func (q *InMemoryQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*InMemoryQueue) GetQueueInfo

func (q *InMemoryQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*InMemoryQueue) ListQueues

func (q *InMemoryQueue) ListQueues(ctx context.Context) ([]string, error)

func (*InMemoryQueue) Nack

func (q *InMemoryQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*InMemoryQueue) Ping

func (q *InMemoryQueue) Ping(ctx context.Context) error

func (*InMemoryQueue) Publish

func (q *InMemoryQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*InMemoryQueue) PublishBatch

func (q *InMemoryQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*InMemoryQueue) PublishDelayed

func (q *InMemoryQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*InMemoryQueue) PurgeQueue

func (q *InMemoryQueue) PurgeQueue(ctx context.Context, name string) error

func (*InMemoryQueue) Reject

func (q *InMemoryQueue) Reject(ctx context.Context, messageID string) error

func (*InMemoryQueue) RequeueDeadLetter

func (q *InMemoryQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*InMemoryQueue) Stats

func (q *InMemoryQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*InMemoryQueue) StopConsuming

func (q *InMemoryQueue) StopConsuming(ctx context.Context, queueName string) error

type Message

type Message struct {
	ID          string                 `json:"id"`
	Queue       string                 `json:"queue"`
	Body        []byte                 `json:"body"`
	Headers     map[string]string      `json:"headers,omitempty"`
	Priority    int                    `json:"priority,omitempty"`    // 0-9, higher = more priority
	Delay       time.Duration          `json:"delay,omitempty"`       // Delay before processing
	Expiration  time.Duration          `json:"expiration,omitempty"`  // Message TTL
	Retries     int                    `json:"retries,omitempty"`     // Current retry count
	MaxRetries  int                    `json:"max_retries,omitempty"` // Maximum retry attempts
	PublishedAt time.Time              `json:"published_at"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

Message represents a queue message.

type MessageHandler

type MessageHandler func(ctx context.Context, msg Message) error

MessageHandler is called for each received message.

type NATSQueue

type NATSQueue struct {
	// contains filtered or unexported fields
}

NATSQueue implements Queue interface using NATS JetStream.

func NewNATSQueue

func NewNATSQueue(config Config, logger forge.Logger, metrics forge.Metrics) (*NATSQueue, error)

NewNATSQueue creates a new NATS-backed queue instance.

func (*NATSQueue) Ack

func (q *NATSQueue) Ack(ctx context.Context, messageID string) error

func (*NATSQueue) Connect

func (q *NATSQueue) Connect(ctx context.Context) error

func (*NATSQueue) Consume

func (q *NATSQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*NATSQueue) DeclareQueue

func (q *NATSQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*NATSQueue) DeleteQueue

func (q *NATSQueue) DeleteQueue(ctx context.Context, name string) error

func (*NATSQueue) Disconnect

func (q *NATSQueue) Disconnect(ctx context.Context) error

func (*NATSQueue) GetDeadLetterQueue

func (q *NATSQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*NATSQueue) GetQueueInfo

func (q *NATSQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*NATSQueue) ListQueues

func (q *NATSQueue) ListQueues(ctx context.Context) ([]string, error)

func (*NATSQueue) Nack

func (q *NATSQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*NATSQueue) Ping

func (q *NATSQueue) Ping(ctx context.Context) error

func (*NATSQueue) Publish

func (q *NATSQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*NATSQueue) PublishBatch

func (q *NATSQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*NATSQueue) PublishDelayed

func (q *NATSQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*NATSQueue) PurgeQueue

func (q *NATSQueue) PurgeQueue(ctx context.Context, name string) error

func (*NATSQueue) Reject

func (q *NATSQueue) Reject(ctx context.Context, messageID string) error

func (*NATSQueue) RequeueDeadLetter

func (q *NATSQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*NATSQueue) Stats

func (q *NATSQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*NATSQueue) StopConsuming

func (q *NATSQueue) StopConsuming(ctx context.Context, queueName string) error

type Queue

type Queue interface {
	// Connection management
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error

	// Queue management
	DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
	DeleteQueue(ctx context.Context, name string) error
	ListQueues(ctx context.Context) ([]string, error)
	GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
	PurgeQueue(ctx context.Context, name string) error

	// Publishing
	Publish(ctx context.Context, queue string, message Message) error
	PublishBatch(ctx context.Context, queue string, messages []Message) error
	PublishDelayed(ctx context.Context, queue string, message Message, delay time.Duration) error

	// Consuming
	Consume(ctx context.Context, queue string, handler MessageHandler, opts ConsumeOptions) error
	StopConsuming(ctx context.Context, queue string) error

	// Message operations
	Ack(ctx context.Context, messageID string) error
	Nack(ctx context.Context, messageID string, requeue bool) error
	Reject(ctx context.Context, messageID string) error

	// Dead letter queue
	GetDeadLetterQueue(ctx context.Context, queue string) ([]Message, error)
	RequeueDeadLetter(ctx context.Context, queue string, messageID string) error

	// Stats
	Stats(ctx context.Context) (*QueueStats, error)
}

Queue represents a unified message queue interface supporting multiple backends.

func Get added in v0.7.4

func Get(container forge.Container) (Queue, error)

Get retrieves the queue service from the container. Returns error if queue is not registered.

func GetFromApp added in v0.7.4

func GetFromApp(app forge.App) (Queue, error)

GetFromApp is a convenience helper to get queue from an App.

func MustGet added in v0.7.4

func MustGet(container forge.Container) Queue

MustGet retrieves the queue service from the container. Panics if queue is not registered.

func MustGetFromApp added in v0.7.4

func MustGetFromApp(app forge.App) Queue

MustGetFromApp is a convenience helper to get queue from an App. Panics if queue is not registered.

type QueueInfo

type QueueInfo struct {
	Name          string    `json:"name"`
	Messages      int64     `json:"messages"`
	Consumers     int       `json:"consumers"`
	MessageBytes  int64     `json:"message_bytes"`
	PublishRate   float64   `json:"publish_rate"`
	DeliverRate   float64   `json:"deliver_rate"`
	AckRate       float64   `json:"ack_rate"`
	Durable       bool      `json:"durable"`
	AutoDelete    bool      `json:"auto_delete"`
	CreatedAt     time.Time `json:"created_at"`
	LastMessageAt time.Time `json:"last_message_at,omitempty"`
}

QueueInfo contains queue metadata.

type QueueOptions

type QueueOptions struct {
	Durable         bool                   `json:"durable"`           // Survives broker restart
	AutoDelete      bool                   `json:"auto_delete"`       // Deleted when no consumers
	Exclusive       bool                   `json:"exclusive"`         // Used by only one connection
	MaxLength       int64                  `json:"max_length"`        // Maximum number of messages
	MaxLengthBytes  int64                  `json:"max_length_bytes"`  // Maximum total message bytes
	MessageTTL      time.Duration          `json:"message_ttl"`       // Message time-to-live
	MaxPriority     int                    `json:"max_priority"`      // Maximum priority (0-255)
	DeadLetterQueue string                 `json:"dead_letter_queue"` // DLQ for failed messages
	Arguments       map[string]interface{} `json:"arguments,omitempty"`
}

QueueOptions contains queue configuration.

func DefaultQueueOptions

func DefaultQueueOptions() QueueOptions

DefaultQueueOptions returns default queue options.

type QueueStats

type QueueStats struct {
	QueueCount      int64                  `json:"queue_count"`
	TotalMessages   int64                  `json:"total_messages"`
	TotalConsumers  int                    `json:"total_consumers"`
	PublishRate     float64                `json:"publish_rate"`
	DeliverRate     float64                `json:"deliver_rate"`
	AckRate         float64                `json:"ack_rate"`
	UnackedMessages int64                  `json:"unacked_messages"`
	ReadyMessages   int64                  `json:"ready_messages"`
	Uptime          time.Duration          `json:"uptime"`
	MemoryUsed      int64                  `json:"memory_used"`
	ConnectionCount int                    `json:"connection_count"`
	Version         string                 `json:"version"`
	Extra           map[string]interface{} `json:"extra,omitempty"`
}

QueueStats contains queue system statistics.

type RabbitMQQueue

type RabbitMQQueue struct {
	// contains filtered or unexported fields
}

RabbitMQQueue implements Queue interface using RabbitMQ.

func NewRabbitMQQueue

func NewRabbitMQQueue(config Config, logger forge.Logger, metrics forge.Metrics) (*RabbitMQQueue, error)

NewRabbitMQQueue creates a new RabbitMQ-backed queue instance.

func (*RabbitMQQueue) Ack

func (q *RabbitMQQueue) Ack(ctx context.Context, messageID string) error

func (*RabbitMQQueue) Connect

func (q *RabbitMQQueue) Connect(ctx context.Context) error

func (*RabbitMQQueue) Consume

func (q *RabbitMQQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*RabbitMQQueue) DeclareQueue

func (q *RabbitMQQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*RabbitMQQueue) DeleteQueue

func (q *RabbitMQQueue) DeleteQueue(ctx context.Context, name string) error

func (*RabbitMQQueue) Disconnect

func (q *RabbitMQQueue) Disconnect(ctx context.Context) error

func (*RabbitMQQueue) GetDeadLetterQueue

func (q *RabbitMQQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*RabbitMQQueue) GetQueueInfo

func (q *RabbitMQQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*RabbitMQQueue) ListQueues

func (q *RabbitMQQueue) ListQueues(ctx context.Context) ([]string, error)

func (*RabbitMQQueue) Nack

func (q *RabbitMQQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*RabbitMQQueue) Ping

func (q *RabbitMQQueue) Ping(ctx context.Context) error

func (*RabbitMQQueue) Publish

func (q *RabbitMQQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*RabbitMQQueue) PublishBatch

func (q *RabbitMQQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*RabbitMQQueue) PublishDelayed

func (q *RabbitMQQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*RabbitMQQueue) PurgeQueue

func (q *RabbitMQQueue) PurgeQueue(ctx context.Context, name string) error

func (*RabbitMQQueue) Reject

func (q *RabbitMQQueue) Reject(ctx context.Context, messageID string) error

func (*RabbitMQQueue) RequeueDeadLetter

func (q *RabbitMQQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*RabbitMQQueue) Stats

func (q *RabbitMQQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*RabbitMQQueue) StopConsuming

func (q *RabbitMQQueue) StopConsuming(ctx context.Context, queueName string) error

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue implements Queue interface using Redis Streams.

func NewRedisQueue

func NewRedisQueue(config Config, logger forge.Logger, metrics forge.Metrics) (*RedisQueue, error)

NewRedisQueue creates a new Redis-backed queue instance.

func NewRedisQueueWithClient added in v0.7.4

func NewRedisQueueWithClient(config Config, logger forge.Logger, metrics forge.Metrics, client redis.UniversalClient) (*RedisQueue, error)

NewRedisQueueWithClient creates a new Redis-backed queue instance with an external client.

func (*RedisQueue) Ack

func (q *RedisQueue) Ack(ctx context.Context, messageID string) error

func (*RedisQueue) Connect

func (q *RedisQueue) Connect(ctx context.Context) error

func (*RedisQueue) Consume

func (q *RedisQueue) Consume(ctx context.Context, queueName string, handler MessageHandler, opts ConsumeOptions) error

func (*RedisQueue) DeclareQueue

func (q *RedisQueue) DeclareQueue(ctx context.Context, name string, opts QueueOptions) error

func (*RedisQueue) DeleteQueue

func (q *RedisQueue) DeleteQueue(ctx context.Context, name string) error

func (*RedisQueue) Disconnect

func (q *RedisQueue) Disconnect(ctx context.Context) error

func (*RedisQueue) GetDeadLetterQueue

func (q *RedisQueue) GetDeadLetterQueue(ctx context.Context, queueName string) ([]Message, error)

func (*RedisQueue) GetQueueInfo

func (q *RedisQueue) GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)

func (*RedisQueue) ListQueues

func (q *RedisQueue) ListQueues(ctx context.Context) ([]string, error)

func (*RedisQueue) Nack

func (q *RedisQueue) Nack(ctx context.Context, messageID string, requeue bool) error

func (*RedisQueue) Ping

func (q *RedisQueue) Ping(ctx context.Context) error

func (*RedisQueue) Publish

func (q *RedisQueue) Publish(ctx context.Context, queueName string, message Message) error

func (*RedisQueue) PublishBatch

func (q *RedisQueue) PublishBatch(ctx context.Context, queueName string, messages []Message) error

func (*RedisQueue) PublishDelayed

func (q *RedisQueue) PublishDelayed(ctx context.Context, queueName string, message Message, delay time.Duration) error

func (*RedisQueue) PurgeQueue

func (q *RedisQueue) PurgeQueue(ctx context.Context, name string) error

func (*RedisQueue) Reject

func (q *RedisQueue) Reject(ctx context.Context, messageID string) error

func (*RedisQueue) RequeueDeadLetter

func (q *RedisQueue) RequeueDeadLetter(ctx context.Context, queueName string, messageID string) error

func (*RedisQueue) Stats

func (q *RedisQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*RedisQueue) StopConsuming

func (q *RedisQueue) StopConsuming(ctx context.Context, queueName string) error

type RetryStrategy

type RetryStrategy struct {
	MaxRetries      int           `json:"max_retries"`
	InitialInterval time.Duration `json:"initial_interval"`
	MaxInterval     time.Duration `json:"max_interval"`
	Multiplier      float64       `json:"multiplier"` // Exponential backoff multiplier
}

RetryStrategy defines retry behavior.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL