go-sqs-worker

module
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2025 License: MIT

README

go-sqs-worker

go-sqs-worker is a Go library designed to manage asynchronous jobs using AWS SQS. It provides a simple, scalable way to produce and consume tasks, making it ideal for background job processing.

Features

  • Producer and Consumer Support: Enables the creation and handling of jobs with distinct producer and consumer components.
  • Scheduled Job Processing: Supports the scheduling of jobs for processing at a later time.
  • Dead Letter Queue Integration: Provides built-in support for Dead Letter Queues (DLQs) to ensure reliable message processing.
  • Configurable Retry Mechanism: Allows customization of the number of retries and delay intervals for failed jobs.
  • Graceful Shutdown: Manages context cancellation and cleanup during termination.
  • LocalStack Compatibility: Fully compatible with LocalStack for local development and testing.

Installation

Add the library to your project using go get:

go get github.com/mickamy/go-sqs-worker

Usage

Producer Example

The producer is responsible for sending jobs to the SQS queue.

package main

import (
	"context"
	"fmt"

	"github.com/mickamy/go-sqs-worker/producer"
	"github.com/mickamy/go-sqs-worker/message"

	"github.com/mickamy/go-sqs-worker-example/internal/job"
	"github.com/mickamy/go-sqs-worker-example/internal/lib/aws"
)

func main() {
	ctx := context.Background()

	cfg := producer.Config{
		WorkerQueueURL: "http://localhost.localstack.cloud:4566/000000000000/worker-queue",
	}

	p, err := producer.New(cfg, aws.NewSQSClient(ctx))
	if err != nil {
		fmt.Println("failed to create producer", "error", err)
		return
	}

	msg, err := message.New(ctx, job.SuccessfulJobType.String(), job.SuccessfulJobPayload{
		Message: "hello",
	})
	if err != nil {
		fmt.Println("failed to create successful job message", "error", err)
		return
	}
	if err := p.Do(ctx, msg); err != nil {
		fmt.Println("failed to produce successful job", "error", err)
		return
	}
}

Consumer Example

The consumer retrieves and processes jobs from the SQS queue.

package main

import (
	"context"
	"fmt"

	"github.com/mickamy/go-sqs-worker-example/internal/lib/aws"
	"github.com/mickamy/go-sqs-worker/consumer"
)

func main() {
	ctx := context.Background()

	cfg := consumer.Config{
		WorkerQueueURL:     "http://localhost.localstack.cloud:4566/000000000000/worker-queue",
		DeadLetterQueueURL: "http://localhost.localstack.cloud:4566/000000000000/dead-letter-queue",
	}

	c, err := consumer.New(cfg, aws.NewSQSClient(ctx), job.GetJobHandler)

	if err != nil {
		fmt.Println("failed to create consumer", "error", err)
		return
	}

	c.Do(ctx)
}


Example Project

The repository includes an example project under the example/ directory. To run the example:

make up

Integration with go-sqs-worker-viewer

For monitoring and managing jobs in real-time, use go-sqs-worker-viewer. Configure RedisURL in the consumer/producer to enable integration:

consumer.Config{
    // ... other configurations
    RedisURL: "redis://localhost:6379",
}
producer.Config{
    // ... other configurations
    RedisURL: "redis://localhost:6379",
}

This setup allows go-sqs-worker-viewer to display job statuses and processing metrics.

Configuration

Producer Configuration
  • WorkerQueueURL: Required. The URL of the SQS queue where worker messages are stored.
  • WorkerQueueARN: Optional. The ARN of the SQS queue where worker messages are stored. This is required if you use scheduled jobs.
  • SchedulerRoleARN: Optional. The ARN of the IAM role used by EventBridge Scheduler. This is required if you use scheduled jobs.
  • SchedulerTimeZone: Optional. The time zone used by EventBridge Scheduler. This is required if you use scheduled jobs.
  • BeforeProduceFunc: Optional. A function to execute before a message is produced. The default function does nothing.
  • AfterProduceFunc: Optional. A function to execute after a message is produced. The default function does nothing.
  • RedisURL: Optional. The URL of the Redis server for storing job processing status. This is particularly useful for tracking job-related data during processing, especially when used with go-sqs-worker-viewer.
Consumer Configuration
  • WorkerQueueURL: Required. The URL of the SQS queue where worker messages are stored.
  • DeadLetterQueueURL: Optional. The URL of the Dead Letter Queue (DLQ) for messages that fail to process after the maximum number of retries. If not set, the DLQ is not used.
  • RedisURL: Optional. The URL of the Redis server for storing job processing status. This is particularly useful for tracking job-related data during processing, especially when used with go-sqs-worker-viewer.
  • MaxRetry: Optional. The maximum number of retry attempts for a failed job. The default value is 5 retries.
  • BaseDelay: Optional. The initial delay (in seconds) before retrying a failed job. This value is used as the base for calculating exponential backoff delays. The default value is 30 seconds.
  • MaxDelay: Optional. The maximum delay (in seconds) between retries, used to cap the exponential backoff delay. The default value is 900 seconds (which is the maximum visibility timeout for SQS).
  • WaitTimeSeconds: Optional. The maximum time (in seconds) to wait for a message to be received from the SQS queue. This value is used for long polling. The default value is 20 seconds. The maximum allowed value is 20 seconds.
  • TimeoutSeconds: Optional. The maximum time (in seconds) to wait for a message to be processed. This value is used to limit the time spent on processing a message. The default value is 300 seconds. Note that actual timeout seconds will be WaitTimeSeconds + TimeoutSeconds.
  • BeforeProcessFunc: Optional. A function to execute before processing a message. The default function does nothing.
  • AfterProcessFunc: Optional. A function to execute after processing a message. The default function does nothing.

Documentation

GoDoc documentation is available at pkg.go.dev.

License

This library is licensed under the MIT License. See the LICENSE file for details.

Directories

Path Synopsis
Package consumer provides structures and functions for consuming messages from an SQS queue and processing them.
Package consumer provides structures and functions for consuming messages from an SQS queue and processing them.
internal
scheduler/mock_scheduler
Package mock_scheduler is a generated GoMock package.
Package mock_scheduler is a generated GoMock package.
sqs
sqs/mock_sqs
Package mock_sqs is a generated GoMock package.
Package mock_sqs is a generated GoMock package.
Package job provides an interface for job execution and a function to get a job by job type
Package job provides an interface for job execution and a function to get a job by job type
Package message provides structures and functions for handling messages in the job processing system.
Package message provides structures and functions for handling messages in the job processing system.
Package producer provides structures and functions for producing messages to an SQS queue.
Package producer provides structures and functions for producing messages to an SQS queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL