v0.68.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: MIT Imports: 25 Imported by: 1



Migrations are implemented with

Install the CLI

To add new migrations, install the CLI. The version of the CLI is not particularly important.

go install -tags 'postgres'

Adding migrations

Adding new migrations is done with the migrate create command

migrate create -dir backends/postgres/migrations -ext sql <descriptive_migration_name>

Running migrations

Migrations are run every time neoq initializes the postgres backend. There is no need to run migrations explicitly.




View Source
const (
	JobQuery = `` /* 222-byte string literal not displayed */

	PendingJobIDQuery = `` /* 157-byte string literal not displayed */

	FutureJobQuery = `` /* 282-byte string literal not displayed */



View Source
var (
	// DefaultConnectionTimeout defines the default amount of time that Neoq waits for connections to become available.
	DefaultConnectionTimeout = 30 * time.Second

	ErrCnxString                     = errors.New("invalid connecton string: see documentation for valid connection strings")
	ErrConnectionStringEmpty         = errors.New("connection string cannot be empty")
	ErrDuplicateJob                  = errors.New("duplicate job")
	ErrNoTransactionInContext        = errors.New("context does not have a Tx set")
	ErrExceededConnectionPoolTimeout = errors.New("exceeded timeout acquiring a connection from the pool")
	ErrUnsupportedURIScheme          = errors.New("only postgres:// and postgresql:// scheme URIs are supported, invalid connection string")


func Backend

func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err error)

Backend initializes a new postgres-backed neoq backend

If the database does not yet exist, Neoq will attempt to create the database and related tables by default.

Backend requires that one of the neoq.ConfigOption is WithConnectionString

Connection strings may be a URL or DSN-style connection strings. The connection string supports multiple options detailed below.


  • pool_max_conns: integer greater than 0
  • pool_min_conns: integer 0 or greater
  • pool_max_conn_lifetime: duration string
  • pool_max_conn_idle_time: duration string
  • pool_health_check_period: duration string
  • pool_max_conn_lifetime_jitter: duration string

Example DSN

user=worker password=secret port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10

Example URL


func GetPQConnectionString added in v0.61.0

func GetPQConnectionString(connectionString string) (string, error)

func WithConnectionString added in v0.7.0

func WithConnectionString(connectionString string) neoq.ConfigOption

WithConnectionString configures neoq postgres backend to use the specified connection string when connecting to a backend

func WithConnectionTimeout added in v0.45.0

func WithConnectionTimeout(timeout time.Duration) neoq.ConfigOption

WithConnectionTimeout sets the duration that Neoq waits for connections to become available to process and enqueue jobs

Note: ConnectionTimeout does not affect how long neoq waits for connections to run schema migrations

func WithSynchronousCommit added in v0.35.0

func WithSynchronousCommit(enabled bool) neoq.ConfigOption

WithSynchronousCommit enables postgres parameter `synchronous_commit`.

By default, neoq runs with synchronous_commit disabled.

Postgres incurrs significant transactional overhead from synchronously committing small transactions. Because neoq jobs must be enqueued individually, and payloads are generally quite small, synchronous_commit introduces significant overhead, but increases data durability.

See for details on the implications that this has for neoq jobs.

Enabling synchronous commit results in an order of magnitude slowdown in enqueueing and processing jobs.

func WithTransactionTimeout

func WithTransactionTimeout(txTimeout int) neoq.ConfigOption

WithTransactionTimeout sets the time that PgBackend's transactions may be idle before its underlying connection is closed The timeout is the number of milliseconds that a transaction may sit idle before postgres terminates the transaction's underlying connection. The timeout should be longer than your longest job takes to complete. If set too short, job state will become unpredictable, e.g. retry counts may become incorrect.


type PgBackend

type PgBackend struct {
	// contains filtered or unexported fields

PgBackend is a Postgres-based Neoq backend

func (*PgBackend) Enqueue

func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error)

Enqueue adds jobs to the specified queue

func (*PgBackend) SetLogger

func (p *PgBackend) SetLogger(logger logging.Logger)

SetLogger sets this backend's logger

func (*PgBackend) Shutdown

func (p *PgBackend) Shutdown(ctx context.Context)

Shutdown shuts this backend down

func (*PgBackend) Start

func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error)

Start starts processing jobs with the specified queue and handler

func (*PgBackend) StartCron

func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error)

StartCron starts processing jobs with the specified cron schedule and handler

See: for details on the cron spec format

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL