Documentation
¶
Overview ¶
Package queue provides a job queue abstraction modelled on Laravel's Queue facade.
Three pieces:
- Job — a payload that describes the work to do, carried as a name + bytes (typed via Register/Marshal).
- Queue — the transport (push/pop/ack/nack). Memory driver ships in-box; DB/Redis live in sub-packages.
- Worker — long-running consumer that pulls Jobs off a Queue and dispatches them to registered Handler funcs.
Usage:
type SendWelcomeEmail struct{ UserID uint64 }
q := queue.NewMemoryQueue()
w := queue.NewWorker(q)
queue.Handle[SendWelcomeEmail](w, func(ctx context.Context, j SendWelcomeEmail) error {
return mailer.Send(ctx, buildEmail(j.UserID))
})
go w.Run(ctx)
_ = queue.Dispatch(ctx, q, SendWelcomeEmail{UserID: 42})
Index ¶
- Variables
- func Dispatch[T any](ctx context.Context, q Queue, payload T) error
- func DispatchAfter[T any](ctx context.Context, q Queue, payload T, delay time.Duration) error
- func Handle[T any](w *Worker, fn func(ctx context.Context, payload T) error)
- type Handler
- type Job
- type MemoryQueue
- func (q *MemoryQueue) Ack(_ context.Context, id string) error
- func (q *MemoryQueue) Len() int
- func (q *MemoryQueue) Nack(_ context.Context, id string, retryAfter time.Duration) error
- func (q *MemoryQueue) Pop(ctx context.Context, wait time.Duration) (Job, error)
- func (q *MemoryQueue) Push(_ context.Context, j Job) error
- type Queue
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrEmpty = errors.New("queue: empty")
ErrEmpty is returned by Queue.Pop when no job is available within the configured wait window.
Functions ¶
func Dispatch ¶
Dispatch JSON-encodes payload and pushes it onto q. The job name is the Go type name of payload (e.g. "SendWelcomeEmail"). The Worker looks up the handler by the same key.
func DispatchAfter ¶
DispatchAfter is Dispatch with a delivery delay.
Types ¶
type Handler ¶
Handler runs a typed job. The Worker decodes Job.Payload into T before invoking the handler.
type Job ¶
Job is the wire-format payload exchanged between producers and the queue transport. Name identifies the handler; Payload carries the JSON-encoded user-defined struct.
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
MemoryQueue is an in-process FIFO queue with delayed delivery and at-least-once semantics. Suitable for single-process apps and tests.
func NewMemoryQueue ¶
func NewMemoryQueue() *MemoryQueue
NewMemoryQueue returns an empty in-memory Queue.
func (*MemoryQueue) Len ¶
func (q *MemoryQueue) Len() int
type Queue ¶
type Queue interface {
// Push enqueues a job. Implementations should respect job.AvailableAt
// (deferred delivery) if non-zero.
Push(ctx context.Context, j Job) error
// Pop blocks until a job becomes available or wait elapses, in
// which case ErrEmpty is returned. The implementation must
// guarantee at-least-once semantics — Ack/Nack closes the loop.
Pop(ctx context.Context, wait time.Duration) (Job, error)
// Ack signals successful handling so the job is removed.
Ack(ctx context.Context, jobID string) error
// Nack returns the job to the queue (or to a delayed retry slot
// when retryAfter > 0).
Nack(ctx context.Context, jobID string, retryAfter time.Duration) error
// Len returns the approximate number of pending jobs (driver-
// specific; primarily for tests/metrics).
Len() int
}
Queue is the transport interface implemented by drivers.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker pulls jobs off a Queue and dispatches them to registered handlers.
func NewWorker ¶
NewWorker returns a Worker bound to q. Defaults: 3 max attempts, 5s backoff, 1s poll interval. Tune via setters.
func (*Worker) Backoff ¶
Backoff sets the initial delay before retrying a failed job. Each retry doubles the delay.
func (*Worker) MaxRetry ¶
MaxRetry sets how many times a failing job is retried before being dropped. The original attempt counts as 1.
func (*Worker) Poll ¶
Poll sets how long Pop blocks waiting for the next job before looping. Lower values cut latency at the cost of CPU.