queue

package module
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: MIT Imports: 16 Imported by: 1

README

queue logo

queue is a queue and workflow library with pluggable backends and runtime extensions.

Go Reference Go Test Go version Latest tag Go Report Card Unit tests (executed count) Integration tests (executed count)

Installation

go get github.com/goforj/queue

Quick Start

import (
	"context"
	"fmt"

	"github.com/goforj/queue"
)

func main() {
	q, _ := queue.NewWorkerpool(
		queue.WithWorkers(2), // optional; default: runtime.NumCPU() (min 1)
	)
	type EmailPayload struct {
		To string `json:"to"`
	}

	q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
		var payload EmailPayload
		_ = m.Bind(&payload)
		fmt.Println("send to", payload.To)
		return nil
	})

	_ = q.StartWorkers(context.Background())
	defer q.Shutdown(context.Background())

	_, _ = q.Dispatch(
		queue.NewJob("emails:send").
			Payload(EmailPayload{To: "user@example.com"}),
	)
}

Drivers

Driver / Backend Mode Notes Durable Async Delay Unique Backoff Timeout Native Stats Queue Admin
Null Drop-only Discards dispatched jobs; useful for disabled queue modes and smoke tests. - - - - - - - -
Sync Inline (caller) Deterministic local execution with no external infra. - - - - - -
Workerpool In-process pool Local async behavior without external broker/database. - - -
MySQL SQL durable queue MySQL driver module (driver/mysqlqueue) built on shared SQL queue core. -
Postgres SQL durable queue Postgres driver module (driver/postgresqueue) built on shared SQL queue core. -
SQLite SQL durable queue SQLite driver module (driver/sqlitequeue) built on shared SQL queue core. -
Redis Redis/Asynq Production Redis backend (Asynq semantics). -
NATS Broker target NATS transport with queue-subject routing. - - -
SQS Broker target AWS SQS transport with endpoint overrides for localstack/testing. - - -
RabbitMQ Broker target RabbitMQ transport and worker consumption. - - -

SQL-backed queues (sqlite, mysql, postgres) are durable and convenient, but they trade throughput for operational simplicity. They default to 1 worker, and increasing concurrency may require DB tuning (indexes, connection pool, lock contention). Prefer broker-backed drivers for higher-throughput workloads.

Queue Admin status: the cross-driver admin contract is defined in core (ListJobs, RetryJob, CancelJob, DeleteJob, ClearQueue, QueueHistory), but full queue admin operations are currently implemented only for Redis. Other drivers return ErrQueueAdminUnsupported for unsupported admin actions.

Driver constructor quick examples

Use root constructors for in-process backends, and driver-module constructors for external backends. See the Driver Constructors API section below for full constructor shapes (New(...) and NewWithConfig(...)). Driver backends live in separate packages so applications only import/link the optional backend dependencies they actually use (smaller builds, less dependency overhead, cleaner deploys).

package main

import (
	"github.com/goforj/queue"
	"github.com/goforj/queue/driver/mysqlqueue"
	"github.com/goforj/queue/driver/natsqueue"
	"github.com/goforj/queue/driver/postgresqueue"
	"github.com/goforj/queue/driver/rabbitmqqueue"
	"github.com/goforj/queue/driver/redisqueue"
	"github.com/goforj/queue/driver/sqlitequeue"
	"github.com/goforj/queue/driver/sqsqueue"
)

func main() {
	queue.NewSync()       // in-process sync
	queue.NewWorkerpool() // in-process worker pool
	queue.NewNull()       // drop-only / disabled mode

	sqlitequeue.New("file:queue.db?_busy_timeout=5000") // SQL durable queue (SQLite)
	mysqlqueue.New("user:pass@tcp(127.0.0.1:3306)/app") // SQL durable queue (MySQL)
	postgresqueue.New("postgres://user:pass@127.0.0.1:5432/app?sslmode=disable") // SQL durable queue (Postgres)

	redisqueue.New("127.0.0.1:6379") // Redis/Asynq
	natsqueue.New("nats://127.0.0.1:4222") // NATS
	sqsqueue.New("us-east-1") // SQS
	rabbitmqqueue.New("amqp://guest:guest@127.0.0.1:5672/") // RabbitMQ
}

Quick Start (Advanced: Workflows)

import (
	"context"

	"github.com/goforj/queue"
)

type EmailPayload struct {
	ID int `json:"id"`
}

func main() {
	q, _ := queue.NewWorkerpool()

	q.Register("reports:generate", func(ctx context.Context, m queue.Message) error {
		return nil
	})
	q.Register("reports:upload", func(ctx context.Context, m queue.Message) error {
		var payload EmailPayload
		if err := m.Bind(&payload); err != nil {
			return err
		}
		return nil
	})
	q.Register("users:notify_report_ready", func(ctx context.Context, m queue.Message) error {
		return nil
	})

	_ = q.StartWorkers(context.Background())
	defer q.Shutdown(context.Background())

	chainID, _ := q.Chain(
		// 1) generate report data
		queue.NewJob("reports:generate").Payload(map[string]any{"report_id": "rpt_123"}),
		// 2) upload report artifact after generate succeeds
		queue.NewJob("reports:upload").Payload(EmailPayload{ID: 123}),
		// 3) notify user only after upload succeeds
		queue.NewJob("users:notify_report_ready").Payload(map[string]any{"user_id": 123}),
	).OnQueue("critical").Dispatch(context.Background())
	_ = chainID
}

Run as a Worker Service

Use Run(ctx) for long-lived workers: it starts processing, waits for shutdown signals, and performs graceful termination.

import (
	"context"
	"log"
	"os/signal"
	"syscall"

	"github.com/goforj/queue"
)

func main() {
	q, _ := queue.NewWorkerpool()

	// Register handlers before starting workers.
	q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
		return nil
	})

	// Create a context that is canceled on SIGINT/SIGTERM (Ctrl+C, container stop).
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	// Run starts workers, blocks until ctx is canceled, then gracefully shuts down.
	if err := q.Run(ctx); err != nil {
		log.Fatal(err)
	}
}

Core Concepts

Job: Typed work unit for app handlers.

_, _ = q.Dispatch(
	queue.NewJob("emails:send").Payload(EmailPayload{To: "user@example.com"}),
)

Chain: Ordered workflow (A then B then C).

_, _ = q.Chain(
	queue.NewJob("reports:generate"),
	queue.NewJob("reports:upload"),
	queue.NewJob("users:notify_report_ready"),
).Dispatch(context.Background())

Batch: Parallel workflow with callbacks.

_, _ = q.Batch(
	queue.NewJob("emails:send"),
	queue.NewJob("sms:send"),
).Then(queue.NewJob("notifications:done")).Dispatch(context.Background())

Middleware: Cross-cutting execution policy.

q, _ := queue.New(
	queue.Config{Driver: queue.DriverWorkerpool},
	queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)

Events: Lifecycle hooks and observability.

q, _ := queue.New(
	queue.Config{Driver: queue.DriverWorkerpool, Observer: queue.NewStatsCollector()},
)

Backends: Driver/runtime transport selection.

q, _ := queue.NewWorkerpool()
rq, _ := redisqueue.New("127.0.0.1:6379")
_, _ = q, rq

Job builder options

// Define a struct for your job payload.
type EmailPayload struct {
	ID int `json:"id"`
	To string `json:"to"`
}

// Fluent builder pattern for job options.
job := queue.NewJob("emails:send").
	// Payload can be bytes, structs, maps, or JSON-marshalable values.
	// Default payload is empty.
	Payload(EmailPayload{ID: 123, To: "user@example.com"}).
	// OnQueue sets the queue name.
	// Default is empty; broker-style drivers expect an explicit queue.
	OnQueue("default").
	// Timeout sets per-job execution timeout.
	// Default is unset; some drivers may apply driver/runtime defaults.
	Timeout(20 * time.Second).
	// Retry sets max retries.
	// Default is 0, which means one total attempt.
	Retry(3).
	// Backoff sets retry delay.
	// Default is unset; Redis dispatch returns ErrBackoffUnsupported.
	Backoff(500 * time.Millisecond).
	// Delay schedules first execution in the future.
	// Default is 0 (run immediately).
	Delay(2 * time.Second).
	// UniqueFor deduplicates Type+Payload for a TTL window.
	// Default is 0 (no dedupe).
	UniqueFor(45 * time.Second)

// Dispatch the job to the queue.
_, _ = q.Dispatch(job)

// In handlers, use Bind to decode payload into a struct.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	return nil
})

Benchmarks

Run local + integration-backed benchmarks (requires Docker/testcontainers):

cd docs && GOWORK=off INTEGRATION_BACKEND=all GOCACHE=/tmp/queue-gocache go test -tags=benchrender ./bench -run '^TestRenderBenchmarks$'

Latency (ns/op)

Queue benchmark latency chart

Throughput (ops/s)

Queue benchmark throughput chart

Allocated Bytes (B/op)

Queue benchmark bytes chart

Allocations (allocs/op)

Queue benchmark allocations chart

Tables

Class Driver ns/op ops/s B/op allocs/op
External nats 774 1291823 1258 13
External redis 95295 10494 2113 33
External rabbitmq 165780 6032 1882 57
External sqlite 202380 4941 1931 47
External postgres 1056731 946 3809 78
External sqs 1873911 534 94784 1082
External mysql 2286406 437 3303 62
Local null 37 26673780 128 1
Local sync 282 3539823 408 6
Local workerpool 650 1538462 456 7

Middleware

Use queue.WithMiddleware(...) to apply cross-cutting workflow behavior to workflow job execution (logging, filtering, and error policy).

Common patterns:

  • wrap handler execution (before/after logging, timing, tracing)
  • skip jobs conditionally (maintenance mode, feature flags)
  • convert matched errors into terminal failures (no retry)
var errValidation = errors.New("validation failed")
maintenanceMode := false

audit := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
	log.Printf("start job=%s", m.JobType)
	err := next(ctx, m)
	log.Printf("done job=%s err=%v", m.JobType, err)
	return err
})

skipMaintenance := queue.SkipWhen{
	Predicate: func(context.Context, queue.Message) bool {
		return maintenanceMode
	},
}

fatalValidation := queue.FailOnError{
	When: func(err error) bool {
		return errors.Is(err, errValidation)
	},
}

q, _ := queue.New(
	queue.Config{Driver: queue.DriverWorkerpool},
	queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)
_ = q

Observability

Use queue.Observer implementations to capture normalized runtime events across drivers.

collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
    collector,
    queue.ObserverFunc(func(event queue.Event) {
        _ = event.Kind
    }),
)

q, _ := queue.New(queue.Config{
    Driver:   queue.DriverWorkerpool,
    Observer: observer,
})
_ = q

Distributed counters and source of truth

  • StatsCollector counters are process-local and event-driven.
  • In multi-process deployments, aggregate metrics externally (OTel/Prometheus/etc.).
  • Prefer backend-native stats when available.
  • queue.SupportsNativeStats(q) indicates native driver snapshot support.
  • queue.Snapshot(ctx, q, collector) merges native + collector where possible.

Compose observers

events := make(chan queue.Event, 100)
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
    collector,
    queue.ChannelObserver{
        Events:     events,
        DropIfFull: true,
    },
    queue.ObserverFunc(func(e queue.Event) {
        _ = e
    }),
)

q, _ := queue.New(queue.Config{
    Driver:   queue.DriverWorkerpool,
    Observer: observer,
})
_ = q

Kitchen sink event logging (runtime + workflow)

Runnable example: examples/observeall/main.go

logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
runtimeObserver := queue.ObserverFunc(func(event queue.Event) {
	attemptInfo := fmt.Sprintf("attempt=%d/%d", event.Attempt, event.MaxRetry+1)
	jobInfo := fmt.Sprintf("job=%s key=%s queue=%s driver=%s", event.JobType, event.JobKey, event.Queue, event.Driver)

	switch event.Kind {
	case queue.EventEnqueueAccepted:
		logger.Info("Accepted dispatch", "msg", fmt.Sprintf("Accepted %s", jobInfo), "scheduled", event.Scheduled, "at", event.Time.Format(time.RFC3339Nano))
	case queue.EventEnqueueRejected:
		logger.Error("Dispatch failed", "msg", fmt.Sprintf("Rejected %s", jobInfo), "error", event.Err)
	case queue.EventEnqueueDuplicate:
		logger.Warn("Skipped duplicate job", "msg", fmt.Sprintf("Duplicate %s", jobInfo))
	case queue.EventEnqueueCanceled:
		logger.Warn("Canceled dispatch", "msg", fmt.Sprintf("Canceled %s", jobInfo), "error", event.Err)
	case queue.EventProcessStarted:
		logger.Info("Started processing job", "msg", fmt.Sprintf("Started %s (%s)", jobInfo, attemptInfo), "at", event.Time.Format(time.RFC3339Nano))
	case queue.EventProcessSucceeded:
		logger.Info("Processed job", "msg", fmt.Sprintf("Processed %s in %s (%s)", jobInfo, event.Duration, attemptInfo))
	case queue.EventProcessFailed:
		logger.Error("Processing failed", "msg", fmt.Sprintf("Failed %s after %s (%s)", jobInfo, event.Duration, attemptInfo), "error", event.Err)
	case queue.EventProcessRetried:
		logger.Warn("Retrying job", "msg", fmt.Sprintf("Retry scheduled for %s (%s)", jobInfo, attemptInfo), "error", event.Err)
	case queue.EventProcessArchived:
		logger.Error("Archived failed job", "msg", fmt.Sprintf("Archived %s after final failure (%s)", jobInfo, attemptInfo), "error", event.Err)
	case queue.EventQueuePaused:
		logger.Info("Paused queue", "msg", fmt.Sprintf("Paused queue=%s driver=%s", event.Queue, event.Driver))
	case queue.EventQueueResumed:
		logger.Info("Resumed queue", "msg", fmt.Sprintf("Resumed queue=%s driver=%s", event.Queue, event.Driver))
	default:
		logger.Info("Queue event", "msg", fmt.Sprintf("kind=%s %s", event.Kind, jobInfo))
	}
})
workflowObserver := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
	logger.Info("workflow event",
		"kind", event.Kind,
		"dispatch_id", event.DispatchID,
		"job_id", event.JobID,
		"chain_id", event.ChainID,
		"batch_id", event.BatchID,
		"job_type", event.JobType,
		"queue", event.Queue,
		"attempt", event.Attempt,
		"duration", event.Duration,
		"err", event.Err,
	)
})

q, _ := queue.New(
	queue.Config{
		Driver:   queue.DriverSync,
		Observer: runtimeObserver,
	},
	queue.WithObserver(workflowObserver),
)
_ = q

Events reference

Type EventKind Meaning
queue enqueue_accepted Job accepted by driver for enqueue.
queue enqueue_rejected Job enqueue failed.
queue enqueue_duplicate Duplicate job rejected due to uniqueness key.
queue enqueue_canceled Context cancellation prevented enqueue.
queue process_started Worker began processing job.
queue process_succeeded Handler returned success.
queue process_failed Handler returned error.
queue process_retried Driver scheduled retry attempt.
queue process_archived Job moved to terminal failure state.
queue queue_paused Queue was paused (driver supports pause).
queue queue_resumed Queue was resumed.
workflow dispatch_started Workflow runtime accepted a dispatch request and created a dispatch record.
workflow dispatch_succeeded Dispatch was successfully enqueued to the underlying queue runtime.
workflow dispatch_failed Dispatch failed before job execution could start.
workflow job_started A workflow job handler started execution.
workflow job_succeeded A workflow job handler completed successfully.
workflow job_failed A workflow job handler returned an error.
workflow chain_started A chain workflow was created and started.
workflow chain_advanced Chain progressed from one node to the next node.
workflow chain_completed Chain reached terminal success.
workflow chain_failed Chain reached terminal failure.
workflow batch_started A batch workflow was created and started.
workflow batch_progressed Batch state changed as jobs completed/failed.
workflow batch_completed Batch reached terminal success (or allowed-failure completion).
workflow batch_failed Batch reached terminal failure.
workflow batch_cancelled Batch was cancelled before normal completion.
workflow callback_started Chain/batch callback execution started.
workflow callback_succeeded Chain/batch callback completed successfully.
workflow callback_failed Chain/batch callback returned an error.

Examples

Runnable examples live in the separate examples module (./examples). They are not included when applications import github.com/goforj/queue, which keeps dependency graphs and build/link overhead smaller.

Admin Support

Queue admin APIs are part of the core contract so additional drivers can implement them over time. At this time, full admin operations (ListJobs, RetryJob, CancelJob, DeleteJob, ClearQueue) are Redis-only. Use queue.SupportsQueueAdmin(q) (or handle queue.ErrQueueAdminUnsupported) to gate admin workflows per runtime.

API reference

The API section below is autogenerated; do not edit between the markers.

API Index

Group Functions
Admin CancelJob Queue.CancelJob ClearQueue Queue.ClearQueue DeleteJob Queue.DeleteJob History ListJobs Queue.ListJobs Normalize QueueHistory RetryJob Queue.RetryJob SinglePointHistory SupportsQueueAdmin TimelineHistoryFromSnapshot
Constructors New NewNull NewStatsCollector NewSync NewWorkerpool
Job Backoff Bind Delay NewJob OnQueue Payload PayloadBytes PayloadJSON Retry Timeout UniqueFor
Observability Active Archived Failed MultiObserver ChannelObserver.Observe Observer.Observe ObserverFunc.Observe StatsCollector.Observe Pause Paused Pending Processed Queue Queues Ready Resume RetryCount SafeObserve Scheduled Snapshot StatsCollector.Snapshot SupportsNativeStats SupportsPause Throughput
Queue Batch Chain Dispatch DispatchCtx Driver FindBatch FindChain Pause Prune Ready Register Resume Run Shutdown StartWorkers Stats WithClock WithMiddleware WithObserver WithStore WithWorkers Queue.WithWorkers
Driver Constructors mysqlqueue.New mysqlqueue.NewWithConfig natsqueue.New natsqueue.NewWithConfig postgresqueue.New postgresqueue.NewWithConfig rabbitmqqueue.New rabbitmqqueue.NewWithConfig redisqueue.New redisqueue.NewWithConfig sqlitequeue.New sqlitequeue.NewWithConfig sqsqueue.New sqsqueue.NewWithConfig
Testing AssertBatchCount AssertBatched AssertChained AssertCount AssertDispatched AssertDispatchedOn AssertDispatchedTimes AssertNotDispatched AssertNothingBatched AssertNothingDispatched AssertNothingWorkflowDispatched AssertWorkflowDispatched AssertWorkflowDispatchedOn AssertWorkflowDispatchedTimes AssertWorkflowNotDispatched Count CountJob CountOn New Queue Records Reset Workflow

API

Admin
CancelJob

CancelJob cancels a job when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.CancelJob(context.Background(), q, "job-id")
_ = err
Queue.CancelJob

CancelJob cancels a job via queue admin capability when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.CancelJob(context.Background(), "job-id")
_ = err
ClearQueue

ClearQueue clears queue jobs when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.ClearQueue(context.Background(), q, "default")
_ = err
Queue.ClearQueue

ClearQueue clears queue jobs via queue admin capability when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.ClearQueue(context.Background(), "default")
_ = err
DeleteJob

DeleteJob deletes a job when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.DeleteJob(context.Background(), q, "default", "job-id")
_ = err
Queue.DeleteJob

DeleteJob deletes a job via queue admin capability when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.DeleteJob(context.Background(), "default", "job-id")
_ = err
History

History returns queue history points via queue admin capability when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
points, err := q.History(context.Background(), "default", queue.QueueHistoryHour)
_ = err
ListJobs

ListJobs lists jobs for a queue and state when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
_, err = queue.ListJobs(context.Background(), q, queue.ListJobsOptions{
	Queue: "default",
	State: queue.JobStatePending,
})
_ = err
Queue.ListJobs

ListJobs lists jobs via queue admin capability when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
_, err = q.ListJobs(context.Background(), queue.ListJobsOptions{
	Queue: "default",
	State: queue.JobStatePending,
})
_ = err
Normalize

Normalize returns a safe options payload with defaults applied.

opts := queue.ListJobsOptions{Queue: "", State: "", Page: 0, PageSize: 1000}
normalized := opts.Normalize()
fmt.Println(normalized.Queue, normalized.State, normalized.Page, normalized.PageSize)
// Output: default pending 1 500
QueueHistory

QueueHistory returns queue history points when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
_, err = queue.QueueHistory(context.Background(), q, "default", queue.QueueHistoryHour)
_ = err
RetryJob

RetryJob retries (runs now) a job when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.RetryJob(context.Background(), q, "default", "job-id")
_ = err
Queue.RetryJob

RetryJob retries (runs now) a job via queue admin capability when supported.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.RetryJob(context.Background(), "default", "job-id")
_ = err
SinglePointHistory

SinglePointHistory converts a snapshot into a single current-history point. This helper is intended for driver modules that do not expose historical buckets.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 12, Failed: 1},
	},
}
points := queue.SinglePointHistory(snapshot, "default")
fmt.Println(len(points), points[0].Processed, points[0].Failed)
// Output: 1 12 1
SupportsQueueAdmin

SupportsQueueAdmin reports whether queue admin operations are available.

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
fmt.Println(queue.SupportsQueueAdmin(q))
// Output: true
TimelineHistoryFromSnapshot

TimelineHistoryFromSnapshot records queue counters and returns windowed points. This is intended for drivers that don't expose native multi-point history.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 5, Failed: 1},
	},
}
points := queue.TimelineHistoryFromSnapshot(snapshot, "default", queue.QueueHistoryHour)
fmt.Println(len(points) >= 1)
// Output: true
Constructors
queue.New

New creates the high-level Queue API based on Config.Driver.

q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	_ = payload
	return nil
})
_ = q.WithWorkers(1).StartWorkers(context.Background()) // optional; default: runtime.NumCPU() (min 1)
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
	queue.NewJob("emails:send").
		Payload(EmailPayload{ID: 1}).
		OnQueue("default"),
)
NewNull

NewNull creates a Queue on the null backend.

q, err := queue.NewNull()
if err != nil {
	return
}
NewStatsCollector

NewStatsCollector creates an event collector for queue counters.

collector := queue.NewStatsCollector()
NewSync

NewSync creates a Queue on the synchronous in-process backend.

q, err := queue.NewSync()
if err != nil {
	return
}
NewWorkerpool

NewWorkerpool creates a Queue on the in-process workerpool backend.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
Job
Backoff

Backoff sets delay between retries.

job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)
Bind

Bind unmarshals job payload JSON into dst.

type EmailPayload struct {
	ID int    `json:"id"`
	To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
	ID: 1,
	To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
	return
}
_ = payload.To
Delay

Delay defers execution by duration.

job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)
NewJob

NewJob creates a job value with a required job type.

job := queue.NewJob("emails:send")
OnQueue

OnQueue sets the target queue name.

job := queue.NewJob("emails:send").OnQueue("critical")
Payload

Payload sets job payload from common value types.

Example: payload bytes

jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))

Example: payload struct

type Meta struct {
	Nested bool `json:"nested"`
}
type EmailPayload struct {
	ID   int    `json:"id"`
	To   string `json:"to"`
	Meta Meta   `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
	ID:   1,
	To:   "user@example.com",
	Meta: Meta{Nested: true},
})

Example: payload map

jobMap := queue.NewJob("emails:send").Payload(map[string]any{
	"id":  1,
	"to":  "user@example.com",
	"meta": map[string]any{"nested": true},
})
PayloadBytes

PayloadBytes returns a copy of job payload bytes.

job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()
PayloadJSON

PayloadJSON marshals payload as JSON.

job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})
Retry

Retry sets max retry attempts.

job := queue.NewJob("emails:send").Retry(4)
Timeout

Timeout sets per-job execution timeout.

job := queue.NewJob("emails:send").Timeout(10 * time.Second)
UniqueFor

UniqueFor enables uniqueness dedupe within the given TTL.

job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)
Observability
Active

Active returns active count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Active: 2},
	},
}
fmt.Println(snapshot.Active("default"))
// Output: 2
Archived

Archived returns archived count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Archived: 7},
	},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7
Failed

Failed returns failed count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Failed: 2},
	},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2
MultiObserver

MultiObserver fans out events to multiple observers.

events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
	queue.ChannelObserver{Events: events},
	queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1
ChannelObserver.Observe

Observe forwards an event to the configured channel.

ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-ch
Observer.Observe

Observe handles a queue runtime event.

var observer queue.Observer
observer.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
})
ObserverFunc.Observe

Observe calls the wrapped function.

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
	logger.Info("queue event",
		"kind", event.Kind,
		"driver", event.Driver,
		"queue", event.Queue,
		"job_type", event.JobType,
		"attempt", event.Attempt,
		"max_retry", event.MaxRetry,
		"duration", event.Duration,
		"err", event.Err,
	)
})
observer.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobType: "emails:send",
})
StatsCollector.Observe

Observe records an event and updates normalized counters.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
Pause

Pause pauses queue consumption for drivers that support it.

q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1
Paused

Paused returns paused count for a queue.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventQueuePaused,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1
Pending

Pending returns pending count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Pending: 3},
	},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3
Processed

Processed returns processed count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 11},
	},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11
StatsSnapshot.Queue

Queue returns queue counters for a queue name.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1
Queues

Queues returns sorted queue names present in the snapshot.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "critical",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 critical
Ready

Ready validates backend readiness for the provided queue runtime.

q, _ := queue.NewSync()
fmt.Println(queue.Ready(context.Background(), q) == nil)
// true
Resume

Resume resumes queue consumption for drivers that support it.

q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
_ = queue.Resume(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0
RetryCount

RetryCount returns retry count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Retry: 1},
	},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1
SafeObserve

SafeObserve delivers an event to an observer and recovers observer panics.

This is an advanced helper intended for driver-module implementations.

Scheduled

Scheduled returns scheduled count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Scheduled: 4},
	},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4
Snapshot

Snapshot returns driver-native stats, falling back to collector data.

q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: true
StatsCollector.Snapshot

Snapshot returns a copy of collected counters.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:   queue.EventProcessStarted,
	Driver: queue.DriverSync,
	Queue:  "default",
	JobKey: "job-1",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobKey:  "job-1",
	Duration: 12 * time.Millisecond,
	Time:     time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}
SupportsNativeStats

SupportsNativeStats reports whether a queue runtime exposes native stats snapshots.

q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: true
SupportsPause

SupportsPause reports whether a queue runtime supports Pause/Resume.

q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: true
Throughput

Throughput returns rolling throughput windows for a queue name.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventProcessSucceeded,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}
Queue
Batch

Batch creates a batch builder for fan-out workflow execution.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Batch(
	queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
	queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())
Chain

Chain creates a chain builder for sequential workflow execution.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
q.Register("second", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Chain(
	queue.NewJob("first"),
	queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())
Queue.Dispatch

Dispatch enqueues a high-level job using context.Background.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(job)
Queue.DispatchCtx

DispatchCtx enqueues a high-level job using the provided context.

Queue.Driver

Driver reports the configured backend driver for the underlying queue runtime.

q, err := queue.NewSync()
if err != nil {
	return
}
fmt.Println(q.Driver())
// Output: sync
FindBatch

FindBatch returns current batch state by ID.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindBatch(context.Background(), batchID)
FindChain

FindChain returns current chain state by ID.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindChain(context.Background(), chainID)
Queue.Pause

Pause pauses consumption for a queue when supported by the underlying driver. See the README "Queue Backends" table for Pause/Resume support and docs/backend-guarantees.md (Capability Matrix) for broader backend differences.

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
	_ = q.Pause(context.Background(), "default")
}
Prune

Prune deletes old workflow state records.

q, err := queue.NewSync()
if err != nil {
	return
}
_ = q.Prune(context.Background(), time.Now().Add(-24*time.Hour))
Queue.Ready

Ready validates queue backend readiness for dispatch/worker operation.

q, err := queue.NewSync()
if err != nil {
	return
}
fmt.Println(q.Ready(context.Background()) == nil)
// true
Queue.Register

Register binds a handler for a high-level job type.

q, err := queue.NewSync()
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	_ = payload
	return nil
})
Queue.Resume

Resume resumes consumption for a queue when supported by the underlying driver.

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
	_ = q.Resume(context.Background(), "default")
}
Run

Run starts worker processing, blocks until ctx is canceled, then gracefully shuts down.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := queue.NewWorkerpool()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
go func() {
	time.Sleep(100 * time.Millisecond)
	cancel()
}()
_ = q.Run(ctx)
Queue.Shutdown

Shutdown drains workers and closes underlying resources.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
_ = q.StartWorkers(context.Background())
_ = q.Shutdown(context.Background())
Queue.StartWorkers

StartWorkers starts worker processing.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
_ = q.StartWorkers(context.Background())
Stats

Stats returns a normalized snapshot when supported by the underlying driver.

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsNativeStats(q) {
	_, _ = q.Stats(context.Background())
}
WithClock

WithClock overrides the workflow runtime clock.

q, err := queue.New(
	queue.Config{Driver: queue.DriverSync},
	queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
	return
}
WithMiddleware

WithMiddleware appends queue workflow middleware.

mw := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
	return next(ctx, m)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
	return
}
WithObserver

WithObserver installs a workflow lifecycle observer.

observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
	_ = event.Kind
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
	return
}
WithStore

WithStore overrides the workflow orchestration store.

var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
	return
}
WithWorkers

WithWorkers sets desired worker concurrency before StartWorkers. It applies to high-level queue constructors (for example NewWorkerpool/New/NewSync).

q, err := queue.NewWorkerpool(
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}
Queue.WithWorkers

WithWorkers sets desired worker concurrency before StartWorkers.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
q.WithWorkers(4) // optional; default: runtime.NumCPU() (min 1)
Testing
FakeQueue.AssertCount

AssertCount fails when dispatch count is not expected.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertCount(t, 1)
FakeQueue.AssertDispatched

AssertDispatched fails when jobType was not dispatched.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatched(t, "emails:send")
FakeQueue.AssertDispatchedOn

AssertDispatchedOn fails when jobType was not dispatched on queueName.

fake := queue.NewFake()
_ = fake.Dispatch(
	queue.NewJob("emails:send").
		OnQueue("critical"),
)
fake.AssertDispatchedOn(t, "critical", "emails:send")
FakeQueue.AssertDispatchedTimes

AssertDispatchedTimes fails when jobType dispatch count does not match expected.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatchedTimes(t, "emails:send", 2)
FakeQueue.AssertNotDispatched

AssertNotDispatched fails when jobType was dispatched.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertNotDispatched(t, "emails:cancel")
FakeQueue.AssertNothingDispatched

AssertNothingDispatched fails when any dispatch was recorded.

fake := queue.NewFake()
fake.AssertNothingDispatched(t)
FakeQueue.Dispatch

Dispatch records a typed job payload in-memory using the fake default queue.

fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
FakeQueue.DispatchCtx

DispatchCtx submits a typed job payload using the provided context.

fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: true
FakeQueue.Driver

Driver returns the active queue driver.

fake := queue.NewFake()
driver := fake.Driver()
NewFake

NewFake creates a queue fake that records dispatches and provides assertions.

fake := queue.NewFake()
_ = fake.Dispatch(
	queue.NewJob("emails:send").
		Payload(map[string]any{"id": 1}).
		OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:send
FakeQueue.Ready

Ready validates fake queue readiness.

fake := queue.NewFake()
fmt.Println(fake.Ready(context.Background()) == nil)
// true
FakeQueue.Records

Records returns a copy of all dispatch records.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:send
FakeQueue.Register

Register associates a handler with a job type.

fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })
FakeQueue.Reset

Reset clears all recorded dispatches.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0
FakeQueue.Shutdown

Shutdown drains running work and releases resources.

fake := queue.NewFake()
err := fake.Shutdown(context.Background())
FakeQueue.StartWorkers

StartWorkers starts worker execution.

fake := queue.NewFake()
err := fake.StartWorkers(context.Background())
FakeQueue.Workers

Workers sets desired worker concurrency before StartWorkers.

fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: true

Driver Constructors

mysqlqueue

mysqlqueue.New

New creates a high-level Queue using the MySQL SQL backend.

q, err := mysqlqueue.New(
	"user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true",
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}
mysqlqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit MySQL SQL driver config.

q, err := mysqlqueue.NewWithConfig(
	mysqlqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		DB: nil, // optional; provide *sql.DB instead of DSN
		DSN: "user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true", // optional if DB is set
		ProcessingRecoveryGrace:  2 * time.Second, // default if <=0: 2s
		ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
	},
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

natsqueue

natsqueue.New

New creates a high-level Queue using the NATS backend.

q, err := natsqueue.New(
	"nats://127.0.0.1:4222",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}
natsqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit NATS driver config.

q, err := natsqueue.NewWithConfig(
	natsqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		URL: "nats://127.0.0.1:4222", // required
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

postgresqueue

postgresqueue.New

New creates a high-level Queue using the Postgres SQL backend.

q, err := postgresqueue.New(
	"postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable",
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}
postgresqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit Postgres SQL driver config.

q, err := postgresqueue.NewWithConfig(
	postgresqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		DB: nil, // optional; provide *sql.DB instead of DSN
		DSN: "postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable", // optional if DB is set
		ProcessingRecoveryGrace:  2 * time.Second, // default if <=0: 2s
		ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
	},
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

rabbitmqqueue

rabbitmqqueue.New

New creates a high-level Queue using the RabbitMQ backend.

q, err := rabbitmqqueue.New(
	"amqp://guest:guest@127.0.0.1:5672/",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}
rabbitmqqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit RabbitMQ driver config.

q, err := rabbitmqqueue.NewWithConfig(
	rabbitmqqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		URL: "amqp://guest:guest@127.0.0.1:5672/", // required
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

redisqueue

redisqueue.New

New creates a high-level Queue using the Redis backend.

q, err := redisqueue.New(
	"127.0.0.1:6379",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}
redisqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit Redis driver config.

q, err := redisqueue.NewWithConfig(
	redisqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		Addr: "127.0.0.1:6379", // required
		Password: "",           // optional; default empty
		DB: 0,                  // optional; default 0
		ServerLogger: nil,      // optional; default backend logger
		ServerLogLevel: redisqueue.ServerLogLevelDefault, // optional
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

sqlitequeue

sqlitequeue.New

New creates a high-level Queue using the SQLite SQL backend.

q, err := sqlitequeue.New(
	"file:queue.db?_busy_timeout=5000",
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}
sqlitequeue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit SQLite SQL driver config.

q, err := sqlitequeue.NewWithConfig(
	sqlitequeue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		DB: nil, // optional; provide *sql.DB instead of DSN
		DSN: "file:queue.db?_busy_timeout=5000", // optional if DB is set
		ProcessingRecoveryGrace:  2 * time.Second, // default if <=0: 2s
		ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
	},
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

sqsqueue

sqsqueue.New

New creates a high-level Queue using the SQS backend.

q, err := sqsqueue.New(
	"us-east-1",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}
sqsqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit SQS driver config.

q, err := sqsqueue.NewWithConfig(
	sqsqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		Region: "us-east-1", // default if empty: "us-east-1"
		Endpoint: "",        // optional; set for LocalStack/custom endpoint
		AccessKey: "",       // optional; static credentials
		SecretKey: "",       // optional; static credentials
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

Testing API

Examples in this section assume they are used inside tests and t is a *testing.T (or testing.TB).

Fake.AssertBatchCount

AssertBatchCount fails if total recorded workflow batch count does not match n.

f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertBatchCount(t, 1)
Fake.AssertBatched

AssertBatched fails unless at least one recorded workflow batch matches predicate.

f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertBatched(t, func(spec bus.BatchSpec) bool { return len(spec.JobTypes) == 2 })
Fake.AssertChained

AssertChained fails if no recorded workflow chain matches expected job type order.

f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertChained(t, []string{"a", "b"})
Fake.AssertCount

AssertCount fails when total dispatch count is not expected.

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
f.AssertCount(t, 2)
Fake.AssertDispatched

AssertDispatched fails when jobType was not dispatched.

f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
f.AssertDispatched(t, "emails:send")
Fake.AssertDispatchedOn

AssertDispatchedOn fails when jobType was not dispatched on queueName.

f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
f.AssertDispatchedOn(t, "critical", "emails:send")
Fake.AssertDispatchedTimes

AssertDispatchedTimes fails when jobType dispatch count does not match expected.

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
f.AssertDispatchedTimes(t, "emails:send", 2)
Fake.AssertNotDispatched

AssertNotDispatched fails when jobType was dispatched.

f := queuefake.New()
f.AssertNotDispatched(t, "emails:send")
Fake.AssertNothingBatched

AssertNothingBatched fails if any workflow batch was recorded.

f := queuefake.New()
f.AssertNothingBatched(t)
Fake.AssertNothingDispatched

AssertNothingDispatched fails when any dispatch was recorded.

f := queuefake.New()
f.AssertNothingDispatched(t)
Fake.AssertNothingWorkflowDispatched

AssertNothingWorkflowDispatched fails when any workflow dispatch was recorded.

f := queuefake.New()
f.AssertNothingWorkflowDispatched(t)
Fake.AssertWorkflowDispatched

AssertWorkflowDispatched fails when jobType was not workflow-dispatched.

f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatched(t, "a")
Fake.AssertWorkflowDispatchedOn

AssertWorkflowDispatchedOn fails when jobType was not workflow-dispatched on queueName.

f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).OnQueue("critical").Dispatch(nil)
f.AssertWorkflowDispatchedOn(t, "critical", "a")
Fake.AssertWorkflowDispatchedTimes

AssertWorkflowDispatchedTimes fails when workflow dispatch count for jobType does not match expected.

f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatchedTimes(t, "a", 2)
Fake.AssertWorkflowNotDispatched

AssertWorkflowNotDispatched fails when jobType was workflow-dispatched.

f := queuefake.New()
f.AssertWorkflowNotDispatched(t, "emails:send")
Fake.Count

Count returns the total number of recorded dispatches.

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
_ = f.Count()
Fake.CountJob

CountJob returns how many times a job type was dispatched.

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = f.CountJob("emails:send")
Fake.CountOn

CountOn returns how many times a job type was dispatched on a queue.

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
_ = f.CountOn("critical", "emails:send")
queuefake.New

New creates a fake queue harness backed by queue.NewFake().

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
f.AssertDispatched(t, "emails:send")
f.AssertCount(t, 1)
Fake.Queue

Queue returns the queue fake to inject into code under test.

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
Fake.Records

Records returns a copy of recorded dispatches.

f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
records := f.Records()
Fake.Reset

Reset clears recorded dispatches.

f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
f.Reset()
f.AssertNothingDispatched(t)
Fake.Workflow

Workflow returns the workflow/orchestration fake for chain/batch assertions.

f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(
	bus.NewJob("a", nil),
	bus.NewJob("b", nil),
).Dispatch(context.Background())
f.AssertChained(t, []string{"a", "b"})

Contributing

Testing

Unit tests (root module):

go test ./...

Integration tests (separate integration module):

go test -tags=integration ./integration/...

Select specific backends with INTEGRATION_BACKEND (comma-separated), for example:

INTEGRATION_BACKEND=sqlite go test -tags=integration ./integration/...
INTEGRATION_BACKEND=redis,rabbitmq go test -tags=integration ./integration/... -count=1
INTEGRATION_BACKEND=all go test -tags=integration ./integration/... -count=1

Matrix status and backend integration notes are tracked in docs/integration-scenarios.md.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBackoffUnsupported = errors.New("backoff option is not supported by this driver")

ErrBackoffUnsupported indicates requested backoff is unsupported by a driver.

View Source
var ErrDuplicate = errors.New("duplicate job")

ErrDuplicate indicates a duplicate unique job enqueue.

View Source
var ErrPauseUnsupported = errors.New("pause/resume is not supported by this driver")

ErrPauseUnsupported indicates queue pause/resume is unsupported by a driver.

View Source
var ErrQueueAdminUnsupported = errors.New("queue admin is not supported by this driver")

ErrQueueAdminUnsupported indicates queue admin operations are unsupported by a driver. @group Admin

View Source
var ErrQueuePaused = errors.New("queue is paused")

ErrQueuePaused indicates enqueue was rejected because queue is paused.

View Source
var ErrQueuerShuttingDown = errors.New("queue is shutting down")

ErrQueuerShuttingDown indicates enqueue was rejected during shutdown.

View Source
var ErrWorkerpoolQueueNotInitialized = errors.New("workerpool queue not initialized")

ErrWorkerpoolQueueNotInitialized indicates workerpool queue is unavailable.

View Source
var ErrWorkflowNotFound = bus.ErrNotFound

ErrWorkflowNotFound indicates a workflow state record is not present. @group Queue

Functions

func CancelJob

func CancelJob(ctx context.Context, q any, jobID string) error

CancelJob cancels a job when supported. @group Admin

Example: cancel a job via helper

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.CancelJob(context.Background(), q, "job-id")
_ = err

func ClearQueue

func ClearQueue(ctx context.Context, q any, queueName string) error

ClearQueue clears queue jobs when supported. @group Admin

Example: clear queue via helper

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.ClearQueue(context.Background(), q, "default")
_ = err

func DeleteJob

func DeleteJob(ctx context.Context, q any, queueName, jobID string) error

DeleteJob deletes a job when supported. @group Admin

Example: delete a job via helper

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.DeleteJob(context.Background(), q, "default", "job-id")
_ = err

func Pause

func Pause(ctx context.Context, q any, queueName string) error

Pause pauses queue consumption for drivers that support it. @group Observability

Example: pause queue

q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1

func Ready

func Ready(ctx context.Context, q any) error

Ready validates backend readiness for the provided queue runtime. @group Observability

Example: ready via package helper

q, _ := queue.NewSync()
fmt.Println(queue.Ready(context.Background(), q) == nil)
// true

func Resume

func Resume(ctx context.Context, q any, queueName string) error

Resume resumes queue consumption for drivers that support it. @group Observability

Example: resume queue

q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
_ = queue.Resume(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0

func RetryJob

func RetryJob(ctx context.Context, q any, queueName, jobID string) error

RetryJob retries (runs now) a job when supported. @group Admin

Example: retry a job via helper

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
err = queue.RetryJob(context.Background(), q, "default", "job-id")
_ = err

func SafeObserve

func SafeObserve(observer Observer, event Event)

SafeObserve delivers an event to an observer and recovers observer panics.

This is an advanced helper intended for driver-module implementations. @group Observability

func SupportsNativeStats

func SupportsNativeStats(q any) bool

SupportsNativeStats reports whether a queue runtime exposes native stats snapshots. @group Observability

Example: check native stats support

q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: true

func SupportsPause

func SupportsPause(q any) bool

SupportsPause reports whether a queue runtime supports Pause/Resume. @group Observability

Example: check pause support

q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: true

func SupportsQueueAdmin

func SupportsQueueAdmin(q any) bool

SupportsQueueAdmin reports whether queue admin operations are available. @group Admin

Example: detect admin support

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
fmt.Println(queue.SupportsQueueAdmin(q))
// Output: true

func ValidateDriverJob

func ValidateDriverJob(job Job) error

ValidateDriverJob validates a job value for backend dispatch.

This is an advanced helper intended for driver-module implementations. @group Driver Integration

Types

type BatchBuilder

type BatchBuilder interface {
	Name(name string) BatchBuilder
	OnQueue(queue string) BatchBuilder
	AllowFailures() BatchBuilder
	Progress(fn func(ctx context.Context, st BatchState) error) BatchBuilder
	Then(fn func(ctx context.Context, st BatchState) error) BatchBuilder
	Catch(fn func(ctx context.Context, st BatchState, err error) error) BatchBuilder
	Finally(fn func(ctx context.Context, st BatchState) error) BatchBuilder
	Dispatch(ctx context.Context) (string, error)
}

BatchBuilder is the high-level batch workflow builder. @group Queue

type BatchState

type BatchState = bus.BatchState

BatchState is the persisted view of a batch workflow. @group Queue

type ChainBuilder

type ChainBuilder interface {
	OnQueue(queue string) ChainBuilder
	Catch(fn func(ctx context.Context, st ChainState, err error) error) ChainBuilder
	Finally(fn func(ctx context.Context, st ChainState) error) ChainBuilder
	Dispatch(ctx context.Context) (string, error)
}

ChainBuilder is the high-level chain workflow builder. @group Queue

type ChainState

type ChainState = bus.ChainState

ChainState is the persisted view of a chain workflow. @group Queue

type ChannelObserver

type ChannelObserver struct {
	Events     chan<- Event
	DropIfFull bool
}

ChannelObserver forwards events to a channel. @group Observability

func (ChannelObserver) Observe

func (c ChannelObserver) Observe(event Event)

Observe forwards an event to the configured channel. @group Observability

Example: channel observer

ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-ch
_ = event

type Config

type Config struct {
	Driver   Driver
	Observer Observer
	Logger   Logger

	DefaultQueue string
}

Config configures queue creation for New (and advanced driver/runtime interop). @group Config

type DatabaseConfig

type DatabaseConfig struct {
	DB                       *sql.DB
	DriverName               string
	DSN                      string
	Workers                  int
	PollInterval             time.Duration
	DefaultQueue             string
	AutoMigrate              bool
	ProcessingRecoveryGrace  time.Duration
	ProcessingLeaseNoTimeout time.Duration
	Observer                 Observer
	Logger                   Logger
}

DatabaseConfig configures the SQL-backed database q. @group Config

type DispatchRecord

type DispatchRecord struct {
	Job   Job
	Queue string
}

DispatchRecord captures one dispatch observed by FakeQueue. @group Testing

type DispatchResult

type DispatchResult = bus.DispatchResult

DispatchResult describes a high-level dispatch operation. @group Queue

type Driver

type Driver string

Driver identifies the queue backend. @group Driver

Example: driver values

fmt.Println(queue.DriverNull, queue.DriverSync, queue.DriverWorkerpool, queue.DriverDatabase, queue.DriverRedis, queue.DriverNATS, queue.DriverSQS, queue.DriverRabbitMQ)
const (
	// DriverNull drops dispatched jobs and performs no execution.
	DriverNull Driver = "null"
	// DriverSync runs handlers inline in the caller goroutine.
	DriverSync Driver = "sync"
	// DriverWorkerpool runs handlers on an in-memory workerpool.
	DriverWorkerpool Driver = "workerpool"
	// DriverDatabase selects the SQL-backed queue backend.
	DriverDatabase Driver = "database"
	// DriverRedis selects the Redis (asynq) backend.
	DriverRedis Driver = "redis"
	// DriverNATS selects the NATS backend.
	DriverNATS Driver = "nats"
	// DriverSQS selects the AWS SQS backend.
	DriverSQS Driver = "sqs"
	// DriverRabbitMQ selects the RabbitMQ backend.
	DriverRabbitMQ Driver = "rabbitmq"
)

type DriverJobOptions

type DriverJobOptions struct {
	QueueName string
	Timeout   *time.Duration
	MaxRetry  *int
	Attempt   int
	Backoff   *time.Duration
	Delay     time.Duration
	UniqueTTL time.Duration
}

DriverJobOptions exposes parsed job enqueue metadata for driver-module implementations.

This is an advanced type intended for optional driver integrations. @group Driver Integration

func DriverOptions

func DriverOptions(job Job) DriverJobOptions

DriverOptions returns parsed enqueue metadata for backend dispatch.

This is an advanced helper intended for driver-module implementations. @group Driver Integration

type Event

type Event struct {
	Kind      EventKind
	Driver    Driver
	Queue     string
	JobType   string
	JobKey    string
	Attempt   int
	MaxRetry  int
	Scheduled bool
	Duration  time.Duration
	Err       error
	Time      time.Time
}

Event is emitted through Observer hooks for queue/worker activity. @group Driver Integration

type EventKind

type EventKind string

EventKind identifies a queue runtime event. @group Driver Integration

const (
	// EventEnqueueAccepted indicates a job was accepted for enqueue.
	EventEnqueueAccepted EventKind = "enqueue_accepted"
	// EventEnqueueRejected indicates enqueue failed.
	EventEnqueueRejected EventKind = "enqueue_rejected"
	// EventEnqueueDuplicate indicates enqueue was rejected as duplicate.
	EventEnqueueDuplicate EventKind = "enqueue_duplicate"
	// EventEnqueueCanceled indicates enqueue was canceled by context.
	EventEnqueueCanceled EventKind = "enqueue_canceled"
	// EventProcessStarted indicates a handler started processing.
	EventProcessStarted EventKind = "process_started"
	// EventProcessSucceeded indicates a handler completed successfully.
	EventProcessSucceeded EventKind = "process_succeeded"
	// EventProcessFailed indicates a handler returned an error.
	EventProcessFailed EventKind = "process_failed"
	// EventProcessRetried indicates a failed attempt was requeued for retry.
	EventProcessRetried EventKind = "process_retried"
	// EventProcessArchived indicates a failed attempt reached terminal state.
	EventProcessArchived EventKind = "process_archived"
	// EventQueuePaused indicates queue consumption was paused.
	EventQueuePaused EventKind = "queue_paused"
	// EventQueueResumed indicates queue consumption was resumed.
	EventQueueResumed EventKind = "queue_resumed"
	// EventProcessRecovered indicates a stale in-flight job was requeued for recovery.
	EventProcessRecovered EventKind = "process_recovered"
	// EventRepublishFailed indicates an internal delay/retry republish attempt failed.
	EventRepublishFailed EventKind = "republish_failed"
)

type FailOnError

type FailOnError = bus.FailOnError

FailOnError converts matched errors into fatal (non-retryable) failures. @group Queue

type FakeQueue

type FakeQueue struct {
	// contains filtered or unexported fields
}

FakeQueue is an in-memory queue fake for tests. @group Testing

func NewFake

func NewFake() *FakeQueue

NewFake creates a queue fake that records dispatches and provides assertions. @group Testing

Example: fake queue assertions

fake := queue.NewFake()
_ = fake.Dispatch(
	queue.NewJob("emails:send").
		Payload(map[string]any{"id": 1}).
		OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:send

func (*FakeQueue) AssertCount

func (f *FakeQueue) AssertCount(t testing.TB, expected int)

AssertCount fails when dispatch count is not expected. @group Testing

Example: assert dispatch count

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertCount(t, 1)

func (*FakeQueue) AssertDispatched

func (f *FakeQueue) AssertDispatched(t testing.TB, jobType string)

AssertDispatched fails when jobType was not dispatched. @group Testing

Example: assert job type dispatched

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatched(t, "emails:send")

func (*FakeQueue) AssertDispatchedOn

func (f *FakeQueue) AssertDispatchedOn(t testing.TB, queueName, jobType string)

AssertDispatchedOn fails when jobType was not dispatched on queueName. @group Testing

Example: assert job type dispatched on queue

fake := queue.NewFake()
_ = fake.Dispatch(
	queue.NewJob("emails:send").
		OnQueue("critical"),
)
fake.AssertDispatchedOn(t, "critical", "emails:send")

func (*FakeQueue) AssertDispatchedTimes

func (f *FakeQueue) AssertDispatchedTimes(t testing.TB, jobType string, expected int)

AssertDispatchedTimes fails when jobType dispatch count does not match expected. @group Testing

Example: assert job type dispatched times

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatchedTimes(t, "emails:send", 2)

func (*FakeQueue) AssertNotDispatched

func (f *FakeQueue) AssertNotDispatched(t testing.TB, jobType string)

AssertNotDispatched fails when jobType was dispatched. @group Testing

Example: assert job type not dispatched

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertNotDispatched(t, "emails:cancel")

func (*FakeQueue) AssertNothingDispatched

func (f *FakeQueue) AssertNothingDispatched(t testing.TB)

AssertNothingDispatched fails when any dispatch was recorded. @group Testing

Example: assert nothing dispatched

fake := queue.NewFake()
fake.AssertNothingDispatched(t)

func (*FakeQueue) BusDispatch

func (f *FakeQueue) BusDispatch(ctx context.Context, jobType string, payload []byte, opts busruntime.JobOptions) error

BusDispatch satisfies the internal orchestration runtime adapter. @group Testing

func (*FakeQueue) BusRegister

func (f *FakeQueue) BusRegister(string, busruntime.Handler)

BusRegister satisfies the internal orchestration runtime adapter. @group Testing

func (*FakeQueue) Dispatch

func (f *FakeQueue) Dispatch(job any) error

Dispatch records a typed job payload in-memory using the fake default queue. @group Testing

Example: dispatch to fake queue

fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
_ = err

func (*FakeQueue) DispatchCtx

func (f *FakeQueue) DispatchCtx(ctx context.Context, job any) error

DispatchCtx submits a typed job payload using the provided context. @group Testing

Example: dispatch with context

fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: true

func (*FakeQueue) Driver

func (f *FakeQueue) Driver() Driver

Driver returns the active queue driver. @group Testing

Example: fake driver

fake := queue.NewFake()
driver := fake.Driver()
_ = driver

func (*FakeQueue) Ready

func (f *FakeQueue) Ready(ctx context.Context) error

Ready validates fake queue readiness. @group Testing

Example: fake ready

fake := queue.NewFake()
fmt.Println(fake.Ready(context.Background()) == nil)
// true

func (*FakeQueue) Records

func (f *FakeQueue) Records() []DispatchRecord

Records returns a copy of all dispatch records. @group Testing

Example: read records

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:send

func (*FakeQueue) Register

func (f *FakeQueue) Register(string, Handler)

Register associates a handler with a job type. @group Testing

Example: register no-op on fake

fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })

func (*FakeQueue) Reset

func (f *FakeQueue) Reset()

Reset clears all recorded dispatches. @group Testing

Example: reset records

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0

func (*FakeQueue) Shutdown

func (f *FakeQueue) Shutdown(context.Context) error

Shutdown drains running work and releases resources. @group Testing

Example: shutdown fake queue

fake := queue.NewFake()
err := fake.Shutdown(context.Background())
_ = err

func (*FakeQueue) StartWorkers

func (f *FakeQueue) StartWorkers(context.Context) error

StartWorkers starts worker execution. @group Testing

Example: start fake workers

fake := queue.NewFake()
err := fake.StartWorkers(context.Background())
_ = err

func (*FakeQueue) Workers

func (f *FakeQueue) Workers(int) queueRuntime

Workers sets desired worker concurrency before StartWorkers. @group Testing

Example: set worker count

fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: true

type Handler

type Handler func(ctx context.Context, job Job) error

Handler processes a job. @group Job

Example: handler

handler := func(ctx context.Context, job queue.Job) error { return nil }
_ = handler

type Job

type Job struct {
	Type string
	// contains filtered or unexported fields
}

Job is a pure queue payload value plus enqueue metadata. @group Job

Example: job

job := queue.NewJob("emails:send").
	PayloadJSON(map[string]string{"to": "user@example.com"}).
	OnQueue("critical")
_ = job

func DriverWithAttempt

func DriverWithAttempt(job Job, attempt int) Job

DriverWithAttempt returns a copy of the job with the attempt number set.

This is an advanced helper intended for driver-module implementations. @group Driver Integration

func NewJob

func NewJob(jobType string) Job

NewJob creates a job value with a required job type. @group Job

Example: new job

job := queue.NewJob("emails:send")
_ = job

func (Job) Backoff

func (t Job) Backoff(backoff time.Duration) Job

Backoff sets delay between retries. @group Job

Example: backoff

job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)
_ = job

func (Job) Bind

func (t Job) Bind(dst any) error

Bind unmarshals job payload JSON into dst. @group Job

Example: bind payload

type EmailPayload struct {
	ID int    `json:"id"`
	To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
	ID: 1,
	To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
	return
}
_ = payload.To

func (Job) Delay

func (t Job) Delay(delay time.Duration) Job

Delay defers execution by duration. @group Job

Example: delay

job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)
_ = job

func (Job) OnQueue

func (t Job) OnQueue(name string) Job

OnQueue sets the target queue name. @group Job

Example: on queue

job := queue.NewJob("emails:send").OnQueue("critical")
_ = job

func (Job) Payload

func (t Job) Payload(payload any) Job

Payload sets job payload from common value types. @group Job

Example: payload bytes

jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
_ = jobBytes

Example: payload struct

type Meta struct {
	Nested bool `json:"nested"`
}
type EmailPayload struct {
	ID   int    `json:"id"`
	To   string `json:"to"`
	Meta Meta   `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
	ID:   1,
	To:   "user@example.com",
	Meta: Meta{Nested: true},
})
_ = jobStruct

Example: payload map

jobMap := queue.NewJob("emails:send").Payload(map[string]any{
	"id":  1,
	"to":  "user@example.com",
	"meta": map[string]any{"nested": true},
})
_ = jobMap

func (Job) PayloadBytes

func (t Job) PayloadBytes() []byte

PayloadBytes returns a copy of job payload bytes. @group Job

Example: payload bytes read

job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()
_ = payload

func (Job) PayloadJSON

func (t Job) PayloadJSON(v any) Job

PayloadJSON marshals payload as JSON. @group Job

Example: payload json

job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})
_ = job

func (Job) Retry

func (t Job) Retry(maxRetry int) Job

Retry sets max retry attempts. @group Job

Example: retry

job := queue.NewJob("emails:send").Retry(4)
_ = job

func (Job) Timeout

func (t Job) Timeout(timeout time.Duration) Job

Timeout sets per-job execution timeout. @group Job

Example: timeout

job := queue.NewJob("emails:send").Timeout(10 * time.Second)
_ = job

func (Job) UniqueFor

func (t Job) UniqueFor(ttl time.Duration) Job

UniqueFor enables uniqueness dedupe within the given TTL. @group Job

Example: unique for

job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)
_ = job

type JobSnapshot

type JobSnapshot struct {
	ID            string
	Queue         string
	State         JobState
	Type          string
	Payload       string
	Attempt       int
	MaxRetry      int
	LastError     string
	NextProcessAt *time.Time
	CompletedAt   *time.Time
}

JobSnapshot describes an admin-facing queue job record. @group Admin

type JobState

type JobState string

JobState identifies queue job state used by queue admin APIs. @group Admin

const (
	JobStatePending   JobState = "pending"
	JobStateActive    JobState = "active"
	JobStateScheduled JobState = "scheduled"
	JobStateRetry     JobState = "retry"
	JobStateArchived  JobState = "archived"
	JobStateCompleted JobState = "completed"
)

type ListJobsOptions

type ListJobsOptions struct {
	Queue    string
	State    JobState
	Page     int
	PageSize int
}

ListJobsOptions configures queue admin list jobs calls. @group Admin

func (ListJobsOptions) Normalize

func (o ListJobsOptions) Normalize() ListJobsOptions

Normalize returns a safe options payload with defaults applied. @group Admin

Example: normalize list options

opts := queue.ListJobsOptions{Queue: "", State: "", Page: 0, PageSize: 1000}
normalized := opts.Normalize()
fmt.Println(normalized.Queue, normalized.State, normalized.Page, normalized.PageSize)
// Output: default pending 1 500

type ListJobsResult

type ListJobsResult struct {
	Jobs  []JobSnapshot
	Total int64
}

ListJobsResult contains queue admin job listing output. @group Admin

func ListJobs

func ListJobs(ctx context.Context, q any, opts ListJobsOptions) (ListJobsResult, error)

ListJobs lists jobs for a queue and state when supported. @group Admin

Example: list jobs via helper

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
_, err = queue.ListJobs(context.Background(), q, queue.ListJobsOptions{
	Queue: "default",
	State: queue.JobStatePending,
})
_ = err

type Lock

type Lock = bus.Lock

Lock is used by overlap prevention middleware. @group Queue

type Locker

type Locker = bus.Locker

Locker acquires locks for overlap prevention middleware. @group Queue

type Logger

type Logger interface {
	Debug(args ...interface{})
	Info(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
	Fatal(args ...interface{})
}

Logger is a generic runtime logger contract that drivers may use to surface their internal worker/server lifecycle output.

type Message

type Message = bus.Context

Message is the handler message passed to the high-level queue runtime. It exposes workflow/job metadata and payload binding helpers. @group Queue

type Middleware

type Middleware = bus.Middleware

Middleware applies behavior around high-level workflow/job execution. @group Queue

type MiddlewareFunc

type MiddlewareFunc = bus.MiddlewareFunc

MiddlewareFunc adapts a function to queue middleware. @group Queue

type Next

type Next = bus.Next

Next invokes the next middleware/handler in the queue middleware chain. @group Queue

type NoopLogger

type NoopLogger struct{}

NoopLogger disables driver-managed internal logs when passed through config.

func (NoopLogger) Debug

func (NoopLogger) Debug(...interface{})

func (NoopLogger) Error

func (NoopLogger) Error(...interface{})

func (NoopLogger) Fatal

func (NoopLogger) Fatal(...interface{})

func (NoopLogger) Info

func (NoopLogger) Info(...interface{})

func (NoopLogger) Warn

func (NoopLogger) Warn(...interface{})

type Observer

type Observer interface {
	// Observe handles a queue runtime event.
	// @group Observability
	//
	// Example: observe runtime event
	//
	//	var observer queue.Observer
	//	observer.Observe(queue.Event{
	//		Kind:   queue.EventEnqueueAccepted,
	//		Driver: queue.DriverSync,
	//		Queue:  "default",
	//	})
	Observe(event Event)
}

Observer receives queue runtime events. @group Observability

func MultiObserver

func MultiObserver(observers ...Observer) Observer

MultiObserver fans out events to multiple observers. @group Observability

Example: fan out to multiple observers

events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
	queue.ChannelObserver{Events: events},
	queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1

type ObserverFunc

type ObserverFunc func(event Event)

ObserverFunc adapts a function to an Observer. @group Observability

func (ObserverFunc) Observe

func (f ObserverFunc) Observe(event Event)

Observe calls the wrapped function. @group Observability

Example: observer func logging hook

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
	logger.Info("queue event",
		"kind", event.Kind,
		"driver", event.Driver,
		"queue", event.Queue,
		"job_type", event.JobType,
		"attempt", event.Attempt,
		"max_retry", event.MaxRetry,
		"duration", event.Duration,
		"err", event.Err,
	)
})
observer.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobType: "emails:send",
})

type Option

type Option func(*runtimeOptions)

Option configures the high-level workflow runtime. @group Queue

func WithClock

func WithClock(clock func() time.Time) Option

WithClock overrides the workflow runtime clock. @group Queue

Example: workflow clock

q, err := queue.New(
	queue.Config{Driver: queue.DriverSync},
	queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
	return
}
_ = q

func WithMiddleware

func WithMiddleware(middlewares ...Middleware) Option

WithMiddleware appends queue workflow middleware. @group Queue

Example: middleware

mw := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
	return next(ctx, m)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
	return
}
_ = q

func WithObserver

func WithObserver(observer WorkflowObserver) Option

WithObserver installs a workflow lifecycle observer. @group Queue

Example: workflow observer

observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
	_ = event.Kind
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
	return
}
_ = q

func WithStore

func WithStore(store WorkflowStore) Option

WithStore overrides the workflow orchestration store. @group Queue

Example: workflow store

var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
	return
}
_ = q

func WithWorkers

func WithWorkers(count int) Option

WithWorkers sets desired worker concurrency before StartWorkers. It applies to high-level queue constructors (for example NewWorkerpool/New/NewSync). @group Queue

Example: constructor workers option

q, err := queue.NewWorkerpool(
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}
_ = q

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the high-level user-facing queue API. It composes the queue runtime with the internal orchestration engine. @group Queue

func New

func New(cfg Config, opts ...Option) (*Queue, error)

New creates the high-level Queue API based on Config.Driver. @group Constructors

Example: create a queue and dispatch a workflow-capable job

q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	_ = payload
	return nil
})
_ = q.WithWorkers(1).StartWorkers(context.Background()) // optional; default: runtime.NumCPU() (min 1)
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
	queue.NewJob("emails:send").
		Payload(EmailPayload{ID: 1}).
		OnQueue("default"),
)

func NewNull

func NewNull(opts ...Option) (*Queue, error)

NewNull creates a Queue on the null backend. @group Constructors

Example: null backend

q, err := queue.NewNull()
if err != nil {
	return
}
_ = q

func NewSync

func NewSync(opts ...Option) (*Queue, error)

NewSync creates a Queue on the synchronous in-process backend. @group Constructors

Example: sync backend

q, err := queue.NewSync()
if err != nil {
	return
}
_ = q

func NewWorkerpool

func NewWorkerpool(opts ...Option) (*Queue, error)

NewWorkerpool creates a Queue on the in-process workerpool backend. @group Constructors

Example: workerpool backend

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
_ = q

func (*Queue) Batch

func (r *Queue) Batch(jobs ...Job) BatchBuilder

Batch creates a batch builder for fan-out workflow execution. @group Queue

Example: batch

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Batch(
	queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
	queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())

func (*Queue) CancelJob

func (r *Queue) CancelJob(ctx context.Context, jobID string) error

CancelJob cancels a job via queue admin capability when supported. @group Admin

Example: queue method cancel job

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.CancelJob(context.Background(), "job-id")
_ = err

func (*Queue) Chain

func (r *Queue) Chain(jobs ...Job) ChainBuilder

Chain creates a chain builder for sequential workflow execution. @group Queue

Example: chain

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
q.Register("second", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Chain(
	queue.NewJob("first"),
	queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())

func (*Queue) ClearQueue

func (r *Queue) ClearQueue(ctx context.Context, queueName string) error

ClearQueue clears queue jobs via queue admin capability when supported. @group Admin

Example: queue method clear queue

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.ClearQueue(context.Background(), "default")
_ = err

func (*Queue) DeleteJob

func (r *Queue) DeleteJob(ctx context.Context, queueName, jobID string) error

DeleteJob deletes a job via queue admin capability when supported. @group Admin

Example: queue method delete job

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.DeleteJob(context.Background(), "default", "job-id")
_ = err

func (*Queue) Dispatch

func (r *Queue) Dispatch(job Job) (DispatchResult, error)

Dispatch enqueues a high-level job using context.Background. @group Queue

Example: dispatch

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(job)

func (*Queue) DispatchCtx

func (r *Queue) DispatchCtx(ctx context.Context, job Job) (DispatchResult, error)

DispatchCtx enqueues a high-level job using the provided context. @group Queue

func (*Queue) Driver

func (r *Queue) Driver() Driver

Driver reports the configured backend driver for the underlying queue runtime. @group Queue

Example: driver

q, err := queue.NewSync()
if err != nil {
	return
}
fmt.Println(q.Driver())
// Output: sync

func (*Queue) FindBatch

func (r *Queue) FindBatch(ctx context.Context, batchID string) (BatchState, error)

FindBatch returns current batch state by ID. @group Queue

Example: find batch

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindBatch(context.Background(), batchID)

func (*Queue) FindChain

func (r *Queue) FindChain(ctx context.Context, chainID string) (ChainState, error)

FindChain returns current chain state by ID. @group Queue

Example: find chain

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindChain(context.Background(), chainID)

func (*Queue) History

func (r *Queue) History(ctx context.Context, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)

History returns queue history points via queue admin capability when supported. @group Admin

Example: queue method history

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
points, err := q.History(context.Background(), "default", queue.QueueHistoryHour)
_ = points
_ = err

func (*Queue) ListJobs

func (r *Queue) ListJobs(ctx context.Context, opts ListJobsOptions) (ListJobsResult, error)

ListJobs lists jobs via queue admin capability when supported. @group Admin

Example: queue method list jobs

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
_, err = q.ListJobs(context.Background(), queue.ListJobsOptions{
	Queue: "default",
	State: queue.JobStatePending,
})
_ = err

func (*Queue) Pause

func (r *Queue) Pause(ctx context.Context, queueName string) error

Pause pauses consumption for a queue when supported by the underlying driver. See the README "Queue Backends" table for Pause/Resume support and docs/backend-guarantees.md (Capability Matrix) for broader backend differences. @group Queue

Example: pause queue

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
	_ = q.Pause(context.Background(), "default")
}

func (*Queue) Prune

func (r *Queue) Prune(ctx context.Context, before time.Time) error

Prune deletes old workflow state records. @group Queue

Example: prune workflow state

q, err := queue.NewSync()
if err != nil {
	return
}
_ = q.Prune(context.Background(), time.Now().Add(-24*time.Hour))

func (*Queue) Ready

func (r *Queue) Ready(ctx context.Context) error

Ready validates queue backend readiness for dispatch/worker operation. @group Queue

Example: queue ready

q, err := queue.NewSync()
if err != nil {
	return
}
fmt.Println(q.Ready(context.Background()) == nil)
// true

func (*Queue) Register

func (r *Queue) Register(jobType string, handler func(context.Context, Message) error)

Register binds a handler for a high-level job type. @group Queue

Example: register

q, err := queue.NewSync()
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	_ = payload
	return nil
})

func (*Queue) Resume

func (r *Queue) Resume(ctx context.Context, queueName string) error

Resume resumes consumption for a queue when supported by the underlying driver. @group Queue

Example: resume queue

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
	_ = q.Resume(context.Background(), "default")
}

func (*Queue) RetryJob

func (r *Queue) RetryJob(ctx context.Context, queueName, jobID string) error

RetryJob retries (runs now) a job via queue admin capability when supported. @group Admin

Example: queue method retry job

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
if !queue.SupportsQueueAdmin(q) {
	return
}
err = q.RetryJob(context.Background(), "default", "job-id")
_ = err

func (*Queue) Run

func (r *Queue) Run(ctx context.Context) error

Run starts worker processing, blocks until ctx is canceled, then gracefully shuts down. @group Queue

Example: run until canceled

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := queue.NewWorkerpool()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
go func() {
	time.Sleep(100 * time.Millisecond)
	cancel()
}()
_ = q.Run(ctx)

func (*Queue) Shutdown

func (r *Queue) Shutdown(ctx context.Context) error

Shutdown drains workers and closes underlying resources. @group Queue

Example: shutdown

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
_ = q.StartWorkers(context.Background())
_ = q.Shutdown(context.Background())

func (*Queue) StartWorkers

func (r *Queue) StartWorkers(ctx context.Context) error

StartWorkers starts worker processing. @group Queue

Example: start workers

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
_ = q.StartWorkers(context.Background())

func (*Queue) Stats

func (r *Queue) Stats(ctx context.Context) (StatsSnapshot, error)

Stats returns a normalized snapshot when supported by the underlying driver. @group Queue

Example: stats

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsNativeStats(q) {
	_, _ = q.Stats(context.Background())
}

func (*Queue) WithWorkers

func (r *Queue) WithWorkers(count int) *Queue

WithWorkers sets desired worker concurrency before StartWorkers. @group Queue

Example: workers

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
q.WithWorkers(4) // optional; default: runtime.NumCPU() (min 1)

type QueueAdmin

type QueueAdmin interface {
	ListJobs(ctx context.Context, opts ListJobsOptions) (ListJobsResult, error)
	RetryJob(ctx context.Context, queueName, jobID string) error
	CancelJob(ctx context.Context, jobID string) error
	DeleteJob(ctx context.Context, queueName, jobID string) error
	ClearQueue(ctx context.Context, queueName string) error
	History(ctx context.Context, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
}

QueueAdmin exposes optional queue admin capabilities. @group Admin

type QueueController

type QueueController interface {
	Pause(ctx context.Context, queueName string) error
	Resume(ctx context.Context, queueName string) error
}

QueueController exposes queue pause/resume controls. @group Observability

type QueueCounters

type QueueCounters struct {
	Pending   int64
	Active    int64
	Scheduled int64
	Retry     int64
	Archived  int64
	Processed int64
	Failed    int64
	Paused    int64
	AvgWait   time.Duration
	AvgRun    time.Duration
}

QueueCounters exposes normalized queue counters collected from events. @group Observability

type QueueHistoryPoint

type QueueHistoryPoint struct {
	At        time.Time
	Processed int64
	Failed    int64
}

QueueHistoryPoint represents processed/failed totals at a point in time. @group Admin

func QueueHistory

func QueueHistory(ctx context.Context, q any, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)

QueueHistory returns queue history points when supported. @group Admin

Example: history via helper

q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
	return
}
_, err = queue.QueueHistory(context.Background(), q, "default", queue.QueueHistoryHour)
_ = err

func SinglePointHistory

func SinglePointHistory(snapshot StatsSnapshot, queueName string) []QueueHistoryPoint

SinglePointHistory converts a snapshot into a single current-history point. This helper is intended for driver modules that do not expose historical buckets. @group Admin

Example: single-point history

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 12, Failed: 1},
	},
}
points := queue.SinglePointHistory(snapshot, "default")
fmt.Println(len(points), points[0].Processed, points[0].Failed)
// Output: 1 12 1

func TimelineHistoryFromSnapshot

func TimelineHistoryFromSnapshot(snapshot StatsSnapshot, queueName string, window QueueHistoryWindow) []QueueHistoryPoint

TimelineHistoryFromSnapshot records queue counters and returns windowed points. This is intended for drivers that don't expose native multi-point history. @group Admin

Example: timeline history from snapshots

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 5, Failed: 1},
	},
}
points := queue.TimelineHistoryFromSnapshot(snapshot, "default", queue.QueueHistoryHour)
fmt.Println(len(points) >= 1)
// Output: true

type QueueHistoryProvider

type QueueHistoryProvider interface {
	History(ctx context.Context, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
}

QueueHistoryProvider exposes queue history capability independently from full queue admin support. Drivers like sync/workerpool can provide history without supporting per-job admin operations. @group Admin

type QueueHistoryWindow

type QueueHistoryWindow string

QueueHistoryWindow identifies queue history horizon. @group Admin

const (
	QueueHistoryHour QueueHistoryWindow = "hour"
	QueueHistoryDay  QueueHistoryWindow = "day"
	QueueHistoryWeek QueueHistoryWindow = "week"
)

type QueueThroughput

type QueueThroughput struct {
	Hour ThroughputWindow
	Day  ThroughputWindow
	Week ThroughputWindow
}

QueueThroughput contains rolling throughput windows for a queue. @group Observability

type RateLimit

type RateLimit = bus.RateLimit

RateLimit applies rate limiting before job execution. @group Queue

type RateLimiter

type RateLimiter = bus.RateLimiter

RateLimiter is used by RateLimit middleware. @group Queue

type RetryPolicy

type RetryPolicy = bus.RetryPolicy

RetryPolicy is a pass-through middleware policy helper. @group Queue

type SkipWhen

type SkipWhen = bus.SkipWhen

SkipWhen skips execution when the predicate matches. @group Queue

type StatsCollector

type StatsCollector struct {
	// contains filtered or unexported fields
}

StatsCollector aggregates normalized counters from Observer events. @group Observability

func NewStatsCollector

func NewStatsCollector() *StatsCollector

NewStatsCollector creates an event collector for queue counters. @group Constructors

Example: new stats collector

collector := queue.NewStatsCollector()
_ = collector

func (*StatsCollector) Observe

func (c *StatsCollector) Observe(event Event)

Observe records an event and updates normalized counters. @group Observability

Example: observe event

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})

func (*StatsCollector) Snapshot

func (c *StatsCollector) Snapshot() StatsSnapshot

Snapshot returns a copy of collected counters. @group Observability

Example: snapshot print

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:   queue.EventProcessStarted,
	Driver: queue.DriverSync,
	Queue:  "default",
	JobKey: "job-1",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobKey:  "job-1",
	Duration: 12 * time.Millisecond,
	Time:     time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}

type StatsProvider

type StatsProvider interface {
	Stats(ctx context.Context) (StatsSnapshot, error)
}

StatsProvider exposes driver-native queue snapshots. @group Observability

type StatsSnapshot

type StatsSnapshot struct {
	ByQueue           map[string]QueueCounters
	ThroughputByQueue map[string]QueueThroughput
}

StatsSnapshot is a point-in-time view of counters by queue. @group Observability

func Snapshot

func Snapshot(ctx context.Context, q any, collector *StatsCollector) (StatsSnapshot, error)

Snapshot returns driver-native stats, falling back to collector data. @group Observability

Example: snapshot from queue runtime

q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: true

func (StatsSnapshot) Active

func (s StatsSnapshot) Active(name string) int64

Active returns active count for a queue. @group Observability

Example: active count getter

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Active: 2},
	},
}
fmt.Println(snapshot.Active("default"))
// Output: 2

func (StatsSnapshot) Archived

func (s StatsSnapshot) Archived(name string) int64

Archived returns archived count for a queue. @group Observability

Example: archived count getter

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Archived: 7},
	},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7

func (StatsSnapshot) Failed

func (s StatsSnapshot) Failed(name string) int64

Failed returns failed count for a queue. @group Observability

Example: failed count getter

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Failed: 2},
	},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2

func (StatsSnapshot) Paused

func (s StatsSnapshot) Paused(name string) int64

Paused returns paused count for a queue. @group Observability

Example: paused count getter

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventQueuePaused,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1

func (StatsSnapshot) Pending

func (s StatsSnapshot) Pending(name string) int64

Pending returns pending count for a queue. @group Observability

Example: pending count getter

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Pending: 3},
	},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3

func (StatsSnapshot) Processed

func (s StatsSnapshot) Processed(name string) int64

Processed returns processed count for a queue. @group Observability

Example: processed count getter

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 11},
	},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11

func (StatsSnapshot) Queue

func (s StatsSnapshot) Queue(name string) (QueueCounters, bool)

Queue returns queue counters for a queue name. @group Observability

Example: queue counters getter

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1

func (StatsSnapshot) Queues

func (s StatsSnapshot) Queues() []string

Queues returns sorted queue names present in the snapshot. @group Observability

Example: list queues

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "critical",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 critical

func (StatsSnapshot) RetryCount

func (s StatsSnapshot) RetryCount(name string) int64

RetryCount returns retry count for a queue. @group Observability

Example: retry count getter

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Retry: 1},
	},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1

func (StatsSnapshot) Scheduled

func (s StatsSnapshot) Scheduled(name string) int64

Scheduled returns scheduled count for a queue. @group Observability

Example: scheduled count getter

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Scheduled: 4},
	},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4

func (StatsSnapshot) Throughput

func (s StatsSnapshot) Throughput(name string) (QueueThroughput, bool)

Throughput returns rolling throughput windows for a queue name. @group Observability

Example: throughput getter

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventProcessSucceeded,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}

type ThroughputWindow

type ThroughputWindow struct {
	Processed int64
	Failed    int64
}

ThroughputWindow captures processed vs failed counts in a fixed window. @group Observability

type WithoutOverlapping

type WithoutOverlapping = bus.WithoutOverlapping

WithoutOverlapping prevents concurrent execution for the same key. @group Queue

type WorkerpoolConfig

type WorkerpoolConfig struct {
	Workers           int
	QueueCapacity     int
	DefaultJobTimeout time.Duration
}

WorkerpoolConfig configures the in-memory workerpool q. @group Config

type WorkflowEvent

type WorkflowEvent = bus.Event

WorkflowEvent is emitted by the high-level workflow runtime observer hooks. @group Queue

type WorkflowEventKind

type WorkflowEventKind = bus.EventKind

WorkflowEventKind identifies high-level workflow runtime lifecycle events. @group Queue

type WorkflowObserver

type WorkflowObserver = bus.Observer

WorkflowObserver receives high-level workflow runtime events. @group Queue

type WorkflowObserverFunc

type WorkflowObserverFunc = bus.ObserverFunc

WorkflowObserverFunc adapts a function to a workflow observer. @group Queue

type WorkflowStore

type WorkflowStore = bus.Store

WorkflowStore is the orchestration state store used for chains/batches/callbacks. @group Queue

Directories

Path Synopsis
bus
Package bus provides the workflow orchestration engine used by queue.
Package bus provides the workflow orchestration engine used by queue.
driver
mysqlqueue module
natsqueue module
postgresqueue module
rabbitmqqueue module
redisqueue module
sqlitequeue module
sqlqueuecore module
sqsqueue module
internal
readmecheck
Package readmecheck contains CI-only compile checks for curated manual README snippets.
Package readmecheck contains CI-only compile checks for curated manual README snippets.
runtimehook
Package runtimehook provides a tiny internal hook registry used to avoid import cycles between the public queue package and internal bridge/test helpers introduced during API flattening.
Package runtimehook provides a tiny internal hook registry used to avoid import cycles between the public queue package and internal bridge/test helpers introduced during API flattening.
Package queueconfig contains shared configuration structs used by optional driver modules.
Package queueconfig contains shared configuration structs used by optional driver modules.
Package queuecore contains shared helper logic used by optional driver modules.
Package queuecore contains shared helper logic used by optional driver modules.
Package queuefake provides a queue-first test harness for queue and workflow assertions.
Package queuefake provides a queue-first test harness for queue and workflow assertions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL