sqlqueue

package
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package sqlqueue implements a database-backed Queue driver that survives process restarts and works across replicas. It plugs into the parent queue package via the Queue interface.

The driver uses a single `jobs` table; the schema is dialect-aware and ships its own migration via Setup(ctx). Pop selects the next available row with `... ORDER BY available_at LIMIT 1` and updates it to `reserved_at = now()` in a transaction so concurrent workers don't claim the same job.

Usage:

q, err := sqlqueue.New(conn)            // uses the project's
if err != nil { return err }            // database.Connection
if err := q.Setup(ctx); err != nil { ... } // creates the table
queue.Dispatch(ctx, q, SendEmail{...})
worker := queue.NewWorker(q)
go worker.Run(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*Queue)

Option customises a Queue.

func WithTable

func WithTable(name string) Option

WithTable overrides the table name (default "jobs").

func WithVisibilityTimeout

func WithVisibilityTimeout(d time.Duration) Option

WithVisibilityTimeout sets how long a reserved-but-not-acked job stays invisible. Default 5 minutes — pick a value larger than the longest handler runtime.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a database-backed queue. Safe for concurrent use across goroutines and processes.

func New

func New(conn *database.Connection, opts ...Option) (*Queue, error)

New constructs a Queue.

func (*Queue) Ack

func (q *Queue) Ack(ctx context.Context, jobID string) error

Ack deletes the job.

func (*Queue) Len

func (q *Queue) Len() int

Len returns the row count of the jobs table (cheap on SQLite, OK on other dialects under modest sizes; for production metrics prefer a time-bucketed query).

func (*Queue) Nack

func (q *Queue) Nack(ctx context.Context, jobID string, retryAfter time.Duration) error

Nack returns the job to the queue, incrementing attempts. If retryAfter > 0 the job stays invisible until that time.

func (*Queue) Pop

func (q *Queue) Pop(ctx context.Context, wait time.Duration) (queue.Job, error)

Pop selects the next available job within wait. Reserves it via transactional update so concurrent workers cannot pick it up.

func (*Queue) Push

func (q *Queue) Push(ctx context.Context, j queue.Job) error

Push inserts j into the queue.

func (*Queue) Setup

func (q *Queue) Setup(ctx context.Context) error

Setup creates the jobs table if it doesn't already exist. Safe to call repeatedly (CREATE TABLE IF NOT EXISTS).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL