queue

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package queue provides background job processing with pluggable drivers. Jobs are dispatched to named queues and processed by a worker pool. Drivers include in-memory (channels), Redis Streams, and PostgreSQL (SKIP LOCKED).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BackoffDelay

func BackoffDelay(strategy BackoffStrategy, attempt int) time.Duration

BackoffDelay calculates the delay for a retry attempt based on the strategy.

Types

type BackoffStrategy

type BackoffStrategy int

BackoffStrategy defines how retry delays are calculated.

const (
	BackoffExponential BackoffStrategy = iota
	BackoffLinear
	BackoffFixed
)

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch dispatches multiple jobs and provides callbacks for when all succeed or any fail. The dispatcher tracks pending job counts per batch. When the worker completes or permanently fails each batch job, the tracker is notified. Once all jobs finish, the Then callback is invoked (all succeeded) or Catch (any failed permanently).

func (*Batch) Catch

func (b *Batch) Catch(fn func(ctx context.Context, err error)) *Batch

Catch sets a callback for when any job in the batch fails permanently.

func (*Batch) Dispatch

func (b *Batch) Dispatch(ctx context.Context) error

Dispatch sends all jobs in the batch to the queue and registers completion tracking. On partial failure, the batch tracking is cleaned up to prevent memory leaks.

func (*Batch) Then

func (b *Batch) Then(fn func(ctx context.Context)) *Batch

Then sets a callback for when all jobs in the batch succeed.

type Chain

type Chain struct {
	// contains filtered or unexported fields
}

Chain dispatches jobs sequentially — each job runs only after the previous one completes. The first job is dispatched immediately with remaining jobs encoded in ChainNext. When the worker successfully processes a chain job, it dispatches the next step.

func (*Chain) Dispatch

func (c *Chain) Dispatch(ctx context.Context) error

Dispatch sends the chain to the queue. The first job is dispatched immediately; subsequent jobs are embedded as ChainNext data on the payload.

type Configurable

type Configurable interface {
	Config() JobConfig
}

Configurable is implemented by jobs that override default configuration.

type DeadJob

type DeadJob struct {
	Payload  *Payload
	Error    string
	FailedAt time.Time
}

DeadJob holds a job that exhausted its retries.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher dispatches jobs to the queue.

func NewDispatcher

func NewDispatcher(driver Driver) *Dispatcher

NewDispatcher creates a job dispatcher with the given driver.

func (*Dispatcher) Batch

func (d *Dispatcher) Batch(jobs ...Job) *Batch

Batch creates a batch of jobs for group dispatch with completion callbacks.

func (*Dispatcher) Chain

func (d *Dispatcher) Chain(jobs ...Job) *Chain

Chain creates a sequential job chain.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(ctx context.Context, job Job) error

Dispatch sends a job to the queue for immediate processing.

func (*Dispatcher) DispatchAfter

func (d *Dispatcher) DispatchAfter(ctx context.Context, delay time.Duration, job Job) error

DispatchAfter sends a job to the queue with a delay before processing.

func (*Dispatcher) Register

func (d *Dispatcher) Register(jobType string, factory func() Job)

Register maps a job type string to a factory function for deserialization.

func (*Dispatcher) Resolve

func (d *Dispatcher) Resolve(payload *Payload) (Job, error)

Resolve creates a Job instance from a payload using the registered factories.

type Driver

type Driver interface {
	// Push adds a job payload to the queue.
	Push(ctx context.Context, payload *Payload) error
	// Pop retrieves the next available job from the queue. Blocks until a job is available or ctx is done.
	Pop(ctx context.Context, queue string) (*Payload, error)
	// Ack acknowledges successful processing of a job.
	Ack(ctx context.Context, payload *Payload) error
	// Fail marks a job as failed. If retries remain, the job is re-queued with backoff.
	Fail(ctx context.Context, payload *Payload, err error) error
	// Dead moves a job to the dead letter queue after exhausting retries.
	Dead(ctx context.Context, payload *Payload, err error) error
	// Close shuts down the driver.
	Close() error
}

Driver is the interface for queue backends.

type Job

type Job interface {
	// Type returns a unique identifier for this job type, used for deserialization.
	Type() string
	// Handle executes the job logic.
	Handle(ctx context.Context) error
}

Job defines a unit of work that can be dispatched to a queue.

type JobConfig

type JobConfig struct {
	Queue      string
	MaxRetries int
	Backoff    BackoffStrategy
	Timeout    time.Duration
	Delay      time.Duration
}

JobConfig configures per-job behavior.

func DefaultJobConfig

func DefaultJobConfig() JobConfig

DefaultJobConfig returns sensible defaults for job configuration.

type MemoryDriver

type MemoryDriver struct {
	// contains filtered or unexported fields
}

MemoryDriver is an in-process queue using buffered channels. Suitable for development and simple single-instance deployments.

func NewMemoryDriver

func NewMemoryDriver(bufSize int) *MemoryDriver

NewMemoryDriver creates an in-memory queue driver.

func (*MemoryDriver) Ack

func (m *MemoryDriver) Ack(_ context.Context, _ *Payload) error

Ack acknowledges a job (no-op for memory driver).

func (*MemoryDriver) Close

func (m *MemoryDriver) Close() error

Close signals all delayed job goroutines to stop and waits for them to exit.

func (*MemoryDriver) Dead

func (m *MemoryDriver) Dead(_ context.Context, payload *Payload, err error) error

Dead moves a job to the dead letter queue.

func (*MemoryDriver) DeadJobs

func (m *MemoryDriver) DeadJobs() []*DeadJob

DeadJobs returns all jobs in the dead letter queue.

func (*MemoryDriver) Fail

func (m *MemoryDriver) Fail(ctx context.Context, payload *Payload, _ error) error

Fail re-queues a job with backoff delay.

func (*MemoryDriver) Pop

func (m *MemoryDriver) Pop(ctx context.Context, queue string) (*Payload, error)

Pop retrieves the next job from the queue. Blocks until available or ctx cancelled.

func (*MemoryDriver) Push

func (m *MemoryDriver) Push(ctx context.Context, payload *Payload) error

Push adds a job to the queue. Delayed jobs use a timer with context cancellation to avoid goroutine leaks on shutdown.

type MiddlewareFunc

type MiddlewareFunc func(ctx context.Context, payload *Payload, next func(context.Context) error) error

MiddlewareFunc wraps job execution for cross-cutting concerns.

func CircuitBreaker

func CircuitBreaker(consecutiveThreshold int, resetTimeout time.Duration) MiddlewareFunc

CircuitBreaker creates middleware that stops processing jobs after consecutiveThreshold consecutive failures. The circuit stays open for the given resetTimeout, after which a single job is allowed through (half-open state). If that job succeeds, the circuit closes; if it fails, the circuit reopens.

func Deduplication

func Deduplication(ttl time.Duration) MiddlewareFunc

Deduplication creates middleware that skips jobs whose ID has been seen within the given TTL window. Prevents duplicate processing when the same job is enqueued multiple times. Expired entries are pruned on every call to prevent unbounded memory growth.

func RateLimiter

func RateLimiter(maxPerSecond int) MiddlewareFunc

RateLimiter creates middleware that throttles job processing per job type. At most maxPerSecond jobs of each type are processed within a one-second window. Jobs that exceed the limit return an error (causing a retry with backoff).

type Payload

type Payload struct {
	ID        string          `json:"id"`
	Type      string          `json:"type"`
	Data      json.RawMessage `json:"data"`
	Queue     string          `json:"queue"`
	Attempts  int             `json:"attempts"`
	MaxRetry  int             `json:"max_retry"`
	Backoff   BackoffStrategy `json:"backoff"`
	Timeout   time.Duration   `json:"timeout"`
	RunAt     time.Time       `json:"run_at"`
	BatchID   string          `json:"batch_id,omitempty"`
	ChainNext json.RawMessage `json:"chain_next,omitempty"`

	// Meta holds driver-specific metadata (e.g. Redis stream message IDs).
	// Not serialized as part of the job payload.
	Meta map[string]string `json:"-"`
}

Payload is the serialized representation of a job on the queue.

type PostgresConfig

type PostgresConfig struct {
	DSN       string
	TableName string
}

PostgresConfig configures the PostgreSQL SKIP LOCKED queue driver.

type PostgresDriver

type PostgresDriver struct {
	// contains filtered or unexported fields
}

PostgresDriver implements Driver using PostgreSQL with FOR UPDATE SKIP LOCKED. Provides transactional job processing with row-level locking for safe concurrent consumption across multiple workers.

func NewPostgresDriver

func NewPostgresDriver(ctx context.Context, cfg PostgresConfig) (*PostgresDriver, error)

NewPostgresDriver creates a PostgreSQL queue driver, connects to the database, and creates the jobs table if it does not exist.

func (*PostgresDriver) Ack

func (d *PostgresDriver) Ack(ctx context.Context, payload *Payload) error

Ack acknowledges successful processing by deleting the job row.

func (*PostgresDriver) Close

func (d *PostgresDriver) Close() error

Close shuts down the PostgreSQL connection pool.

func (*PostgresDriver) Dead

func (d *PostgresDriver) Dead(ctx context.Context, payload *Payload, err error) error

Dead marks a job as dead after exhausting retries and stores the error message.

func (*PostgresDriver) Fail

func (d *PostgresDriver) Fail(ctx context.Context, payload *Payload, _ error) error

Fail re-queues a failed job by resetting its status to pending with backoff delay.

func (*PostgresDriver) Pop

func (d *PostgresDriver) Pop(ctx context.Context, queue string) (*Payload, error)

Pop retrieves the next available job from the queue using SELECT FOR UPDATE SKIP LOCKED. Polls until a job is available or ctx is cancelled.

func (*PostgresDriver) Push

func (d *PostgresDriver) Push(ctx context.Context, payload *Payload) error

Push inserts a job into the PostgreSQL jobs table.

type RedisConfig

type RedisConfig struct {
	Addr     string
	Password string
	DB       int
	Prefix   string
	Group    string
	Consumer string
}

RedisConfig configures the Redis Streams queue driver.

type RedisDriver

type RedisDriver struct {
	// contains filtered or unexported fields
}

RedisDriver implements Driver using Redis Streams. Each queue maps to a stream named {prefix}queue:{queueName}. Consumer groups provide at-least-once delivery semantics.

func NewRedisDriver

func NewRedisDriver(cfg RedisConfig) *RedisDriver

NewRedisDriver creates a Redis Streams queue driver.

func (*RedisDriver) Ack

func (r *RedisDriver) Ack(ctx context.Context, payload *Payload) error

Ack acknowledges successful processing of a job via XACK and deletes the message.

func (*RedisDriver) Close

func (r *RedisDriver) Close() error

Close shuts down the Redis client connection.

func (*RedisDriver) Dead

func (r *RedisDriver) Dead(ctx context.Context, payload *Payload, err error) error

Dead moves a job to the dead letter stream after acknowledging the original.

func (*RedisDriver) Fail

func (r *RedisDriver) Fail(ctx context.Context, payload *Payload, _ error) error

Fail re-queues a job with backoff delay after acknowledging the original message.

func (*RedisDriver) Pop

func (r *RedisDriver) Pop(ctx context.Context, queue string) (*Payload, error)

Pop retrieves the next available job from the Redis stream using XREADGROUP. Blocks until a job is available or ctx is cancelled. Auto-creates the consumer group on first call if it does not exist.

func (*RedisDriver) Push

func (r *RedisDriver) Push(ctx context.Context, payload *Payload) error

Push adds a job payload to the Redis stream via XADD.

type Schedulable

type Schedulable interface {
	Job
	// Schedule returns a cron expression for when the job should run.
	Schedule() string
}

Schedulable is implemented by jobs that should run on a schedule.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool processes jobs from queues with configurable concurrency.

func NewWorkerPool

func NewWorkerPool(dispatcher *Dispatcher, logger *slog.Logger) *WorkerPool

NewWorkerPool creates a worker pool with the given dispatcher.

func (*WorkerPool) Drain

func (wp *WorkerPool) Drain()

Drain waits for all in-progress jobs to complete.

func (*WorkerPool) Queue

func (wp *WorkerPool) Queue(name string, concurrency int) *WorkerPool

Queue configures the number of workers for a named queue.

func (*WorkerPool) Start

func (wp *WorkerPool) Start(ctx context.Context) error

Start launches workers for all configured queues. Blocks until ctx is cancelled.

func (*WorkerPool) Use

func (wp *WorkerPool) Use(mw ...MiddlewareFunc)

Use adds middleware to the worker pool's processing chain.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL