workers

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

README

Workers

Go Go Report Card GoDoc License

A worker lifecycle library for Go — manage background goroutines with panic recovery, configurable restart, tracing, and structured shutdown.

Built on suture for Erlang-style supervisor trees. Part of the ColdBrew framework.


API Reference

workers

import "github.com/go-coldbrew/workers"

Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown.

Architecture

Every worker runs inside its own supervisor subtree. This means:

  • Each worker gets panic recovery and restart independently
  • Workers can dynamically spawn child workers via WorkerContext
  • When a parent worker stops, all its children stop (scoped lifecycle)
  • The supervisor tree prevents cascading failures and CPU-burn restart storms
Quick Start

Create workers with NewWorker and run them with Run:

workers.Run(ctx, []*workers.Worker{
    workers.NewWorker("kafka", consume),
    workers.NewWorker("cleanup", cleanup).Every(5 * time.Minute).WithRestart(true),
})
Helpers

Common patterns are provided as helpers:

Dynamic Workers

Manager workers can spawn and remove child workers at runtime using the Add, Remove, and Children methods on WorkerContext. Children join the parent's supervisor subtree and get full framework guarantees (tracing, panic recovery, restart). See [Example_dynamicWorkerPool].

Example (Dynamic Worker Pool)

Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick. This demonstrates the pattern used by services like route-store where worker configs are loaded from a database periodically.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// Simulate config that changes over 3 ticks.
	// Tick 1: start worker-a
	// Tick 2: add worker-b
	// Tick 3: remove worker-a
	configs := [][]string{
		{"worker-a"},
		{"worker-a", "worker-b"},
		{"worker-b"},
	}

	tick := 0
	manager := workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error {
		ticker := time.NewTicker(40 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-ticker.C:
				if tick >= len(configs) {
					continue
				}
				desired := map[string]bool{}
				for _, name := range configs[tick] {
					desired[name] = true
				}
				tick++

				// Remove workers no longer desired.
				for _, name := range ctx.Children() {
					if !desired[name] {
						ctx.Remove(name)
					}
				}
				// Add new workers (Add is a no-op replacement if already running).
				for name := range desired {
					name := name
					ctx.Add(workers.NewWorker(name, func(ctx workers.WorkerContext) error {
						<-ctx.Done()
						return ctx.Err()
					}))
				}
				time.Sleep(10 * time.Millisecond) // let children start
				fmt.Printf("tick %d: children=%v\n", tick, ctx.Children())
			}
		}
	})

	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
	fmt.Println("pool shut down")
}
Output
tick 1: children=[worker-a]
tick 2: children=[worker-a worker-b]
tick 3: children=[worker-b]
pool shut down

Example (Standalone)

Standalone usage with signal handling — no ColdBrew required.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
	// For the example, use a short timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{
		workers.NewWorker("kafka", func(ctx workers.WorkerContext) error {
			fmt.Println("consuming messages")
			<-ctx.Done()
			return ctx.Err()
		}),
	})
	fmt.Println("shutdown complete")
}
Output
consuming messages
shutdown complete

Index

func BatchChannelWorker

func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) error

BatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.

Example

BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan int, 10)
	for i := 1; i <= 6; i++ {
		ch <- i
	}
	close(ch)

	fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(ctx workers.WorkerContext, batch []int) error {
		fmt.Println(batch)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("batcher", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output
[1 2 3]
[4 5 6]

func ChannelWorker

func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error

ChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.

Example

ChannelWorker consumes items from a channel one at a time.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan string, 3)
	ch <- "hello"
	ch <- "world"
	ch <- "!"
	close(ch)

	fn := workers.ChannelWorker(ch, func(ctx workers.WorkerContext, item string) error {
		fmt.Println(item)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("consumer", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output
hello
world
!

func EveryInterval

func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error

EveryInterval wraps fn in a ticker loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on WithRestart).

Example

EveryInterval wraps a function in a ticker loop.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	fn := workers.EveryInterval(20*time.Millisecond, func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	})

	w := workers.NewWorker("periodic", fn)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output
tick 1
tick 2

func Run

func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error

Run starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.

Example

Run multiple workers concurrently. All workers start together and stop when the context is cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w1 := workers.NewWorker("api-poller", func(ctx workers.WorkerContext) error {
		fmt.Println("api-poller started")
		<-ctx.Done()
		return ctx.Err()
	})
	w2 := workers.NewWorker("cache-warmer", func(ctx workers.WorkerContext) error {
		fmt.Println("cache-warmer started")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w1, w2})
	fmt.Println("all workers stopped")
}
Output
api-poller started
cache-warmer started
all workers stopped

func RunWorker

func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)

RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without RestartOnFail.

Example

RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("single", func(ctx workers.WorkerContext) error {
		fmt.Println("running")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.RunWorker(ctx, w)
	fmt.Println("done")
}
Output
running
done

type BaseMetrics

BaseMetrics provides no-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no-op defaults instead of breaking your build:

type myMetrics struct {
    workers.BaseMetrics // forward-compatible
    client *statsd.Client
}

func (m *myMetrics) WorkerStarted(name string) {
    m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
type BaseMetrics struct{}

func (BaseMetrics) ObserveRunDuration
func (BaseMetrics) ObserveRunDuration(string, time.Duration)

func (BaseMetrics) SetActiveWorkers
func (BaseMetrics) SetActiveWorkers(int)

func (BaseMetrics) WorkerFailed
func (BaseMetrics) WorkerFailed(string, error)

func (BaseMetrics) WorkerPanicked
func (BaseMetrics) WorkerPanicked(string)

func (BaseMetrics) WorkerRestarted
func (BaseMetrics) WorkerRestarted(string, int)

func (BaseMetrics) WorkerStarted
func (BaseMetrics) WorkerStarted(string)

func (BaseMetrics) WorkerStopped
func (BaseMetrics) WorkerStopped(string)

type Metrics

Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics (e.g., Datadog, StatsD). Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in Prometheus implementation.

type Metrics interface {
    WorkerStarted(name string)
    WorkerStopped(name string)
    WorkerPanicked(name string)
    WorkerFailed(name string, err error)
    WorkerRestarted(name string, attempt int)
    ObserveRunDuration(name string, duration time.Duration)
    SetActiveWorkers(count int)
}

func NewPrometheusMetrics
func NewPrometheusMetrics(namespace string) Metrics

NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names (e.g., "myapp" → "myapp_worker_started_total"). Metrics are auto-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process-global; use a small number of static namespaces (not per-request/tenant values).

type RunOption

RunOption configures the behavior of Run.

type RunOption func(*runConfig)

func WithMetrics
func WithMetrics(m Metrics) RunOption

WithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics{} is used.

type Worker

Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.

type Worker struct {
    // contains filtered or unexported fields
}

func NewWorker
func NewWorker(name string, run func(WorkerContext) error) *Worker

NewWorker creates a Worker with the given name and run function. The run function should block until ctx is cancelled or an error occurs.

Example

A simple worker that runs until cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("greeter", func(ctx workers.WorkerContext) error {
		fmt.Printf("worker %q started (attempt %d)\n", ctx.Name(), ctx.Attempt())
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output
worker "greeter" started (attempt 0)

func (*Worker) Every
func (w *Worker) Every(d time.Duration) *Worker

Every wraps the run function in a ticker loop that calls it at the given interval. The original run function is called once per tick. If it returns an error, the behavior depends on WithRestart: if true, the ticker worker restarts; if false, it exits.

Example

A periodic worker that runs a function on a fixed interval.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	w := workers.NewWorker("ticker", func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	}).Every(20 * time.Millisecond)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output
tick 1
tick 2

func (*Worker) WithBackoffJitter
func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker

WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts.

func (*Worker) WithFailureBackoff
func (w *Worker) WithFailureBackoff(d time.Duration) *Worker

WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.

func (*Worker) WithFailureDecay
func (w *Worker) WithFailureDecay(decay float64) *Worker

WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.

func (*Worker) WithFailureThreshold
func (w *Worker) WithFailureThreshold(threshold float64) *Worker

WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.

func (*Worker) WithMetrics
func (w *Worker) WithMetrics(m Metrics) *Worker

WithMetrics sets a per-worker metrics implementation, overriding the metrics inherited from the parent WorkerContext or Run options.

func (*Worker) WithRestart
func (w *Worker) WithRestart(restart bool) *Worker

WithRestart configures whether the worker should be restarted on failure. When true, the supervisor restarts the worker with backoff on non-context errors.

Example

A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	attempt := 0
	w := workers.NewWorker("resilient", func(ctx workers.WorkerContext) error {
		attempt++
		if attempt <= 2 {
			return fmt.Errorf("transient error")
		}
		fmt.Printf("succeeded on attempt %d\n", attempt)
		<-ctx.Done()
		return ctx.Err()
	}).WithRestart(true)

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
	// This example demonstrates restart behavior. Log output from the
	// supervisor is expected between restarts. The worker prints on success.
}

func (*Worker) WithTimeout
func (w *Worker) WithTimeout(d time.Duration) *Worker

WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.

type WorkerContext

WorkerContext extends context.Context with worker metadata and dynamic child worker management. The framework creates these — users never need to implement this interface.

type WorkerContext interface {
    context.Context
    // Name returns the worker's name.
    Name() string
    // Attempt returns the restart attempt number (0 on first run).
    Attempt() int
    // Add adds or replaces a child worker by name under the same supervisor.
    // If a worker with the same name already exists, it is removed first.
    // Children get full framework guarantees (tracing, panic recovery, restart).
    Add(w *Worker)
    // Remove stops a child worker by name.
    Remove(name string)
    // Children returns the names of currently running child workers.
    Children() []string
}
Example (!dd)

A manager worker that dynamically spawns and removes child workers using WorkerContext.Add, Remove, and Children.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	manager := workers.NewWorker("manager", func(ctx workers.WorkerContext) error {
		// Spawn two child workers dynamically.
		ctx.Add(workers.NewWorker("child-a", func(ctx workers.WorkerContext) error {
			fmt.Printf("%s started\n", ctx.Name())
			<-ctx.Done()
			return ctx.Err()
		}))
		ctx.Add(workers.NewWorker("child-b", func(ctx workers.WorkerContext) error {
			fmt.Printf("%s started\n", ctx.Name())
			<-ctx.Done()
			return ctx.Err()
		}))

		// Give children time to start.
		time.Sleep(30 * time.Millisecond)
		fmt.Printf("children: %v\n", ctx.Children())

		// Remove one child.
		ctx.Remove("child-a")
		time.Sleep(30 * time.Millisecond)
		fmt.Printf("after remove: %v\n", ctx.Children())

		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
}
Output
child-a started
child-b started
children: [child-a child-b]
after remove: [child-b]

Example (!dd_replace)

Replace a child worker by adding one with the same name. The old worker is stopped and the new one takes its place.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	manager := workers.NewWorker("manager", func(ctx workers.WorkerContext) error {
		ctx.Add(workers.NewWorker("processor", func(ctx workers.WorkerContext) error {
			fmt.Println("processor v1")
			<-ctx.Done()
			return ctx.Err()
		}))
		time.Sleep(30 * time.Millisecond)

		// Replace with a new version — old one is stopped automatically.
		ctx.Add(workers.NewWorker("processor", func(ctx workers.WorkerContext) error {
			fmt.Println("processor v2")
			<-ctx.Done()
			return ctx.Err()
		}))
		time.Sleep(30 * time.Millisecond)

		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
}
Output
processor v1
processor v2

Generated by gomarkdoc

Documentation

Overview

Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown.

Architecture

Every worker runs inside its own supervisor subtree. This means:

  • Each worker gets panic recovery and restart independently
  • Workers can dynamically spawn child workers via WorkerContext
  • When a parent worker stops, all its children stop (scoped lifecycle)
  • The supervisor tree prevents cascading failures and CPU-burn restart storms

Quick Start

Create workers with NewWorker and run them with Run:

workers.Run(ctx, []*workers.Worker{
    workers.NewWorker("kafka", consume),
    workers.NewWorker("cleanup", cleanup).Every(5 * time.Minute).WithRestart(true),
})

Helpers

Common patterns are provided as helpers:

Dynamic Workers

Manager workers can spawn and remove child workers at runtime using the Add, Remove, and Children methods on WorkerContext. Children join the parent's supervisor subtree and get full framework guarantees (tracing, panic recovery, restart). See [Example_dynamicWorkerPool].

Example (DynamicWorkerPool)

Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick. This demonstrates the pattern used by services like route-store where worker configs are loaded from a database periodically.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// Simulate config that changes over 3 ticks.
	// Tick 1: start worker-a
	// Tick 2: add worker-b
	// Tick 3: remove worker-a
	configs := [][]string{
		{"worker-a"},
		{"worker-a", "worker-b"},
		{"worker-b"},
	}

	tick := 0
	manager := workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error {
		ticker := time.NewTicker(40 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-ticker.C:
				if tick >= len(configs) {
					continue
				}
				desired := map[string]bool{}
				for _, name := range configs[tick] {
					desired[name] = true
				}
				tick++

				// Remove workers no longer desired.
				for _, name := range ctx.Children() {
					if !desired[name] {
						ctx.Remove(name)
					}
				}
				// Add new workers (Add is a no-op replacement if already running).
				for name := range desired {
					name := name
					ctx.Add(workers.NewWorker(name, func(ctx workers.WorkerContext) error {
						<-ctx.Done()
						return ctx.Err()
					}))
				}
				time.Sleep(10 * time.Millisecond) // let children start
				fmt.Printf("tick %d: children=%v\n", tick, ctx.Children())
			}
		}
	})

	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
	fmt.Println("pool shut down")
}
Output:
tick 1: children=[worker-a]
tick 2: children=[worker-a worker-b]
tick 3: children=[worker-b]
pool shut down
Example (Standalone)

Standalone usage with signal handling — no ColdBrew required.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
	// For the example, use a short timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{
		workers.NewWorker("kafka", func(ctx workers.WorkerContext) error {
			fmt.Println("consuming messages")
			<-ctx.Done()
			return ctx.Err()
		}),
	})
	fmt.Println("shutdown complete")
}
Output:
consuming messages
shutdown complete

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchChannelWorker

func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) error

BatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.

Example

BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan int, 10)
	for i := 1; i <= 6; i++ {
		ch <- i
	}
	close(ch)

	fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(ctx workers.WorkerContext, batch []int) error {
		fmt.Println(batch)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("batcher", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output:
[1 2 3]
[4 5 6]

func ChannelWorker

func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error

ChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.

Example

ChannelWorker consumes items from a channel one at a time.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan string, 3)
	ch <- "hello"
	ch <- "world"
	ch <- "!"
	close(ch)

	fn := workers.ChannelWorker(ch, func(ctx workers.WorkerContext, item string) error {
		fmt.Println(item)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("consumer", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output:
hello
world
!

func EveryInterval

func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error

EveryInterval wraps fn in a ticker loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on WithRestart).

Example

EveryInterval wraps a function in a ticker loop.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	fn := workers.EveryInterval(20*time.Millisecond, func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	})

	w := workers.NewWorker("periodic", fn)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output:
tick 1
tick 2

func Run

func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error

Run starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.

Example

Run multiple workers concurrently. All workers start together and stop when the context is cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w1 := workers.NewWorker("api-poller", func(ctx workers.WorkerContext) error {
		fmt.Println("api-poller started")
		<-ctx.Done()
		return ctx.Err()
	})
	w2 := workers.NewWorker("cache-warmer", func(ctx workers.WorkerContext) error {
		fmt.Println("cache-warmer started")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w1, w2})
	fmt.Println("all workers stopped")
}
Output:
api-poller started
cache-warmer started
all workers stopped

func RunWorker

func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)

RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without RestartOnFail.

Example

RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("single", func(ctx workers.WorkerContext) error {
		fmt.Println("running")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.RunWorker(ctx, w)
	fmt.Println("done")
}
Output:
running
done

Types

type BaseMetrics added in v0.0.4

type BaseMetrics struct{}

BaseMetrics provides no-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no-op defaults instead of breaking your build:

type myMetrics struct {
    workers.BaseMetrics // forward-compatible
    client *statsd.Client
}

func (m *myMetrics) WorkerStarted(name string) {
    m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}

func (BaseMetrics) ObserveRunDuration added in v0.0.4

func (BaseMetrics) ObserveRunDuration(string, time.Duration)

func (BaseMetrics) SetActiveWorkers added in v0.0.4

func (BaseMetrics) SetActiveWorkers(int)

func (BaseMetrics) WorkerFailed added in v0.0.4

func (BaseMetrics) WorkerFailed(string, error)

func (BaseMetrics) WorkerPanicked added in v0.0.4

func (BaseMetrics) WorkerPanicked(string)

func (BaseMetrics) WorkerRestarted added in v0.0.4

func (BaseMetrics) WorkerRestarted(string, int)

func (BaseMetrics) WorkerStarted added in v0.0.4

func (BaseMetrics) WorkerStarted(string)

func (BaseMetrics) WorkerStopped added in v0.0.4

func (BaseMetrics) WorkerStopped(string)

type Metrics added in v0.0.3

type Metrics interface {
	WorkerStarted(name string)
	WorkerStopped(name string)
	WorkerPanicked(name string)
	WorkerFailed(name string, err error)
	WorkerRestarted(name string, attempt int)
	ObserveRunDuration(name string, duration time.Duration)
	SetActiveWorkers(count int)
}

Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics (e.g., Datadog, StatsD). Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in Prometheus implementation.

func NewPrometheusMetrics added in v0.0.3

func NewPrometheusMetrics(namespace string) Metrics

NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names (e.g., "myapp" → "myapp_worker_started_total"). Metrics are auto-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process-global; use a small number of static namespaces (not per-request/tenant values).

type RunOption added in v0.0.3

type RunOption func(*runConfig)

RunOption configures the behavior of Run.

func WithMetrics added in v0.0.3

func WithMetrics(m Metrics) RunOption

WithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics{} is used.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.

func NewWorker

func NewWorker(name string, run func(WorkerContext) error) *Worker

NewWorker creates a Worker with the given name and run function. The run function should block until ctx is cancelled or an error occurs.

Example

A simple worker that runs until cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("greeter", func(ctx workers.WorkerContext) error {
		fmt.Printf("worker %q started (attempt %d)\n", ctx.Name(), ctx.Attempt())
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output:
worker "greeter" started (attempt 0)

func (*Worker) Every

func (w *Worker) Every(d time.Duration) *Worker

Every wraps the run function in a ticker loop that calls it at the given interval. The original run function is called once per tick. If it returns an error, the behavior depends on WithRestart: if true, the ticker worker restarts; if false, it exits.

Example

A periodic worker that runs a function on a fixed interval.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	w := workers.NewWorker("ticker", func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	}).Every(20 * time.Millisecond)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output:
tick 1
tick 2

func (*Worker) WithBackoffJitter

func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker

WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts.

func (*Worker) WithFailureBackoff

func (w *Worker) WithFailureBackoff(d time.Duration) *Worker

WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.

func (*Worker) WithFailureDecay

func (w *Worker) WithFailureDecay(decay float64) *Worker

WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.

func (*Worker) WithFailureThreshold

func (w *Worker) WithFailureThreshold(threshold float64) *Worker

WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.

func (*Worker) WithMetrics added in v0.0.3

func (w *Worker) WithMetrics(m Metrics) *Worker

WithMetrics sets a per-worker metrics implementation, overriding the metrics inherited from the parent WorkerContext or Run options.

func (*Worker) WithRestart

func (w *Worker) WithRestart(restart bool) *Worker

WithRestart configures whether the worker should be restarted on failure. When true, the supervisor restarts the worker with backoff on non-context errors.

Example

A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	attempt := 0
	w := workers.NewWorker("resilient", func(ctx workers.WorkerContext) error {
		attempt++
		if attempt <= 2 {
			return fmt.Errorf("transient error")
		}
		fmt.Printf("succeeded on attempt %d\n", attempt)
		<-ctx.Done()
		return ctx.Err()
	}).WithRestart(true)

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
	// This example demonstrates restart behavior. Log output from the
	// supervisor is expected between restarts. The worker prints on success.
}

func (*Worker) WithTimeout

func (w *Worker) WithTimeout(d time.Duration) *Worker

WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.

type WorkerContext

type WorkerContext interface {
	context.Context
	// Name returns the worker's name.
	Name() string
	// Attempt returns the restart attempt number (0 on first run).
	Attempt() int
	// Add adds or replaces a child worker by name under the same supervisor.
	// If a worker with the same name already exists, it is removed first.
	// Children get full framework guarantees (tracing, panic recovery, restart).
	Add(w *Worker)
	// Remove stops a child worker by name.
	Remove(name string)
	// Children returns the names of currently running child workers.
	Children() []string
}

WorkerContext extends context.Context with worker metadata and dynamic child worker management. The framework creates these — users never need to implement this interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL