queue

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

README

PostgreSQL Task Queue

The queue package provides a PostgreSQL-backed task queue with support for delayed tasks, retries with exponential backoff, and periodic tickers. It enables reliable background job processing through both a direct Go API and HTTP interfaces.

To test the package:

git clone github.com/mutablelogic/go-pg
make tests

You'll need to have docker installed in order to run the integration tests, which will create a PostgreSQL server in a container. There is a command line client included for testing:

git clone github.com/mutablelogic/go-pg
make cmd/pgqueue

This places a pgqueue binary in the build folder which you can use as a server or client. To run the server on localhost, port 8080:

build/pgqueue run postgres://localhost:5432/postgres

To use the client:

build/pgqueue queues
build/pgqueue tasks --status=new

Run build/pgqueue --help for more information and to understand the available commands and settings.

Architecture

The package is organized into four main components:

Manager (this package folder)

The core component that provides direct access to queue operations. It wraps a connection pool and exposes methods for managing queues, tasks, and tickers.

import "github.com/mutablelogic/go-pg/pkg/queue"

// Create a manager with namespace isolation
mgr, err := queue.New(ctx, pool, "myapp")
Schema (schema/)

Defines all data types, request/response structures, and SQL queries. Each resource type has its own file containing:

  • Structs - Go types representing queue objects (Queue, Task, Ticker)
  • Meta types - Parameters for creating/updating resources
  • SQL generation - Methods that produce parameterized SQL queries
HTTP Handler (httphandler/)

Provides REST API endpoints for all queue operations. Register handlers with an http.ServeMux:

import "github.com/mutablelogic/go-pg/pkg/queue/httphandler"

httphandler.RegisterBackendHandlers(mux, "/api", mgr)
HTTP Client (httpclient/)

A typed client for consuming the REST API from Go applications:

import "github.com/mutablelogic/go-pg/pkg/queue/httpclient"

client, err := httpclient.New("http://localhost:8080/api")
queues, err := client.ListQueues(ctx)

Core Concepts

Queues

Queues hold tasks and define retry behavior:

ttl := 24 * time.Hour
retries := uint64(3)
retryDelay := time.Minute

queue, err := mgr.RegisterQueue(ctx, schema.QueueMeta{
    Queue:      "emails",
    TTL:        &ttl,        // Task expiration
    Retries:    &retries,    // Max retry attempts
    RetryDelay: &retryDelay, // Base delay (exponential backoff)
})
Tasks

Tasks progress through a lifecycle:

Status Description
new Newly created, waiting to be processed
retry Failed but has retries remaining, waiting for backoff delay
retained Locked by a worker for processing
released Finished successfully
failed Exhausted all retries
expired TTL exceeded before completion

Tasks with status released, failed, or expired are automatically cleaned up when the queue's TTL expires.

Create and process tasks:

// Create a task
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
    Payload: map[string]any{"to": "user@example.com"},
})

// Create a delayed task
delayedAt := time.Now().Add(time.Hour)
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
    Payload:   map[string]any{"to": "user@example.com"},
    DelayedAt: &delayedAt,
})

// Worker: get next available task from specific queue
task, err := mgr.NextTask(ctx, "worker-1", "emails")

// Worker: get next available task from any queue
task, err := mgr.NextTask(ctx, "worker-1")

// Worker: get next available task from multiple queues
task, err := mgr.NextTask(ctx, "worker-1", "emails", "notifications")

// Release task (success)
mgr.ReleaseTask(ctx, task.Id, true, nil, &status)

// Release task (failure - will retry with backoff)
mgr.ReleaseTask(ctx, task.Id, false, errPayload, &status)
Tickers

Tickers generate tasks on a schedule:

period := time.Hour
ticker, err := mgr.RegisterTicker(ctx, schema.TickerMeta{
    Ticker:  "hourly-report",
    Queue:   "reports",
    Period:  &period,
    Payload: map[string]any{"type": "hourly"},
})

// Process matured tickers
mgr.RunTickerLoop(ctx, func(ticker *schema.Ticker) error {
    _, err := mgr.CreateTask(ctx, ticker.Queue, schema.TaskMeta{
        Payload: ticker.Payload,
    })
    return err
})
Namespaces

Each manager operates within a namespace for multi-tenant isolation:

appMgr, _ := queue.New(ctx, pool, "app")
adminMgr, _ := queue.New(ctx, pool, "admin")
// Queues and tasks are completely independent

A special pgqueue system namespace is automatically created and used for internal operations like expired task cleanup. This namespace should not be used by applications.

REST API Endpoints

All endpoints are prefixed with the configured path (e.g., /api):

Namespace
Method Path Description
GET {prefix}/namespace List all namespaces
Queue
Method Path Description
GET {prefix}/queue List queues
POST {prefix}/queue Create queue
GET {prefix}/queue/{name} Get queue by name
PATCH {prefix}/queue/{name} Update queue
DELETE {prefix}/queue/{name} Delete queue
Task
Method Path Description
GET {prefix}/task List tasks (filter: ?queue=, ?status=, ?offset=, ?limit=)
POST {prefix}/task Create task
PUT {prefix}/task Retain next task from any queue (requires ?worker=)
PUT {prefix}/task/{queue} Retain next task from specific queue (requires ?worker=)
PATCH {prefix}/task/{id} Release task with result ({"fail": bool, "result": any})
Ticker
Method Path Description
GET {prefix}/ticker List tickers
POST {prefix}/ticker Create ticker
GET {prefix}/ticker/{name} Get ticker by name
PATCH {prefix}/ticker/{name} Update ticker
DELETE {prefix}/ticker/{name} Delete ticker
GET {prefix}/ticker/next SSE stream of matured tickers
Metrics
Method Path Description
GET {prefix}/metrics Prometheus metrics

Exposes the queue_tasks gauge metric with labels for namespace, queue, and status.

CLI Commands

# Namespace operations
pgqueue namespaces                 # List namespaces

# Queue operations
pgqueue queues                     # List queues
pgqueue queue myqueue              # Get queue details
pgqueue create-queue myqueue       # Create queue
pgqueue update-queue myqueue       # Update queue
pgqueue delete-queue myqueue       # Delete queue

# Task operations
pgqueue tasks                      # List all tasks
pgqueue tasks --queue=myqueue      # Filter by queue
pgqueue tasks --status=new         # Filter by status
pgqueue create-task myqueue        # Create task
pgqueue retain-task --worker=worker1 myqueue  # Retain next task from specific queue
pgqueue retain-task --worker=worker1          # Retain next task from any queue
pgqueue retain-task --worker=worker1 queue1 queue2  # Retain from multiple queues
pgqueue complete-task 123          # Release task (success)
pgqueue complete-task 123 --error '{"msg":"failed"}'  # Release task (failure)

# Ticker operations
pgqueue tickers                    # List tickers
pgqueue ticker myticker            # Get ticker details
pgqueue create-ticker myticker     # Create ticker
pgqueue update-ticker myticker     # Update ticker
pgqueue delete-ticker myticker     # Delete ticker
pgqueue next-ticker                # Stream matured tickers (SSE)

# Server
pgqueue run postgres://...         # Run HTTP server

Dependencies

  • github.com/mutablelogic/go-pg - PostgreSQL connection pool
  • github.com/mutablelogic/go-server - HTTP utilities
  • github.com/mutablelogic/go-client - HTTP client

Documentation

Overview

Package queue provides a PostgreSQL-backed task queue with support for delayed tasks, retries with exponential backoff, and periodic tickers.

Manager

Create a manager with namespace isolation:

mgr, err := queue.New(ctx, pool, "myapp")
if err != nil {
	panic(err)
}

Queues

Register queues that define retry behavior:

ttl := 24 * time.Hour
retries := uint64(3)
retryDelay := time.Minute

queue, err := mgr.RegisterQueue(ctx, schema.QueueMeta{
	Queue:      "emails",
	TTL:        &ttl,
	Retries:    &retries,
	RetryDelay: &retryDelay,
})

Tasks

Create and process tasks:

// Create a task
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
	Payload: map[string]any{"to": "user@example.com"},
})

// Retain next task from a specific queue
task, err := mgr.NextTask(ctx, "worker-1", "emails")

// Retain next task from any queue
task, err := mgr.NextTask(ctx, "worker-1")

// Retain next task from multiple queues
task, err := mgr.NextTask(ctx, "worker-1", "emails", "notifications")

// Release task (success)
mgr.ReleaseTask(ctx, task.Id, true, nil, &status)

// Release task (failure - will retry with backoff)
mgr.ReleaseTask(ctx, task.Id, false, errPayload, &status)

WorkerPool

Use WorkerPool for concurrent task processing:

pool, err := queue.NewWorkerPool(mgr,
	queue.WithWorkers(4),
	queue.WithWorkerName("worker-1"),
)

// Register queue handlers
pool.RegisterQueue(ctx, schema.QueueMeta{Queue: "emails"}, func(ctx context.Context, task *schema.Task) error {
	// Process task
	return nil
})

// Run blocks until context is cancelled
err = pool.Run(ctx)

Tickers

Register periodic tickers:

interval := time.Hour
ticker, err := mgr.RegisterTicker(ctx, schema.TickerMeta{
	Ticker:   "hourly-report",
	Interval: &interval,
})

Subpackages

  • schema: Data types, request/response structures, and SQL generation
  • httphandler: REST API handlers for all queue operations
  • httpclient: Typed Go client for the REST API

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidWorkers = errors.New("workers must be >= 1")
	ErrInvalidPeriod  = errors.New("period must be >= 1ms")
)

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, conn pg.PoolConn, namespace string) (*Manager, error)

New creates a new queue manager. The namespace parameter is used to scope all queue operations.

func (*Manager) CleanQueue

func (manager *Manager) CleanQueue(ctx context.Context, name string) ([]schema.Task, error)

CleanQueue removes stale tasks from a queue, and returns the tasks removed

func (*Manager) Conn

func (manager *Manager) Conn() pg.PoolConn

func (*Manager) CreateTask

func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (*schema.Task, error)

CreateTask creates a new task, and returns it.

func (*Manager) DeleteQueue

func (manager *Manager) DeleteQueue(ctx context.Context, name string) (*schema.Queue, error)

DeleteQueue deletes an existing queue, and returns it

func (*Manager) DeleteTicker

func (manager *Manager) DeleteTicker(ctx context.Context, name string) (*schema.Ticker, error)

DeleteTicker deletes an existing ticker, and returns the deleted ticker.

func (*Manager) GetQueue

func (manager *Manager) GetQueue(ctx context.Context, name string) (*schema.Queue, error)

GetQueue returns a queue by name

func (*Manager) GetTicker

func (manager *Manager) GetTicker(ctx context.Context, name string) (*schema.Ticker, error)

GetTicker returns a ticker by name

func (*Manager) ListNamespaces

func (manager *Manager) ListNamespaces(ctx context.Context, req schema.NamespaceListRequest) (*schema.NamespaceList, error)

ListNamespaces returns all distinct namespaces from the queue table

func (*Manager) ListQueueStatuses

func (manager *Manager) ListQueueStatuses(ctx context.Context) ([]schema.QueueStatus, error)

ListQueueStatuses returns the status breakdown for all queues in the namespace

func (*Manager) ListQueues

func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (*schema.QueueList, error)

ListQueues returns all queues in a namespace as a list

func (*Manager) ListTasks

func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (*schema.TaskList, error)

ListTasks returns all tasks in a namespace as a list, with optional filtering

func (*Manager) ListTickers

func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (*schema.TickerList, error)

ListTickers returns all tickers in a namespace as a list

func (*Manager) Namespace

func (manager *Manager) Namespace() string

func (*Manager) NextTask

func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (*schema.Task, error)

NextTask retains a task from any of the specified queues, and returns it. If queues is empty, tasks from any queue are considered. Returns nil if there is no task to retain.

func (*Manager) NextTicker

func (manager *Manager) NextTicker(ctx context.Context) (*schema.Ticker, error)

NextTicker returns the next matured ticker, or nil

func (*Manager) NextTickerNs

func (manager *Manager) NextTickerNs(ctx context.Context, namespace string) (*schema.Ticker, error)

NextTickerNs returns the next matured ticker in a namespace, or nil

func (*Manager) RegisterQueue

func (manager *Manager) RegisterQueue(ctx context.Context, meta schema.QueueMeta) (*schema.Queue, error)

RegisterQueue creates a new queue, or updates an existing queue, and returns it.

func (*Manager) RegisterTicker

func (manager *Manager) RegisterTicker(ctx context.Context, meta schema.TickerMeta) (*schema.Ticker, error)

RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.

func (*Manager) RegisterTickerNs

func (manager *Manager) RegisterTickerNs(ctx context.Context, namespace string, meta schema.TickerMeta) (*schema.Ticker, error)

RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.

func (*Manager) ReleaseTask

func (manager *Manager) ReleaseTask(ctx context.Context, task uint64, success bool, result any, status *string) (*schema.Task, error)

ReleaseTask releases a task from a queue, and returns it. Can optionally set the status

func (*Manager) Run

func (manager *Manager) Run(ctx context.Context) error

Run starts the background ticker loop for cleanup tasks in the system namespace. It runs until the context is cancelled. This should be called as a goroutine.

func (*Manager) RunTaskLoop

func (manager *Manager) RunTaskLoop(ctx context.Context, ch chan<- *schema.Task, worker string, queues ...string) error

RunTaskLoop runs a loop to process tasks, until the context is cancelled or an error occurs. It uses both polling and LISTEN/NOTIFY to pick up tasks immediately when they're created. If queues is empty, tasks from any queue are considered.

func (*Manager) RunTickerLoop

func (manager *Manager) RunTickerLoop(ctx context.Context, ch chan<- *schema.Ticker, period time.Duration) error

RunTickerLoop runs a loop to process matured tickers, until the context is cancelled, or an error occurs. The period parameter controls the sleep duration between checks when no ticker is found. When a ticker is found, it immediately polls again to drain all matured tickers.

func (*Manager) RunTickerLoopNs

func (manager *Manager) RunTickerLoopNs(ctx context.Context, namespace string, ch chan<- *schema.Ticker, period time.Duration) error

RunTickerLoopNs runs a loop to process matured tickers in a namespace, until the context is cancelled, or an error occurs. The period parameter controls the sleep duration between checks when no ticker is found. When a ticker is found, it immediately polls again to drain all matured tickers.

func (*Manager) UpdateQueue

func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schema.QueueMeta) (*schema.Queue, error)

UpdateQueue updates an existing queue, and returns it.

func (*Manager) UpdateTicker

func (manager *Manager) UpdateTicker(ctx context.Context, name string, meta schema.TickerMeta) (*schema.Ticker, error)

UpdateTicker updates an existing ticker, and returns it.

type Opt

type Opt func(*opts) error

Opt is a functional option for worker pool, queue, and ticker configuration.

func WithPeriod

func WithPeriod(d time.Duration) Opt

WithPeriod sets the polling period for a ticker. Returns ErrInvalidPeriod if d < 1ms.

func WithWorkerName

func WithWorkerName(name string) Opt

WithWorkerName sets the worker name used to identify this worker instance. Defaults to the hostname if not specified.

func WithWorkers

func WithWorkers(n int) Opt

WithWorkers sets the number of concurrent workers for the worker pool. The worker pool uses a shared pool of workers to process tasks from any registered queue. Returns ErrInvalidWorkers if n < 1.

type TaskHandler

type TaskHandler func(context.Context, *schema.Task) error

TaskHandler processes a task. Return nil on success, or an error to fail the task.

type TickerHandler

type TickerHandler func(context.Context, *schema.Ticker) error

TickerHandler processes a matured ticker. Return nil on success, or an error to stop.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages workers for processing tasks and tickers.

func NewWorkerPool

func NewWorkerPool(manager *Manager, opt ...Opt) (*WorkerPool, error)

NewWorkerPool creates a new worker pool for the given manager.

func (*WorkerPool) RegisterQueue

func (wp *WorkerPool) RegisterQueue(ctx context.Context, meta schema.QueueMeta, handler TaskHandler) (*schema.Queue, error)

RegisterQueue registers a queue with its handler and creates/updates it in the database.

func (*WorkerPool) RegisterTicker

func (wp *WorkerPool) RegisterTicker(ctx context.Context, meta schema.TickerMeta, handler TickerHandler) (*schema.Ticker, error)

RegisterTicker registers a handler for the named ticker and creates/updates it in the database.

func (*WorkerPool) Run

func (wp *WorkerPool) Run(ctx context.Context) error

Run starts all workers and blocks until context is cancelled or an error occurs.

Directories

Path Synopsis
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API.
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API.
Package httphandler provides HTTP handlers for the queue package.
Package httphandler provides HTTP handlers for the queue package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL