winter

package module
v0.0.0-...-7480950 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 20 Imported by: 0

README

Winter

Go Redis

Winter is an open-source distributed task queue for Go, backed by Redis. With Winter, you get:

  • Generics-first type safety (your struct is the task, no []byte payloads)
  • Workflow primitives (Chain, Group, Chord)
  • Standalone gRPC server for language-agnostic workers
  • Priority queues with weighted and strict modes
  • Delayed, scheduled, and cron jobs
  • Retries with configurable backoff (exponential, linear, fixed)
  • Dead letter queue with inspection and retry
  • Unique job deduplication
  • Lease-based worker recovery
  • Rate limiting with Redis-backed token bucket
  • Result storage with typed retrieval
  • Lifecycle hooks (OnStart, OnComplete, OnError, OnDead)
  • CLI for queue management

Install

go get github.com/kylegrahammatzen/winter

Usage

Define a task
type SendEmail struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

func (SendEmail) Kind() string { return "email.send" }
Enqueue
client, _ := winter.NewClient(winter.RedisConfig{Addr: "localhost:6379"})

winter.Enqueue(client, ctx, SendEmail{
    To:      "user@example.com",
    Subject: "Welcome",
    Body:    "Thanks for signing up.",
})

// With options
winter.Enqueue(client, ctx, SendEmail{To: "user@example.com", Subject: "Reminder"},
    winter.In(5 * time.Minute),
    winter.Priority(0),
    winter.Unique(1 * time.Hour),
)
Process
server, _ := winter.NewServer(
    winter.RedisConfig{Addr: "localhost:6379"},
    winter.ServerConfig{
        Concurrency: 20,
        Queues:      winter.Queues("critical", 6, "default", 3, "low", 1),
    },
)

winter.HandleFunc(server, func(ctx context.Context, job *winter.Job[SendEmail]) error {
    return smtp.Send(job.Args.To, job.Args.Subject, job.Args.Body)
})

server.Use(winter.Recover())
server.Start()

By default, queues use weighted polling where higher-weight queues get proportionally more attention. Enable strict mode to always drain higher-priority queues first:

server, _ := winter.NewServer(redisCfg, winter.ServerConfig{
    Concurrency:    20,
    Queues:         winter.Queues("critical", 6, "default", 3, "low", 1),
    StrictPriority: true,
})

In strict mode, low jobs will only be processed when critical and default are both empty.

Workflows

Chain runs tasks sequentially. Each step only starts after the previous one completes.

winter.Chain(client, ctx, []winter.Task{
    ProcessOrder{OrderID: "abc"},
    GenerateInvoice{OrderID: "abc"},
    SendReceipt{OrderID: "abc"},
})

Group runs all tasks in parallel and tracks completion.

winter.Group(client, ctx, []winter.Task{
    SyncInventory{ProductIDs: []string{"sku-1"}},
    SyncInventory{ProductIDs: []string{"sku-2"}},
    SyncInventory{ProductIDs: []string{"sku-3"}},
})

Chord runs all header tasks in parallel, then fires a callback when all are done.

winter.Chord(client, ctx,
    []winter.Task{
        Build{OS: "linux", Arch: "amd64"},
        Build{OS: "darwin", Arch: "arm64"},
    },
    Deploy{Version: "1.4.0", Env: "staging"},
)
Cron Jobs

Register periodic jobs in the server config.

server, _ := winter.NewServer(redisCfg, winter.ServerConfig{
    Concurrency: 20,
    Queues:      winter.Queues("default", 1),
    Cron: []winter.CronEntry{
        {Name: "daily-cleanup", Schedule: "0 2 * * *", Kind: "cleanup.run", Queue: "maintenance"},
    },
})
Worker-side control flow
func (h *Handler) Work(ctx context.Context, job *winter.Job[SyncInventory]) error {
    resp, err := callExternalAPI()
    if isRateLimited(err) {
        return winter.Reschedule(30 * time.Second)
    }
    if isInvalidData(err) {
        return fmt.Errorf("bad data: %w", winter.SkipRetry)
    }
    if shouldAbandon(err) {
        return winter.Cancel("resource deleted")
    }

    client := winter.ClientFromContext(ctx)
    winter.Enqueue(client, ctx, NotifyUser{UserID: job.Args.UserID})

    return nil
}
Rate Limiting

Limit how fast tasks of a given kind are processed using a Redis-backed token bucket.

// On the task itself
func (SendEmail) Options() winter.TaskOptions {
    return winter.TaskOptions{
        RateLimit: &winter.RateLimit{Max: 10, Per: time.Second},
    }
}

// Or set it on the server directly
server.SetRateLimit("email.send", 10, time.Second)

When the rate limit is exceeded, the job is re-enqueued after the bucket refills.

Result Storage

Store results from completed jobs and retrieve them later.

// Inside a handler, set a result on the job
winter.HandleFunc(server, func(ctx context.Context, job *winter.Job[ProcessImage]) error {
    url := resize(job.Args.Path)
    job.SetResult(map[string]string{"url": url})
    return nil
})

// Later, poll for the result
var result map[string]string
err := winter.WaitForResult(client, ctx, jobID, &result)

Results are stored in Redis with a 7-day TTL.

Lifecycle Hooks

Register callbacks that fire at specific points in a job's lifecycle.

server.OnStart(func(ctx context.Context, ev winter.JobEvent) {
    log.Printf("starting %s (attempt %d)", ev.Kind, ev.Attempt)
})
server.OnComplete(func(ctx context.Context, ev winter.JobEvent) {
    metrics.Increment("jobs.completed")
})
server.OnError(func(ctx context.Context, ev winter.JobEvent) {
    alerting.Notify(ev.Err)
})
server.OnDead(func(ctx context.Context, ev winter.JobEvent) {
    log.Printf("job %s is dead: %v", ev.ID, ev.Err)
})
Task options

Tasks can declare their own defaults by implementing TaskWithOptions:

func (SyncInventory) Options() winter.TaskOptions {
    return winter.TaskOptions{
        Queue:      "low",
        MaxRetries: 10,
        Backoff:    winter.Exponential(2 * time.Second),
    }
}
Inspector

Inspect and manage queues and dead letter jobs without a running server.

inspector, _ := winter.NewInspector(winter.RedisConfig{Addr: "localhost:6379"})

info, _ := inspector.Queue(ctx, "default")
dead, _ := inspector.Dead(ctx, "default", 0, 50)
inspector.Retry(ctx, "default", "job-id")
inspector.PurgeDead(ctx, "default")
inspector.Pause(ctx, "default")
inspector.Resume(ctx, "default")

gRPC Server

Winter includes a standalone gRPC server that lets non-Go workers (Python, TypeScript, etc.) interact with the queue.

# Start with defaults
go run ./cmd/server

# Start with config
go run ./cmd/server --config winter.yaml

The server exposes Enqueue, Dequeue, Ack, Nack, Heartbeat, GetJob, and QueueStats RPCs. Health checking and reflection are enabled by default.

CLI

# Start the server
winter server --config winter.yaml

# Enqueue a job
winter enqueue --kind "order.process" --payload '{"order_id":"abc"}' --queue emails

# Check queue status
winter status --queue default

# Inspect a job
winter jobs <job-id>

# List dead letter jobs
winter dead list --queue default

# Retry a dead job
winter retry <job-id> --queue default

# Purge dead letter queue
winter dead purge --queue default

Configuration

Winter can be configured via YAML:

redis:
  addr: "localhost:6379"
  password: ""
  db: 0

server:
  grpc_port: "50051"
  log_level: "info"

queues:
  emails:
    max_retries: 5
    backoff: "exponential"
    backoff_base: "1s"
  payments:
    max_retries: 10
    backoff: "exponential"
    backoff_base: "500ms"

workers:
  concurrency: 20
  heartbeat_interval: "15s"
  recovery_interval: "30s"

cron:
  - name: "daily-cleanup"
    schedule: "0 2 * * *"
    queue: "maintenance"
    kind: "cleanup.run"
    payload: '{"task":"cleanup"}'

Testing

Winter includes the wintertest package for testing your workers without Redis:

func TestSignupEnqueuesWelcome(t *testing.T) {
    client := wintertest.NewClient(t)

    handleSignup(client, User{ID: 42})

    wintertest.RequireEnqueued(t, client, tasks.SendWelcome{UserID: 42})
    wintertest.RequireEnqueuedN(t, client, "notification.send", 2)
}
# Unit tests
go test ./...

# Integration tests (requires Docker)
go test -tags=integration ./test/integration/...

# Benchmarks
go test -bench=. -benchmem ./internal/queue/...

Contributing

Winter is open source and welcomes contributions, issues, and feedback.

License

MIT License (LICENSE)

Documentation

Overview

Package winter is a distributed task queue backed by Redis with generics-first type safety, workflow primitives, and a standalone gRPC server mode.

Index

Constants

This section is empty.

Variables

View Source
var ErrDuplicate = queue.ErrDuplicate

ErrDuplicate is returned when enqueuing a job that matches an existing unique constraint.

View Source
var ErrSkipRetry = errors.New("winter: skip retry")

ErrSkipRetry is a sentinel error that causes a failed job to go directly to the dead letter queue without retrying. Wrap it with fmt.Errorf to add context.

Functions

func Cancel

func Cancel(reason string) error

Cancel returns an error that instructs the server to permanently remove the job without retrying.

func Chain

func Chain(c *Client, ctx context.Context, tasks []Task, opts ...Option) (string, error)

Chain enqueues tasks sequentially. Each task is enqueued only after the previous one completes. If any task fails and exhausts retries, the chain stops.

func Chord

func Chord(c *Client, ctx context.Context, headers []Task, callback Task, opts ...Option) (string, error)

Chord enqueues all header tasks for parallel execution and fires the callback task only after every header completes. If any header fails, the callback does not fire.

func Group

func Group(c *Client, ctx context.Context, tasks []Task, opts ...Option) (string, error)

Group enqueues all tasks immediately for parallel execution and tracks completion. Returns the workflow ID for status checks.

func Handle

func Handle[T Task](s *Server, h Handler[T])

Handle registers a typed handler for its task kind on the server. If the task implements TaskWithOptions and specifies a RateLimit, the server enforces a per-kind token bucket before dispatching.

func HandleFunc

func HandleFunc[T Task](s *Server, fn func(ctx context.Context, job *Job[T]) error)

HandleFunc registers a function as a handler for its task kind.

func IsCancel

func IsCancel(err error) (string, bool)

IsCancel reports whether err is a cancel sentinel and returns its reason.

func IsReschedule

func IsReschedule(err error) (time.Duration, bool)

IsReschedule reports whether err is a reschedule sentinel and returns its delay.

func Reschedule

func Reschedule(d time.Duration) error

Reschedule returns an error that instructs the server to put the job back in the delayed set and try again after the given duration.

func ValidTransition

func ValidTransition(from, to JobState) bool

ValidTransition reports whether transitioning from one state to another is allowed.

func WaitForResult

func WaitForResult[R any](c *Client, ctx context.Context, jobID string) (*R, error)

WaitForResult polls Redis until a result is available for the given job ID, then deserializes it into type R. Returns context.DeadlineExceeded if the context expires before a result appears. Poll interval defaults to 250ms.

Types

type BackoffStrategy

type BackoffStrategy interface {
	Next(attempt int) time.Duration
}

BackoffStrategy computes the delay before the next retry attempt.

func Exponential

func Exponential(base time.Duration) BackoffStrategy

Exponential returns a backoff strategy that doubles the delay on each attempt.

func Fixed

func Fixed(d time.Duration) BackoffStrategy

Fixed returns a backoff strategy that always waits the same duration between retries.

func Linear

func Linear(step time.Duration) BackoffStrategy

Linear returns a backoff strategy that increases the delay by a fixed step each attempt.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps a Redis connection and provides the Enqueue API.

func ClientFromContext

func ClientFromContext(ctx context.Context) *Client

ClientFromContext extracts the Client from a worker's context, allowing handlers to enqueue downstream jobs.

func NewClient

func NewClient(cfg RedisConfig) (*Client, error)

NewClient connects to Redis and returns a client ready to enqueue jobs.

func NewClientFromRedis

func NewClientFromRedis(rdb redis.UniversalClient) *Client

NewClientFromRedis creates a client from an existing Redis connection.

func (*Client) Close

func (c *Client) Close() error

Close shuts down the underlying Redis connection.

func (*Client) Queue

func (c *Client) Queue() *queue.Queue

Queue returns the underlying queue implementation for internal use.

func (*Client) Redis

func (c *Client) Redis() redis.UniversalClient

Redis returns the underlying Redis client for advanced use cases.

type CronEntry

type CronEntry struct {
	Name     string
	Schedule string
	Queue    string
	Kind     string
	Payload  []byte
}

CronEntry defines a periodic job to be scheduled by the server.

type DeadJob

type DeadJob struct {
	ID          string
	Kind        string
	Queue       string
	Payload     []byte
	Attempt     int
	MaxRetries  int
	LastError   string
	CreatedAt   time.Time
	CompletedAt time.Time
}

DeadJob is a dead letter job with its metadata exposed for inspection.

type Handler

type Handler[T Task] interface {
	Work(ctx context.Context, job *Job[T]) error
}

Handler processes jobs of a specific task type.

type HandlerFn

type HandlerFn func(ctx context.Context, job *rawJob) error

HandlerFn is the untyped handler signature used internally by the middleware chain.

type HandlerFunc

type HandlerFunc[T Task] func(ctx context.Context, job *Job[T]) error

HandlerFunc is an adapter to allow ordinary functions to serve as handlers.

func (HandlerFunc[T]) Work

func (f HandlerFunc[T]) Work(ctx context.Context, job *Job[T]) error

type HookFunc

type HookFunc func(ctx context.Context, event JobEvent)

HookFunc is the signature for lifecycle hook callbacks.

type Inspector

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector provides read and management operations for queues and dead letter jobs without requiring a running server.

func NewInspector

func NewInspector(cfg RedisConfig) (*Inspector, error)

NewInspector connects to Redis and returns an inspector.

func NewInspectorFromRedis

func NewInspectorFromRedis(rdb redis.UniversalClient) *Inspector

NewInspectorFromRedis creates an inspector from an existing Redis connection.

func (*Inspector) Close

func (i *Inspector) Close() error

Close shuts down the underlying Redis connection.

func (*Inspector) Dead

func (i *Inspector) Dead(ctx context.Context, queueName string, offset, limit int64) ([]*DeadJob, error)

Dead returns a paginated list of dead letter jobs for the named queue.

func (*Inspector) DeadCount

func (i *Inspector) DeadCount(ctx context.Context, queueName string) (int64, error)

DeadCount returns the number of jobs in the dead letter queue.

func (*Inspector) JobResult

func (i *Inspector) JobResult(ctx context.Context, jobID string) ([]byte, error)

JobResult returns the stored result bytes for a completed job, or nil if no result was stored.

func (*Inspector) Pause

func (i *Inspector) Pause(ctx context.Context, queueName string) error

Pause stops workers from dequeuing jobs from the named queue.

func (*Inspector) PeekDead

func (i *Inspector) PeekDead(ctx context.Context, queueName string) (*DeadJob, error)

PeekDead returns the first dead letter job without removing it.

func (*Inspector) PurgeDead

func (i *Inspector) PurgeDead(ctx context.Context, queueName string) (int64, error)

PurgeDead removes all jobs from the dead letter queue and deletes their data.

func (*Inspector) Queue

func (i *Inspector) Queue(ctx context.Context, name string) (*QueueInfo, error)

Queue returns a snapshot of depths and counters for the named queue.

func (*Inspector) Resume

func (i *Inspector) Resume(ctx context.Context, queueName string) error

Resume allows workers to dequeue jobs from a previously paused queue.

func (*Inspector) Retry

func (i *Inspector) Retry(ctx context.Context, queueName string, jobID string) error

Retry removes a job from the dead letter queue, resets its attempt counter, and re-enqueues it to the ready set.

type Job

type Job[T Task] struct {
	ID          string    `json:"id"`
	Args        T         `json:"args"`
	Kind        string    `json:"kind"`
	Queue       string    `json:"queue"`
	Priority    int       `json:"priority"`
	State       JobState  `json:"state"`
	Attempt     int       `json:"attempt"`
	MaxRetries  int       `json:"max_retries"`
	CreatedAt   time.Time `json:"created_at"`
	ScheduledAt time.Time `json:"scheduled_at,omitempty"`
	StartedAt   time.Time `json:"started_at,omitempty"`
	CompletedAt time.Time `json:"completed_at,omitempty"`
	LastError   string    `json:"last_error,omitempty"`
	// contains filtered or unexported fields
}

Job is a type-safe wrapper around a queued job and its deserialized payload.

func Enqueue

func Enqueue[T Task](c *Client, ctx context.Context, args T, opts ...Option) (*Job[T], error)

Enqueue serializes the task payload and atomically inserts it into the queue.

func EnqueueMany

func EnqueueMany[T Task](c *Client, ctx context.Context, tasks []T, opts ...Option) ([]*Job[T], error)

EnqueueMany inserts multiple tasks using Redis pipelining for high throughput. Options are applied uniformly to all jobs in the batch.

func (*Job[T]) SetResult

func (j *Job[T]) SetResult(v any) error

SetResult stores a result value to be persisted after the job completes. The value is JSON-serialized and stored in Redis with a 7-day TTL. Retrieve it later with inspector.JobResult or WaitForResult.

type JobEvent

type JobEvent struct {
	ID      string
	Kind    string
	Queue   string
	Attempt int
	Err     error
}

JobEvent is passed to lifecycle hooks with context about the job and any error.

type JobState

type JobState string

JobState represents the current lifecycle state of a job.

const (
	StatePending   JobState = "pending"
	StateActive    JobState = "active"
	StateCompleted JobState = "completed"
	StateFailed    JobState = "failed"
	StateRetry     JobState = "retry"
	StateDead      JobState = "dead"
	StateCancelled JobState = "cancelled"
)

type Middleware

type Middleware func(next HandlerFn) HandlerFn

Middleware wraps a handler to add cross-cutting behavior like panic recovery or logging.

func Recover

func Recover() Middleware

Recover returns a middleware that catches panics and converts them to errors.

type Option

type Option func(*insertOpts)

Option configures how a job is enqueued.

func At

func At(t time.Time) Option

At schedules the job to run at a specific time.

func In

func In(d time.Duration) Option

In schedules the job to run after the given delay.

func Priority

func Priority(p int) Option

Priority sets the job's priority where lower values are dequeued first.

func Queue

func Queue(name string) Option

Queue overrides the destination queue for a job.

func Unique

func Unique(period time.Duration) Option

Unique deduplicates jobs with the same kind and payload within the given period.

type QueueInfo

type QueueInfo struct {
	Name      string
	Ready     int64
	Active    int64
	Delayed   int64
	Dead      int64
	Completed int64
	Enqueued  int64
}

QueueInfo holds a snapshot of queue depths and counters.

type QueueWeight

type QueueWeight struct {
	Name   string
	Weight int
}

QueueWeight pairs a queue name with its relative polling weight.

func Queues

func Queues(args ...any) []QueueWeight

Queues builds a weighted queue list from alternating name/weight pairs.

type Rate

type Rate struct {
	Max int
	Per time.Duration
}

Rate configures per-task-kind rate limiting as a token bucket.

type RedisConfig

type RedisConfig struct {
	Addr     string
	Password string
	DB       int
}

RedisConfig holds connection parameters for the Redis server.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server polls queues, dispatches jobs to registered handlers, and manages the worker lifecycle including graceful shutdown.

func NewServer

func NewServer(redisCfg RedisConfig, cfg ServerConfig) (*Server, error)

NewServer connects to Redis and returns a server ready to process jobs.

func NewServerFromRedis

func NewServerFromRedis(rdb redis.UniversalClient, cfg ServerConfig) *Server

NewServerFromRedis creates a server from an existing Redis connection.

func (*Server) OnComplete

func (s *Server) OnComplete(fn HookFunc)

OnComplete registers a hook that fires after a job completes successfully.

func (*Server) OnDead

func (s *Server) OnDead(fn HookFunc)

OnDead registers a hook that fires when a job exhausts retries and enters the dead letter queue.

func (*Server) OnError

func (s *Server) OnError(fn HookFunc)

OnError registers a hook that fires when a job fails but will be retried.

func (*Server) OnStart

func (s *Server) OnStart(fn HookFunc)

OnStart registers a hook that fires when a job is picked up for processing, before the handler runs.

func (*Server) SetRateLimit

func (s *Server) SetRateLimit(kind string, max int, per time.Duration)

SetRateLimit configures a rate limit for the given task kind. This overrides any limit set via TaskWithOptions. Max is the number of tokens (jobs) allowed per interval.

func (*Server) Start

func (s *Server) Start() error

Start begins polling queues and blocks until a shutdown signal is received. On shutdown it stops fetching new jobs, waits for in-flight jobs to finish (up to ShutdownTimeout), deregisters the worker, and closes the connection.

func (*Server) Stop

func (s *Server) Stop()

Stop cancels the server context and begins graceful shutdown.

func (*Server) Use

func (s *Server) Use(mw ...Middleware)

Use appends middleware to the server's handler chain.

type ServerConfig

type ServerConfig struct {
	Concurrency     int
	Queues          []QueueWeight
	StrictPriority  bool
	Cron            []CronEntry
	PollInterval    time.Duration
	ShutdownTimeout time.Duration
	Logger          *slog.Logger
}

ServerConfig controls concurrency, queue weights, poll interval, and logging. When StrictPriority is true, queues are polled in descending weight order and lower-priority queues are only checked when all higher ones are empty.

type Task

type Task interface {
	Kind() string
}

Task is the interface that all job payloads must implement.

type TaskOptions

type TaskOptions struct {
	Queue      string
	MaxRetries int
	Timeout    time.Duration
	Backoff    BackoffStrategy
	RateLimit  Rate
}

TaskOptions holds per-task-kind defaults that override server-level configuration.

type TaskWithOptions

type TaskWithOptions interface {
	Task
	Options() TaskOptions
}

TaskWithOptions extends Task with default queue, retry, and rate limit configuration.

Directories

Path Synopsis
cmd
server command
Command server runs the standalone Winter gRPC server.
Command server runs the standalone Winter gRPC server.
winter command
Command winter is the CLI for managing Winter task queues.
Command winter is the CLI for managing Winter task queues.
examples
basic command
Basic example: define a task, enqueue it, and process it.
Basic example: define a task, enqueue it, and process it.
cron command
Cron example: schedule periodic jobs with cron expressions.
Cron example: schedule periodic jobs with cron expressions.
multi-queue/producer command
Producer example: enqueues tasks to different queues based on task options.
Producer example: enqueues tasks to different queues based on task options.
multi-queue/tasks
Package tasks defines shared task types used by both the producer and worker.
Package tasks defines shared task types used by both the producer and worker.
multi-queue/worker command
Worker example: processes tasks from multiple weighted queues.
Worker example: processes tasks from multiple weighted queues.
priority command
Priority example: demonstrates that lower priority values are dequeued first.
Priority example: demonstrates that lower priority values are dequeued first.
workflows command
Workflows example: demonstrates Chain, Group, and Chord patterns.
Workflows example: demonstrates Chain, Group, and Chord patterns.
internal
config
Package config parses Winter's YAML configuration file into strongly typed structs used by the server and CLI.
Package config parses Winter's YAML configuration file into strongly typed structs used by the server and CLI.
queue
Package queue implements the core Redis operations for Winter's job queue.
Package queue implements the core Redis operations for Winter's job queue.
ratelimit
Package ratelimit implements a Redis-backed token bucket rate limiter.
Package ratelimit implements a Redis-backed token bucket rate limiter.
scheduler
Package scheduler implements periodic job scheduling with cron expressions.
Package scheduler implements periodic job scheduling with cron expressions.
server
Package server implements the gRPC QueueService backed by the internal queue package.
Package server implements the gRPC QueueService backed by the internal queue package.
worker
Package worker implements background goroutines for lease recovery and worker lifecycle management.
Package worker implements background goroutines for lease recovery and worker lifecycle management.
workflow
Package workflow implements Chain, Group, and Chord primitives for composing multi-job pipelines.
Package workflow implements Chain, Group, and Chord primitives for composing multi-job pipelines.
proto
Package wintertest provides test helpers for asserting that jobs were enqueued correctly.
Package wintertest provides test helpers for asserting that jobs were enqueued correctly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL