package module
v0.0.0-...-b92a6cf Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2021 License: MIT Imports: 11 Imported by: 0


pgq Build Status Coverage Status

pgq is a Go library for job queues that use Postgres for persistence. It builds on the SKIP LOCKED functionality added in Postgres 9.5, which provides safe locking of in-progress jobs while keeping queries very simple and readable.

Installation and Setup

You can install pgq with go get:

go get

Before you can enqueue or process jobs with pgq, your Postgres database will need the table and index defined in sql/create_table.sql. You can paste this right into psql command line while connected to your database, or paste it into the migration tool of your choice. (I'm partial to Pomegranate.)


There are a few steps to using pgq:

  1. Define job handler functions.
  2. Instantiate a new Worker.
  3. Register your handler functions with the Worker.
  4. Start the Worker.
  5. In another process, enqueue jobs with a Worker.

Each of these steps will be explained in more detail below.

Defining Job Handlers

While pgq will take care of pulling jobs off the queue, it's up to you to tell it exactly what each job does. This is done by defining and registering handler functions. A job handler func takes one []byte argument, and returns either an error or nil. Because job payloads are sent as bytes, you can put whatever data you need in there. It may be plain text, a URL, JSON, or a binary encoding like protobuf.

Here's a simple example:

func SayHello(data []byte) error {
  name := string(data)
  fmt.Println("Hello " + name)

That example might be too simple for a real app though. You'll probably want your jobs to have access to some resources, like config or a database. In that case you can use either struct methods or closures to inject those resources. Here's an example of a struct method job handler:

type MyApp struct {
  db *sql.DB

func (app *MyApp) SayGoodbye(data []byte) error {
  // this is a silly example of a DB query, but illustrates the point.
  var message string
  err := app.db.QueryRow(`SELECT 'Goodbye ' || $1`, string(data)).Scan(&message)
  if err != nil {
    return err

Here's the equivalent using a closure:

package main

func main() {
  db, _ := sql.Open("postgres", "postgres://postgres@/my_database?sslmode=disable")

  myJobHandler := buildGoodbyeHandler(db)

func buildGoodbyeHandler(db *sql.DB) func([]byte) error {
	return func(data []byte) error {
		// we have access to db here because this function is declared in the same scope that db lives in
		var message string
		err := db.QueryRow(`SELECT 'Goodbye ' || $1`, string(data)).Scan(&message)
		if err != nil {
			return err

(That example, like several below omits some error handling code. Don't do that in real life. It's only done here so you can see details about using pgq without getting lost in error handling noise.)

Processing Jobs with the Worker

Once you've defined a handler function, you need to register it with an instance of pgq.Worker. You can get one of those by passing your Postgres *sql.DB instance into pgq.NewWorker.

db, _ := sql.Open("postgres", "postgres://postgres@/my_database?sslmode=disable")
worker := pgq.NewWorker(db)

Now you can register your job handler functions with this worker by calling its RegisterQueue method and passing two arguments: the queue name (which can be any string you like), and your handler function.

err := worker.RegisterQueue("hello", SayHello)

If you're using struct methods as job handler functions, you need to instantiate your struct and then register its methods. This example uses the MyApp struct shown above.

db, _ := sql.Open("postgres", "postgres://postgres@/my_database?sslmode=disable")
myApp := &MyApp{db: db}
err := worker.RegisterQueue("goodbye", myApp.SayGoodbye)

If you attempt to register the same queue name more than once on the same worker, you'll get an error.

With your handler functions now registered, you start the job worker by calling its Run method. This call will query for uncompleted jobs and call the appropriate handler function for each of them.

err := worker.Run()

This will loop and perform jobs forever. It will only stop if it hits an unrecoverable error, or the program is terminated, or your program stops the loop by sending a value on the worker's stop channel, like this:

worker.StopChan <- true

The worker will only query for jobs that have a queue name that matches one of its registered handlers. If there's a job in the queue with an unregistered queue name, it will be ignored. You can use this feature to start separate processes for handling different job types. This is useful if some queues need to be scaled up to more worker processes than others.

Putting Jobs on the Queue

To enqueue jobs you need to have a Worker instance and then call its EnqueueJob method, which takes a queue name, a []byte payload, and some optional arguments we'll cover later. EnqueueJob will return an integer job ID (only useful for logging/debugging), and possibly an error.

db, _ := sql.Open("postgres", "postgres://postgres@/my_database?sslmode=disable")
worker := pgq.NewWorker(db)
jobID, err := worker.EnqueueJob("hello", []byte("Brent"))

If you're using the same Postgres database for both pgq and your own application tables, you might want to have the enqueueing of jobs happen in the same database transaction where you're doing other database calls. In that case, you can use the worker's EnqueueJobInTx method, which lets you pass in your own transaction object. It's up to you to ensure that the transaction is properly committed or rolled back. Example:

db, _ := sql.Open("postgres", "postgres://postgres@/my_database?sslmode=disable")
worker := pgq.NewWorker(db)

// later on in some other part of your app...
tx, _ := db.Begin()

// we use the same transaction for creating the user and enqueueing the welcome email
// so both will succeed or fail together, and we avoid sending a welcome email to a user 
// whose account never actually got created because the tx got rolled back.
createUser(tx, someUserData)
jobID, err := worker.EnqueueJobInTx(tx, "sendWelcomeEmail", someWelcomeEmailData)

if thereWasAnErrorSomewhereElse {
} else {

Additional Features

There are some additional things you can configure, both at the per-worker level and the per-job level.

Runner Options

These options are passed as additional arguments when calling pgq.NewWorker, using the functional options pattern.


After a worker completes a job, it immediately queries for another one. If there are no jobs waiting in a queue, it will sleep for a few seconds before querying again. By default that's 10 seconds. You can change that default by passing in the JobPollingInterval option with a time.Duration to NewWorker. In this example we increase it to 30 seconds.

worker, err := pgq.NewWorker(
  pgq.JobPollingInterval(time.Second * 30),

By default, jobs are deleted from the pgq_jobs table after being performed. If you would prefer to leave them in the table for analytics or debugging purposes, you can pass the PreserveCompletedJobs option to pgq.NewWorker:

worker, err := pgq.NewWorker(
Job Options

If you want a job to be processed at some time in the future instead of immediately, you can pass the pgq.After option to the worker's EnqueueJob method. This option takes a time.Time. This example sets a job to be run 24 hours in the future.

jobID, err := worker.EnqueueJob(
  pgq.After(time.Now().Add(time.Minute * 60 * 24)),

Sometimes jobs fail through no fault of their own. It could be because of downtime in a database, or a third party API, or a flaky network. It can be useful to retry jobs automatically when such things happen.

But you don't want to retry immediately. A problem that's happening now will probably still be happening one second from now.

For this reason pgq supports setting a per-job option for the number of retries to perform and how long to wait before each. If a job returns an error, and it's configured to allow one or more retries, then it will be run again.

By default pgq does three retries, with one minute, ten minute, and 30 minute waits. You can override this by passing the RetryWaits option to the worker's EnqueueJob method. This option takes in a slice of durations specifying how long each wait should be. In this example we set two retries, one after an hour and another after 6 hours:

jobID, err := worker.EnqueueJob(
    time.Minute * 60,
    time.Minute * 60 * 6,
Exponential Backoffs

While retries may help a single job be resilient to intermittent failures, they won't slow down a big queue of jobs that are all failing because some backend system is down. For that you need to tell the worker to slow down its processing of jobs on the whole queue. You can do that by having your job return an error that implements this interface:

// Backoffer is a type of error that can also indicate whether a queue should slow down.
type Backoffer interface {
	Backoff() bool

If the error returned by your job implements that interface, and its Backoff() method returns true, then the worker will pause for a short time (100 milliseconds) before pulling jobs off that queue again. The next time a job returns a backoff, that time will be doubled, and so on until reaching the max backoff time (60 seconds).



Package pgq provides an implementation of a Postgres-backed job queue. Safe concurrency is built on top of the SKIP LOCKED functionality introduced in Postgres 9.5. Retries and exponential backoff are supported.



This section is empty.


This section is empty.


func PreserveCompletedJobs

func PreserveCompletedJobs(worker *Worker)

PreserveCompletedJobs sets the runner option to leave job attempts in the pgq_jobs table instead of deleting them when complete.


type BackoffError

type BackoffError struct {
	// contains filtered or unexported fields

BackoffError is an implementation of the Backoff interface

func (BackoffError) Backoff

func (b BackoffError) Backoff() bool

Backoff implements the Backoffer interface.

func (BackoffError) Error

func (b BackoffError) Error() string

Error implements the error interface.

type Backoffer

type Backoffer interface {
	Backoff() bool

Backoffer is a type of error that can also indicate whether a queue should slow down.

func Backoff

func Backoff(msg string) Backoffer

Backoff returns an error that implements the Backoffer interface, telling the caller processing the error to slow the queue down.

type DB

type DB interface {
	Exec(string, ...interface{}) (sql.Result, error)
	QueryRow(string, ...interface{}) *sql.Row

DB is anything with the DB methods on it that we need. (like a DB or a Tx)

type Durations

type Durations []time.Duration

Durations is our own alias for time.Duration slices, so we can stick Scan and Value methods on them. End users shouldn't have to use this type at all.

func (*Durations) Scan

func (ds *Durations) Scan(src interface{}) error

Scan converts raw db bytes into Durations. Nulls will be ignored.

func (Durations) Value

func (ds Durations) Value() (driver.Value, error)

Value marshals the slice the Postgres syntax for a TEXT[] literal.

type Job

type Job struct {
	ID         int64       `db:"id"`
	CreatedAt  time.Time   `db:"created_at"`
	QueueName  string      `db:"queue_name"`
	Data       []byte      `db:"data"`
	RunAfter   time.Time   `db:"run_after"`
	RetryWaits Durations   `db:"retry_waits"`
	RanAt      null.Time   `db:"ran_at"`
	Error      null.String `db:"error"`

Job contains all the info needed to execute a single job attempt.

type JobOption

type JobOption func(*Job)

A JobOption sets an optional parameter on a Job that you're enqueueing.

func After

func After(t time.Time) JobOption

After sets the timestamp after which the Job may run.

func RetryWaits

func RetryWaits(ds []time.Duration) JobOption

RetryWaits explicitly sets the wait periods for the Job's retries.

type Worker

type Worker struct {
	StopChan chan bool
	// contains filtered or unexported fields

Worker provides methods for putting jobs on a Postgres-backed queue, and performing any jobs that are there.

func NewWorker

func NewWorker(db *sql.DB, options ...WorkerOption) *Worker

NewWorker takes a Postgres DB connection and returns a Worker instance.

func (*Worker) EnqueueJob

func (worker *Worker) EnqueueJob(queueName string, data []byte, options ...JobOption) (int, error)

EnqueueJob puts a job on the queue. If successful, it returns the Job ID.

func (*Worker) EnqueueJobInTx

func (worker *Worker) EnqueueJobInTx(tx DB, queueName string, data []byte, options ...JobOption) (int, error)

EnqueueJobInTx enqueues a Job, but lets you provide your own sql.Tx or other compatible object with an Exec method. This is useful if your application has other tables in the same database, and you want to only enqueue the job if all the DB operations in the same transaction are successful. All the handling of Begin, Commit, and Rollback calls is up to you.

func (*Worker) PerformNextJob

func (worker *Worker) PerformNextJob() (attempted bool, outErr error)

PerformNextJob performs the next job in the queue. It returns true if it attempted to run a job, or false if there was no job in the queue or some error prevented it from attempting to run the job. It only returns an error if there's some problem talking to Postgres. Errors inside jobs are not bubbled up.

func (*Worker) RegisterQueue

func (worker *Worker) RegisterQueue(queueName string, jobFunc func([]byte) error) error

RegisterQueue tells your Worker instance which function should be called for a given job type.

func (*Worker) Run

func (worker *Worker) Run() error

Run will query for the next job in the queue, then run it, then do another, forever.

type WorkerOption

type WorkerOption func(*Worker)

A WorkerOption sets an optional parameter on the Worker.

func JobPollingInterval

func JobPollingInterval(d time.Duration) WorkerOption

JobPollingInterval sets the amount of time that the runner will sleep if it has no jobs to do. Default is 10 seconds.

func OnStop

func OnStop(f func()) WorkerOption

OnStop sets an optional callback function that will be called when the runner exits its Run method.

func SetLogger

func SetLogger(l *log.Logger) WorkerOption

SetLogger allows you to set your own logrus logger object for use by the job worker.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL