sqsbalancer

package module
v0.0.0-...-b9b2f3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: MIT Imports: 20 Imported by: 0

README

SQS Balancer

A production-ready Go library for distributed SQS queue consumption with automatic load balancing across multiple ECS service instances.

Features

  • Distributed Coordination - Multiple instances automatically coordinate via DynamoDB with heartbeat-based health checks
  • Consistent Hashing - Stable queue distribution using consistent hashing with virtual nodes, minimizing reassignments during scaling
  • Automatic Rebalancing - Queues automatically redistribute when instances join, leave, or new queues are discovered
  • Graceful Shutdown - In-flight messages complete processing before shutdown, with configurable timeout
  • Queue Discovery - Automatic discovery of SQS queues with regex-based filtering
  • Error Handling - Failed messages automatically retry via SQS visibility timeout mechanism
  • Observability - Built-in support for structured logging (zerolog) and OpenTelemetry metrics/tracing
  • Highly Configurable - All parameters tunable with sensible defaults

Installation

go get github.com/soaserele/sqsbalancer

Quick Start

package main

import (
    "context"
    "os"
    "regexp"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/sqs/types"
    "github.com/rs/zerolog"
    "github.com/soaserele/sqsbalancer"
)

func main() {
    ctx := context.Background()

    // Setup logger
    logger := zerolog.New(os.Stdout).With().Timestamp().Logger()

    // Load AWS configuration
    awsConfig, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        panic(err)
    }

    // Configure the balancer
    cfg := &sqsbalancer.Config{
        AWSConfig:         awsConfig,
        DynamoDBTableName: "sqsbalancer-coordination",
        QueueFilterPattern: regexp.MustCompile("^myapp-.*"),
        Logger:            logger,
    }

    // Create balancer
    balancer, err := sqsbalancer.New(ctx, cfg)
    if err != nil {
        panic(err)
    }

    // Define message handler
    handler := func(ctx context.Context, queueURL string, message *types.Message) error {
        logger.Info().
            Str("queue_url", queueURL).
            Str("message_body", *message.Body).
            Msg("Processing message")
        // Your message processing logic here
        return nil // Return error to retry via visibility timeout
    }

    // Start the balancer
    if err := balancer.Start(ctx, handler); err != nil {
        panic(err)
    }

    // Wait for shutdown signal...
    // balancer.Stop(ctx)
}

How It Works

Architecture Overview
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Instance 1 │     │  Instance 2 │     │  Instance 3 │
│             │     │             │     │             │
│ ┌─────────┐ │     │ ┌─────────┐ │     │ ┌─────────┐ │
│ │Balancer │ │     │ │Balancer │ │     │ │Balancer │ │
│ └────┬────┘ │     │ └────┬────┘ │     │ └────┬────┘ │
└──────┼──────┘     └──────┼──────┘     └──────┼──────┘
       │                   │                   │
       └───────────────────┼───────────────────┘
                           │
                    ┌──────▼───────┐
                    │   DynamoDB   │
                    │ Coordination │
                    └──────────────┘

       ┌───────────────────┼───────────────────┐
       │                   │                   │
┌──────▼──────┐     ┌──────▼──────┐     ┌─────▼───────┐
│ SQS Queue 1 │     │ SQS Queue 2 │     │ SQS Queue 3 │
└─────────────┘     └─────────────┘     └─────────────┘
Distribution Algorithm
  1. Instance Registration: Each instance registers itself in DynamoDB with a unique ID and starts sending heartbeats
  2. Queue Discovery: Instances discover all SQS queues matching the configured regex pattern
  3. Consistent Hashing: Queues are distributed across active instances using consistent hashing with virtual nodes (default: 150 per instance)
  4. Consumer Management: Each instance starts consumers only for queues assigned to it
  5. Rebalancing: When topology changes (instances join/leave, queues added/removed), the library triggers rebalancing after a configurable delay
Message Processing Flow
SQS Queues → Pollers → Work Queue → Worker Pool → Your Handler
    (N)        (N)        (1)          (M)             (1)

where N = number of assigned queues, M = worker pool size
  1. Pollers: Each assigned queue has a dedicated poller using SQS long polling (20s default)
  2. Work Queue: Messages are submitted to a global buffered work queue (size: 100 default)
  3. Worker Pool: Fixed number of workers (default: 10) process messages concurrently
  4. Handler Execution: Your handler function is called for each message
  5. Message Deletion: Messages are deleted from SQS only on successful processing (nil error)
  6. Retry on Failure: Failed messages (handler returns error) are NOT deleted and retry after visibility timeout

Configuration

Complete Configuration Options
type Config struct {
    // AWS Configuration (required)
    AWSConfig aws.Config

    // Instance Identification
    // Unique ID for this instance (auto-generated UUID if not provided)
    InstanceID string

    // DynamoDB Coordination (required)
    DynamoDBTableName string // Table for coordination

    // Heartbeat Configuration
    HeartbeatInterval time.Duration // Default: 10s
    HeartbeatTimeout  time.Duration // Default: 30s (instance considered dead)

    // Queue Discovery
    QueueDiscoveryInterval time.Duration  // Default: 5 minutes
    QueueFilterPattern     *regexp.Regexp // nil = all queues

    // Queue Distribution
    RebalanceDelay time.Duration // Default: 30s (delay before rebalancing)
    VirtualNodes   int           // Default: 150 (virtual nodes per instance)

    // SQS Configuration (applies to ALL queues)
    SQS SQSConfig

    // Worker Pool
    WorkerPoolSize int // Default: 10 concurrent workers
    WorkQueueSize  int // Default: 100 buffered messages

    // Graceful Shutdown
    ShutdownTimeout time.Duration // Default: 30s

    // Observability
    Logger         zerolog.Logger       // Default: console logger
    MeterProvider  metric.MeterProvider // Optional OpenTelemetry
    TracerProvider trace.TracerProvider // Optional OpenTelemetry
}

type SQSConfig struct {
    VisibilityTimeout     int32    // Default: 30 seconds
    WaitTimeSeconds       int32    // Default: 20 seconds (long polling)
    MaxMessages           int32    // Default: 10 (max batch size)
    MessageAttributeNames []string // Default: ["All"]
}
Configuration Best Practices
  • HeartbeatTimeout should be > HeartbeatInterval to avoid false positives
  • RebalanceDelay allows batching multiple topology changes (e.g., multiple instances starting)
  • VirtualNodes higher values = better distribution but more memory
  • WorkerPoolSize should match your message processing concurrency needs
  • WorkQueueSize provides buffering between pollers and workers
  • VisibilityTimeout should exceed your expected message processing time

Advanced Usage

Custom Observability
import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/sdk/metric"
    "go.opentelemetry.io/otel/sdk/trace"
)

// Setup OpenTelemetry
meterProvider := metric.NewMeterProvider()
tracerProvider := trace.NewTracerProvider()

cfg := &sqsbalancer.Config{
    // ... other config ...
    MeterProvider:  meterProvider,
    TracerProvider: tracerProvider,
}
Runtime Statistics
// Get current statistics (assumes logger is available)
stats := balancer.Stats()

logger.Info().
    Str("instance_id", stats.InstanceID).
    Int("active_queues", stats.ActiveQueues).
    Int64("messages_processed", stats.MessagesProcessed).
    Int64("messages_succeeded", stats.MessagesSucceeded).
    Int64("messages_failed", stats.MessagesFailed).
    Int("workers_active", stats.WorkersActive).
    Int("work_queue_depth", stats.WorkQueueDepth).
    Msg("Balancer statistics")
Health Checks
// For ECS/Kubernetes health checks
health := balancer.Health()

if !health.Healthy {
    // Instance is unhealthy
}
Graceful Shutdown
// Setup signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

// Wait for signal
<-sigCh

// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

if err := balancer.Stop(ctx); err != nil {
    log.Fatal(err)
}

DynamoDB Table

The library automatically creates a DynamoDB table with the following schema:

Table: <your-table-name>
├── Primary Key
│   ├── pk (String, Partition Key)
│   └── sk (String, Sort Key)
├── Attributes
│   ├── instance_id
│   ├── last_heartbeat (timestamp)
│   ├── status (active/draining/stopped)
│   ├── version (for optimistic locking)
│   ├── queue_url
│   ├── assigned_instance
│   └── ttl (auto-cleanup)
└── GSI: InstanceAssignments
    └── assigned_instance → queue_url

Provisioned Capacity: 5 RCU / 5 WCU (adjust based on your instance count)

TTL: Enabled on ttl attribute for automatic cleanup of dead instances

Metrics

When OpenTelemetry MeterProvider is configured, the following metrics are emitted:

  • sqsbalancer.messages.received - Counter of messages received from SQS
  • sqsbalancer.messages.processed - Counter of messages processed
  • sqsbalancer.messages.succeeded - Counter of successful message processing
  • sqsbalancer.messages.failed - Counter of failed message processing

All metrics include the queue_url attribute.

Performance Considerations

Scaling Behavior
  • Instances: Supports hundreds of instances (limited by DynamoDB throughput)
  • Queues: Efficiently handles thousands of queues per instance
  • Messages: Throughput limited by worker pool size and message processing time
Resource Usage
  • Memory: Minimal (~10MB base + message buffers)
  • CPU: Low overhead, mainly from message processing
  • Network: DynamoDB heartbeats (every 10s) + SQS long polling
Optimization Tips
  1. Worker Pool Size: Set based on your message processing I/O wait time
  2. Work Queue Size: Increase if workers are idle but pollers are submitting messages
  3. Batch Size: Increase MaxMessages for higher throughput (up to 10)
  4. Visibility Timeout: Set higher than expected processing time to avoid duplicates
  5. Virtual Nodes: Default 150 is usually sufficient; increase for more uniform distribution

Limitations

  • At-Least-Once Delivery: Messages may be processed multiple times (handler should be idempotent)
  • No Message Ordering: Standard SQS queues don't guarantee order (use FIFO queues if needed)
  • DynamoDB Dependency: Requires DynamoDB table for coordination
  • Region-Specific: All instances must be in the same AWS region

Troubleshooting

Messages Not Being Processed
  1. Check queue filter pattern matches your queue names
  2. Verify AWS credentials have SQS permissions
  3. Check DynamoDB table exists and is accessible
  4. Review logs for errors
Frequent Rebalancing
  1. Increase RebalanceDelay to batch changes
  2. Ensure HeartbeatTimeout > HeartbeatInterval
  3. Check network stability between instances and DynamoDB
High Message Latency
  1. Increase WorkerPoolSize for more concurrent processing
  2. Optimize your message handler performance
  3. Increase WorkQueueSize if queue is frequently full

License

MIT License - see LICENSE file for details

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

Support

For issues, questions, or feature requests, please open an issue on GitHub.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidConfig is returned when the configuration is invalid
	ErrInvalidConfig = errors.New("invalid configuration")

	// ErrAlreadyStarted is returned when Start is called on an already started balancer
	ErrAlreadyStarted = errors.New("balancer already started")

	// ErrNotStarted is returned when Stop is called on a balancer that hasn't been started
	ErrNotStarted = errors.New("balancer not started")

	// ErrShutdownTimeout is returned when graceful shutdown exceeds the configured timeout
	ErrShutdownTimeout = errors.New("shutdown timeout exceeded")

	// ErrWorkerPoolFull is returned when the worker pool queue is full and cannot accept new work
	ErrWorkerPoolFull = errors.New("worker pool queue is full")

	// ErrInstanceNotFound is returned when an instance is not found in DynamoDB
	ErrInstanceNotFound = errors.New("instance not found in DynamoDB")

	// ErrQueueNotAssigned is returned when a queue is not assigned to this instance
	ErrQueueNotAssigned = errors.New("queue not assigned to this instance")

	// ErrCoordinationConflict is returned when there's a version conflict in DynamoDB coordination
	ErrCoordinationConflict = errors.New("coordination conflict, retry needed")
)

Functions

func IsRetryable

func IsRetryable(err error) bool

IsRetryable determines if an error should be retried Transient AWS errors (throttling, network timeouts, service unavailable) should be retried

Types

type Balancer

type Balancer struct {
	// contains filtered or unexported fields
}

Balancer manages distributed SQS queue consumption across multiple instances

func New

func New(ctx context.Context, cfg *Config) (*Balancer, error)

New creates a new Balancer instance with the provided configuration. The configuration is validated and defaults are applied for any zero-value fields. Returns an error if the configuration is invalid.

func (*Balancer) GetActiveQueues

func (b *Balancer) GetActiveQueues() []string

GetActiveQueues returns a list of currently active queue URLs Implements ConsumerManager interface

func (*Balancer) Health

func (b *Balancer) Health() HealthStatus

Health returns the current health status of the balancer. This can be used for external health checks (e.g., ECS health checks).

func (*Balancer) Start

func (b *Balancer) Start(ctx context.Context, handler MessageHandler) error

Start begins queue discovery, coordination, and message consumption. It launches background goroutines for: - Heartbeat maintenance - Queue discovery - Instance topology monitoring - Rebalancing - Message consumption from assigned queues

Returns immediately after starting background processing. Returns an error if already started or if initialization fails.

func (*Balancer) StartConsumer

func (b *Balancer) StartConsumer(ctx context.Context, queueURL string) error

StartConsumer starts a consumer for the given queue URL Implements ConsumerManager interface

func (*Balancer) Stats

func (b *Balancer) Stats() Stats

Stats returns runtime statistics of the balancer. This includes information about active queues, messages processed, worker pool status, etc.

func (*Balancer) Stop

func (b *Balancer) Stop(ctx context.Context) error

Stop initiates graceful shutdown of the balancer. The shutdown process: 1. Stops accepting new messages from SQS 2. Waits for in-flight messages to complete (up to ShutdownTimeout) 3. Deregisters instance from DynamoDB 4. Returns when shutdown is complete or context is cancelled

Returns an error if not started or if shutdown fails/times out.

func (*Balancer) StopConsumer

func (b *Balancer) StopConsumer(queueURL string, timeout time.Duration) error

StopConsumer stops the consumer for the given queue URL Implements ConsumerManager interface

type Config

type Config struct {
	// AWS Configuration
	AWSConfig aws.Config

	// Application Identification
	// Name of the application using this balancer (required)
	// Instances with different ApplicationName values will operate independently
	ApplicationName string

	// Instance Identification
	// Unique ID for this instance (auto-generated if empty)
	InstanceID string

	// DynamoDB Coordination
	DynamoDBTableName string        // Table for coordination (will be created if not exists)
	HeartbeatInterval time.Duration // How often to send heartbeats (default: 10s)
	HeartbeatTimeout  time.Duration // Instance considered dead after this duration (default: 30s)

	// Queue Discovery
	QueueDiscoveryInterval time.Duration  // How often to refresh queue list (default: 5m)
	QueueFilterPattern     *regexp.Regexp // Regex to filter queue names (nil = all queues)

	// Queue Distribution
	RebalanceDelay time.Duration // Delay before rebalancing after topology change (default: 30s)
	VirtualNodes   int           // Virtual nodes per instance for consistent hashing (default: 150)

	// SQS Configuration (applies to ALL queues)
	SQS SQSConfig

	// Worker Pool
	WorkerPoolSize int // Number of concurrent workers (default: 10)
	WorkQueueSize  int // Size of internal work queue (default: 100)

	// Graceful Shutdown
	ShutdownTimeout time.Duration // Max time to wait for in-flight messages (default: 30s)

	// Observability
	Logger         zerolog.Logger       // If not provided, creates default logger
	MeterProvider  metric.MeterProvider // OpenTelemetry meter provider
	TracerProvider trace.TracerProvider // OpenTelemetry tracer provider
}

Config holds all configuration for the SQS Balancer

func (*Config) Validate

func (c *Config) Validate() error

Validate ensures the configuration is valid and returns an error if not

func (*Config) WithDefaults

func (c *Config) WithDefaults() *Config

WithDefaults returns a new Config with default values applied to any zero-value fields

type HealthStatus

type HealthStatus struct {
	Healthy   bool
	Started   bool
	Instances int
	Queues    int
	Workers   int
}

HealthStatus represents the health status of the balancer

type MessageHandler

type MessageHandler func(ctx context.Context, queueURL string, message *types.Message) error

MessageHandler is the function signature for processing SQS messages. The handler receives the queue URL/name and the message to process. The handler should return nil on success, or an error if the message processing failed. If an error is returned, the message will NOT be deleted from SQS and will become visible again after the visibility timeout expires, allowing for automatic retry.

type SQSConfig

type SQSConfig struct {
	VisibilityTimeout     int32    // Visibility timeout in seconds (default: 30)
	WaitTimeSeconds       int32    // Long polling duration in seconds (default: 20)
	MaxMessages           int32    // Batch size for receiving messages (default: 10, max: 10)
	MessageAttributeNames []string // Message attributes to receive (default: ["All"])
}

SQSConfig holds SQS-specific configuration that applies to all queues

type Stats

type Stats struct {
	InstanceID        string
	ActiveQueues      int
	MessagesProcessed int64
	MessagesSucceeded int64
	MessagesFailed    int64
	WorkersActive     int
	WorkQueueDepth    int
	ActiveInstances   int
	AssignedQueueURLs []string
}

Stats represents runtime statistics of the balancer

Directories

Path Synopsis
examples
basic command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL