Documentation
¶
Index ¶
Constants ¶
const ( ErrKindHandler = "handler" ErrKindDecode = "decode" ErrKindUnknownHandler = "unknown_handler" ErrKindPanic = "panic" ErrKindDeadline = "deadline" ErrKindArgCount = "arg_count" ErrKindArgType = "arg_type" )
Variables ¶
This section is empty.
Functions ¶
func CanonicalName ¶
CanonicalName derives a stable handler name from a function value. Example: "github.com/you/app/handlers.Foo" → "handlers.Foo".
func MustRegister ¶
func MustRegister(r *Registry, fn any, opts ...RegisterOpt)
Types ¶
type Dispatcher ¶
type Dispatcher struct {
Name string
// contains filtered or unexported fields
}
func Describe ¶
func Describe(fn any) (*Dispatcher, error)
Describe returns signature info for a function (producer-side, no registration required). Used by Client.Enqueue to validate args and derive the handler name.
func (*Dispatcher) ArgTypes ¶
func (d *Dispatcher) ArgTypes() []reflect.Type
DescribeArgs returns the registered param types (excluding ctx). Used by Client.Enqueue.
func (*Dispatcher) Call ¶
Call invokes the registered function. payload is a JSON array of args. Returns the JSON-encoded result (or nil if the handler returns only an error).
func (*Dispatcher) HasResult ¶
func (d *Dispatcher) HasResult() bool
HasResult reports whether the function returns (T, error) vs just error.
func (*Dispatcher) ResultType ¶
func (d *Dispatcher) ResultType() reflect.Type
ResultType returns T if HasResult is true.
func (*Dispatcher) ValidateArgs ¶
func (d *Dispatcher) ValidateArgs(args []any) error
ValidateArgs checks that args match the registered signature (producer-side, pre-network).
type RegisterOpt ¶
type RegisterOpt func(*regOptions)
func Alias ¶
func Alias(n string) RegisterOpt
func WithName ¶
func WithName(n string) RegisterOpt
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
func NewRegistry ¶
func NewRegistry() *Registry
type RetryPolicy ¶
type RetryPolicy struct {
InitialInterval time.Duration `json:"initial_interval"`
BackoffCoefficient float64 `json:"backoff_coefficient"`
MaximumInterval time.Duration `json:"maximum_interval"`
MaximumAttempts int `json:"maximum_attempts"` // 0 = unlimited
NonRetryableErrorKinds []string `json:"non_retryable_error_kinds,omitempty"`
}
RetryPolicy configures exponential backoff + attempt limits + non-retryable error kinds. All methods are pure and safe for concurrent use.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() RetryPolicy
DefaultRetryPolicy is a reasonable default matching common task-queue behavior. 5 attempts, 1s → 2s → 4s → 8s → 16s (capped at 60s).
func NoRetryPolicy ¶
func NoRetryPolicy() RetryPolicy
NoRetryPolicy disables retries — any failure terminates the task immediately.
func (RetryPolicy) NextDelay ¶
func (p RetryPolicy) NextDelay(attempt int) time.Duration
NextDelay returns the wait before the Nth retry delivery (1-based attempt count). Returns 0 for attempt <= 0. Capped by MaximumInterval. Deterministic — no jitter; callers that want jitter can wrap this.
func (RetryPolicy) ShouldRetry ¶
func (p RetryPolicy) ShouldRetry(err *TaskError, attempt int) bool
ShouldRetry reports whether a failed attempt warrants another delivery. Rules (short-circuit in order):
- Unlimited attempts (MaximumAttempts == 0) with retryable err → true
- Attempt already reached MaximumAttempts → false
- err is marked non-retryable → false
- err.Kind appears in NonRetryableErrorKinds → false
- Otherwise → true
type Task ¶
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
Payload json.RawMessage `json:"payload"`
ReplyTo string `json:"reply_to,omitempty"`
Deadline time.Time `json:"deadline,omitempty"`
EnqueuedAt time.Time `json:"enqueued_at"`
Attempt int `json:"attempt"`
TraceCtx map[string]string `json:"trace_ctx,omitempty"`
DAGID string `json:"dag_id,omitempty"`
StepID string `json:"step_id,omitempty"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
Target string `json:"target,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
}