Documentation
¶
Overview ¶
Package winter is a distributed task queue backed by Redis with generics-first type safety, workflow primitives, and a standalone gRPC server mode.
Index ¶
- Variables
- func Cancel(reason string) error
- func Chain(c *Client, ctx context.Context, tasks []Task, opts ...Option) (string, error)
- func Chord(c *Client, ctx context.Context, headers []Task, callback Task, opts ...Option) (string, error)
- func Group(c *Client, ctx context.Context, tasks []Task, opts ...Option) (string, error)
- func Handle[T Task](s *Server, h Handler[T])
- func HandleFunc[T Task](s *Server, fn func(ctx context.Context, job *Job[T]) error)
- func IsCancel(err error) (string, bool)
- func IsReschedule(err error) (time.Duration, bool)
- func Reschedule(d time.Duration) error
- func ValidTransition(from, to JobState) bool
- func WaitForResult[R any](c *Client, ctx context.Context, jobID string) (*R, error)
- type BackoffStrategy
- type Client
- type CronEntry
- type DeadJob
- type Handler
- type HandlerFn
- type HandlerFunc
- type HookFunc
- type Inspector
- func (i *Inspector) Close() error
- func (i *Inspector) Dead(ctx context.Context, queueName string, offset, limit int64) ([]*DeadJob, error)
- func (i *Inspector) DeadCount(ctx context.Context, queueName string) (int64, error)
- func (i *Inspector) JobResult(ctx context.Context, jobID string) ([]byte, error)
- func (i *Inspector) Pause(ctx context.Context, queueName string) error
- func (i *Inspector) PeekDead(ctx context.Context, queueName string) (*DeadJob, error)
- func (i *Inspector) PurgeDead(ctx context.Context, queueName string) (int64, error)
- func (i *Inspector) Queue(ctx context.Context, name string) (*QueueInfo, error)
- func (i *Inspector) Resume(ctx context.Context, queueName string) error
- func (i *Inspector) Retry(ctx context.Context, queueName string, jobID string) error
- type Job
- type JobEvent
- type JobState
- type Middleware
- type Option
- type QueueInfo
- type QueueWeight
- type Rate
- type RedisConfig
- type Server
- func (s *Server) OnComplete(fn HookFunc)
- func (s *Server) OnDead(fn HookFunc)
- func (s *Server) OnError(fn HookFunc)
- func (s *Server) OnStart(fn HookFunc)
- func (s *Server) SetRateLimit(kind string, max int, per time.Duration)
- func (s *Server) Start() error
- func (s *Server) Stop()
- func (s *Server) Use(mw ...Middleware)
- type ServerConfig
- type Task
- type TaskOptions
- type TaskWithOptions
Constants ¶
This section is empty.
Variables ¶
var ErrDuplicate = queue.ErrDuplicate
ErrDuplicate is returned when enqueuing a job that matches an existing unique constraint.
var ErrSkipRetry = errors.New("winter: skip retry")
ErrSkipRetry is a sentinel error that causes a failed job to go directly to the dead letter queue without retrying. Wrap it with fmt.Errorf to add context.
Functions ¶
func Cancel ¶
Cancel returns an error that instructs the server to permanently remove the job without retrying.
func Chain ¶
Chain enqueues tasks sequentially. Each task is enqueued only after the previous one completes. If any task fails and exhausts retries, the chain stops.
func Chord ¶
func Chord(c *Client, ctx context.Context, headers []Task, callback Task, opts ...Option) (string, error)
Chord enqueues all header tasks for parallel execution and fires the callback task only after every header completes. If any header fails, the callback does not fire.
func Group ¶
Group enqueues all tasks immediately for parallel execution and tracks completion. Returns the workflow ID for status checks.
func Handle ¶
Handle registers a typed handler for its task kind on the server. If the task implements TaskWithOptions and specifies a RateLimit, the server enforces a per-kind token bucket before dispatching.
func HandleFunc ¶
HandleFunc registers a function as a handler for its task kind.
func IsReschedule ¶
IsReschedule reports whether err is a reschedule sentinel and returns its delay.
func Reschedule ¶
Reschedule returns an error that instructs the server to put the job back in the delayed set and try again after the given duration.
func ValidTransition ¶
ValidTransition reports whether transitioning from one state to another is allowed.
func WaitForResult ¶
WaitForResult polls Redis until a result is available for the given job ID, then deserializes it into type R. Returns context.DeadlineExceeded if the context expires before a result appears. Poll interval defaults to 250ms.
Types ¶
type BackoffStrategy ¶
BackoffStrategy computes the delay before the next retry attempt.
func Exponential ¶
func Exponential(base time.Duration) BackoffStrategy
Exponential returns a backoff strategy that doubles the delay on each attempt.
func Fixed ¶
func Fixed(d time.Duration) BackoffStrategy
Fixed returns a backoff strategy that always waits the same duration between retries.
func Linear ¶
func Linear(step time.Duration) BackoffStrategy
Linear returns a backoff strategy that increases the delay by a fixed step each attempt.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps a Redis connection and provides the Enqueue API.
func ClientFromContext ¶
ClientFromContext extracts the Client from a worker's context, allowing handlers to enqueue downstream jobs.
func NewClient ¶
func NewClient(cfg RedisConfig) (*Client, error)
NewClient connects to Redis and returns a client ready to enqueue jobs.
func NewClientFromRedis ¶
func NewClientFromRedis(rdb redis.UniversalClient) *Client
NewClientFromRedis creates a client from an existing Redis connection.
func (*Client) Redis ¶
func (c *Client) Redis() redis.UniversalClient
Redis returns the underlying Redis client for advanced use cases.
type DeadJob ¶
type DeadJob struct {
ID string
Kind string
Queue string
Payload []byte
Attempt int
MaxRetries int
LastError string
CreatedAt time.Time
CompletedAt time.Time
}
DeadJob is a dead letter job with its metadata exposed for inspection.
type HandlerFn ¶
HandlerFn is the untyped handler signature used internally by the middleware chain.
type HandlerFunc ¶
HandlerFunc is an adapter to allow ordinary functions to serve as handlers.
type Inspector ¶
type Inspector struct {
// contains filtered or unexported fields
}
Inspector provides read and management operations for queues and dead letter jobs without requiring a running server.
func NewInspector ¶
func NewInspector(cfg RedisConfig) (*Inspector, error)
NewInspector connects to Redis and returns an inspector.
func NewInspectorFromRedis ¶
func NewInspectorFromRedis(rdb redis.UniversalClient) *Inspector
NewInspectorFromRedis creates an inspector from an existing Redis connection.
func (*Inspector) Dead ¶
func (i *Inspector) Dead(ctx context.Context, queueName string, offset, limit int64) ([]*DeadJob, error)
Dead returns a paginated list of dead letter jobs for the named queue.
func (*Inspector) JobResult ¶
JobResult returns the stored result bytes for a completed job, or nil if no result was stored.
func (*Inspector) PurgeDead ¶
PurgeDead removes all jobs from the dead letter queue and deletes their data.
type Job ¶
type Job[T Task] struct { ID string `json:"id"` Args T `json:"args"` Kind string `json:"kind"` Queue string `json:"queue"` Priority int `json:"priority"` State JobState `json:"state"` Attempt int `json:"attempt"` MaxRetries int `json:"max_retries"` CreatedAt time.Time `json:"created_at"` ScheduledAt time.Time `json:"scheduled_at,omitempty"` StartedAt time.Time `json:"started_at,omitempty"` CompletedAt time.Time `json:"completed_at,omitempty"` LastError string `json:"last_error,omitempty"` // contains filtered or unexported fields }
Job is a type-safe wrapper around a queued job and its deserialized payload.
type Middleware ¶
Middleware wraps a handler to add cross-cutting behavior like panic recovery or logging.
func Recover ¶
func Recover() Middleware
Recover returns a middleware that catches panics and converts them to errors.
type Option ¶
type Option func(*insertOpts)
Option configures how a job is enqueued.
type QueueInfo ¶
type QueueInfo struct {
Name string
Ready int64
Active int64
Delayed int64
Dead int64
Completed int64
Enqueued int64
}
QueueInfo holds a snapshot of queue depths and counters.
type QueueWeight ¶
QueueWeight pairs a queue name with its relative polling weight.
func Queues ¶
func Queues(args ...any) []QueueWeight
Queues builds a weighted queue list from alternating name/weight pairs.
type RedisConfig ¶
RedisConfig holds connection parameters for the Redis server.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server polls queues, dispatches jobs to registered handlers, and manages the worker lifecycle including graceful shutdown.
func NewServer ¶
func NewServer(redisCfg RedisConfig, cfg ServerConfig) (*Server, error)
NewServer connects to Redis and returns a server ready to process jobs.
func NewServerFromRedis ¶
func NewServerFromRedis(rdb redis.UniversalClient, cfg ServerConfig) *Server
NewServerFromRedis creates a server from an existing Redis connection.
func (*Server) OnComplete ¶
OnComplete registers a hook that fires after a job completes successfully.
func (*Server) OnDead ¶
OnDead registers a hook that fires when a job exhausts retries and enters the dead letter queue.
func (*Server) OnStart ¶
OnStart registers a hook that fires when a job is picked up for processing, before the handler runs.
func (*Server) SetRateLimit ¶
SetRateLimit configures a rate limit for the given task kind. This overrides any limit set via TaskWithOptions. Max is the number of tokens (jobs) allowed per interval.
func (*Server) Start ¶
Start begins polling queues and blocks until a shutdown signal is received. On shutdown it stops fetching new jobs, waits for in-flight jobs to finish (up to ShutdownTimeout), deregisters the worker, and closes the connection.
func (*Server) Stop ¶
func (s *Server) Stop()
Stop cancels the server context and begins graceful shutdown.
func (*Server) Use ¶
func (s *Server) Use(mw ...Middleware)
Use appends middleware to the server's handler chain.
type ServerConfig ¶
type ServerConfig struct {
Concurrency int
Queues []QueueWeight
StrictPriority bool
Cron []CronEntry
PollInterval time.Duration
ShutdownTimeout time.Duration
Logger *slog.Logger
}
ServerConfig controls concurrency, queue weights, poll interval, and logging. When StrictPriority is true, queues are polled in descending weight order and lower-priority queues are only checked when all higher ones are empty.
type Task ¶
type Task interface {
Kind() string
}
Task is the interface that all job payloads must implement.
type TaskOptions ¶
type TaskOptions struct {
Queue string
MaxRetries int
Timeout time.Duration
Backoff BackoffStrategy
RateLimit Rate
}
TaskOptions holds per-task-kind defaults that override server-level configuration.
type TaskWithOptions ¶
type TaskWithOptions interface {
Task
Options() TaskOptions
}
TaskWithOptions extends Task with default queue, retry, and rate limit configuration.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
server
command
Command server runs the standalone Winter gRPC server.
|
Command server runs the standalone Winter gRPC server. |
|
winter
command
Command winter is the CLI for managing Winter task queues.
|
Command winter is the CLI for managing Winter task queues. |
|
examples
|
|
|
basic
command
Basic example: define a task, enqueue it, and process it.
|
Basic example: define a task, enqueue it, and process it. |
|
cron
command
Cron example: schedule periodic jobs with cron expressions.
|
Cron example: schedule periodic jobs with cron expressions. |
|
multi-queue/producer
command
Producer example: enqueues tasks to different queues based on task options.
|
Producer example: enqueues tasks to different queues based on task options. |
|
multi-queue/tasks
Package tasks defines shared task types used by both the producer and worker.
|
Package tasks defines shared task types used by both the producer and worker. |
|
multi-queue/worker
command
Worker example: processes tasks from multiple weighted queues.
|
Worker example: processes tasks from multiple weighted queues. |
|
priority
command
Priority example: demonstrates that lower priority values are dequeued first.
|
Priority example: demonstrates that lower priority values are dequeued first. |
|
workflows
command
Workflows example: demonstrates Chain, Group, and Chord patterns.
|
Workflows example: demonstrates Chain, Group, and Chord patterns. |
|
internal
|
|
|
config
Package config parses Winter's YAML configuration file into strongly typed structs used by the server and CLI.
|
Package config parses Winter's YAML configuration file into strongly typed structs used by the server and CLI. |
|
queue
Package queue implements the core Redis operations for Winter's job queue.
|
Package queue implements the core Redis operations for Winter's job queue. |
|
ratelimit
Package ratelimit implements a Redis-backed token bucket rate limiter.
|
Package ratelimit implements a Redis-backed token bucket rate limiter. |
|
scheduler
Package scheduler implements periodic job scheduling with cron expressions.
|
Package scheduler implements periodic job scheduling with cron expressions. |
|
server
Package server implements the gRPC QueueService backed by the internal queue package.
|
Package server implements the gRPC QueueService backed by the internal queue package. |
|
worker
Package worker implements background goroutines for lease recovery and worker lifecycle management.
|
Package worker implements background goroutines for lease recovery and worker lifecycle management. |
|
workflow
Package workflow implements Chain, Group, and Chord primitives for composing multi-job pipelines.
|
Package workflow implements Chain, Group, and Chord primitives for composing multi-job pipelines. |
|
proto
|
|
|
Package wintertest provides test helpers for asserting that jobs were enqueued correctly.
|
Package wintertest provides test helpers for asserting that jobs were enqueued correctly. |