goqueue

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2025 License: MIT Imports: 7 Imported by: 0

README

GoQueue

GoQueue Logo

Reliable Job Processing for Go Applications

Go Reference Go Version Version

GoQueue is a flexible background job processing library for Go applications, designed to handle workloads of any scale with multiple backend options.

Features

  • Multiple Backends: In-memory (development), Redis (production), PostgreSQL/MySQL , AWS SQS (cloud)
  • Job Management: automatic retries with backoff, Dead Letter Queue
  • Concurrency Control: Configurable worker pools, graceful shutdown
  • Observability: Metrics collection, logging support
  • Extensibility: Middleware pipeline for job customization
  • Code Quality: Configured with golangci-lint for high code quality standards

For a deeper understanding of the system design, check out the Architecture Documentation.

Why GoQueue?

GoQueue provides the most comprehensive feature set with excellent developer experience!

Alternatives

Here are some popular alternatives and related frameworks for job queues and background processing in Go:

  • Asynq: Redis-based job queue with a polished UI and CLI. Good developer experience, but recent commit activity is low.
  • River: Transactional queue built on Postgres, offering strong consistency and transactional guarantees.
  • Temporal: Workflow engine for complex orchestration and stateful jobs. Powerful, but has a steeper learning curve and heavier setup.
  • NATS: High-performance messaging system. Lacks built-in job scheduling, but is widely used for event-driven architectures.
  • Machinery: Older Go task queue library supporting multiple backends. Less active development and limited middleware support.
  • RabbitMQ / Kafka: General-purpose message brokers. Robust and scalable, but require more infrastructure and lack job-specific features out of the box.
Comparison Table
Feature GoQueue Asynq River Temporal NATS Machinery RabbitMQ/Kafka
Multiple Backends
Clean API ⚠️ ⚠️ ⚠️
Middleware Support ⚠️
Test Coverage ⚠️ ⚠️ ⚠️ ⚠️
AWS SQS Support
Non-blocking Retries ⚠️ ⚠️ ⚠️ ⚠️
UI/CLI Tools ⚠️ ⚠️
Scheduling ⚠️ ⚠️ ⚠️
SQL ⚠️ ⚠️

Installation

go get github.com/saravanasai/goqueue

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/saravanasai/goqueue"
    "github.com/saravanasai/goqueue/config"
)

// Define your job
type EmailJob struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}

// Implement the Job interface
func (e EmailJob) Process(ctx context.Context) error {
    fmt.Printf("Sending email to %s: %s\n", e.To, e.Subject)
    return nil
}

// Register job type for serialization
func init() {
    goqueue.RegisterJob("EmailJob", func() goqueue.Job {
        return &EmailJob{}
    })
}

func main() {
    // Create a queue with in-memory backend
    cfg := config.NewInMemoryConfig()
    q, err := goqueue.NewQueueWithDefaults("email-queue", cfg)
    if err != nil {
        log.Fatal(err)
    }

    // Start worker pool with 2 concurrent workers
    ctx := context.Background()
    q.StartWorkers(ctx, 2)

    // Dispatch a job
    job := EmailJob{
        To:      "user@example.com",
        Subject: "Welcome to GoQueue!",
    }
    if err := q.Dispatch(job); err != nil {
        log.Printf("Failed to dispatch job: %v", err)
    }

    // Wait to see results (in production, workers would run continuously)
    time.Sleep(2 * time.Second)

    // Graceful shutdown
    shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    q.Shutdown(shutdownCtx)
}

Database Driver Setup

GoQueue supports database backends for teams that prefer SQL-based persistence and transactional guarantees. Both PostgreSQL and MySQL are supported.

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/saravanasai/goqueue"
    "github.com/saravanasai/goqueue/config"
)

func main() {
    // Create a queue with PostgreSQL backend
    cfg := config.NewPostgresConfig("postgresql://user:password@localhost:5432/dbname")

    // Configure metrics callback (optional)
    cfg = cfg.WithMetricsCallback(func(metrics config.JobMetrics) {
        fmt.Printf("Job: %s, Queue: %s, Duration: %v, Error: %v\n",
            metrics.JobID, metrics.QueueName, metrics.Duration, metrics.Error)
    })

    // Create the queue
    q, err := goqueue.NewQueueWithDefaults("emails", cfg)
    if err != nil {
        log.Fatalf("Failed to create queue: %v", err)
    }

    // Dispatch jobs and start workers as normal...
}

The database driver automatically handles creating tables and managing failed jobs through a Dead Letter Queue (DLQ).

Backend Options

In-Memory (Development)
cfg := config.NewInMemoryConfig()
queue, err := goqueue.NewQueueWithDefaults("emails", cfg)
PostgreSQL/MySQL (Transactional)
// PostgreSQL
cfg := config.NewPostgresConfig("postgresql://user:password@localhost:5432/dbname")
queue, err := goqueue.NewQueueWithDefaults("emails", cfg)

// MySQL
cfg := config.NewMySQLConfig("root:password@tcp(localhost:3306)/dbname")
queue, err := goqueue.NewQueueWithDefaults("emails", cfg)
Redis (Production)
cfg := config.NewRedisConfig(
    "localhost:6379", // Redis server address
    "",               // Password (if any)
    0,                // Database number
)
queue, err := goqueue.NewQueueWithDefaults("emails", cfg)
AWS SQS (Cloud)
cfg := config.NewSQSConfig(
    "https://sqs.us-west-2.amazonaws.com/123456789012/my-queue", // Queue URL
    "us-west-2",                                                 // AWS Region
    "xxx",                                                       // Access key
    "xxxx",                                                      // Secret key
)
queue, err := goqueue.NewQueueWithDefaults("notifications", cfg)

Configuration Options

// Basic configuration
cfg := config.NewRedisConfig("localhost:6379", "", 0)

// Worker options
cfg = cfg.WithMaxWorkers(5)               // Number of worker goroutines
cfg = cfg.WithConcurrencyLimit(10)        // Maximum concurrent jobs

// Retry behavior
cfg = cfg.WithMaxRetryAttempts(3)         // Max retry attempts for failed jobs
cfg = cfg.WithRetryDelay(5 * time.Second) // Base delay between retries
cfg = cfg.WithExponentialBackoff(true)    // Use increasing delays between retries

// Metrics collection
cfg = cfg.WithMetricsCallback(func(metrics config.JobMetrics) {
    log.Printf("Job %s completed in %v with error: %v",
        metrics.JobID, metrics.Duration, metrics.Error)
})

Advanced Features

Delayed Jobs

You can schedule jobs to run in the future using DispatchWithDelay:

// Schedule a job to run 5 minutes from now
delay := 5 * time.Minute
if err := q.DispatchWithDelay(job, delay); err != nil {
    log.Printf("Failed to schedule job: %v", err)
}

Note: When using the Amazon SQS driver, the maximum supported delay is 15 minutes.

Job Middleware

Middleware allows you to add cross-cutting functionality to job processing. GoQueue uses a pipeline pattern where each middleware wraps the next one.

// Available built-in middleware
loggingMiddleware := middleware.LoggingMiddleware(logger) // Structured logging
skipCondition := middleware.ConditionalSkipMiddleware(func(jobCtx *job.JobContext) bool {
    // Skip processing based on job attributes
    return jobCtx.EnqueuedAt.Before(time.Now().Add(-24 * time.Hour))
})

// Custom middleware
rateLimiter := func(next middleware.HandlerFunc) middleware.HandlerFunc {
    return func(ctx context.Context, jobCtx *job.JobContext) error {
        // Implement rate limiting logic here
        fmt.Println("Rate limiting job execution")
        return next(ctx, jobCtx)
    }
}

// Apply middleware through configuration (order matters - first added, first executed)
cfg = cfg.WithMiddleware(loggingMiddleware)
cfg = cfg.WithMiddleware(skipCondition)
cfg = cfg.WithMiddleware(rateLimiter)

// Or add multiple middlewares at once
cfg = cfg.WithMiddlewares(loggingMiddleware, skipCondition, rateLimiter)
Dead Letter Queue (DLQ)

When jobs fail repeatedly after exhausting retry attempts, they can be sent to a Dead Letter Queue for analysis and handling.

// Custom handler for failed jobs
type CustomDLQ struct{}

func (d *CustomDLQ) Push(ctx context.Context, jobCtx *job.JobContext, err error) error {
    // Store job data and error in database
    fmt.Printf("Job %s failed after %d attempts: %v\n",
        jobCtx.JobID, jobCtx.Attempts, err)

    // Send alerts or notifications
    // alertService.Send(fmt.Sprintf("Critical job failure: %s", jobCtx.JobID))

    return nil
}

// Register custom DLQ
cfg = cfg.WithDLQAdapter(&CustomDLQ{})
Metrics and Observability

GoQueue provides built-in support for collecting metrics and monitoring job processing:

// Register metrics callback
cfg = cfg.WithMetricsCallback(func(metrics config.JobMetrics) {
    // Record metrics in your monitoring system (Prometheus, etc.)
    fmt.Printf("Job: %s, Queue: %s, Duration: %v, Error: %v\n",
        metrics.JobID, metrics.QueueName, metrics.Duration, metrics.Error)

    // Track success/failure rates
    if metrics.Error != nil {
        failureCounter.Inc()
    } else {
        successCounter.Inc()
    }
})

License

MIT

Roadmap

Upcoming features planned for GoQueue:

  • MySQL Driver: Additional SQL backend option. .

Documentation

Index

Constants

View Source
const DefaultShutdownTimeout = 5 * time.Second
View Source
const Version = "0.2.0"

Version represents the current version of GoQueue

Variables

This section is empty.

Functions

func Dispatch

func Dispatch(q *queue.Queue, payload job.Job) error

Dispatch adds a single job to the queue for processing.

The job will be stored in the configured backend and processed by available workers.

Parameters:

  • q: The queue to dispatch the job to
  • payload: The job to be processed

Returns:

  • nil on successful dispatch
  • an error if dispatch fails

Dispatch adds a single job to the queue for processing (immediate execution).

func DispatchBatch

func DispatchBatch(q *queue.Queue, jobs []job.Job) error

DispatchBatch adds multiple jobs to the queue for processing.

All jobs will be stored in the configured backend and processed by available workers. This is more efficient than calling Dispatch multiple times for individual jobs.

Parameters:

  • q: The queue to dispatch the jobs to
  • jobs: Slice of jobs to be processed

Returns:

  • nil on successful batch dispatch
  • an error if batch dispatch fails

DispatchBatch adds multiple jobs to the queue for processing (immediate execution).

func DispatchBatchWithDelay

func DispatchBatchWithDelay(q *queue.Queue, jobs []job.Job, delay time.Duration) error

DispatchBatchWithDelay adds multiple jobs to the queue for processing after a delay. Delay can be specified in seconds, minutes, or hours using time.Duration.

func DispatchWithDelay

func DispatchWithDelay(q *queue.Queue, payload job.Job, delay time.Duration) error

DispatchWithDelay adds a single job to the queue for processing after a delay. Delay can be specified in seconds, minutes, or hours using time.Duration.

func GetVersion

func GetVersion() string

GetVersion returns the current version of GoQueue

func IsQueueOverloaded

func IsQueueOverloaded(q *queue.Queue) bool

IsQueueOverloaded checks if the queue is currently experiencing high load.

The determination is based on the configured thresholds for job count and processing ratios. This can be used to implement backpressure mechanisms.

Parameters:

  • q: The queue to check load status for

Returns:

  • true if the queue is overloaded
  • false if the queue is operating normally or if statistics collection is disabled

func RegisterJob

func RegisterJob(name string, constructor func() Job)

RegisterJob registers a job type with the queue system to enable serialization/deserialization.

This must be called for each job type before using the queue. It associates a string name with a function that creates new instances of your job type.

Parameters:

  • name: A unique string identifier for the job type
  • constructor: A function that returns a new instance of the job type

func Shutdown

func Shutdown(q *queue.Queue, ctx context.Context) error

Shutdown gracefully stops the queue, waiting for in-progress jobs to complete.

It will wait up to the timeout duration configured during queue creation for jobs to finish.

Parameters:

  • q: The queue to shut down
  • ctx: Context used for cancellation

Returns:

  • nil if shutdown completes successfully
  • an error if shutdown fails or times out

func StartWorker

func StartWorker(q *queue.Queue, ctx context.Context, count int) error

StartWorker launches worker goroutines to process jobs from the queue.

Workers will continue running until the provided context is cancelled.

Parameters:

  • q: The queue to start workers for
  • ctx: Context used for cancellation and shutdown
  • count: Number of worker goroutines to start

Returns:

  • Error if workers cannot be started

Types

type Job

type Job = job.Job

Job is the interface clients must implement for their jobs.

type JobMetrics

type JobMetrics = config.JobMetrics

type Queue

type Queue = queue.Queue

func NewQueue

func NewQueue(queueName string, cfg config.Config, shutdownTimeout time.Duration) (*Queue, error)

NewQueue creates a new queue with the specified name, configuration, and shutdown timeout.

Parameters:

  • queueName: A unique identifier for the queue
  • cfg: The queue configuration specifying backend, workers, retry policy, etc.
  • shutdownTimeout: Maximum duration to wait for jobs to complete during shutdown

Returns:

  • A new Queue instance and nil error on success
  • nil and an error if queue creation fails

func NewQueueWithDefaults

func NewQueueWithDefaults(queueName string, cfg config.Config) (*Queue, error)

NewQueueWithDefaults creates a new queue with the specified name and configuration, using the default shutdown timeout.

This is a convenience function for when you don't need to specify a custom shutdown timeout.

Parameters:

  • queueName: A unique identifier for the queue
  • cfg: The queue configuration specifying backend, workers, retry policy, etc.

Returns:

  • A new Queue instance and nil error on success
  • nil and an error if queue creation fails

type QueueStats

type QueueStats = stats.QueueStats

func GetQueueStats

func GetQueueStats(q *queue.Queue) QueueStats

GetQueueStats returns current queue statistics and health metrics.

This includes job counts, processing rates, and health indicators. If statistics collection is disabled in the queue configuration, only basic health information is returned.

Parameters:

  • q: The queue to get statistics for

Returns:

  • QueueStats containing the current queue metrics

Directories

Path Synopsis
dlq
Package dlq provides the Dead Letter Queue (DLQ) interface for handling failed jobs.
Package dlq provides the Dead Letter Queue (DLQ) interface for handling failed jobs.
sqs
internal
Package job provides the core job processing interfaces and types for the goqueue package.
Package job provides the core job processing interfaces and types for the goqueue package.
Package middleware provides a flexible middleware system for customizing job processing behavior.
Package middleware provides a flexible middleware system for customizing job processing behavior.
Package queue provides the core job queue functionality, including job dispatching, worker management, and queue lifecycle control.
Package queue provides the core job queue functionality, including job dispatching, worker management, and queue lifecycle control.
Package worker provides job processing functionality through concurrent worker goroutines.
Package worker provides job processing functionality through concurrent worker goroutines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL