Documentation
¶
Overview ¶
Package pgxq provides a PostgreSQL-backed job queue for pgx/v5.
Jobs are enqueued with Insert and processed by a Client worker pool. Enqueue works with any DBTX — pass a pgx.Tx for transactional guarantees or a pgxpool.Pool for standalone inserts.
Index ¶
- Constants
- Variables
- func Discard(err error) error
- func Migrate(ctx context.Context, db DBTX, table string) error
- func Schema(table string) (string, error)
- func UnmarshalArgs[T any](job *Job) (T, error)
- type BackoffFunc
- type Client
- type ClientConfig
- type DBTX
- type DiscardError
- type HandlerFunc
- type InsertOption
- type Job
- type JobState
Constants ¶
const DefaultTable = "pgxq_job"
DefaultTable is the default table name used when none is specified.
Variables ¶
var DefaultBackoff = ExponentialBackoff(time.Second, 2.0, time.Hour)
DefaultBackoff is exponential backoff: 1s base, 2x factor, 1h max, 20% jitter.
Functions ¶
func Discard ¶
Discard wraps err so that the worker discards the job instead of retrying.
return pgxq.Discard(fmt.Errorf("invalid payload: %w", err))
func UnmarshalArgs ¶
UnmarshalArgs decodes a job's JSON args into a value of type T.
Types ¶
type BackoffFunc ¶
BackoffFunc returns the delay before the next retry based on the attempt number (1-based).
func ConstantBackoff ¶
func ConstantBackoff(d time.Duration) BackoffFunc
ConstantBackoff returns a BackoffFunc that always returns the same delay.
func ExponentialBackoff ¶
ExponentialBackoff returns a BackoffFunc with exponential delay, capped at maxDelay, with 20% jitter.
delay = min(base * factor^(attempt-1), maxDelay) * (0.8 + rand*0.4)
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client polls for jobs and dispatches them to registered handlers.
Handle must be called before Start. The client cannot be reused after Stop — create a new one if needed.
func NewClient ¶
func NewClient(cfg ClientConfig) (*Client, error)
NewClient creates a worker pool client.
func (*Client) Handle ¶
func (c *Client) Handle(kind string, fn HandlerFunc)
Handle registers a handler for the given job kind. Must be called before Client.Start. Panics if called after Start.
func (*Client) Start ¶
Start begins polling and processing jobs. It blocks until Client.Stop is called and all active jobs complete. Returns nil on normal shutdown. Returns an error if called more than once.
Handlers must respect ctx.Done() for graceful shutdown. If a handler blocks indefinitely ignoring context cancellation, Client.Stop will return a timeout error and the Start goroutine will leak.
func (*Client) Stop ¶
Stop gracefully stops the client. It stops polling for new jobs and waits for active jobs to complete. If ctx expires before all jobs finish, active job contexts are cancelled and Stop returns ctx.Err().
Safe to call before Start — cancels the poll context so a subsequent Start returns immediately.
type ClientConfig ¶
type ClientConfig struct {
// Pool is the pgx connection pool. Required.
Pool *pgxpool.Pool
// Table is the job table name. Default: [DefaultTable].
Table string
// Queue is the queue name to poll. Default: "default".
Queue string
// PollInterval is the delay between polls when no jobs are found. Default: 1s.
PollInterval time.Duration
// BatchSize is the max number of jobs fetched per poll. Default: 10.
BatchSize int
// MaxWorkers is the max concurrent job goroutines. Default: 100.
MaxWorkers int
// RescueAfter is the duration after which a running job is considered orphaned. Default: 1h.
RescueAfter time.Duration
// Backoff returns the delay before the next retry. Default: [DefaultBackoff].
Backoff BackoffFunc
// Logger for operational messages. Default: slog.Default().
Logger *slog.Logger
}
ClientConfig configures a Client worker pool.
type DBTX ¶
type DBTX interface {
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
DBTX accepts pgxpool.Pool, pgx.Tx, and pgx.Conn.
type DiscardError ¶
type DiscardError struct {
Err error
}
DiscardError wraps an error to signal that the job should be discarded immediately without further retries.
func (*DiscardError) Error ¶
func (e *DiscardError) Error() string
func (*DiscardError) Unwrap ¶
func (e *DiscardError) Unwrap() error
type HandlerFunc ¶
HandlerFunc processes a single job within a transaction.
Handlers must be idempotent. If tx.Commit fails due to a network error, the commit may have already succeeded. The job will be rescued and retried, so the handler may execute more than once for the same job.
On success (nil return), the transaction is committed — any database work done through tx is atomic with the job completion.
On error, the transaction is rolled back and the job is retried or failed. Wrap with Discard to skip retries and mark the job as discarded.
type InsertOption ¶
type InsertOption func(*insertOpts)
InsertOption configures a job at insert time.
func WithMaxAttempts ¶
func WithMaxAttempts(n int16) InsertOption
WithMaxAttempts overrides the maximum number of attempts. Default: 5.
func WithPriority ¶
func WithPriority(p int16) InsertOption
WithPriority sets the job priority. Lower values run first. Default: 0.
func WithQueue ¶
func WithQueue(q string) InsertOption
WithQueue sets the queue name. Default: "default".
func WithScheduledAt ¶
func WithScheduledAt(t time.Time) InsertOption
WithScheduledAt schedules the job for a future time. Default: now.
func WithTable ¶
func WithTable(t string) InsertOption
WithTable overrides the table name for this insert. Default: DefaultTable. Must match the table used by the Client that will process this job.
type Job ¶
type Job struct {
ID int64
Kind string
Args json.RawMessage
State JobState
Queue string
Priority int16
Attempt int16
MaxAttempts int16
ScheduledAt time.Time
CreatedAt time.Time
AttemptedAt *time.Time
CompletedAt *time.Time
Error *string
}
Job represents a row in the job table.
type JobState ¶
type JobState string
JobState represents the lifecycle state of a job.
const ( // JobStateAvailable means the job is ready to be picked up by a worker. JobStateAvailable JobState = "available" // JobStateRunning means a worker is currently processing the job. JobStateRunning JobState = "running" // JobStateCompleted means the job finished successfully. JobStateCompleted JobState = "completed" // JobStateFailed means the job exhausted all retry attempts. JobStateFailed JobState = "failed" // JobStateDiscarded means the job was explicitly discarded via [Discard]. JobStateDiscarded JobState = "discarded" )