Documentation
¶
Overview ¶
Package pgqueue provides a lightweight, PostgreSQL-backed job queue for Go.
It enables asynchronous background processing using PostgreSQL while offering safe concurrency, retries with backoff, delayed jobs, and cron scheduling.
Index ¶
- Constants
- Variables
- type CleanupStrategy
- type Client
- func (c *Client) Close() error
- func (c *Client) Enqueue(ctx context.Context, task TaskType, payload any, opts ...EnqueueOption) error
- func (c *Client) ListCronJobs() ([]CronJobInfo, error)
- func (c *Client) RemoveCron(id CronID) error
- func (c *Client) ScheduleCron(spec string, jobName string, task TaskType, payload any) (CronID, error)
- func (c *Client) Stats(ctx context.Context) (QueueStats, error)
- type CronID
- type CronJobInfo
- type EnqueueOption
- type HandlerFunc
- type Metrics
- type Middleware
- type Priority
- type PriorityMetrics
- type Queue
- type QueueOption
- type QueueStats
- type ServeMux
- type Server
- type Task
- type TaskType
- type WorkerHandler
Constants ¶
const ( TaskDone = "done" TaskPending = "pending" TaskProcessing = "processing" TaskFailed = "failed" )
Variables ¶
var ErrHandlerNotFound = errors.New("handler not found for task")
Functions ¶
This section is empty.
Types ¶
type CleanupStrategy ¶
type CleanupStrategy int
const ( // DeleteStrategy hard deletes old tasks. DeleteStrategy CleanupStrategy = iota // ArchiveStrategy moves old tasks to the tasks_archive table. ArchiveStrategy )
func (CleanupStrategy) String ¶
func (c CleanupStrategy) String() string
type Client ¶
type Client struct {
Metrics *Metrics
Logger *slog.Logger
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(db *sql.DB, opts ...QueueOption) (client *Client, err error)
NewClient returns a Queue's Client.
func (*Client) Close ¶
Close shuts down the Client's background maintenance routines and Cron scheduler.
func (*Client) Enqueue ¶
func (c *Client) Enqueue(ctx context.Context, task TaskType, payload any, opts ...EnqueueOption) error
Enqueue adds a task to the queue
func (*Client) ListCronJobs ¶
func (c *Client) ListCronJobs() ([]CronJobInfo, error)
ListCronJobs returns a list of scheduled tasks
func (*Client) RemoveCron ¶
RemoveCron removes a scheduled task from cron
type EnqueueOption ¶
type EnqueueOption func(*enqueueConfig)
EnqueueOption allows configuring options like delays or deduplication
func WithDedup ¶
func WithDedup(key string) EnqueueOption
WithDedup ensures a task with this key is only enqueued once
func WithDelay ¶
func WithDelay(d time.Duration) EnqueueOption
WithDelay schedules the task to run in the future
func WithMaxRetries ¶
func WithMaxRetries(n int) EnqueueOption
WithMaxRetries overrides the default retry count (default is 5)
type HandlerFunc ¶
The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.
func (HandlerFunc) ProcessTask ¶
func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error
ProcessTask calls fn(ctx, task)
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func NewMetrics() *Metrics
func (*Metrics) RecordFailure ¶
func (*Metrics) RecordStart ¶
type Middleware ¶
type Middleware func(WorkerHandler) WorkerHandler
Middleware wraps a WorkerHandler with extra behavior.
func SlogMiddleware ¶
func SlogMiddleware(logger *slog.Logger, metrics *Metrics) Middleware
type PriorityMetrics ¶
type QueueOption ¶
type QueueOption func(*queueConfig)
QueueOption is a function that modifies the queue configuration.
func WithCleanupConfig ¶
func WithCleanupConfig(interval, retention time.Duration, strategy CleanupStrategy) QueueOption
WithCleanupConfig configures automatic removal of old data.
params:
- interval: how often to run the cleanup job.
- retention: how old a 'done'/'failed' task must be to be removed.
- strategy: either pgqueue.DeleteStrategy or pgqueue.ArchiveStrategy.
func WithCronEnabled ¶
func WithCronEnabled() QueueOption
WithCronEnabled enables cron jobs functionality.
Cron jobs are disabled by default.
func WithRescueConfig ¶
func WithRescueConfig(interval, visibilityTimeout time.Duration) QueueOption
WithRescueConfig configures the automatic stuck task rescue.
params:
- interval: how often to check for stuck tasks.
- visibilityTimeout: how long a task can stay 'processing' before being reset.
type QueueStats ¶
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
ServeMux is a multiplexer for tasks which matches the type of each task against a list of registered patterns and calls the workerhandler for the pattern that most closely matches the task's type.
func (*ServeMux) HandleFunc ¶
HandleFunc registers the handler function for the given pattern.
func (*ServeMux) ProcessTask ¶
ProcessTask dispatches the task to the handler whose pattern most closely matches the task type.
func (*ServeMux) Use ¶
func (mux *ServeMux) Use(mw ...Middleware)
Use appends middleware to the mux. Middleware runs in the order it is added.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
type WorkerHandler ¶
WorkerHandler processes tasks.
ProcessTask should return nil if the processing of a task is successful.
func NotFoundHandler ¶
func NotFoundHandler() WorkerHandler
NotFoundHandler returns a simple task handler that returns a “not found“ error.