queue

package
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package queue provides a job queue abstraction modelled on Laravel's Queue facade.

Three pieces:

  • Job — a payload that describes the work to do, carried as a name + bytes (typed via Register/Marshal).
  • Queue — the transport (push/pop/ack/nack). Memory driver ships in-box; DB/Redis live in sub-packages.
  • Worker — long-running consumer that pulls Jobs off a Queue and dispatches them to registered Handler funcs.

Usage:

type SendWelcomeEmail struct{ UserID uint64 }

q := queue.NewMemoryQueue()
w := queue.NewWorker(q)
queue.Handle[SendWelcomeEmail](w, func(ctx context.Context, j SendWelcomeEmail) error {
    return mailer.Send(ctx, buildEmail(j.UserID))
})
go w.Run(ctx)

_ = queue.Dispatch(ctx, q, SendWelcomeEmail{UserID: 42})

Index

Constants

This section is empty.

Variables

View Source
var ErrEmpty = errors.New("queue: empty")

ErrEmpty is returned by Queue.Pop when no job is available within the configured wait window.

Functions

func Dispatch

func Dispatch[T any](ctx context.Context, q Queue, payload T) error

Dispatch JSON-encodes payload and pushes it onto q. The job name is the Go type name of payload (e.g. "SendWelcomeEmail"). The Worker looks up the handler by the same key.

func DispatchAfter

func DispatchAfter[T any](ctx context.Context, q Queue, payload T, delay time.Duration) error

DispatchAfter is Dispatch with a delivery delay.

func Handle

func Handle[T any](w *Worker, fn func(ctx context.Context, payload T) error)

Handle registers a typed handler for jobs of T. T must JSON-decode from the payload bytes.

Types

type Handler

type Handler func(ctx context.Context, raw []byte) error

Handler runs a typed job. The Worker decodes Job.Payload into T before invoking the handler.

type Job

type Job struct {
	ID          string
	Name        string
	Payload     []byte
	Attempts    int
	AvailableAt time.Time
}

Job is the wire-format payload exchanged between producers and the queue transport. Name identifies the handler; Payload carries the JSON-encoded user-defined struct.

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue is an in-process FIFO queue with delayed delivery and at-least-once semantics. Suitable for single-process apps and tests.

func NewMemoryQueue

func NewMemoryQueue() *MemoryQueue

NewMemoryQueue returns an empty in-memory Queue.

func (*MemoryQueue) Ack

func (q *MemoryQueue) Ack(_ context.Context, id string) error

func (*MemoryQueue) Len

func (q *MemoryQueue) Len() int

func (*MemoryQueue) Nack

func (q *MemoryQueue) Nack(_ context.Context, id string, retryAfter time.Duration) error

func (*MemoryQueue) Pop

func (q *MemoryQueue) Pop(ctx context.Context, wait time.Duration) (Job, error)

func (*MemoryQueue) Push

func (q *MemoryQueue) Push(_ context.Context, j Job) error

type Queue

type Queue interface {
	// Push enqueues a job. Implementations should respect job.AvailableAt
	// (deferred delivery) if non-zero.
	Push(ctx context.Context, j Job) error
	// Pop blocks until a job becomes available or wait elapses, in
	// which case ErrEmpty is returned. The implementation must
	// guarantee at-least-once semantics — Ack/Nack closes the loop.
	Pop(ctx context.Context, wait time.Duration) (Job, error)
	// Ack signals successful handling so the job is removed.
	Ack(ctx context.Context, jobID string) error
	// Nack returns the job to the queue (or to a delayed retry slot
	// when retryAfter > 0).
	Nack(ctx context.Context, jobID string, retryAfter time.Duration) error
	// Len returns the approximate number of pending jobs (driver-
	// specific; primarily for tests/metrics).
	Len() int
}

Queue is the transport interface implemented by drivers.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker pulls jobs off a Queue and dispatches them to registered handlers.

func NewWorker

func NewWorker(q Queue) *Worker

NewWorker returns a Worker bound to q. Defaults: 3 max attempts, 5s backoff, 1s poll interval. Tune via setters.

func (*Worker) Backoff

func (w *Worker) Backoff(d time.Duration) *Worker

Backoff sets the initial delay before retrying a failed job. Each retry doubles the delay.

func (*Worker) Logger

func (w *Worker) Logger(fn func(string, ...any)) *Worker

Logger installs a structured logger callback.

func (*Worker) MaxRetry

func (w *Worker) MaxRetry(n int) *Worker

MaxRetry sets how many times a failing job is retried before being dropped. The original attempt counts as 1.

func (*Worker) Poll

func (w *Worker) Poll(d time.Duration) *Worker

Poll sets how long Pop blocks waiting for the next job before looping. Lower values cut latency at the cost of CPU.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run blocks the calling goroutine, pulling jobs until ctx is cancelled or Stop is called.

func (*Worker) Stop

func (w *Worker) Stop()

Stop signals Run to exit. Blocks until Run returns.

Directories

Path Synopsis
Package sqlqueue implements a database-backed Queue driver that survives process restarts and works across replicas.
Package sqlqueue implements a database-backed Queue driver that survives process restarts and works across replicas.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL