Documentation
¶
Overview ¶
Package worker consumes tasks from the TASKS stream, dispatches via the registry, and publishes responses to each task's ReplyTo subject.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConcreteTarget ¶
ConcreteTarget names the stable, concrete placement claim for a specific worker instance. Colocated workflow steps resolve to this claim.
Types ¶
type ClaimProvider ¶
ClaimProvider returns the logical targets currently owned by a worker.
The returned claims may change over time. Workers refresh their claim set periodically and also re-check ownership before invoking a targeted task.
Examples of claims include role-like values such as "primary" or application-scoped claims like "cluster/a/primary".
type ClaimsFunc ¶
ClaimsFunc adapts a function to ClaimProvider.
type Handler ¶
Handler is the terminal operation in the middleware chain: decode payload, run the registered function, return the JSON-encoded result (or nil for error-only handlers).
type Metrics ¶
Metrics is a minimal hook interface; wire it to Prometheus, OpenTelemetry, etc. Observe is called exactly once per task dispatch with success or error outcome.
type Middleware ¶
Middleware wraps a Handler to add cross-cutting behavior.
func Chain ¶
func Chain(mws ...Middleware) Middleware
Chain composes middlewares left-to-right (first in slice runs outermost).
func Log ¶
func Log(log *slog.Logger) Middleware
Log emits a structured log line at start, end, and on error.
func Recover ¶
func Recover() Middleware
Recover converts panics in downstream handlers into TaskError{Kind:"panic"}. Always register this as the outermost middleware.
func WithMetrics ¶
func WithMetrics(m Metrics) Middleware
WithMetrics returns a middleware that calls m.Observe after each dispatch.
type Options ¶
type Options struct {
Durable string // consumer durable name; default "ebind-worker"
Concurrency int // max in-flight handlers; default 16
AckWait time.Duration // default 30s
// MaxDeliver bounds how many times NATS will redeliver a message before
// giving up. Default is -1 (unlimited at the NATS layer) so that the
// worker's RetryPolicy is authoritative: the handler decides when to
// terminate (Term + DLQ). Set a positive value only if you want an
// infrastructure-level cap that can override an over-generous RetryPolicy.
MaxDeliver int
Backoff []time.Duration
ShutdownGrace time.Duration // default 30s
Middleware []Middleware
StepHook StepHook // optional; called on terminal success/failure (workflow integration)
// DefaultRetryPolicy is applied to tasks that carry no per-task RetryPolicy.
// Used by the worker (not NATS) to decide when to stop retrying. Default is
// task.DefaultRetryPolicy().
DefaultRetryPolicy *task.RetryPolicy
WorkerID string
Claims ClaimProvider
ClaimRefreshInterval time.Duration // background refresh of the claim cache; default 2s
ClaimRetryDelay time.Duration
}
type StaticClaims ¶
type StaticClaims []string
StaticClaims is a fixed claim set useful in tests and simple deployments.
type StepHook ¶
type StepHook interface {
OnStepDone(ctx context.Context, t *task.Task, result []byte) error
OnStepFailed(ctx context.Context, t *task.Task, err *task.TaskError) error
}
StepHook lets callers observe final task outcomes. Used by the workflow package to persist DAG step results/failures without coupling the worker to workflow. Called exactly once per terminal outcome (success or final failure, not per retry). Never called during intermediate retries.