worker

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: ISC Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoTask = errors.New("worker: no task available")

ErrNoTask is returned by Handler.Claim when there are no tasks available for processing. The worker uses this sentinel to distinguish "no work" from real errors.

Functions

This section is empty.

Types

type Handler

type Handler[T any] interface {
	// Claim acquires the next available task.
	// Implementations must return ErrNoTask when no work is
	// available.
	Claim(ctx context.Context) (T, error)

	// Process performs the actual work on a claimed task.
	// Implementations are responsible for handling their own
	// failures (e.g. updating status, retrying, logging).
	Process(ctx context.Context, task T) error
}

Handler defines the operations a worker needs to claim, process, and manage tasks of type T.

type Option

type Option func(*options)

Option configures a Worker.

func WithInterval

func WithInterval(d time.Duration) Option

WithInterval sets the polling interval between work cycles. Default is 10 seconds.

func WithMaxConcurrency

func WithMaxConcurrency(n int) Option

WithMaxConcurrency sets the maximum number of tasks processed concurrently. Values less than 1 are ignored. Default is 5.

func WithRegisterer

func WithRegisterer(r prometheus.Registerer) Option

WithRegisterer sets a custom Prometheus registerer for metrics.

func WithTracerProvider

func WithTracerProvider(tp trace.TracerProvider) Option

WithTracerProvider configures OpenTelemetry tracing with the provided tracer provider.

type StaleRecoverer

type StaleRecoverer interface {
	RecoverStale(ctx context.Context) error
}

StaleRecoverer is an optional interface that a Handler can implement to recover tasks stuck in a processing state. When the handler implements this interface, RecoverStale is called at the beginning of each polling cycle.

type Worker

type Worker[T any] struct {
	// contains filtered or unexported fields
}

Worker polls for tasks using a Handler and processes them concurrently up to a configurable limit.

func New

func New[T any](name string, handler Handler[T], logger *log.Logger, opts ...Option) *Worker[T]

New creates a Worker named name that uses handler to claim and process tasks. The name identifies this worker in metrics, logs, and traces.

func (*Worker[T]) Run

func (w *Worker[T]) Run(ctx context.Context) error

Run starts the worker loop. It blocks until ctx is cancelled, then waits for all in-flight tasks to complete before returning.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL