pgxq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 16 Imported by: 0

README

pgxq

PostgreSQL job queue for pgx/v5. One table, one dependency, no magic.

  • Transactional enqueue — insert jobs in the same transaction as your business data
  • SELECT FOR UPDATE SKIP LOCKED — efficient concurrent polling without advisory locks
  • Retry with backoff — exponential, constant, or custom
  • Rescue orphaned jobs — automatic recovery after worker crashes
  • Graceful shutdown — waits for active jobs to complete

Install

go get github.com/errisnotnil/go-pgxq

Requires Go 1.24+ and PostgreSQL 10+.

Schema

Create the job table (or call pgxq.Migrate):

CREATE TABLE IF NOT EXISTS pgxq_job (
    id            BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    kind          TEXT        NOT NULL,
    args          JSONB       NOT NULL DEFAULT '{}',
    state         TEXT        NOT NULL DEFAULT 'available',
    queue         TEXT        NOT NULL DEFAULT 'default',
    priority      SMALLINT    NOT NULL DEFAULT 0,
    attempt       SMALLINT    NOT NULL DEFAULT 0,
    max_attempts  SMALLINT    NOT NULL DEFAULT 5,
    scheduled_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    attempted_at  TIMESTAMPTZ,
    completed_at  TIMESTAMPTZ,
    error         TEXT,
    CHECK (state IN ('available', 'running', 'completed', 'failed', 'discarded'))
);

CREATE INDEX IF NOT EXISTS idx_pgxq_job_fetch
    ON pgxq_job (queue, priority, scheduled_at)
    WHERE state = 'available';

CREATE INDEX IF NOT EXISTS idx_pgxq_job_rescue
    ON pgxq_job (attempted_at)
    WHERE state = 'running';

Or programmatically:

pgxq.Migrate(ctx, pool, pgxq.DefaultTable)

Custom table name (including schema.table):

pgxq.Migrate(ctx, pool, "myapp.jobs")

Usage

Enqueue
// Standalone
pgxq.Insert(ctx, pool, "send_email", SendEmail{To: "user@example.com", Subject: "Hello", Body: "..."})

// In a transaction — job commits or rolls back with your data
tx, _ := pool.Begin(ctx)
queries.CreateAccount(ctx, tx, ...)
pgxq.Insert(ctx, tx, "send_email", SendEmail{To: "user@example.com", Subject: "Welcome", Body: "..."})
tx.Commit(ctx)

Insert accepts any pgxq.DBTX — works with pgxpool.Pool, pgx.Tx, and pgx.Conn. Args can be any JSON-marshalable value: struct, map, string, or nil.

Options:

pgxq.Insert(ctx, tx, "send_email", args,
    pgxq.WithQueue("emails"),       // default: "default"
    pgxq.WithPriority(-1),          // lower = runs first, default: 0
    pgxq.WithScheduledAt(tomorrow), // default: now
    pgxq.WithMaxAttempts(10),       // default: 5
)
Process

Each handler receives a pgx.Tx. On success, the transaction is committed — any database work done through tx is atomic with the job completion. On error, the transaction is rolled back and the job is retried.

client, err := pgxq.NewClient(pgxq.ClientConfig{
    Pool: pool,
})
if err != nil {
    log.Fatal(err)
}

client.Handle("send_email", func(ctx context.Context, _ pgx.Tx, job *pgxq.Job) error {
    args, err := pgxq.UnmarshalArgs[SendEmail](job)
    if err != nil {
        return pgxq.Discard(err) // bad payload, don't retry
    }
    return emailService.Send(ctx, args.To, args.Subject, args.Body)
})

go client.Start()
// on shutdown:
client.Stop(shutdownCtx)

Use tx when you need atomicity with the job completion:

client.Handle("process_payment", func(ctx context.Context, tx pgx.Tx, job *pgxq.Job) error {
    args, err := pgxq.UnmarshalArgs[ProcessPayment](job)
    if err != nil {
        return pgxq.Discard(err)
    }
    // Business logic and job completion commit together.
    // If commit fails, everything rolls back and the job is retried.
    _, err = tx.Exec(ctx, "UPDATE account SET balance = balance - $1 WHERE id = $2", args.Amount, args.AccountID)
    return err
})
Retry and discard
  • Return nil — transaction committed, job completed
  • Return error — transaction rolled back, job retried with backoff (up to max_attempts)
  • Return pgxq.Discard(err) — transaction rolled back, job discarded immediately
Configuration
pgxq.ClientConfig{
    Pool:         pool,
    Table:        pgxq.DefaultTable, // custom table name, e.g. "myapp.jobs"
    Queue:        "default",         // queue to poll
    PollInterval: time.Second,       // delay between polls when idle
    BatchSize:    10,                // jobs fetched per poll
    MaxWorkers:   100,               // max concurrent handlers
    RescueAfter:  time.Hour,         // recover orphaned running jobs after this
    Backoff:      pgxq.DefaultBackoff,
    Logger:       slog.Default(),
}

Pool accepts *pgxpool.Pool specifically — the client needs connection pooling for concurrent workers.

Logger accepts *slog.Logger. Use slog.New(handler) to integrate with any logging library (zap, zerolog, etc.) via its slog.Handler implementation.

Custom backoff
// Exponential: base * factor^(attempt-1), capped, with 20% jitter
pgxq.ExponentialBackoff(time.Second, 2.0, time.Hour)

// Constant
pgxq.ConstantBackoff(5 * time.Second)

// Custom
func myBackoff(attempt int) time.Duration {
    return time.Duration(attempt) * 10 * time.Second
}

Job lifecycle

available → running → completed
                ↓
            (error) → available (retry)
                ↓
            (max attempts) → failed
                ↓
            (Discard) → discarded

Orphaned jobs (worker crashed while processing) are automatically returned to available after RescueAfter.

Notes

  • Handler context: handlers must respect ctx.Done() for graceful shutdown. If a handler blocks ignoring context cancellation, Stop will return a timeout error and the Start goroutine will leak.
  • Cleanup: completed, failed, and discarded jobs stay in the table. Periodically run DELETE FROM pgxq_job WHERE state IN ('completed', 'failed', 'discarded') AND completed_at < now() - interval '7 days' (or similar) to keep the table small.
  • Client reuse: a Client cannot be reused after Stop — create a new one if needed.

License

MIT

Documentation

Overview

Package pgxq provides a PostgreSQL-backed job queue for pgx/v5.

Jobs are enqueued with Insert and processed by a Client worker pool. Enqueue works with any DBTX — pass a pgx.Tx for transactional guarantees or a pgxpool.Pool for standalone inserts.

Index

Constants

View Source
const DefaultTable = "pgxq_job"

DefaultTable is the default table name used when none is specified.

Variables

View Source
var DefaultBackoff = ExponentialBackoff(time.Second, 2.0, time.Hour)

DefaultBackoff is exponential backoff: 1s base, 2x factor, 1h max, 20% jitter.

Functions

func Discard

func Discard(err error) error

Discard wraps err so that the worker discards the job instead of retrying.

return pgxq.Discard(fmt.Errorf("invalid payload: %w", err))

func Migrate

func Migrate(ctx context.Context, db DBTX, table string) error

Migrate creates the job table and indexes if they don't exist.

func Schema

func Schema(table string) (string, error)

Schema returns SQL statements to create the job table and indexes.

func UnmarshalArgs

func UnmarshalArgs[T any](job *Job) (T, error)

UnmarshalArgs decodes a job's JSON args into a value of type T.

Types

type BackoffFunc

type BackoffFunc func(attempt int) time.Duration

BackoffFunc returns the delay before the next retry based on the attempt number (1-based).

func ConstantBackoff

func ConstantBackoff(d time.Duration) BackoffFunc

ConstantBackoff returns a BackoffFunc that always returns the same delay.

func ExponentialBackoff

func ExponentialBackoff(base time.Duration, factor float64, maxDelay time.Duration) BackoffFunc

ExponentialBackoff returns a BackoffFunc with exponential delay, capped at maxDelay, with 20% jitter.

delay = min(base * factor^(attempt-1), maxDelay) * (0.8 + rand*0.4)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client polls for jobs and dispatches them to registered handlers.

Handle must be called before Start. The client cannot be reused after Stop — create a new one if needed.

func NewClient

func NewClient(cfg ClientConfig) (*Client, error)

NewClient creates a worker pool client.

func (*Client) Handle

func (c *Client) Handle(kind string, fn HandlerFunc)

Handle registers a handler for the given job kind. Must be called before Client.Start. Panics if called after Start.

func (*Client) Start

func (c *Client) Start() error

Start begins polling and processing jobs. It blocks until Client.Stop is called and all active jobs complete. Returns nil on normal shutdown. Returns an error if called more than once.

Handlers must respect ctx.Done() for graceful shutdown. If a handler blocks indefinitely ignoring context cancellation, Client.Stop will return a timeout error and the Start goroutine will leak.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) error

Stop gracefully stops the client. It stops polling for new jobs and waits for active jobs to complete. If ctx expires before all jobs finish, active job contexts are cancelled and Stop returns ctx.Err().

Safe to call before Start — cancels the poll context so a subsequent Start returns immediately.

type ClientConfig

type ClientConfig struct {
	// Pool is the pgx connection pool. Required.
	Pool *pgxpool.Pool
	// Table is the job table name. Default: [DefaultTable].
	Table string
	// Queue is the queue name to poll. Default: "default".
	Queue string
	// PollInterval is the delay between polls when no jobs are found. Default: 1s.
	PollInterval time.Duration
	// BatchSize is the max number of jobs fetched per poll. Default: 10.
	BatchSize int
	// MaxWorkers is the max concurrent job goroutines. Default: 100.
	MaxWorkers int
	// RescueAfter is the duration after which a running job is considered orphaned. Default: 1h.
	RescueAfter time.Duration
	// Backoff returns the delay before the next retry. Default: [DefaultBackoff].
	Backoff BackoffFunc
	// Logger for operational messages. Default: slog.Default().
	Logger *slog.Logger
}

ClientConfig configures a Client worker pool.

type DBTX

type DBTX interface {
	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

DBTX accepts pgxpool.Pool, pgx.Tx, and pgx.Conn.

type DiscardError

type DiscardError struct {
	Err error
}

DiscardError wraps an error to signal that the job should be discarded immediately without further retries.

func (*DiscardError) Error

func (e *DiscardError) Error() string

func (*DiscardError) Unwrap

func (e *DiscardError) Unwrap() error

type HandlerFunc

type HandlerFunc func(ctx context.Context, tx pgx.Tx, job *Job) error

HandlerFunc processes a single job within a transaction.

Handlers must be idempotent. If tx.Commit fails due to a network error, the commit may have already succeeded. The job will be rescued and retried, so the handler may execute more than once for the same job.

On success (nil return), the transaction is committed — any database work done through tx is atomic with the job completion.

On error, the transaction is rolled back and the job is retried or failed. Wrap with Discard to skip retries and mark the job as discarded.

type InsertOption

type InsertOption func(*insertOpts)

InsertOption configures a job at insert time.

func WithMaxAttempts

func WithMaxAttempts(n int16) InsertOption

WithMaxAttempts overrides the maximum number of attempts. Default: 5.

func WithPriority

func WithPriority(p int16) InsertOption

WithPriority sets the job priority. Lower values run first. Default: 0.

func WithQueue

func WithQueue(q string) InsertOption

WithQueue sets the queue name. Default: "default".

func WithScheduledAt

func WithScheduledAt(t time.Time) InsertOption

WithScheduledAt schedules the job for a future time. Default: now.

func WithTable

func WithTable(t string) InsertOption

WithTable overrides the table name for this insert. Default: DefaultTable. Must match the table used by the Client that will process this job.

type Job

type Job struct {
	ID          int64
	Kind        string
	Args        json.RawMessage
	State       JobState
	Queue       string
	Priority    int16
	Attempt     int16
	MaxAttempts int16
	ScheduledAt time.Time
	CreatedAt   time.Time
	AttemptedAt *time.Time
	CompletedAt *time.Time
	Error       *string
}

Job represents a row in the job table.

func Insert

func Insert(ctx context.Context, db DBTX, kind string, args any, opts ...InsertOption) (*Job, error)

Insert enqueues a job. Pass a pgx.Tx for transactional insert or a pgxpool.Pool for standalone. Args can be any JSON-marshalable value (struct, map, string, nil).

type JobState

type JobState string

JobState represents the lifecycle state of a job.

const (
	// JobStateAvailable means the job is ready to be picked up by a worker.
	JobStateAvailable JobState = "available"
	// JobStateRunning means a worker is currently processing the job.
	JobStateRunning JobState = "running"
	// JobStateCompleted means the job finished successfully.
	JobStateCompleted JobState = "completed"
	// JobStateFailed means the job exhausted all retry attempts.
	JobStateFailed JobState = "failed"
	// JobStateDiscarded means the job was explicitly discarded via [Discard].
	JobStateDiscarded JobState = "discarded"
)

Directories

Path Synopsis
A minimal worker that enqueues and processes a job.
A minimal worker that enqueues and processes a job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL