Documentation
¶
Overview ¶
Package queue provides background job processing with pluggable drivers. Jobs are dispatched to named queues and processed by a worker pool. Drivers include in-memory (channels), Redis Streams, and PostgreSQL (SKIP LOCKED).
Index ¶
- func BackoffDelay(strategy BackoffStrategy, attempt int) time.Duration
- type BackoffStrategy
- type Batch
- type Chain
- type Configurable
- type DeadJob
- type Dispatcher
- func (d *Dispatcher) Batch(jobs ...Job) *Batch
- func (d *Dispatcher) Chain(jobs ...Job) *Chain
- func (d *Dispatcher) Dispatch(ctx context.Context, job Job) error
- func (d *Dispatcher) DispatchAfter(ctx context.Context, delay time.Duration, job Job) error
- func (d *Dispatcher) Register(jobType string, factory func() Job)
- func (d *Dispatcher) Resolve(payload *Payload) (Job, error)
- type Driver
- type Job
- type JobConfig
- type MemoryDriver
- func (m *MemoryDriver) Ack(_ context.Context, _ *Payload) error
- func (m *MemoryDriver) Close() error
- func (m *MemoryDriver) Dead(_ context.Context, payload *Payload, err error) error
- func (m *MemoryDriver) DeadJobs() []*DeadJob
- func (m *MemoryDriver) Fail(ctx context.Context, payload *Payload, _ error) error
- func (m *MemoryDriver) Pop(ctx context.Context, queue string) (*Payload, error)
- func (m *MemoryDriver) Push(ctx context.Context, payload *Payload) error
- type MiddlewareFunc
- type Payload
- type PostgresConfig
- type PostgresDriver
- func (d *PostgresDriver) Ack(ctx context.Context, payload *Payload) error
- func (d *PostgresDriver) Close() error
- func (d *PostgresDriver) Dead(ctx context.Context, payload *Payload, err error) error
- func (d *PostgresDriver) Fail(ctx context.Context, payload *Payload, _ error) error
- func (d *PostgresDriver) Pop(ctx context.Context, queue string) (*Payload, error)
- func (d *PostgresDriver) Push(ctx context.Context, payload *Payload) error
- type RedisConfig
- type RedisDriver
- func (r *RedisDriver) Ack(ctx context.Context, payload *Payload) error
- func (r *RedisDriver) Close() error
- func (r *RedisDriver) Dead(ctx context.Context, payload *Payload, err error) error
- func (r *RedisDriver) Fail(ctx context.Context, payload *Payload, _ error) error
- func (r *RedisDriver) Pop(ctx context.Context, queue string) (*Payload, error)
- func (r *RedisDriver) Push(ctx context.Context, payload *Payload) error
- type Schedulable
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BackoffDelay ¶
func BackoffDelay(strategy BackoffStrategy, attempt int) time.Duration
BackoffDelay calculates the delay for a retry attempt based on the strategy.
Types ¶
type BackoffStrategy ¶
type BackoffStrategy int
BackoffStrategy defines how retry delays are calculated.
const ( BackoffExponential BackoffStrategy = iota BackoffLinear BackoffFixed )
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch dispatches multiple jobs and provides callbacks for when all succeed or any fail. The dispatcher tracks pending job counts per batch. When the worker completes or permanently fails each batch job, the tracker is notified. Once all jobs finish, the Then callback is invoked (all succeeded) or Catch (any failed permanently).
type Chain ¶
type Chain struct {
// contains filtered or unexported fields
}
Chain dispatches jobs sequentially — each job runs only after the previous one completes. The first job is dispatched immediately with remaining jobs encoded in ChainNext. When the worker successfully processes a chain job, it dispatches the next step.
type Configurable ¶
type Configurable interface {
Config() JobConfig
}
Configurable is implemented by jobs that override default configuration.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher dispatches jobs to the queue.
func NewDispatcher ¶
func NewDispatcher(driver Driver) *Dispatcher
NewDispatcher creates a job dispatcher with the given driver.
func (*Dispatcher) Batch ¶
func (d *Dispatcher) Batch(jobs ...Job) *Batch
Batch creates a batch of jobs for group dispatch with completion callbacks.
func (*Dispatcher) Chain ¶
func (d *Dispatcher) Chain(jobs ...Job) *Chain
Chain creates a sequential job chain.
func (*Dispatcher) Dispatch ¶
func (d *Dispatcher) Dispatch(ctx context.Context, job Job) error
Dispatch sends a job to the queue for immediate processing.
func (*Dispatcher) DispatchAfter ¶
DispatchAfter sends a job to the queue with a delay before processing.
func (*Dispatcher) Register ¶
func (d *Dispatcher) Register(jobType string, factory func() Job)
Register maps a job type string to a factory function for deserialization.
type Driver ¶
type Driver interface {
// Push adds a job payload to the queue.
Push(ctx context.Context, payload *Payload) error
// Pop retrieves the next available job from the queue. Blocks until a job is available or ctx is done.
Pop(ctx context.Context, queue string) (*Payload, error)
// Ack acknowledges successful processing of a job.
Ack(ctx context.Context, payload *Payload) error
// Fail marks a job as failed. If retries remain, the job is re-queued with backoff.
Fail(ctx context.Context, payload *Payload, err error) error
// Dead moves a job to the dead letter queue after exhausting retries.
Dead(ctx context.Context, payload *Payload, err error) error
// Close shuts down the driver.
Close() error
}
Driver is the interface for queue backends.
type Job ¶
type Job interface {
// Type returns a unique identifier for this job type, used for deserialization.
Type() string
// Handle executes the job logic.
Handle(ctx context.Context) error
}
Job defines a unit of work that can be dispatched to a queue.
type JobConfig ¶
type JobConfig struct {
Queue string
MaxRetries int
Backoff BackoffStrategy
Timeout time.Duration
Delay time.Duration
}
JobConfig configures per-job behavior.
func DefaultJobConfig ¶
func DefaultJobConfig() JobConfig
DefaultJobConfig returns sensible defaults for job configuration.
type MemoryDriver ¶
type MemoryDriver struct {
// contains filtered or unexported fields
}
MemoryDriver is an in-process queue using buffered channels. Suitable for development and simple single-instance deployments.
func NewMemoryDriver ¶
func NewMemoryDriver(bufSize int) *MemoryDriver
NewMemoryDriver creates an in-memory queue driver.
func (*MemoryDriver) Ack ¶
func (m *MemoryDriver) Ack(_ context.Context, _ *Payload) error
Ack acknowledges a job (no-op for memory driver).
func (*MemoryDriver) Close ¶
func (m *MemoryDriver) Close() error
Close signals all delayed job goroutines to stop and waits for them to exit.
func (*MemoryDriver) DeadJobs ¶
func (m *MemoryDriver) DeadJobs() []*DeadJob
DeadJobs returns all jobs in the dead letter queue.
type MiddlewareFunc ¶
type MiddlewareFunc func(ctx context.Context, payload *Payload, next func(context.Context) error) error
MiddlewareFunc wraps job execution for cross-cutting concerns.
func CircuitBreaker ¶
func CircuitBreaker(consecutiveThreshold int, resetTimeout time.Duration) MiddlewareFunc
CircuitBreaker creates middleware that stops processing jobs after consecutiveThreshold consecutive failures. The circuit stays open for the given resetTimeout, after which a single job is allowed through (half-open state). If that job succeeds, the circuit closes; if it fails, the circuit reopens.
func Deduplication ¶
func Deduplication(ttl time.Duration) MiddlewareFunc
Deduplication creates middleware that skips jobs whose ID has been seen within the given TTL window. Prevents duplicate processing when the same job is enqueued multiple times. Expired entries are pruned on every call to prevent unbounded memory growth.
func RateLimiter ¶
func RateLimiter(maxPerSecond int) MiddlewareFunc
RateLimiter creates middleware that throttles job processing per job type. At most maxPerSecond jobs of each type are processed within a one-second window. Jobs that exceed the limit return an error (causing a retry with backoff).
type Payload ¶
type Payload struct {
ID string `json:"id"`
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Queue string `json:"queue"`
Attempts int `json:"attempts"`
MaxRetry int `json:"max_retry"`
Backoff BackoffStrategy `json:"backoff"`
Timeout time.Duration `json:"timeout"`
RunAt time.Time `json:"run_at"`
BatchID string `json:"batch_id,omitempty"`
ChainNext json.RawMessage `json:"chain_next,omitempty"`
// Meta holds driver-specific metadata (e.g. Redis stream message IDs).
// Not serialized as part of the job payload.
Meta map[string]string `json:"-"`
}
Payload is the serialized representation of a job on the queue.
type PostgresConfig ¶
PostgresConfig configures the PostgreSQL SKIP LOCKED queue driver.
type PostgresDriver ¶
type PostgresDriver struct {
// contains filtered or unexported fields
}
PostgresDriver implements Driver using PostgreSQL with FOR UPDATE SKIP LOCKED. Provides transactional job processing with row-level locking for safe concurrent consumption across multiple workers.
func NewPostgresDriver ¶
func NewPostgresDriver(ctx context.Context, cfg PostgresConfig) (*PostgresDriver, error)
NewPostgresDriver creates a PostgreSQL queue driver, connects to the database, and creates the jobs table if it does not exist.
func (*PostgresDriver) Ack ¶
func (d *PostgresDriver) Ack(ctx context.Context, payload *Payload) error
Ack acknowledges successful processing by deleting the job row.
func (*PostgresDriver) Close ¶
func (d *PostgresDriver) Close() error
Close shuts down the PostgreSQL connection pool.
func (*PostgresDriver) Dead ¶
Dead marks a job as dead after exhausting retries and stores the error message.
func (*PostgresDriver) Fail ¶
Fail re-queues a failed job by resetting its status to pending with backoff delay.
type RedisConfig ¶
type RedisConfig struct {
Addr string
Password string
DB int
Prefix string
Group string
Consumer string
}
RedisConfig configures the Redis Streams queue driver.
type RedisDriver ¶
type RedisDriver struct {
// contains filtered or unexported fields
}
RedisDriver implements Driver using Redis Streams. Each queue maps to a stream named {prefix}queue:{queueName}. Consumer groups provide at-least-once delivery semantics.
func NewRedisDriver ¶
func NewRedisDriver(cfg RedisConfig) *RedisDriver
NewRedisDriver creates a Redis Streams queue driver.
func (*RedisDriver) Ack ¶
func (r *RedisDriver) Ack(ctx context.Context, payload *Payload) error
Ack acknowledges successful processing of a job via XACK and deletes the message.
func (*RedisDriver) Close ¶
func (r *RedisDriver) Close() error
Close shuts down the Redis client connection.
func (*RedisDriver) Dead ¶
Dead moves a job to the dead letter stream after acknowledging the original.
func (*RedisDriver) Fail ¶
Fail re-queues a job with backoff delay after acknowledging the original message.
type Schedulable ¶
type Schedulable interface {
Job
// Schedule returns a cron expression for when the job should run.
Schedule() string
}
Schedulable is implemented by jobs that should run on a schedule.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool processes jobs from queues with configurable concurrency.
func NewWorkerPool ¶
func NewWorkerPool(dispatcher *Dispatcher, logger *slog.Logger) *WorkerPool
NewWorkerPool creates a worker pool with the given dispatcher.
func (*WorkerPool) Drain ¶
func (wp *WorkerPool) Drain()
Drain waits for all in-progress jobs to complete.
func (*WorkerPool) Queue ¶
func (wp *WorkerPool) Queue(name string, concurrency int) *WorkerPool
Queue configures the number of workers for a named queue.
func (*WorkerPool) Start ¶
func (wp *WorkerPool) Start(ctx context.Context) error
Start launches workers for all configured queues. Blocks until ctx is cancelled.
func (*WorkerPool) Use ¶
func (wp *WorkerPool) Use(mw ...MiddlewareFunc)
Use adds middleware to the worker pool's processing chain.