Documentation
¶
Overview ¶
Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown.
Architecture ¶
Every worker runs inside its own supervisor subtree. This means:
- Each worker gets panic recovery and restart independently
- Workers can dynamically spawn child workers via WorkerContext
- When a parent worker stops, all its children stop (scoped lifecycle)
- The supervisor tree prevents cascading failures and CPU-burn restart storms
Quick Start ¶
Create workers with NewWorker and run them with Run:
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka", consume),
workers.NewWorker("cleanup", cleanup).Every(5 * time.Minute).WithRestart(true),
})
Helpers ¶
Common patterns are provided as helpers:
- EveryInterval — periodic execution on a fixed interval
- ChannelWorker — consume items from a channel one at a time
- BatchChannelWorker — collect items into batches, flush on size or timer
Dynamic Workers ¶
Manager workers can spawn and remove child workers at runtime using the Add, Remove, and Children methods on WorkerContext. Children join the parent's supervisor subtree and get full framework guarantees (tracing, panic recovery, restart). See [Example_dynamicWorkerPool].
Example (DynamicWorkerPool) ¶
Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick. This demonstrates the pattern used by services like route-store where worker configs are loaded from a database periodically.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// Simulate config that changes over 3 ticks.
// Tick 1: start worker-a
// Tick 2: add worker-b
// Tick 3: remove worker-a
configs := [][]string{
{"worker-a"},
{"worker-a", "worker-b"},
{"worker-b"},
}
tick := 0
manager := workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error {
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if tick >= len(configs) {
continue
}
desired := map[string]bool{}
for _, name := range configs[tick] {
desired[name] = true
}
tick++
// Remove workers no longer desired.
for _, name := range ctx.Children() {
if !desired[name] {
ctx.Remove(name)
}
}
// Add new workers (Add is a no-op replacement if already running).
for name := range desired {
name := name
ctx.Add(workers.NewWorker(name, func(ctx workers.WorkerContext) error {
<-ctx.Done()
return ctx.Err()
}))
}
time.Sleep(10 * time.Millisecond) // let children start
fmt.Printf("tick %d: children=%v\n", tick, ctx.Children())
}
}
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
fmt.Println("pool shut down")
}
Output: tick 1: children=[worker-a] tick 2: children=[worker-a worker-b] tick 3: children=[worker-b] pool shut down
Example (Standalone) ¶
Standalone usage with signal handling — no ColdBrew required.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
// For the example, use a short timeout.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka", func(ctx workers.WorkerContext) error {
fmt.Println("consuming messages")
<-ctx.Done()
return ctx.Err()
}),
})
fmt.Println("shutdown complete")
}
Output: consuming messages shutdown complete
Index ¶
- func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, ...) func(WorkerContext) error
- func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error
- func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error
- func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
- func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
- type BaseMetrics
- func (BaseMetrics) ObserveRunDuration(string, time.Duration)
- func (BaseMetrics) SetActiveWorkers(int)
- func (BaseMetrics) WorkerFailed(string, error)
- func (BaseMetrics) WorkerPanicked(string)
- func (BaseMetrics) WorkerRestarted(string, int)
- func (BaseMetrics) WorkerStarted(string)
- func (BaseMetrics) WorkerStopped(string)
- type Metrics
- type RunOption
- type Worker
- func (w *Worker) Every(d time.Duration) *Worker
- func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker
- func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
- func (w *Worker) WithFailureDecay(decay float64) *Worker
- func (w *Worker) WithFailureThreshold(threshold float64) *Worker
- func (w *Worker) WithMetrics(m Metrics) *Worker
- func (w *Worker) WithRestart(restart bool) *Worker
- func (w *Worker) WithTimeout(d time.Duration) *Worker
- type WorkerContext
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BatchChannelWorker ¶
func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) error
BatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.
Example ¶
BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan int, 10)
for i := 1; i <= 6; i++ {
ch <- i
}
close(ch)
fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(ctx workers.WorkerContext, batch []int) error {
fmt.Println(batch)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("batcher", fn)
workers.Run(ctx, []*workers.Worker{w})
}
Output: [1 2 3] [4 5 6]
func ChannelWorker ¶
func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error
ChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.
Example ¶
ChannelWorker consumes items from a channel one at a time.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan string, 3)
ch <- "hello"
ch <- "world"
ch <- "!"
close(ch)
fn := workers.ChannelWorker(ch, func(ctx workers.WorkerContext, item string) error {
fmt.Println(item)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("consumer", fn)
workers.Run(ctx, []*workers.Worker{w})
}
Output: hello world !
func EveryInterval ¶
func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error
EveryInterval wraps fn in a ticker loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on WithRestart).
Example ¶
EveryInterval wraps a function in a ticker loop.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
fn := workers.EveryInterval(20*time.Millisecond, func(ctx workers.WorkerContext) error {
count++
fmt.Printf("tick %d\n", count)
return nil
})
w := workers.NewWorker("periodic", fn)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}
Output: tick 1 tick 2
func Run ¶
Run starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.
Example ¶
Run multiple workers concurrently. All workers start together and stop when the context is cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w1 := workers.NewWorker("api-poller", func(ctx workers.WorkerContext) error {
fmt.Println("api-poller started")
<-ctx.Done()
return ctx.Err()
})
w2 := workers.NewWorker("cache-warmer", func(ctx workers.WorkerContext) error {
fmt.Println("cache-warmer started")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w1, w2})
fmt.Println("all workers stopped")
}
Output: api-poller started cache-warmer started all workers stopped
func RunWorker ¶
RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without RestartOnFail.
Example ¶
RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("single", func(ctx workers.WorkerContext) error {
fmt.Println("running")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.RunWorker(ctx, w)
fmt.Println("done")
}
Output: running done
Types ¶
type BaseMetrics ¶ added in v0.0.4
type BaseMetrics struct{}
BaseMetrics provides no-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no-op defaults instead of breaking your build:
type myMetrics struct {
workers.BaseMetrics // forward-compatible
client *statsd.Client
}
func (m *myMetrics) WorkerStarted(name string) {
m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
func (BaseMetrics) ObserveRunDuration ¶ added in v0.0.4
func (BaseMetrics) ObserveRunDuration(string, time.Duration)
func (BaseMetrics) SetActiveWorkers ¶ added in v0.0.4
func (BaseMetrics) SetActiveWorkers(int)
func (BaseMetrics) WorkerFailed ¶ added in v0.0.4
func (BaseMetrics) WorkerFailed(string, error)
func (BaseMetrics) WorkerPanicked ¶ added in v0.0.4
func (BaseMetrics) WorkerPanicked(string)
func (BaseMetrics) WorkerRestarted ¶ added in v0.0.4
func (BaseMetrics) WorkerRestarted(string, int)
func (BaseMetrics) WorkerStarted ¶ added in v0.0.4
func (BaseMetrics) WorkerStarted(string)
func (BaseMetrics) WorkerStopped ¶ added in v0.0.4
func (BaseMetrics) WorkerStopped(string)
type Metrics ¶ added in v0.0.3
type Metrics interface {
WorkerStarted(name string)
WorkerStopped(name string)
WorkerPanicked(name string)
WorkerFailed(name string, err error)
WorkerRestarted(name string, attempt int)
ObserveRunDuration(name string, duration time.Duration)
SetActiveWorkers(count int)
}
Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics (e.g., Datadog, StatsD). Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in Prometheus implementation.
func NewPrometheusMetrics ¶ added in v0.0.3
NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names (e.g., "myapp" → "myapp_worker_started_total"). Metrics are auto-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process-global; use a small number of static namespaces (not per-request/tenant values).
type RunOption ¶ added in v0.0.3
type RunOption func(*runConfig)
RunOption configures the behavior of Run.
func WithMetrics ¶ added in v0.0.3
WithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics{} is used.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.
func NewWorker ¶
func NewWorker(name string, run func(WorkerContext) error) *Worker
NewWorker creates a Worker with the given name and run function. The run function should block until ctx is cancelled or an error occurs.
Example ¶
A simple worker that runs until cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("greeter", func(ctx workers.WorkerContext) error {
fmt.Printf("worker %q started (attempt %d)\n", ctx.Name(), ctx.Attempt())
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}
Output: worker "greeter" started (attempt 0)
func (*Worker) Every ¶
Every wraps the run function in a ticker loop that calls it at the given interval. The original run function is called once per tick. If it returns an error, the behavior depends on WithRestart: if true, the ticker worker restarts; if false, it exits.
Example ¶
A periodic worker that runs a function on a fixed interval.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
w := workers.NewWorker("ticker", func(ctx workers.WorkerContext) error {
count++
fmt.Printf("tick %d\n", count)
return nil
}).Every(20 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}
Output: tick 1 tick 2
func (*Worker) WithBackoffJitter ¶
WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts.
func (*Worker) WithFailureBackoff ¶
WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.
func (*Worker) WithFailureDecay ¶
WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.
func (*Worker) WithFailureThreshold ¶
WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.
func (*Worker) WithMetrics ¶ added in v0.0.3
WithMetrics sets a per-worker metrics implementation, overriding the metrics inherited from the parent WorkerContext or Run options.
func (*Worker) WithRestart ¶
WithRestart configures whether the worker should be restarted on failure. When true, the supervisor restarts the worker with backoff on non-context errors.
Example ¶
A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
attempt := 0
w := workers.NewWorker("resilient", func(ctx workers.WorkerContext) error {
attempt++
if attempt <= 2 {
return fmt.Errorf("transient error")
}
fmt.Printf("succeeded on attempt %d\n", attempt)
<-ctx.Done()
return ctx.Err()
}).WithRestart(true)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
// This example demonstrates restart behavior. Log output from the
// supervisor is expected between restarts. The worker prints on success.
}
Output:
type WorkerContext ¶
type WorkerContext interface {
context.Context
// Name returns the worker's name.
Name() string
// Attempt returns the restart attempt number (0 on first run).
Attempt() int
// Add adds or replaces a child worker by name under the same supervisor.
// If a worker with the same name already exists, it is removed first.
// Children get full framework guarantees (tracing, panic recovery, restart).
Add(w *Worker)
// Remove stops a child worker by name.
Remove(name string)
// Children returns the names of currently running child workers.
Children() []string
}
WorkerContext extends context.Context with worker metadata and dynamic child worker management. The framework creates these — users never need to implement this interface.