queue

package
v0.0.0-...-41a30da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseJob

type BaseJob struct{}

BaseJob is a helper that provides production-safe defaults.

func (*BaseJob) MaxRetries

func (j *BaseJob) MaxRetries() int

MaxRetries defaults to three attempts.

func (*BaseJob) OnFailure

func (j *BaseJob) OnFailure(ctx context.Context, err error)

OnFailure is a no-op by default.

func (*BaseJob) Queue

func (j *BaseJob) Queue() string

Queue defaults to the "default" queue.

func (*BaseJob) Timeout

func (j *BaseJob) Timeout() time.Duration

Timeout defaults to 30 seconds.

type DashboardTracer

type DashboardTracer interface {
	TrackJob(name, status string, data any, duration time.Duration)
}

DashboardTracer is the interface for tracking job progress in a telemetry dashboard.

type Dispatcher

type Dispatcher = RedisDispatcher

Dispatcher is the compatibility alias for the Redis-backed dispatcher.

func NewDispatcher

func NewDispatcher(client redis.UniversalClient, prefix string) *Dispatcher

NewDispatcher creates a Redis-backed job dispatcher.

type FailedJob

type FailedJob struct {
	ID                 string    `json:"id"`
	JobType            string    `json:"job_type"`
	Queue              string    `json:"queue"`
	Payload            string    `json:"payload"`
	Error              string    `json:"error"`
	StackTrace         string    `json:"stack_trace,omitempty"`
	FailedAt           time.Time `json:"failed_at"`
	Attempts           int       `json:"attempts"`
	MaxRetries         int       `json:"max_retries"`
	OriginalEnqueuedAt time.Time `json:"original_enqueued_at"`
}

FailedJob represents a job that exhausted all retries.

type FailedJobManager

type FailedJobManager struct {
	// contains filtered or unexported fields
}

FailedJobManager provides the legacy failed job API on top of the Redis store.

func NewFailedJobManager

func NewFailedJobManager(client redis.UniversalClient, prefix string) *FailedJobManager

NewFailedJobManager creates a new failed job manager.

func (*FailedJobManager) Flush

func (m *FailedJobManager) Flush(ctx context.Context) error

Flush deletes all failed jobs.

func (*FailedJobManager) Retry

func (m *FailedJobManager) Retry(ctx context.Context) error

Retry re-enqueues all failed jobs.

type GenericJob

type GenericJob[T any] struct {
	BaseJob
	Payload     T
	JobName     string
	JobQueue    string
	JobMaxRetry int
	JobTimeoutD time.Duration
}

GenericJob is a helper that wraps a payload and common job metadata.

func NewJob

func NewJob[T any](name string, payload T) *GenericJob[T]

NewJob creates a new generic job with the provided name and payload.

func (*GenericJob[T]) Data

func (j *GenericJob[T]) Data() T

PayloadData returns the typed payload.

func (*GenericJob[T]) JobType

func (j *GenericJob[T]) JobType() string

JobType returns the registered name of the job.

func (*GenericJob[T]) MaxRetries

func (j *GenericJob[T]) MaxRetries() int

MaxRetries returns the max retry attempts.

func (*GenericJob[T]) Queue

func (j *GenericJob[T]) Queue() string

Queue returns the queue name.

func (*GenericJob[T]) Timeout

func (j *GenericJob[T]) Timeout() time.Duration

Timeout returns the execution timeout.

func (*GenericJob[T]) WithMaxRetries

func (j *GenericJob[T]) WithMaxRetries(n int) *GenericJob[T]

WithMaxRetries sets the maximum retry attempts.

func (*GenericJob[T]) WithQueue

func (j *GenericJob[T]) WithQueue(name string) *GenericJob[T]

WithQueue sets the queue name for the job.

func (*GenericJob[T]) WithTimeout

func (j *GenericJob[T]) WithTimeout(d time.Duration) *GenericJob[T]

WithTimeout sets the execution timeout.

type Job

type Job interface {
	// Handle contains the actual job logic.
	Handle(ctx context.Context) error
	// OnFailure is invoked when the job permanently fails.
	OnFailure(ctx context.Context, err error)
	// MaxRetries returns the maximum number of attempts before failing.
	MaxRetries() int
	// Queue returns the queue name to push the job to.
	Queue() string
	// Timeout returns the maximum execution time for a single attempt.
	Timeout() time.Duration
}

Job is the interface that background jobs must implement.

type Queue

type Queue interface {
	// Enqueue stores a job for immediate execution.
	Enqueue(ctx context.Context, job Job) error
	// EnqueueIn stores a job for execution after the provided delay.
	EnqueueIn(ctx context.Context, job Job, delay time.Duration) error
	// EnqueueAt stores a job for execution at the provided time.
	EnqueueAt(ctx context.Context, job Job, at time.Time) error
	// Size reports the number of ready jobs for a queue.
	Size(ctx context.Context, queue string) (int64, error)
	// Purge removes all pending jobs for a queue.
	Purge(ctx context.Context, queue string) error
}

Queue stores background jobs for later execution.

type RedisDispatcher

type RedisDispatcher struct {
	// contains filtered or unexported fields
}

RedisDispatcher pushes jobs onto a Redis-backed queue.

func NewRedisDispatcher

func NewRedisDispatcher(client redis.UniversalClient, prefix string) *RedisDispatcher

NewRedisDispatcher creates a new Redis-backed dispatcher.

func (*RedisDispatcher) Dispatch

func (d *RedisDispatcher) Dispatch(ctx context.Context, job Job, name string) error

Dispatch pushes a job for immediate processing.

func (*RedisDispatcher) DispatchAt

func (d *RedisDispatcher) DispatchAt(ctx context.Context, job Job, name string, at time.Time) error

DispatchAt pushes a job to run at a specific time.

func (*RedisDispatcher) DispatchIn

func (d *RedisDispatcher) DispatchIn(ctx context.Context, job Job, name string, delay time.Duration) error

DispatchIn pushes a job to the delayed queue.

func (*RedisDispatcher) DispatchUnique

func (d *RedisDispatcher) DispatchUnique(ctx context.Context, job Job, name string, ttl time.Duration) error

DispatchUnique pushes a job only when the uniqueness lock is available.

type RedisFailedJobsStore

type RedisFailedJobsStore struct {
	// contains filtered or unexported fields
}

RedisFailedJobsStore persists failed jobs in Redis.

func NewRedisFailedJobsStore

func NewRedisFailedJobsStore(client redis.UniversalClient, prefix string, queue *RedisQueue) *RedisFailedJobsStore

NewRedisFailedJobsStore creates a failed job store.

func (*RedisFailedJobsStore) All

All returns all failed jobs.

func (*RedisFailedJobsStore) Delete

func (s *RedisFailedJobsStore) Delete(ctx context.Context, id string) error

Delete removes a failed job entry.

func (*RedisFailedJobsStore) Find

Find returns a failed job by ID.

func (*RedisFailedJobsStore) Purge

func (s *RedisFailedJobsStore) Purge(ctx context.Context) error

Purge removes all failed job entries.

func (*RedisFailedJobsStore) Retry

func (s *RedisFailedJobsStore) Retry(ctx context.Context, id string) error

Retry re-enqueues a failed job and removes it from the failed set.

func (*RedisFailedJobsStore) Store

func (s *RedisFailedJobsStore) Store(ctx context.Context, job FailedJob) error

Store persists a failed job entry.

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue is a Redis Streams-backed persistent queue.

func NewRedisQueue

func NewRedisQueue(client redis.UniversalClient, prefix string, locker cache.Locker) *RedisQueue

NewRedisQueue creates a new Redis-backed queue.

func (*RedisQueue) Enqueue

func (q *RedisQueue) Enqueue(ctx context.Context, job Job) error

Enqueue stores a job for immediate execution.

func (*RedisQueue) EnqueueAt

func (q *RedisQueue) EnqueueAt(ctx context.Context, job Job, at time.Time) error

EnqueueAt stores a job for execution at a specific time.

func (*RedisQueue) EnqueueIn

func (q *RedisQueue) EnqueueIn(ctx context.Context, job Job, delay time.Duration) error

EnqueueIn stores a job for later execution.

func (*RedisQueue) PromoteReady

func (q *RedisQueue) PromoteReady(ctx context.Context) error

PromoteReady moves jobs from the delayed set to the ready stream once their scheduled time arrives.

func (*RedisQueue) Purge

func (q *RedisQueue) Purge(ctx context.Context, queue string) error

Purge removes all pending jobs for the provided queue.

func (*RedisQueue) Size

func (q *RedisQueue) Size(ctx context.Context, queue string) (int64, error)

Size reports the number of ready jobs in a stream.

func (*RedisQueue) Start

func (q *RedisQueue) Start(ctx context.Context) error

Start starts the delayed job promoter.

func (*RedisQueue) Stop

func (q *RedisQueue) Stop(ctx context.Context) error

Stop stops the delayed job promoter.

func (*RedisQueue) WithLogger

func (q *RedisQueue) WithLogger(logger *slog.Logger) *RedisQueue

WithLogger sets the logger used by the queue.

type RedisWorker

type RedisWorker struct {
	// contains filtered or unexported fields
}

RedisWorker processes jobs from Redis Streams consumer groups.

func NewRedisWorker

func NewRedisWorker(client redis.UniversalClient, prefix string, queues []string, logger *slog.Logger) *RedisWorker

NewRedisWorker creates a new Redis worker.

func (*RedisWorker) Metrics

func (w *RedisWorker) Metrics() WorkerMetrics

Metrics returns the current worker counters.

func (*RedisWorker) Register

func (w *RedisWorker) Register(name string, factory func() Job)

Register registers a named job factory.

func (*RedisWorker) Start

func (w *RedisWorker) Start(ctx context.Context) error

Start begins polling Redis for new jobs.

func (*RedisWorker) Stop

func (w *RedisWorker) Stop(ctx context.Context) error

Stop stops fetching new jobs and waits for in-flight jobs to finish. It respects the provided context for timeout protection.

func (*RedisWorker) WithConcurrency

func (w *RedisWorker) WithConcurrency(n int) *RedisWorker

WithConcurrency sets the number of worker goroutines.

func (*RedisWorker) WithDashboard

func (w *RedisWorker) WithDashboard(dash DashboardTracer) *RedisWorker

WithDashboard sets the telemetry tracer for the worker.

func (*RedisWorker) WithEvents

func (w *RedisWorker) WithEvents(emitter *event.Emitter) *RedisWorker

WithEvents sets the event emitter for the worker.

type ScheduledJob

type ScheduledJob struct {
	ID      cron.EntryID
	Name    string
	Spec    string
	NextRun time.Time
	PrevRun time.Time
}

ScheduledJob holds metadata about a registered cron job.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler runs named cron jobs with optional Redis distributed locking to prevent duplicate runs across multiple application instances. It also handles moving delayed jobs to ready queues.

func NewScheduler

func NewScheduler(client redis.UniversalClient, prefix string, queue *RedisQueue) *Scheduler

NewScheduler creates a new scheduler.

func (*Scheduler) List

func (s *Scheduler) List() []ScheduledJob

List returns metadata for all registered named jobs including their next run time.

func (*Scheduler) Register

func (s *Scheduler) Register(name, spec string, fn func()) (cron.EntryID, error)

Register adds a named cron job. If a Redis client is configured, a distributed lock is acquired before each run to prevent concurrent execution across instances.

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(spec string, cmd func()) (cron.EntryID, error)

Schedule adds an anonymous cron job (legacy API, no lock protection).

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start begins the cron scheduler and delayed job polling.

func (*Scheduler) Stop

func (s *Scheduler) Stop(ctx context.Context) error

Stop stops the scheduler gracefully.

type StreamsDriver

type StreamsDriver = RedisDispatcher

StreamsDriver is the compatibility alias for the Redis-backed dispatcher.

func NewStreamsDriver

func NewStreamsDriver(client redis.UniversalClient, prefix string) *StreamsDriver

NewStreamsDriver creates a Redis Streams dispatcher.

type TypedJob

type TypedJob[T any] interface {
	Job
	Data() T
}

TypedJob is a generic interface for background jobs with typed payloads.

type Worker

type Worker = RedisWorker

Worker is the compatibility alias for the Redis-backed worker.

func NewWorker

func NewWorker(client redis.UniversalClient, prefix string, queues []string, concurrency int, logger *slog.Logger) *Worker

NewWorker creates a Redis-backed worker.

type WorkerMetrics

type WorkerMetrics struct {
	JobsProcessed int64 `json:"jobs_processed"`
	JobsFailed    int64 `json:"jobs_failed"`
	JobsRetried   int64 `json:"jobs_retried"`
	InFlight      int64 `json:"in_flight"`
}

WorkerMetrics exposes queue worker counters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL