pgqueue

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: MIT Imports: 18 Imported by: 0

README

pgqueue

GitHub go.mod Go version License Project Status

pgqueue is a lightweight, asynchronous, durable, PostgreSQL-backed job queue for Go.

It is designed to be simple, safe, and easy to reason about, using only PostgreSQL and standard SQL.

⚠️ Project status This is primarily a learning project, which I have created to explore how background job queues work internally. That aside, pgqueue aims to follow solid, production-style patterns and is suitable for real-world experimentation and small-to-medium workloads.


Features

  • ✅ Distributed-safe workers
  • ⏱ Delayed execution
  • 🔁 Automatic retries with exponential backoff + jitter
  • 🚦 Job priorities
  • 🧠 Deduplication support
  • ⏰ Cron jobs (run once across many servers)
  • 📊 Queue metrics & stats
  • 🪵 Structured logging (slog middleware)
  • 💥 Crash-resilient, at-least-once delivery

Why pgqueue?

If you already use PostgreSQL, you don’t need Redis, SQS, or Kafka just to run background jobs.

PostgreSQL is already:

  • Durable
  • Transactional
  • Highly available
  • Operationally familiar

pgqueue builds a background job queue using:

  • SELECT … FOR UPDATE SKIP LOCKED
  • Advisory locking semantics
  • Transactions for correctness
  • LISTEN / NOTIFY for fast wake-ups

Architecture Overview

This diagram shows how producers, PostgreSQL, workers, and cron jobs interact inside pgqueue.

flowchart LR
    %% Nodes
    P["Producers<br/>queue.Enqueue()"]
    C["Cron Scheduler<br/>ScheduleCron()"]

    T["PostgreSQL<br/>tasks table"]
    A["tasks_archive"]
    N["LISTEN / NOTIFY"]

    W["Worker Pool<br/>StartConsumer(n)"]
    M["ServeMux"]
    H["Task Handlers"]
    R["Retry & Rescue"]
    S["Metrics / Stats"]

    %% Flows
    P --> T
    C --> T
    T --> N
    N --> W
    W --> M
    M --> H
    H -->|success| T
    H -->|failure| R
    R --> T
    T --> A
    W --> S

    %% Styles
    classDef producer fill:#E3F2FD,stroke:#1565C0,stroke-width:2px;
    classDef postgres fill:#E8F5E9,stroke:#2E7D32,stroke-width:2px;
    classDef worker fill:#FFF8E1,stroke:#EF6C00,stroke-width:2px;
    classDef handler fill:#F3E5F5,stroke:#6A1B9A,stroke-width:2px;
    classDef metrics fill:#ECEFF1,stroke:#455A64,stroke-width:2px;

    class P,C producer;
    class T,A,N postgres;
    class W,M,R worker;
    class H handler;
    class S metrics;  

Installation

go get github.com/i-christian/pgqueue

Initilise queue's client with options

client, err := pgqueue.NewClient(
    db,
    pgqueue.WithRescueConfig(5*time.Minute, 30*time.Minute),
		pgqueue.WithCleanupConfig(1*time.Hour, 24*time.Hour, pgqueue.ArchiveStrategy),
    // Enables cron job scheduling, which is disabled by default 
		pgqueue.WithCronEnabled(),
  )
if err != nil {
    log.Fatalf("Failed to init queue: %v", err)

Enqueue a Job

type EmailPayload struct {
    Subject string `json:"subject"`
}

client.Enqueue(
    ctx,
    "task:send:email",
    EmailPayload{Subject: "Welcome!"},
)
Enqueue with Options
client.Enqueue(
    ctx,
    "task:send:email",
    payload,
    pgqueue.WithPriority(pgqueue.HighPriority),
    pgqueue.WithDelay(5*time.Minute),
    pgqueue.WithMaxRetries(10),
    pgqueue.WithDedup("email:user:123"),
)

Supported options include:

  • Priority
  • Delayed execution
  • Retry limits
  • Deduplication keys

Start Workers (ServeMux)

pgqueue uses a ServeMux to route tasks by type, similar to http.ServeMux.

mux := pgqueue.NewServeMux()

// Middleware runs for every task
mux.Use(pgqueue.SlogMiddleware(client.Logger, client.Metrics))

// Exact match
mux.HandleFunc("task:send:email", sendEmailHandler)

// Prefix match
mux.HandleFunc("task:cleanup:", cleanupHandler)
mux.HandleFunc("task:report:", reportHandler)

// Start worker pool
server := pgqueue.NewServer(db, connStr, 3, mux)
if err := server.Start(); err != nil {
		log.Fatal(err)
	}
	log.Println("Worker server started...")


⚠️ Bounded Task Types (Important)

Task types must be bounded.

✅ Good (bounded)
task:send:email
task:cleanup:expired-sessions
task:report:daily
❌ Bad (unbounded)
task:report:user:123
task:email:user:UUID
Why this matters
  • Routing is based on task type or prefix
  • Metrics are keyed by task type
  • Unbounded types can cause unbounded memory growth

Rule of thumb: Use task categories, not per-entity identifiers.


Cron Jobs

Run scheduled jobs once, even when multiple workers or servers are running.

cronID, err := client.ScheduleCron(
	"0 * * * *",
	"hourly-report",
	TaskReportBase+"hourly",
	ReportPayload{ReportName: "Hourly"},
)
if err != nil {
	log.Fatal(err)
}

jobs, _ := client.ListCronJobs()
for _, job := range jobs {
	fmt.Printf(
		"Cron %d → next: %s\n",
		job.ID,
		job.NextRun.Format(time.DateTime),
	)
}

// Optional cleanup
client.RemoveCron(cronID)

Retries & Backoff

  • At-least-once execution
  • Automatic retries on failure
  • Exponential backoff: 2^attempts
  • Jitter added to prevent thundering-herd effects
  • Max retries configurable per job

Queue Stats

stats, _ := client.Stats(ctx)

fmt.Printf(
    "Pending: %d | Processing: %d | Failed: %d | Done: %d\n",
    stats.Pending,
    stats.Processing,
    stats.Failed,
    stats.Done,
)

Examples

A complete, runnable example demonstrating:

  • Worker pools
  • ServeMux routing
  • slog logging
  • Priorities
  • Retries
  • Cron jobs

➡️ See the full example here: 👉 Examples


Guarantees

pgqueue provides the following guarantees:

At-least-once executionNo concurrent double-processing of the same taskSafe concurrency across multiple workers and processesCrash resilience (workers can die at any point)


When Not to Use pgqueue

pgqueue is not a replacement for high-throughput message brokers.

Avoid pgqueue if you need:

  • Ultra-low latency (<1ms)
  • Massive fan-out (millions of jobs per second)
  • Cross-region replication
  • Exactly-once semantics

Documentation

Overview

Package pgqueue provides a lightweight, PostgreSQL-backed job queue for Go.

It enables asynchronous background processing using PostgreSQL while offering safe concurrency, retries with backoff, delayed jobs, and cron scheduling.

Index

Constants

View Source
const (
	TaskDone       = "done"
	TaskPending    = "pending"
	TaskProcessing = "processing"
	TaskFailed     = "failed"
)

Variables

View Source
var ErrHandlerNotFound = errors.New("handler not found for task")

Functions

This section is empty.

Types

type CleanupStrategy

type CleanupStrategy int
const (
	// DeleteStrategy hard deletes old tasks.
	DeleteStrategy CleanupStrategy = iota
	// ArchiveStrategy moves old tasks to the tasks_archive table.
	ArchiveStrategy
)

func (CleanupStrategy) String

func (c CleanupStrategy) String() string

type Client

type Client struct {
	Metrics *Metrics
	Logger  *slog.Logger
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db *sql.DB, opts ...QueueOption) (client *Client, err error)

NewClient returns a Queue's Client.

func (*Client) Close

func (c *Client) Close() error

Close shuts down the Client's background maintenance routines and Cron scheduler.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, task TaskType, payload any, opts ...EnqueueOption) error

Enqueue adds a task to the queue

func (*Client) ListCronJobs

func (c *Client) ListCronJobs() ([]CronJobInfo, error)

ListCronJobs returns a list of scheduled tasks

func (*Client) RemoveCron

func (c *Client) RemoveCron(id CronID) error

RemoveCron removes a scheduled task from cron

func (*Client) ScheduleCron

func (c *Client) ScheduleCron(
	spec string,
	jobName string,
	task TaskType,
	payload any,
) (CronID, error)

ScheduleCron registers a recurring job.

func (*Client) Stats

func (c *Client) Stats(ctx context.Context) (QueueStats, error)

type CronID

type CronID int

type CronJobInfo

type CronJobInfo struct {
	ID      CronID
	NextRun time.Time
	PrevRun time.Time
}

type EnqueueOption

type EnqueueOption func(*enqueueConfig)

EnqueueOption allows configuring options like delays or deduplication

func WithDedup

func WithDedup(key string) EnqueueOption

WithDedup ensures a task with this key is only enqueued once

func WithDelay

func WithDelay(d time.Duration) EnqueueOption

WithDelay schedules the task to run in the future

func WithMaxRetries

func WithMaxRetries(n int) EnqueueOption

WithMaxRetries overrides the default retry count (default is 5)

func WithPriority

func WithPriority(p Priority) EnqueueOption

WithPriority sets the priority

type HandlerFunc

type HandlerFunc func(context.Context, *Task) error

The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func (HandlerFunc) ProcessTask

func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error

ProcessTask calls fn(ctx, task)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics() *Metrics

func (*Metrics) RecordFailure

func (m *Metrics) RecordFailure(p Priority, t TaskType, d time.Duration)

func (*Metrics) RecordStart

func (m *Metrics) RecordStart(p Priority, t TaskType)

func (*Metrics) RecordSuccess

func (m *Metrics) RecordSuccess(p Priority, t TaskType, d time.Duration)

type Middleware

type Middleware func(WorkerHandler) WorkerHandler

Middleware wraps a WorkerHandler with extra behavior.

func SlogMiddleware

func SlogMiddleware(logger *slog.Logger, metrics *Metrics) Middleware

type Priority

type Priority int
const (
	HighPriority    Priority = 6
	DefaultPriority Priority = 3
	LowPriority     Priority = 1
)

func (Priority) String

func (p Priority) String() string

type PriorityMetrics

type PriorityMetrics struct {
	Started   int64
	Succeeded int64
	Failed    int64
	Duration  time.Duration
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

type QueueOption

type QueueOption func(*queueConfig)

QueueOption is a function that modifies the queue configuration.

func WithCleanupConfig

func WithCleanupConfig(interval, retention time.Duration, strategy CleanupStrategy) QueueOption

WithCleanupConfig configures automatic removal of old data.

params:

  • interval: how often to run the cleanup job.
  • retention: how old a 'done'/'failed' task must be to be removed.
  • strategy: either pgqueue.DeleteStrategy or pgqueue.ArchiveStrategy.

func WithCronEnabled

func WithCronEnabled() QueueOption

WithCronEnabled enables cron jobs functionality.

Cron jobs are disabled by default.

func WithRescueConfig

func WithRescueConfig(interval, visibilityTimeout time.Duration) QueueOption

WithRescueConfig configures the automatic stuck task rescue.

params:

  • interval: how often to check for stuck tasks.
  • visibilityTimeout: how long a task can stay 'processing' before being reset.

type QueueStats

type QueueStats struct {
	Pending    int
	Processing int
	Failed     int
	Done       int
	Total      int
}

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

ServeMux is a multiplexer for tasks which matches the type of each task against a list of registered patterns and calls the workerhandler for the pattern that most closely matches the task's type.

func NewServeMux

func NewServeMux() *ServeMux

NewServeMux allocates and returns a new ServeMux.

func (*ServeMux) HandleFunc

func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error)

HandleFunc registers the handler function for the given pattern.

func (*ServeMux) ProcessTask

func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error

ProcessTask dispatches the task to the handler whose pattern most closely matches the task type.

func (*ServeMux) Use

func (mux *ServeMux) Use(mw ...Middleware)

Use appends middleware to the mux. Middleware runs in the order it is added.

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(db *sql.DB, connString string, concurrency int, handler WorkerHandler) *Server

NewServer initializes the worker pool settings.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully stops the server. It cancels the internal context and waits for all workers to finish.

func (*Server) Start

func (s *Server) Start() error

Start launches the background workers and listener in a separate goroutine.

Strictly non-blocking: It returns nil immediately if startup is successful. You must call Shutdown(ctx) to stop the server and wait for workers to finish.

type Task

type Task struct {
	ID         uuid.UUID
	Type       TaskType
	Payload    json.RawMessage
	Attempts   int
	MaxRetries int
	Priority   Priority
	CreatedAt  time.Time
}

type TaskType

type TaskType string

type WorkerHandler

type WorkerHandler interface {
	ProcessTask(context.Context, *Task) error
}

WorkerHandler processes tasks.

ProcessTask should return nil if the processing of a task is successful.

func NotFoundHandler

func NotFoundHandler() WorkerHandler

NotFoundHandler returns a simple task handler that returns a “not found“ error.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL