Documentation
¶
Index ¶
- type BaseJob
- type DashboardTracer
- type Dispatcher
- type FailedJob
- type FailedJobManager
- type GenericJob
- func (j *GenericJob[T]) Data() T
- func (j *GenericJob[T]) JobType() string
- func (j *GenericJob[T]) MaxRetries() int
- func (j *GenericJob[T]) Queue() string
- func (j *GenericJob[T]) Timeout() time.Duration
- func (j *GenericJob[T]) WithMaxRetries(n int) *GenericJob[T]
- func (j *GenericJob[T]) WithQueue(name string) *GenericJob[T]
- func (j *GenericJob[T]) WithTimeout(d time.Duration) *GenericJob[T]
- type Job
- type Queue
- type RedisDispatcher
- func (d *RedisDispatcher) Dispatch(ctx context.Context, job Job, name string) error
- func (d *RedisDispatcher) DispatchAt(ctx context.Context, job Job, name string, at time.Time) error
- func (d *RedisDispatcher) DispatchIn(ctx context.Context, job Job, name string, delay time.Duration) error
- func (d *RedisDispatcher) DispatchUnique(ctx context.Context, job Job, name string, ttl time.Duration) error
- type RedisFailedJobsStore
- func (s *RedisFailedJobsStore) All(ctx context.Context) ([]FailedJob, error)
- func (s *RedisFailedJobsStore) Delete(ctx context.Context, id string) error
- func (s *RedisFailedJobsStore) Find(ctx context.Context, id string) (FailedJob, error)
- func (s *RedisFailedJobsStore) Purge(ctx context.Context) error
- func (s *RedisFailedJobsStore) Retry(ctx context.Context, id string) error
- func (s *RedisFailedJobsStore) Store(ctx context.Context, job FailedJob) error
- type RedisQueue
- func (q *RedisQueue) Enqueue(ctx context.Context, job Job) error
- func (q *RedisQueue) EnqueueAt(ctx context.Context, job Job, at time.Time) error
- func (q *RedisQueue) EnqueueIn(ctx context.Context, job Job, delay time.Duration) error
- func (q *RedisQueue) PromoteReady(ctx context.Context) error
- func (q *RedisQueue) Purge(ctx context.Context, queue string) error
- func (q *RedisQueue) Size(ctx context.Context, queue string) (int64, error)
- func (q *RedisQueue) Start(ctx context.Context) error
- func (q *RedisQueue) Stop(ctx context.Context) error
- func (q *RedisQueue) WithLogger(logger *slog.Logger) *RedisQueue
- type RedisWorker
- func (w *RedisWorker) Metrics() WorkerMetrics
- func (w *RedisWorker) Register(name string, factory func() Job)
- func (w *RedisWorker) Start(ctx context.Context) error
- func (w *RedisWorker) Stop(ctx context.Context) error
- func (w *RedisWorker) WithConcurrency(n int) *RedisWorker
- func (w *RedisWorker) WithDashboard(dash DashboardTracer) *RedisWorker
- func (w *RedisWorker) WithEvents(emitter *event.Emitter) *RedisWorker
- type ScheduledJob
- type Scheduler
- func (s *Scheduler) List() []ScheduledJob
- func (s *Scheduler) Register(name, spec string, fn func()) (cron.EntryID, error)
- func (s *Scheduler) Schedule(spec string, cmd func()) (cron.EntryID, error)
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Stop(ctx context.Context) error
- type StreamsDriver
- type TypedJob
- type Worker
- type WorkerMetrics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseJob ¶
type BaseJob struct{}
BaseJob is a helper that provides production-safe defaults.
func (*BaseJob) MaxRetries ¶
MaxRetries defaults to three attempts.
type DashboardTracer ¶
DashboardTracer is the interface for tracking job progress in a telemetry dashboard.
type Dispatcher ¶
type Dispatcher = RedisDispatcher
Dispatcher is the compatibility alias for the Redis-backed dispatcher.
func NewDispatcher ¶
func NewDispatcher(client redis.UniversalClient, prefix string) *Dispatcher
NewDispatcher creates a Redis-backed job dispatcher.
type FailedJob ¶
type FailedJob struct {
ID string `json:"id"`
JobType string `json:"job_type"`
Queue string `json:"queue"`
Payload string `json:"payload"`
Error string `json:"error"`
StackTrace string `json:"stack_trace,omitempty"`
FailedAt time.Time `json:"failed_at"`
Attempts int `json:"attempts"`
MaxRetries int `json:"max_retries"`
OriginalEnqueuedAt time.Time `json:"original_enqueued_at"`
}
FailedJob represents a job that exhausted all retries.
type FailedJobManager ¶
type FailedJobManager struct {
// contains filtered or unexported fields
}
FailedJobManager provides the legacy failed job API on top of the Redis store.
func NewFailedJobManager ¶
func NewFailedJobManager(client redis.UniversalClient, prefix string) *FailedJobManager
NewFailedJobManager creates a new failed job manager.
type GenericJob ¶
type GenericJob[T any] struct { BaseJob Payload T JobName string JobQueue string JobMaxRetry int JobTimeoutD time.Duration }
GenericJob is a helper that wraps a payload and common job metadata.
func NewJob ¶
func NewJob[T any](name string, payload T) *GenericJob[T]
NewJob creates a new generic job with the provided name and payload.
func (*GenericJob[T]) Data ¶
func (j *GenericJob[T]) Data() T
PayloadData returns the typed payload.
func (*GenericJob[T]) JobType ¶
func (j *GenericJob[T]) JobType() string
JobType returns the registered name of the job.
func (*GenericJob[T]) MaxRetries ¶
func (j *GenericJob[T]) MaxRetries() int
MaxRetries returns the max retry attempts.
func (*GenericJob[T]) Timeout ¶
func (j *GenericJob[T]) Timeout() time.Duration
Timeout returns the execution timeout.
func (*GenericJob[T]) WithMaxRetries ¶
func (j *GenericJob[T]) WithMaxRetries(n int) *GenericJob[T]
WithMaxRetries sets the maximum retry attempts.
func (*GenericJob[T]) WithQueue ¶
func (j *GenericJob[T]) WithQueue(name string) *GenericJob[T]
WithQueue sets the queue name for the job.
func (*GenericJob[T]) WithTimeout ¶
func (j *GenericJob[T]) WithTimeout(d time.Duration) *GenericJob[T]
WithTimeout sets the execution timeout.
type Job ¶
type Job interface {
// Handle contains the actual job logic.
Handle(ctx context.Context) error
// OnFailure is invoked when the job permanently fails.
OnFailure(ctx context.Context, err error)
// MaxRetries returns the maximum number of attempts before failing.
MaxRetries() int
// Queue returns the queue name to push the job to.
Queue() string
// Timeout returns the maximum execution time for a single attempt.
Timeout() time.Duration
}
Job is the interface that background jobs must implement.
type Queue ¶
type Queue interface {
// Enqueue stores a job for immediate execution.
Enqueue(ctx context.Context, job Job) error
// EnqueueIn stores a job for execution after the provided delay.
EnqueueIn(ctx context.Context, job Job, delay time.Duration) error
// EnqueueAt stores a job for execution at the provided time.
EnqueueAt(ctx context.Context, job Job, at time.Time) error
// Size reports the number of ready jobs for a queue.
Size(ctx context.Context, queue string) (int64, error)
// Purge removes all pending jobs for a queue.
Purge(ctx context.Context, queue string) error
}
Queue stores background jobs for later execution.
type RedisDispatcher ¶
type RedisDispatcher struct {
// contains filtered or unexported fields
}
RedisDispatcher pushes jobs onto a Redis-backed queue.
func NewRedisDispatcher ¶
func NewRedisDispatcher(client redis.UniversalClient, prefix string) *RedisDispatcher
NewRedisDispatcher creates a new Redis-backed dispatcher.
func (*RedisDispatcher) DispatchAt ¶
DispatchAt pushes a job to run at a specific time.
func (*RedisDispatcher) DispatchIn ¶
func (d *RedisDispatcher) DispatchIn(ctx context.Context, job Job, name string, delay time.Duration) error
DispatchIn pushes a job to the delayed queue.
func (*RedisDispatcher) DispatchUnique ¶
func (d *RedisDispatcher) DispatchUnique(ctx context.Context, job Job, name string, ttl time.Duration) error
DispatchUnique pushes a job only when the uniqueness lock is available.
type RedisFailedJobsStore ¶
type RedisFailedJobsStore struct {
// contains filtered or unexported fields
}
RedisFailedJobsStore persists failed jobs in Redis.
func NewRedisFailedJobsStore ¶
func NewRedisFailedJobsStore(client redis.UniversalClient, prefix string, queue *RedisQueue) *RedisFailedJobsStore
NewRedisFailedJobsStore creates a failed job store.
func (*RedisFailedJobsStore) All ¶
func (s *RedisFailedJobsStore) All(ctx context.Context) ([]FailedJob, error)
All returns all failed jobs.
func (*RedisFailedJobsStore) Delete ¶
func (s *RedisFailedJobsStore) Delete(ctx context.Context, id string) error
Delete removes a failed job entry.
func (*RedisFailedJobsStore) Purge ¶
func (s *RedisFailedJobsStore) Purge(ctx context.Context) error
Purge removes all failed job entries.
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue is a Redis Streams-backed persistent queue.
func NewRedisQueue ¶
func NewRedisQueue(client redis.UniversalClient, prefix string, locker cache.Locker) *RedisQueue
NewRedisQueue creates a new Redis-backed queue.
func (*RedisQueue) Enqueue ¶
func (q *RedisQueue) Enqueue(ctx context.Context, job Job) error
Enqueue stores a job for immediate execution.
func (*RedisQueue) PromoteReady ¶
func (q *RedisQueue) PromoteReady(ctx context.Context) error
PromoteReady moves jobs from the delayed set to the ready stream once their scheduled time arrives.
func (*RedisQueue) Purge ¶
func (q *RedisQueue) Purge(ctx context.Context, queue string) error
Purge removes all pending jobs for the provided queue.
func (*RedisQueue) Start ¶
func (q *RedisQueue) Start(ctx context.Context) error
Start starts the delayed job promoter.
func (*RedisQueue) Stop ¶
func (q *RedisQueue) Stop(ctx context.Context) error
Stop stops the delayed job promoter.
func (*RedisQueue) WithLogger ¶
func (q *RedisQueue) WithLogger(logger *slog.Logger) *RedisQueue
WithLogger sets the logger used by the queue.
type RedisWorker ¶
type RedisWorker struct {
// contains filtered or unexported fields
}
RedisWorker processes jobs from Redis Streams consumer groups.
func NewRedisWorker ¶
func NewRedisWorker(client redis.UniversalClient, prefix string, queues []string, logger *slog.Logger) *RedisWorker
NewRedisWorker creates a new Redis worker.
func (*RedisWorker) Metrics ¶
func (w *RedisWorker) Metrics() WorkerMetrics
Metrics returns the current worker counters.
func (*RedisWorker) Register ¶
func (w *RedisWorker) Register(name string, factory func() Job)
Register registers a named job factory.
func (*RedisWorker) Start ¶
func (w *RedisWorker) Start(ctx context.Context) error
Start begins polling Redis for new jobs.
func (*RedisWorker) Stop ¶
func (w *RedisWorker) Stop(ctx context.Context) error
Stop stops fetching new jobs and waits for in-flight jobs to finish. It respects the provided context for timeout protection.
func (*RedisWorker) WithConcurrency ¶
func (w *RedisWorker) WithConcurrency(n int) *RedisWorker
WithConcurrency sets the number of worker goroutines.
func (*RedisWorker) WithDashboard ¶
func (w *RedisWorker) WithDashboard(dash DashboardTracer) *RedisWorker
WithDashboard sets the telemetry tracer for the worker.
func (*RedisWorker) WithEvents ¶
func (w *RedisWorker) WithEvents(emitter *event.Emitter) *RedisWorker
WithEvents sets the event emitter for the worker.
type ScheduledJob ¶
type ScheduledJob struct {
ID cron.EntryID
Name string
Spec string
NextRun time.Time
PrevRun time.Time
}
ScheduledJob holds metadata about a registered cron job.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler runs named cron jobs with optional Redis distributed locking to prevent duplicate runs across multiple application instances. It also handles moving delayed jobs to ready queues.
func NewScheduler ¶
func NewScheduler(client redis.UniversalClient, prefix string, queue *RedisQueue) *Scheduler
NewScheduler creates a new scheduler.
func (*Scheduler) List ¶
func (s *Scheduler) List() []ScheduledJob
List returns metadata for all registered named jobs including their next run time.
func (*Scheduler) Register ¶
Register adds a named cron job. If a Redis client is configured, a distributed lock is acquired before each run to prevent concurrent execution across instances.
type StreamsDriver ¶
type StreamsDriver = RedisDispatcher
StreamsDriver is the compatibility alias for the Redis-backed dispatcher.
func NewStreamsDriver ¶
func NewStreamsDriver(client redis.UniversalClient, prefix string) *StreamsDriver
NewStreamsDriver creates a Redis Streams dispatcher.
type Worker ¶
type Worker = RedisWorker
Worker is the compatibility alias for the Redis-backed worker.