task

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrKindHandler        = "handler"
	ErrKindDecode         = "decode"
	ErrKindUnknownHandler = "unknown_handler"
	ErrKindPanic          = "panic"
	ErrKindDeadline       = "deadline"
	ErrKindArgCount       = "arg_count"
	ErrKindArgType        = "arg_type"
)

Variables

This section is empty.

Functions

func CanonicalName

func CanonicalName(fnVal reflect.Value) string

CanonicalName derives a stable handler name from a function value. Example: "github.com/you/app/handlers.Foo" → "handlers.Foo".

func MustRegister

func MustRegister(r *Registry, fn any, opts ...RegisterOpt)

func Register

func Register(r *Registry, fn any, opts ...RegisterOpt) error

Types

type Dispatcher

type Dispatcher struct {
	Name string
	// contains filtered or unexported fields
}

func Describe

func Describe(fn any) (*Dispatcher, error)

Describe returns signature info for a function (producer-side, no registration required). Used by Client.Enqueue to validate args and derive the handler name.

func (*Dispatcher) ArgTypes

func (d *Dispatcher) ArgTypes() []reflect.Type

DescribeArgs returns the registered param types (excluding ctx). Used by Client.Enqueue.

func (*Dispatcher) Call

func (d *Dispatcher) Call(ctx context.Context, payload []byte) ([]byte, error)

Call invokes the registered function. payload is a JSON array of args. Returns the JSON-encoded result (or nil if the handler returns only an error).

func (*Dispatcher) HasResult

func (d *Dispatcher) HasResult() bool

HasResult reports whether the function returns (T, error) vs just error.

func (*Dispatcher) ResultType

func (d *Dispatcher) ResultType() reflect.Type

ResultType returns T if HasResult is true.

func (*Dispatcher) ValidateArgs

func (d *Dispatcher) ValidateArgs(args []any) error

ValidateArgs checks that args match the registered signature (producer-side, pre-network).

type RegisterOpt

type RegisterOpt func(*regOptions)

func Alias

func Alias(n string) RegisterOpt

func WithName

func WithName(n string) RegisterOpt

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry() *Registry

func (*Registry) Get

func (r *Registry) Get(name string) (*Dispatcher, bool)

func (*Registry) Names

func (r *Registry) Names() []string

type Response

type Response struct {
	TaskID      string          `json:"task_id"`
	Result      json.RawMessage `json:"result,omitempty"`
	Error       *TaskError      `json:"error,omitempty"`
	Attempts    int             `json:"attempts"`
	CompletedAt time.Time       `json:"completed_at"`
}

type RetryPolicy

type RetryPolicy struct {
	InitialInterval        time.Duration `json:"initial_interval"`
	BackoffCoefficient     float64       `json:"backoff_coefficient"`
	MaximumInterval        time.Duration `json:"maximum_interval"`
	MaximumAttempts        int           `json:"maximum_attempts"` // 0 = unlimited
	NonRetryableErrorKinds []string      `json:"non_retryable_error_kinds,omitempty"`
}

RetryPolicy configures exponential backoff + attempt limits + non-retryable error kinds. All methods are pure and safe for concurrent use.

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy is a reasonable default matching common task-queue behavior. 5 attempts, 1s → 2s → 4s → 8s → 16s (capped at 60s).

func NoRetryPolicy

func NoRetryPolicy() RetryPolicy

NoRetryPolicy disables retries — any failure terminates the task immediately.

func (RetryPolicy) NextDelay

func (p RetryPolicy) NextDelay(attempt int) time.Duration

NextDelay returns the wait before the Nth retry delivery (1-based attempt count). Returns 0 for attempt <= 0. Capped by MaximumInterval. Deterministic — no jitter; callers that want jitter can wrap this.

func (RetryPolicy) ShouldRetry

func (p RetryPolicy) ShouldRetry(err *TaskError, attempt int) bool

ShouldRetry reports whether a failed attempt warrants another delivery. Rules (short-circuit in order):

  1. Unlimited attempts (MaximumAttempts == 0) with retryable err → true
  2. Attempt already reached MaximumAttempts → false
  3. err is marked non-retryable → false
  4. err.Kind appears in NonRetryableErrorKinds → false
  5. Otherwise → true

type Task

type Task struct {
	ID          string            `json:"id"`
	Name        string            `json:"name"`
	Payload     json.RawMessage   `json:"payload"`
	ReplyTo     string            `json:"reply_to,omitempty"`
	Deadline    time.Time         `json:"deadline,omitempty"`
	EnqueuedAt  time.Time         `json:"enqueued_at"`
	Attempt     int               `json:"attempt"`
	TraceCtx    map[string]string `json:"trace_ctx,omitempty"`
	DAGID       string            `json:"dag_id,omitempty"`
	StepID      string            `json:"step_id,omitempty"`
	RetryPolicy *RetryPolicy      `json:"retry_policy,omitempty"`
	Target      string            `json:"target,omitempty"`
	WorkerID    string            `json:"worker_id,omitempty"`
}

type TaskError

type TaskError struct {
	Kind      string `json:"kind"`
	Message   string `json:"message"`
	Retryable bool   `json:"retryable"`
}

func (*TaskError) Error

func (e *TaskError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL