worker

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package worker consumes tasks from the TASKS stream, dispatches via the registry, and publishes responses to each task's ReplyTo subject.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConcreteTarget

func ConcreteTarget(workerID string) string

ConcreteTarget names the stable, concrete placement claim for a specific worker instance. Colocated workflow steps resolve to this claim.

Types

type ClaimProvider

type ClaimProvider interface {
	Claims(ctx context.Context) ([]string, error)
}

ClaimProvider returns the logical targets currently owned by a worker.

The returned claims may change over time. Workers refresh their claim set periodically and also re-check ownership before invoking a targeted task.

Examples of claims include role-like values such as "primary" or application-scoped claims like "cluster/a/primary".

type ClaimsFunc

type ClaimsFunc func(ctx context.Context) ([]string, error)

ClaimsFunc adapts a function to ClaimProvider.

func (ClaimsFunc) Claims

func (f ClaimsFunc) Claims(ctx context.Context) ([]string, error)

type Handler

type Handler func(ctx context.Context, t *task.Task) ([]byte, error)

Handler is the terminal operation in the middleware chain: decode payload, run the registered function, return the JSON-encoded result (or nil for error-only handlers).

type Metrics

type Metrics interface {
	Observe(name string, attempt int, duration time.Duration, err error)
}

Metrics is a minimal hook interface; wire it to Prometheus, OpenTelemetry, etc. Observe is called exactly once per task dispatch with success or error outcome.

type Middleware

type Middleware func(Handler) Handler

Middleware wraps a Handler to add cross-cutting behavior.

func Chain

func Chain(mws ...Middleware) Middleware

Chain composes middlewares left-to-right (first in slice runs outermost).

func Log

func Log(log *slog.Logger) Middleware

Log emits a structured log line at start, end, and on error.

func Recover

func Recover() Middleware

Recover converts panics in downstream handlers into TaskError{Kind:"panic"}. Always register this as the outermost middleware.

func WithMetrics

func WithMetrics(m Metrics) Middleware

WithMetrics returns a middleware that calls m.Observe after each dispatch.

type Options

type Options struct {
	Durable     string        // consumer durable name; default "ebind-worker"
	Concurrency int           // max in-flight handlers; default 16
	AckWait     time.Duration // default 30s
	// MaxDeliver bounds how many times NATS will redeliver a message before
	// giving up. Default is -1 (unlimited at the NATS layer) so that the
	// worker's RetryPolicy is authoritative: the handler decides when to
	// terminate (Term + DLQ). Set a positive value only if you want an
	// infrastructure-level cap that can override an over-generous RetryPolicy.
	MaxDeliver    int
	Backoff       []time.Duration
	ShutdownGrace time.Duration // default 30s
	Middleware    []Middleware
	StepHook      StepHook // optional; called on terminal success/failure (workflow integration)
	// DefaultRetryPolicy is applied to tasks that carry no per-task RetryPolicy.
	// Used by the worker (not NATS) to decide when to stop retrying. Default is
	// task.DefaultRetryPolicy().
	DefaultRetryPolicy   *task.RetryPolicy
	WorkerID             string
	Claims               ClaimProvider
	ClaimRefreshInterval time.Duration // background refresh of the claim cache; default 2s
	ClaimRetryDelay      time.Duration
}

type StaticClaims

type StaticClaims []string

StaticClaims is a fixed claim set useful in tests and simple deployments.

func (StaticClaims) Claims

func (s StaticClaims) Claims(context.Context) ([]string, error)

type StepHook

type StepHook interface {
	OnStepDone(ctx context.Context, t *task.Task, result []byte) error
	OnStepFailed(ctx context.Context, t *task.Task, err *task.TaskError) error
}

StepHook lets callers observe final task outcomes. Used by the workflow package to persist DAG step results/failures without coupling the worker to workflow. Called exactly once per terminal outcome (success or final failure, not per retry). Never called during intermediate retries.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func New

func New(nc *nats.Conn, reg *task.Registry, opts Options) (*Worker, error)

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run blocks until ctx is canceled, then drains in-flight handlers up to ShutdownGrace.

func (*Worker) Use

func (w *Worker) Use(mws ...Middleware)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL