Documentation
¶
Overview ¶
Package queue provides a PostgreSQL-backed task queue with support for delayed tasks, retries with exponential backoff, and periodic tickers.
Manager ¶
Create a manager with namespace isolation:
mgr, err := queue.New(ctx, pool, "myapp")
if err != nil {
panic(err)
}
Queues ¶
Register queues that define retry behavior:
ttl := 24 * time.Hour
retries := uint64(3)
retryDelay := time.Minute
queue, err := mgr.RegisterQueue(ctx, schema.QueueMeta{
Queue: "emails",
TTL: &ttl,
Retries: &retries,
RetryDelay: &retryDelay,
})
Tasks ¶
Create and process tasks:
// Create a task
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
Payload: map[string]any{"to": "user@example.com"},
})
// Retain next task from a specific queue
task, err := mgr.NextTask(ctx, "worker-1", "emails")
// Retain next task from any queue
task, err := mgr.NextTask(ctx, "worker-1")
// Retain next task from multiple queues
task, err := mgr.NextTask(ctx, "worker-1", "emails", "notifications")
// Release task (success)
mgr.ReleaseTask(ctx, task.Id, true, nil, &status)
// Release task (failure - will retry with backoff)
mgr.ReleaseTask(ctx, task.Id, false, errPayload, &status)
WorkerPool ¶
Use WorkerPool for concurrent task processing:
pool, err := queue.NewWorkerPool(mgr,
queue.WithWorkers(4),
queue.WithWorkerName("worker-1"),
)
// Register queue handlers
pool.RegisterQueue(ctx, schema.QueueMeta{Queue: "emails"}, func(ctx context.Context, task *schema.Task) error {
// Process task
return nil
})
// Run blocks until context is cancelled
err = pool.Run(ctx)
Tickers ¶
Register periodic tickers:
interval := time.Hour
ticker, err := mgr.RegisterTicker(ctx, schema.TickerMeta{
Ticker: "hourly-report",
Interval: &interval,
})
Subpackages ¶
- schema: Data types, request/response structures, and SQL generation
- httphandler: REST API handlers for all queue operations
- httpclient: Typed Go client for the REST API
Index ¶
- Variables
- type Manager
- func (manager *Manager) CleanQueue(ctx context.Context, name string) ([]schema.Task, error)
- func (manager *Manager) Conn() pg.PoolConn
- func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (*schema.Task, error)
- func (manager *Manager) DeleteQueue(ctx context.Context, name string) (*schema.Queue, error)
- func (manager *Manager) DeleteTicker(ctx context.Context, name string) (*schema.Ticker, error)
- func (manager *Manager) GetQueue(ctx context.Context, name string) (*schema.Queue, error)
- func (manager *Manager) GetTicker(ctx context.Context, name string) (*schema.Ticker, error)
- func (manager *Manager) ListNamespaces(ctx context.Context, req schema.NamespaceListRequest) (*schema.NamespaceList, error)
- func (manager *Manager) ListQueueStatuses(ctx context.Context) ([]schema.QueueStatus, error)
- func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (*schema.QueueList, error)
- func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (*schema.TaskList, error)
- func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (*schema.TickerList, error)
- func (manager *Manager) Namespace() string
- func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (*schema.Task, error)
- func (manager *Manager) NextTicker(ctx context.Context) (*schema.Ticker, error)
- func (manager *Manager) NextTickerNs(ctx context.Context, namespace string) (*schema.Ticker, error)
- func (manager *Manager) RegisterQueue(ctx context.Context, meta schema.QueueMeta) (*schema.Queue, error)
- func (manager *Manager) RegisterTicker(ctx context.Context, meta schema.TickerMeta) (*schema.Ticker, error)
- func (manager *Manager) RegisterTickerNs(ctx context.Context, namespace string, meta schema.TickerMeta) (*schema.Ticker, error)
- func (manager *Manager) ReleaseTask(ctx context.Context, task uint64, success bool, result any, status *string) (*schema.Task, error)
- func (manager *Manager) Run(ctx context.Context) error
- func (manager *Manager) RunTaskLoop(ctx context.Context, ch chan<- *schema.Task, worker string, queues ...string) error
- func (manager *Manager) RunTickerLoop(ctx context.Context, ch chan<- *schema.Ticker, period time.Duration) error
- func (manager *Manager) RunTickerLoopNs(ctx context.Context, namespace string, ch chan<- *schema.Ticker, ...) error
- func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schema.QueueMeta) (*schema.Queue, error)
- func (manager *Manager) UpdateTicker(ctx context.Context, name string, meta schema.TickerMeta) (*schema.Ticker, error)
- type Opt
- type TaskHandler
- type TickerHandler
- type WorkerPool
- func (wp *WorkerPool) RegisterQueue(ctx context.Context, meta schema.QueueMeta, handler TaskHandler) (*schema.Queue, error)
- func (wp *WorkerPool) RegisterTicker(ctx context.Context, meta schema.TickerMeta, handler TickerHandler) (*schema.Ticker, error)
- func (wp *WorkerPool) Run(ctx context.Context) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidWorkers = errors.New("workers must be >= 1") ErrInvalidPeriod = errors.New("period must be >= 1ms") )
Functions ¶
This section is empty.
Types ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func New ¶
New creates a new queue manager. The namespace parameter is used to scope all queue operations.
func (*Manager) CleanQueue ¶
CleanQueue removes stale tasks from a queue, and returns the tasks removed
func (*Manager) CreateTask ¶
func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (*schema.Task, error)
CreateTask creates a new task, and returns it.
func (*Manager) DeleteQueue ¶
DeleteQueue deletes an existing queue, and returns it
func (*Manager) DeleteTicker ¶
DeleteTicker deletes an existing ticker, and returns the deleted ticker.
func (*Manager) ListNamespaces ¶
func (manager *Manager) ListNamespaces(ctx context.Context, req schema.NamespaceListRequest) (*schema.NamespaceList, error)
ListNamespaces returns all distinct namespaces from the queue table
func (*Manager) ListQueueStatuses ¶
ListQueueStatuses returns the status breakdown for all queues in the namespace
func (*Manager) ListQueues ¶
func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (*schema.QueueList, error)
ListQueues returns all queues in a namespace as a list
func (*Manager) ListTasks ¶
func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (*schema.TaskList, error)
ListTasks returns all tasks in a namespace as a list, with optional filtering
func (*Manager) ListTickers ¶
func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (*schema.TickerList, error)
ListTickers returns all tickers in a namespace as a list
func (*Manager) NextTask ¶
func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (*schema.Task, error)
NextTask retains a task from any of the specified queues, and returns it. If queues is empty, tasks from any queue are considered. Returns nil if there is no task to retain.
func (*Manager) NextTicker ¶
NextTicker returns the next matured ticker, or nil
func (*Manager) NextTickerNs ¶
NextTickerNs returns the next matured ticker in a namespace, or nil
func (*Manager) RegisterQueue ¶
func (manager *Manager) RegisterQueue(ctx context.Context, meta schema.QueueMeta) (*schema.Queue, error)
RegisterQueue creates a new queue, or updates an existing queue, and returns it.
func (*Manager) RegisterTicker ¶
func (manager *Manager) RegisterTicker(ctx context.Context, meta schema.TickerMeta) (*schema.Ticker, error)
RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.
func (*Manager) RegisterTickerNs ¶
func (manager *Manager) RegisterTickerNs(ctx context.Context, namespace string, meta schema.TickerMeta) (*schema.Ticker, error)
RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.
func (*Manager) ReleaseTask ¶
func (manager *Manager) ReleaseTask(ctx context.Context, task uint64, success bool, result any, status *string) (*schema.Task, error)
ReleaseTask releases a task from a queue, and returns it. Can optionally set the status
func (*Manager) Run ¶
Run starts the background ticker loop for cleanup tasks in the system namespace. It runs until the context is cancelled. This should be called as a goroutine.
func (*Manager) RunTaskLoop ¶
func (manager *Manager) RunTaskLoop(ctx context.Context, ch chan<- *schema.Task, worker string, queues ...string) error
RunTaskLoop runs a loop to process tasks, until the context is cancelled or an error occurs. It uses both polling and LISTEN/NOTIFY to pick up tasks immediately when they're created. If queues is empty, tasks from any queue are considered.
func (*Manager) RunTickerLoop ¶
func (manager *Manager) RunTickerLoop(ctx context.Context, ch chan<- *schema.Ticker, period time.Duration) error
RunTickerLoop runs a loop to process matured tickers, until the context is cancelled, or an error occurs. The period parameter controls the sleep duration between checks when no ticker is found. When a ticker is found, it immediately polls again to drain all matured tickers.
func (*Manager) RunTickerLoopNs ¶
func (manager *Manager) RunTickerLoopNs(ctx context.Context, namespace string, ch chan<- *schema.Ticker, period time.Duration) error
RunTickerLoopNs runs a loop to process matured tickers in a namespace, until the context is cancelled, or an error occurs. The period parameter controls the sleep duration between checks when no ticker is found. When a ticker is found, it immediately polls again to drain all matured tickers.
type Opt ¶
type Opt func(*opts) error
Opt is a functional option for worker pool, queue, and ticker configuration.
func WithPeriod ¶
WithPeriod sets the polling period for a ticker. Returns ErrInvalidPeriod if d < 1ms.
func WithWorkerName ¶
WithWorkerName sets the worker name used to identify this worker instance. Defaults to the hostname if not specified.
func WithWorkers ¶
WithWorkers sets the number of concurrent workers for the worker pool. The worker pool uses a shared pool of workers to process tasks from any registered queue. Returns ErrInvalidWorkers if n < 1.
type TaskHandler ¶
TaskHandler processes a task. Return nil on success, or an error to fail the task.
type TickerHandler ¶
TickerHandler processes a matured ticker. Return nil on success, or an error to stop.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages workers for processing tasks and tickers.
func NewWorkerPool ¶
func NewWorkerPool(manager *Manager, opt ...Opt) (*WorkerPool, error)
NewWorkerPool creates a new worker pool for the given manager.
func (*WorkerPool) RegisterQueue ¶
func (wp *WorkerPool) RegisterQueue(ctx context.Context, meta schema.QueueMeta, handler TaskHandler) (*schema.Queue, error)
RegisterQueue registers a queue with its handler and creates/updates it in the database.
func (*WorkerPool) RegisterTicker ¶
func (wp *WorkerPool) RegisterTicker(ctx context.Context, meta schema.TickerMeta, handler TickerHandler) (*schema.Ticker, error)
RegisterTicker registers a handler for the named ticker and creates/updates it in the database.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API.
|
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API. |
|
Package httphandler provides HTTP handlers for the queue package.
|
Package httphandler provides HTTP handlers for the queue package. |