Documentation
¶
Index ¶
- Variables
- func IsRetryable(err error) bool
- type Balancer
- func (b *Balancer) GetActiveQueues() []string
- func (b *Balancer) Health() HealthStatus
- func (b *Balancer) Start(ctx context.Context, handler MessageHandler) error
- func (b *Balancer) StartConsumer(ctx context.Context, queueURL string) error
- func (b *Balancer) Stats() Stats
- func (b *Balancer) Stop(ctx context.Context) error
- func (b *Balancer) StopConsumer(queueURL string, timeout time.Duration) error
- type Config
- type HealthStatus
- type MessageHandler
- type SQSConfig
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidConfig is returned when the configuration is invalid ErrInvalidConfig = errors.New("invalid configuration") // ErrAlreadyStarted is returned when Start is called on an already started balancer ErrAlreadyStarted = errors.New("balancer already started") // ErrNotStarted is returned when Stop is called on a balancer that hasn't been started ErrNotStarted = errors.New("balancer not started") // ErrShutdownTimeout is returned when graceful shutdown exceeds the configured timeout ErrShutdownTimeout = errors.New("shutdown timeout exceeded") // ErrWorkerPoolFull is returned when the worker pool queue is full and cannot accept new work ErrWorkerPoolFull = errors.New("worker pool queue is full") // ErrInstanceNotFound is returned when an instance is not found in DynamoDB ErrInstanceNotFound = errors.New("instance not found in DynamoDB") // ErrQueueNotAssigned is returned when a queue is not assigned to this instance ErrQueueNotAssigned = errors.New("queue not assigned to this instance") // ErrCoordinationConflict is returned when there's a version conflict in DynamoDB coordination ErrCoordinationConflict = errors.New("coordination conflict, retry needed") )
Functions ¶
func IsRetryable ¶
IsRetryable determines if an error should be retried Transient AWS errors (throttling, network timeouts, service unavailable) should be retried
Types ¶
type Balancer ¶
type Balancer struct {
// contains filtered or unexported fields
}
Balancer manages distributed SQS queue consumption across multiple instances
func New ¶
New creates a new Balancer instance with the provided configuration. The configuration is validated and defaults are applied for any zero-value fields. Returns an error if the configuration is invalid.
func (*Balancer) GetActiveQueues ¶
GetActiveQueues returns a list of currently active queue URLs Implements ConsumerManager interface
func (*Balancer) Health ¶
func (b *Balancer) Health() HealthStatus
Health returns the current health status of the balancer. This can be used for external health checks (e.g., ECS health checks).
func (*Balancer) Start ¶
func (b *Balancer) Start(ctx context.Context, handler MessageHandler) error
Start begins queue discovery, coordination, and message consumption. It launches background goroutines for: - Heartbeat maintenance - Queue discovery - Instance topology monitoring - Rebalancing - Message consumption from assigned queues
Returns immediately after starting background processing. Returns an error if already started or if initialization fails.
func (*Balancer) StartConsumer ¶
StartConsumer starts a consumer for the given queue URL Implements ConsumerManager interface
func (*Balancer) Stats ¶
Stats returns runtime statistics of the balancer. This includes information about active queues, messages processed, worker pool status, etc.
func (*Balancer) Stop ¶
Stop initiates graceful shutdown of the balancer. The shutdown process: 1. Stops accepting new messages from SQS 2. Waits for in-flight messages to complete (up to ShutdownTimeout) 3. Deregisters instance from DynamoDB 4. Returns when shutdown is complete or context is cancelled
Returns an error if not started or if shutdown fails/times out.
type Config ¶
type Config struct {
// AWS Configuration
AWSConfig aws.Config
// Application Identification
// Name of the application using this balancer (required)
// Instances with different ApplicationName values will operate independently
ApplicationName string
// Instance Identification
// Unique ID for this instance (auto-generated if empty)
InstanceID string
// DynamoDB Coordination
DynamoDBTableName string // Table for coordination (will be created if not exists)
HeartbeatInterval time.Duration // How often to send heartbeats (default: 10s)
HeartbeatTimeout time.Duration // Instance considered dead after this duration (default: 30s)
// Queue Discovery
QueueDiscoveryInterval time.Duration // How often to refresh queue list (default: 5m)
QueueFilterPattern *regexp.Regexp // Regex to filter queue names (nil = all queues)
// Queue Distribution
RebalanceDelay time.Duration // Delay before rebalancing after topology change (default: 30s)
VirtualNodes int // Virtual nodes per instance for consistent hashing (default: 150)
// SQS Configuration (applies to ALL queues)
SQS SQSConfig
// Worker Pool
WorkerPoolSize int // Number of concurrent workers (default: 10)
WorkQueueSize int // Size of internal work queue (default: 100)
// Graceful Shutdown
ShutdownTimeout time.Duration // Max time to wait for in-flight messages (default: 30s)
// Observability
Logger zerolog.Logger // If not provided, creates default logger
MeterProvider metric.MeterProvider // OpenTelemetry meter provider
TracerProvider trace.TracerProvider // OpenTelemetry tracer provider
}
Config holds all configuration for the SQS Balancer
func (*Config) WithDefaults ¶
WithDefaults returns a new Config with default values applied to any zero-value fields
type HealthStatus ¶
HealthStatus represents the health status of the balancer
type MessageHandler ¶
MessageHandler is the function signature for processing SQS messages. The handler receives the queue URL/name and the message to process. The handler should return nil on success, or an error if the message processing failed. If an error is returned, the message will NOT be deleted from SQS and will become visible again after the visibility timeout expires, allowing for automatic retry.
type SQSConfig ¶
type SQSConfig struct {
VisibilityTimeout int32 // Visibility timeout in seconds (default: 30)
WaitTimeSeconds int32 // Long polling duration in seconds (default: 20)
MaxMessages int32 // Batch size for receiving messages (default: 10, max: 10)
MessageAttributeNames []string // Message attributes to receive (default: ["All"])
}
SQSConfig holds SQS-specific configuration that applies to all queues