types

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrJobNotFound            = errors.New("dureq: job not found")
	ErrScheduleNotFound       = errors.New("dureq: schedule not found")
	ErrRunNotFound            = errors.New("dureq: run not found")
	ErrDuplicateJob           = errors.New("dureq: duplicate unique key")
	ErrHandlerNotFound        = errors.New("dureq: handler not registered for task type")
	ErrNotLeader              = errors.New("dureq: not the leader node")
	ErrLockFailed             = errors.New("dureq: failed to acquire lock")
	ErrLockNotHeld            = errors.New("dureq: lock not held")
	ErrServerStopped          = errors.New("dureq: server stopped")
	ErrInvalidSchedule        = errors.New("dureq: invalid schedule")
	ErrInvalidJobStatus       = errors.New("dureq: invalid job status transition")
	ErrRedisNotConnected      = errors.New("dureq: Redis not connected")
	ErrWorkflowNotFound       = errors.New("dureq: workflow not found")
	ErrCyclicDependency       = errors.New("dureq: workflow has cyclic dependencies")
	ErrBatchNotFound          = errors.New("dureq: batch not found")
	ErrExecutionTimedOut      = errors.New("dureq: execution timed out")
	ErrScheduleToStartTimeout = errors.New("dureq: schedule-to-start timeout exceeded")
	ErrNoProgressReporter     = errors.New("dureq: no progress reporter in context (called outside handler?)")
)

Sentinel errors.

Functions

func GetAttempt

func GetAttempt(ctx context.Context) int

GetAttempt extracts the attempt number from the context.

func GetErrorClassString

func GetErrorClassString(errClass ErrorClassification) string

GetErrorClassString extracts the error string from an ErrorClassification.

func GetHeaders

func GetHeaders(ctx context.Context) map[string]string

GetHeaders extracts the headers map from the context.

func GetJobID

func GetJobID(ctx context.Context) string

GetJobID extracts the job ID from the context.

func GetMaxRetry

func GetMaxRetry(ctx context.Context) int

GetMaxRetry extracts the max retry count from the context.

func GetNodeID

func GetNodeID(ctx context.Context) string

GetNodeID extracts the node ID from the context.

func GetRetryAfter

func GetRetryAfter(err error) time.Duration

GetRetryAfter extracts the RetryAfter duration from a RateLimitedError.

func GetRunID

func GetRunID(ctx context.Context) string

GetRunID extracts the run ID from the context.

func IsPanicError

func IsPanicError(err error) bool

IsPanicError checks whether err (or any wrapped error) is a PanicError.

func ReportProgress

func ReportProgress(ctx context.Context, data any) error

ReportProgress reports user-defined progress data for the current run. The data is stored alongside the run's heartbeat and is accessible via the monitoring API and client SDK.

func ValidJobTransition added in v0.1.10

func ValidJobTransition(from, to JobStatus) bool

ValidJobTransition returns true if transitioning from → to is allowed.

func ValidRunTransition added in v0.1.10

func ValidRunTransition(from, to RunStatus) bool

ValidRunTransition returns true if transitioning from → to is allowed.

func WithAttempt

func WithAttempt(ctx context.Context, attempt int) context.Context

WithAttempt stores the attempt number in the context.

func WithHeaders

func WithHeaders(ctx context.Context, headers map[string]string) context.Context

WithHeaders stores the headers map in the context.

func WithJobID

func WithJobID(ctx context.Context, id string) context.Context

WithJobID stores the job ID in the context.

func WithMaxRetry

func WithMaxRetry(ctx context.Context, max int) context.Context

WithMaxRetry stores the max retry count in the context.

func WithNodeID

func WithNodeID(ctx context.Context, id string) context.Context

WithNodeID stores the node ID in the context.

func WithPriority

func WithPriority(ctx context.Context, p Priority) context.Context

WithPriority stores the priority in the context.

func WithProgressReporter

func WithProgressReporter(ctx context.Context, fn ProgressReporterFunc) context.Context

WithProgressReporter stores the progress reporter function in the context.

func WithRunID

func WithRunID(ctx context.Context, id string) context.Context

WithRunID stores the run ID in the context.

func WithTaskType

func WithTaskType(ctx context.Context, tt TaskType) context.Context

WithTaskType stores the task type in the context.

Types

type AggregateResult

type AggregateResult struct {
	Group             string          `json:"group"`
	Count             int             `json:"count"`
	AggregatedPayload json.RawMessage `json:"aggregated_payload"`
}

AggregateResult is the outcome of flushing a group.

type AtTime

type AtTime struct {
	Hour   uint `json:"hour"`
	Minute uint `json:"minute"`
	Second uint `json:"second"`
}

AtTime represents a time of day as hour, minute, second.

type BatchDefinition

type BatchDefinition struct {
	Name string `json:"name"`

	// OnetimeTaskType is the handler for shared preprocessing (runs once before items).
	OnetimeTaskType *TaskType       `json:"onetime_task_type,omitempty"`
	OnetimePayload  json.RawMessage `json:"onetime_payload,omitempty"`

	// ItemTaskType is the handler that processes each batch item.
	ItemTaskType TaskType    `json:"item_task_type"`
	Items        []BatchItem `json:"items"`

	// FailurePolicy controls partial failure behavior. Default: continue_on_error.
	FailurePolicy BatchFailurePolicy `json:"failure_policy,omitempty"`

	// ChunkSize controls how many items are dispatched concurrently. Default: 100.
	ChunkSize int `json:"chunk_size,omitempty"`

	// ItemRetryPolicy for individual item failures.
	ItemRetryPolicy *RetryPolicy `json:"item_retry_policy,omitempty"`

	// Execution timeout for the entire batch.
	ExecutionTimeout *Duration `json:"execution_timeout,omitempty"`

	// RetryPolicy for retrying the entire batch on failure.
	RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`

	// DefaultPriority for all jobs created by this batch.
	DefaultPriority *Priority `json:"default_priority,omitempty"`
}

BatchDefinition describes a batch processing job to submit.

type BatchFailurePolicy

type BatchFailurePolicy string

BatchFailurePolicy controls behavior when a batch item fails.

const (
	// BatchFailFast stops the batch immediately on first item failure.
	BatchFailFast BatchFailurePolicy = "fail_fast"
	// BatchContinueOnError continues processing remaining items.
	BatchContinueOnError BatchFailurePolicy = "continue_on_error"
)

type BatchInstance

type BatchInstance struct {
	ID         string          `json:"id"`
	Name       string          `json:"name"`
	Status     WorkflowStatus  `json:"status"`
	Definition BatchDefinition `json:"definition"`

	// Onetime preprocessing state.
	OnetimeState *BatchOnetimeState `json:"onetime_state,omitempty"`

	// Per-item tracking.
	ItemStates map[string]BatchItemState `json:"item_states"`

	// Progress counters.
	TotalItems     int `json:"total_items"`
	CompletedItems int `json:"completed_items"`
	FailedItems    int `json:"failed_items"`
	RunningItems   int `json:"running_items"`
	PendingItems   int `json:"pending_items"`

	// Chunk tracking — index into Definition.Items for the next chunk.
	NextChunkIndex int `json:"next_chunk_index"`

	// Timeout and retry tracking.
	Deadline    *time.Time `json:"deadline,omitempty"`
	Attempt     int        `json:"attempt,omitempty"`
	MaxAttempts int        `json:"max_attempts,omitempty"`

	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`
}

BatchInstance is the runtime state of a batch being processed.

type BatchItem

type BatchItem struct {
	ID      string          `json:"id"`
	Payload json.RawMessage `json:"payload"`
}

BatchItem is a single item within a batch.

type BatchItemResult

type BatchItemResult struct {
	BatchID string          `json:"batch_id"`
	ItemID  string          `json:"item_id"`
	Success bool            `json:"success"`
	Output  json.RawMessage `json:"output,omitempty"`
	Error   *string         `json:"error,omitempty"`
}

BatchItemResult stores the output of a single batch item execution.

type BatchItemState

type BatchItemState struct {
	ItemID     string     `json:"item_id"`
	JobID      string     `json:"job_id,omitempty"`
	Status     JobStatus  `json:"status"`
	Error      *string    `json:"error,omitempty"`
	StartedAt  *time.Time `json:"started_at,omitempty"`
	FinishedAt *time.Time `json:"finished_at,omitempty"`
}

BatchItemState tracks an individual item within a batch.

type BatchOnetimeState

type BatchOnetimeState struct {
	JobID      string          `json:"job_id,omitempty"`
	Status     JobStatus       `json:"status"`
	ResultData json.RawMessage `json:"result_data,omitempty"`
	Error      *string         `json:"error,omitempty"`
	StartedAt  *time.Time      `json:"started_at,omitempty"`
	FinishedAt *time.Time      `json:"finished_at,omitempty"`
}

BatchOnetimeState tracks the shared preprocessing step.

type BatchProgress

type BatchProgress struct {
	BatchID string `json:"batch_id"`
	Done    int    `json:"done"`
	Total   int    `json:"total"`
	Failed  int    `json:"failed"`
	Step    string `json:"step,omitempty"`
}

BatchProgress is published as an event for progress tracking.

type ConditionResult added in v0.1.10

type ConditionResult struct {
	Route uint `json:"route"`
}

ConditionResult wraps the route index chosen by a condition handler. The orchestrator reads this from the job result to determine branching.

type Duration

type Duration time.Duration

Duration is a JSON-friendly time.Duration.

func (Duration) MarshalJSON

func (d Duration) MarshalJSON() ([]byte, error)

func (Duration) Std

func (d Duration) Std() time.Duration

func (*Duration) UnmarshalJSON

func (d *Duration) UnmarshalJSON(b []byte) error

type EnqueueGroupOption

type EnqueueGroupOption struct {
	Group    string // the group name (required)
	TaskType TaskType
	Payload  json.RawMessage
}

EnqueueGroupOption configures a grouped task enqueue.

type ErrorClassification

type ErrorClassification int

ErrorClassification categorizes errors for retry decisions.

const (
	ErrorClassRetryable    ErrorClassification = iota // transient, should retry
	ErrorClassNonRetryable                            // permanent, do not retry
	ErrorClassRateLimited                             // retry after specific delay
)
const (
	ErrorClassSkip   ErrorClassification = iota + 10 // skip execution, no state change
	ErrorClassRepeat                                 // save and re-enqueue immediately
	ErrorClassPause                                  // pause the job
)

func ClassifyControlFlow added in v0.1.10

func ClassifyControlFlow(err error) ErrorClassification

ClassifyControlFlow returns the extended classification including control flow errors. Returns ErrorClassRetryable for standard errors.

func ClassifyError

func ClassifyError(err error) ErrorClassification

ClassifyError determines the retry classification of an error.

type EventType

type EventType string

EventType identifies the kind of event published on the DUREQ_EVENTS stream.

const (
	EventJobEnqueued   EventType = "job.enqueued"
	EventJobScheduled  EventType = "job.scheduled"
	EventJobDispatched EventType = "job.dispatched"
	EventJobStarted    EventType = "job.started"
	EventJobCompleted  EventType = "job.completed"
	EventJobFailed     EventType = "job.failed"
	EventJobRetrying   EventType = "job.retrying"
	EventJobDead       EventType = "job.dead"
	EventJobCancelled  EventType = "job.cancelled"
	EventJobPaused     EventType = "job.paused"
	EventJobResumed    EventType = "job.resumed"

	EventNodeJoined    EventType = "node.joined"
	EventNodeLeft      EventType = "node.left"
	EventLeaderElected EventType = "leader.elected"
	EventLeaderLost    EventType = "leader.lost"

	EventScheduleCreated EventType = "schedule.created"
	EventScheduleRemoved EventType = "schedule.removed"

	EventWorkflowStarted        EventType = "workflow.started"
	EventWorkflowCompleted      EventType = "workflow.completed"
	EventWorkflowFailed         EventType = "workflow.failed"
	EventWorkflowCancelled      EventType = "workflow.cancelled"
	EventWorkflowTaskDispatched EventType = "workflow.task.dispatched"
	EventWorkflowTaskCompleted  EventType = "workflow.task.completed"
	EventWorkflowTaskFailed     EventType = "workflow.task.failed"

	EventBatchStarted          EventType = "batch.started"
	EventBatchCompleted        EventType = "batch.completed"
	EventBatchFailed           EventType = "batch.failed"
	EventBatchCancelled        EventType = "batch.cancelled"
	EventBatchOnetimeCompleted EventType = "batch.onetime.completed"
	EventBatchOnetimeFailed    EventType = "batch.onetime.failed"
	EventBatchItemCompleted    EventType = "batch.item.completed"
	EventBatchItemFailed       EventType = "batch.item.failed"
	EventBatchProgress         EventType = "batch.progress"

	EventWorkflowTimedOut          EventType = "workflow.timed_out"
	EventWorkflowRetrying          EventType = "workflow.retrying"
	EventBatchTimedOut             EventType = "batch.timed_out"
	EventBatchRetrying             EventType = "batch.retrying"
	EventJobScheduleToStartTimeout EventType = "job.schedule_to_start_timeout"
	EventNodeCrashDetected         EventType = "node.crash_detected"
	EventJobAutoRecovered          EventType = "job.auto_recovered"

	EventWorkflowSignalReceived EventType = "workflow.signal.received"
	EventWorkflowTaskPanicked   EventType = "workflow.task.panicked"

	EventWorkflowSuspended EventType = "workflow.suspended"
	EventWorkflowResumed   EventType = "workflow.resumed"

	EventWorkflowHookDispatched EventType = "workflow.hook.dispatched"
	EventWorkflowHookCompleted  EventType = "workflow.hook.completed"
	EventWorkflowHookFailed     EventType = "workflow.hook.failed"
)

type GroupAggregator

type GroupAggregator interface {
	// Aggregate combines multiple task payloads into a single payload.
	// The group name and list of payloads are provided.
	Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)
}

GroupAggregator defines how a set of grouped tasks should be combined into a single aggregated task for processing.

type GroupAggregatorFunc

type GroupAggregatorFunc func(group string, payloads []json.RawMessage) (json.RawMessage, error)

GroupAggregatorFunc is a convenience adapter for GroupAggregator.

func (GroupAggregatorFunc) Aggregate

func (f GroupAggregatorFunc) Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)

type GroupConfig

type GroupConfig struct {
	// Aggregator combines grouped tasks into one before processing.
	Aggregator GroupAggregator

	// GracePeriod is how long to wait for additional tasks after the
	// first task arrives in a group. Default: 5s.
	GracePeriod time.Duration

	// MaxDelay is the maximum time a group can wait before being
	// flushed, regardless of GracePeriod. Default: 30s.
	MaxDelay time.Duration

	// MaxSize is the maximum number of tasks in a group before
	// it is flushed immediately. Default: 100.
	MaxSize int
}

GroupConfig configures task aggregation for a server.

type GroupHandler

type GroupHandler func(ctx context.Context, group string, payload json.RawMessage) error

GroupHandler processes an aggregated group of tasks.

type GroupMessage

type GroupMessage struct {
	JobID    string          `json:"job_id"`
	TaskType TaskType        `json:"task_type"`
	Payload  json.RawMessage `json:"payload"`
	AddedAt  time.Time       `json:"added_at"`
}

GroupMessage is stored in the group set pending aggregation.

type HandlerDefinition

type HandlerDefinition struct {
	TaskType          TaskType
	Handler           HandlerFunc           // original handler (no result)
	HandlerWithResult HandlerFuncWithResult // handler that returns result data (optional)
	Concurrency       int                   // max concurrent per node (0 = pool default)
	Timeout           time.Duration         // per-execution timeout (0 = no timeout)
	RetryPolicy       *RetryPolicy          // default retry (overridable per-job)
	Middlewares       []MiddlewareFunc      // per-handler middleware (applied after global)
	// Version identifies the handler build for safe deployments.
	// When set, the worker will skip messages dispatched with a different version
	// and re-enqueue them for a matching worker to pick up.
	Version string
}

HandlerDefinition describes a handler registered on the server.

type HandlerFunc

type HandlerFunc func(ctx context.Context, payload json.RawMessage) error

HandlerFunc is the function signature server handlers must implement.

func ChainMiddleware

func ChainMiddleware(handler HandlerFunc, middlewares ...MiddlewareFunc) HandlerFunc

ChainMiddleware applies middlewares to a HandlerFunc in order. The first middleware in the list is the outermost (executed first).

func TypedHandler added in v0.1.10

func TypedHandler[T any](fn func(ctx context.Context, payload T) error) HandlerFunc

TypedHandler wraps a type-safe handler function into a HandlerFunc. The payload is automatically unmarshalled from JSON into T.

srv.RegisterHandler(types.HandlerDefinition{
    TaskType: "order.process",
    Handler:  types.TypedHandler(func(ctx context.Context, order OrderPayload) error {
        // order is already unmarshalled
        return nil
    }),
})

func TypedHandlerWithUpstream added in v0.1.10

func TypedHandlerWithUpstream[U any, T any](fn func(ctx context.Context, upstream U, original T) error) HandlerFunc

TypedHandlerWithUpstream wraps a handler that receives both the upstream task's result and this task's original payload. Use with WorkflowTask.ResultFrom to enable automatic data flow between workflow tasks.

srv.RegisterHandler(types.HandlerDefinition{
    TaskType: "process.order",
    Handler:  types.TypedHandlerWithUpstream(func(ctx context.Context, upstream ValidateResult, original OrderPayload) error {
        // upstream is the result from the task specified in ResultFrom
        // original is this task's own payload
        return nil
    }),
})

type HandlerFuncWithResult

type HandlerFuncWithResult func(ctx context.Context, payload json.RawMessage) (json.RawMessage, error)

HandlerFuncWithResult is an extended handler that returns output data.

func ChainMiddlewareWithResult

func ChainMiddlewareWithResult(handler HandlerFuncWithResult, middlewares ...MiddlewareWithResultFunc) HandlerFuncWithResult

ChainMiddlewareWithResult applies middlewares to a HandlerFuncWithResult.

func TypedConditionHandler added in v0.1.10

func TypedConditionHandler[T any](fn func(ctx context.Context, payload T) (uint, error)) HandlerFuncWithResult

TypedConditionHandler wraps a type-safe condition handler that returns a route index. The route index maps to ConditionRoutes in WorkflowTask to determine the next branch.

srv.RegisterHandler(types.HandlerDefinition{
    TaskType:          "check.threshold",
    HandlerWithResult: types.TypedConditionHandler(func(ctx context.Context, p ThresholdPayload) (uint, error) {
        if p.Value > 100 { return 0, nil } // route 0: high
        return 1, nil                       // route 1: low
    }),
})

func TypedHandlerWithResult added in v0.1.10

func TypedHandlerWithResult[T any, R any](fn func(ctx context.Context, payload T) (R, error)) HandlerFuncWithResult

TypedHandlerWithResult wraps a type-safe handler that returns output data. The payload is unmarshalled from JSON into T, and the output R is marshalled back.

srv.RegisterHandler(types.HandlerDefinition{
    TaskType:          "order.process",
    HandlerWithResult: types.TypedHandlerWithResult(func(ctx context.Context, order OrderPayload) (OrderResult, error) {
        return OrderResult{Status: "ok"}, nil
    }),
})

func TypedHandlerWithUpstreamResult added in v0.1.10

func TypedHandlerWithUpstreamResult[U any, T any, R any](fn func(ctx context.Context, upstream U, original T) (R, error)) HandlerFuncWithResult

TypedHandlerWithUpstreamResult wraps a handler that receives upstream result + original payload and returns output data. Combines ResultFrom piping with result production.

func TypedSubflowHandler added in v0.1.10

func TypedSubflowHandler[T any](fn func(ctx context.Context, payload T) ([]WorkflowTask, error)) HandlerFuncWithResult

TypedSubflowHandler wraps a handler that dynamically generates workflow tasks at runtime. The returned tasks are injected into the workflow instance and dispatched by the orchestrator.

srv.RegisterHandler(types.HandlerDefinition{
    TaskType:          "generate.subtasks",
    HandlerWithResult: types.TypedSubflowHandler(func(ctx context.Context, p Input) ([]types.WorkflowTask, error) {
        return []types.WorkflowTask{...}, nil
    }),
})

type HookFunc added in v0.1.10

type HookFunc func(ctx context.Context, event JobEvent)

HookFunc is a callback invoked when a lifecycle event occurs. The context carries the same metadata available in handler contexts (job ID, run ID, etc. via GetJobID, GetRunID, etc.).

type Hooks added in v0.1.10

type Hooks struct {
	OnJobCompleted []HookFunc
	OnJobFailed    []HookFunc
	OnJobDead      []HookFunc
	OnJobPaused    []HookFunc
	OnJobResumed   []HookFunc
	OnJobCancelled []HookFunc

	OnWorkflowCompleted []HookFunc
	OnWorkflowFailed    []HookFunc

	OnBatchCompleted []HookFunc
	OnBatchFailed    []HookFunc
}

Hooks holds registered lifecycle callbacks. All hooks are optional and invoked asynchronously (non-blocking).

func (*Hooks) Fire added in v0.1.10

func (h *Hooks) Fire(ctx context.Context, event JobEvent)

Fire invokes all hooks registered for the given event type. Hooks are called asynchronously in separate goroutines.

type Job

type Job struct {
	ID          string          `json:"id"`
	TaskType    TaskType        `json:"task_type"`
	Payload     json.RawMessage `json:"payload"`
	Schedule    Schedule        `json:"schedule"`
	RetryPolicy *RetryPolicy    `json:"retry_policy,omitempty"`

	// State
	Status            JobStatus  `json:"status"`
	Attempt           int        `json:"attempt"`
	ConsecutiveErrors int        `json:"consecutive_errors,omitempty"`
	LastError         *string    `json:"last_error,omitempty"`
	LastRunAt         *time.Time `json:"last_run_at,omitempty"`
	NextRunAt         *time.Time `json:"next_run_at,omitempty"`
	CompletedAt       *time.Time `json:"completed_at,omitempty"`
	PausedAt          *time.Time `json:"paused_at,omitempty"`
	ResumeAt          *time.Time `json:"resume_at,omitempty"` // auto-resume time (nil = manual only)

	// Metadata
	Tags      []string          `json:"tags,omitempty"`
	Headers   map[string]string `json:"headers,omitempty"` // key-value metadata
	UniqueKey *string           `json:"unique_key,omitempty"`
	DLQAfter  *int              `json:"dlq_after,omitempty"` // move to DLQ after N failures

	// Workflow association (set when this job is part of a workflow).
	WorkflowID   *string `json:"workflow_id,omitempty"`
	WorkflowTask *string `json:"workflow_task,omitempty"`

	// Batch association (set when this job is part of a batch).
	BatchID   *string `json:"batch_id,omitempty"`
	BatchItem *string `json:"batch_item,omitempty"` // item ID within batch
	BatchRole *string `json:"batch_role,omitempty"` // "onetime" or "item"

	// Priority and queue policies.
	Priority               *Priority `json:"priority,omitempty"`
	ScheduleToStartTimeout *Duration `json:"schedule_to_start_timeout,omitempty"`

	// HeartbeatTimeout overrides the default 30s stale age for orphan detection.
	// If set, a run is considered orphaned if no heartbeat is received within
	// this duration.
	HeartbeatTimeout *Duration `json:"heartbeat_timeout,omitempty"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

Job is the persisted entity representing a registered job with its schedule.

type JobEvent

type JobEvent struct {
	Type           EventType      `json:"type"`
	JobID          string         `json:"job_id,omitempty"`
	RunID          string         `json:"run_id,omitempty"`
	NodeID         string         `json:"node_id,omitempty"`
	TaskType       TaskType       `json:"task_type,omitempty"`
	Error          *string        `json:"error,omitempty"`
	Attempt        int            `json:"attempt,omitempty"`
	Timestamp      time.Time      `json:"timestamp"`
	BatchProgress  *BatchProgress `json:"batch_progress,omitempty"`
	AffectedRunIDs []string       `json:"affected_run_ids,omitempty"`
	// WorkflowID is set for task-level workflow events so they can be indexed
	// under the parent workflow rather than the task's job ID.
	WorkflowID string `json:"workflow_id,omitempty"`
}

JobEvent is the envelope published to the DUREQ_EVENTS stream.

type JobRun

type JobRun struct {
	ID              string          `json:"id"`
	JobID           string          `json:"job_id"`
	NodeID          string          `json:"node_id"`
	Status          RunStatus       `json:"status"`
	Attempt         int             `json:"attempt"`
	Error           *string         `json:"error,omitempty"`
	StartedAt       time.Time       `json:"started_at"`
	FinishedAt      *time.Time      `json:"finished_at,omitempty"`
	Duration        time.Duration   `json:"duration,omitempty"`
	LastHeartbeatAt *time.Time      `json:"last_heartbeat_at,omitempty"`
	Progress        json.RawMessage `json:"progress,omitempty"` // user-reported progress data
}

JobRun represents an individual execution of a job.

type JobStatus

type JobStatus string

JobStatus represents the lifecycle state of a job definition.

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusScheduled JobStatus = "scheduled"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
	JobStatusRetrying  JobStatus = "retrying"
	JobStatusPaused    JobStatus = "paused"
	JobStatusDead      JobStatus = "dead"
	JobStatusCancelled JobStatus = "cancelled"
)

func (JobStatus) IsPaused added in v0.1.10

func (s JobStatus) IsPaused() bool

IsPaused returns true if the job is in the paused state.

func (JobStatus) IsTerminal

func (s JobStatus) IsTerminal() bool

func (JobStatus) String

func (s JobStatus) String() string

type MiddlewareFunc

type MiddlewareFunc func(HandlerFunc) HandlerFunc

MiddlewareFunc wraps a HandlerFunc, allowing pre/post processing.

type MiddlewareWithResultFunc

type MiddlewareWithResultFunc func(HandlerFuncWithResult) HandlerFuncWithResult

MiddlewareWithResultFunc wraps a HandlerFuncWithResult.

func AdaptMiddleware

func AdaptMiddleware(mw MiddlewareFunc) MiddlewareWithResultFunc

AdaptMiddleware converts a MiddlewareFunc for use with HandlerFuncWithResult. The adapted middleware wraps the result handler by converting it to/from a plain handler.

type NodeInfo

type NodeInfo struct {
	NodeID        string     `json:"node_id"`
	Address       string     `json:"address,omitempty"`
	TaskTypes     []string   `json:"task_types"`
	StartedAt     time.Time  `json:"started_at"`
	LastHeartbeat time.Time  `json:"last_heartbeat"`
	PoolStats     *PoolStats `json:"pool_stats,omitempty"`
	ActiveRunIDs  []string   `json:"active_run_ids,omitempty"`
	// HandlerVersions maps task type → handler version for safe deployment routing.
	HandlerVersions map[string]string `json:"handler_versions,omitempty"`
}

NodeInfo is registered by each node in the dureq_nodes KV bucket.

type NonRetryableError

type NonRetryableError struct{ Err error }

NonRetryableError wraps an error as non-retryable.

func (*NonRetryableError) Error

func (e *NonRetryableError) Error() string

func (*NonRetryableError) Unwrap

func (e *NonRetryableError) Unwrap() error

type OverlapPolicy

type OverlapPolicy string

OverlapPolicy controls behavior when a scheduled job fires while a previous execution is still running.

const (
	// OverlapAllowAll dispatches regardless of active runs (default).
	OverlapAllowAll OverlapPolicy = "ALLOW_ALL"
	// OverlapSkip skips the dispatch if any run for this job is active.
	OverlapSkip OverlapPolicy = "SKIP"
	// OverlapBufferOne buffers at most one pending dispatch while a run is active.
	OverlapBufferOne OverlapPolicy = "BUFFER_ONE"
	// OverlapBufferAll buffers all pending dispatches while a run is active.
	OverlapBufferAll OverlapPolicy = "BUFFER_ALL"
	// OverlapReplace cancels the current active run and starts a new one.
	OverlapReplace OverlapPolicy = "REPLACE"
)

type PanicError

type PanicError struct {
	Value      interface{}
	Stacktrace string
}

PanicError wraps a recovered panic value as an error.

func GetPanicError

func GetPanicError(err error) *PanicError

GetPanicError extracts the PanicError from an error chain, if present.

func (*PanicError) Error

func (e *PanicError) Error() string

type PauseError added in v0.1.10

type PauseError struct {
	Reason     string
	RetryAfter time.Duration // 0 = manual resume only
}

PauseError signals that the handler wants to pause the job. The job transitions to paused status and can be resumed manually or auto-resumed after RetryAfter duration.

func (*PauseError) Error added in v0.1.10

func (e *PauseError) Error() string

type PeriodicTaskConfig

type PeriodicTaskConfig struct {
	// CronExpr is the cron expression (e.g., "*/5 * * * *").
	CronExpr string `json:"cron_expr"`

	// TaskType is the handler to invoke.
	TaskType TaskType `json:"task_type"`

	// Payload is the task payload.
	Payload json.RawMessage `json:"payload,omitempty"`

	// UniqueKey prevents duplicate schedules for the same logical task.
	UniqueKey string `json:"unique_key,omitempty"`
}

PeriodicTaskConfig describes a single periodic task to be managed.

type PeriodicTaskConfigProvider

type PeriodicTaskConfigProvider interface {
	GetConfigs() ([]*PeriodicTaskConfig, error)
}

PeriodicTaskConfigProvider is an interface that returns the current set of periodic tasks. The PeriodicTaskManager calls GetConfigs periodically and syncs the returned tasks with the scheduler — adding new ones, removing stale ones, and updating changed ones.

type PeriodicTaskConfigProviderFunc

type PeriodicTaskConfigProviderFunc func() ([]*PeriodicTaskConfig, error)

PeriodicTaskConfigProviderFunc is a convenience adapter.

func (PeriodicTaskConfigProviderFunc) GetConfigs

type PoolStats

type PoolStats struct {
	RunningWorkers int    `json:"running_workers"`
	IdleWorkers    int    `json:"idle_workers"`
	MaxConcurrency int    `json:"max_concurrency"`
	TotalSubmitted uint64 `json:"total_submitted"`
	TotalCompleted uint64 `json:"total_completed"`
	TotalFailed    uint64 `json:"total_failed"`
	QueueLength    int    `json:"queue_length"`
}

PoolStats captures worker pool metrics for a node.

type Precondition added in v0.1.10

type Precondition struct {
	Type     string `json:"type"`           // "upstream_output"
	Task     string `json:"task,omitempty"` // upstream task name
	Path     string `json:"path"`           // JSONPath in upstream output
	Expected string `json:"expected"`       // expected value (string comparison)
}

Precondition defines a condition that must be true before a task is dispatched.

type Priority

type Priority int

Priority represents the execution priority of a job (1-10).

const (
	PriorityLow      Priority = 1
	PriorityNormal   Priority = 5
	PriorityHigh     Priority = 8
	PriorityCritical Priority = 10
)

func GetPriority

func GetPriority(ctx context.Context) Priority

GetPriority extracts the priority from the context.

type PriorityTier

type PriorityTier string

PriorityTier maps a numeric priority to a Redis Stream tier.

const (
	TierHigh   PriorityTier = "high"
	TierNormal PriorityTier = "normal"
	TierLow    PriorityTier = "low"
)

func TierForPriority

func TierForPriority(p Priority) PriorityTier

TierForPriority returns the subject tier for a given priority value.

type ProgressReporterFunc

type ProgressReporterFunc func(ctx context.Context, data json.RawMessage) error

ProgressReporterFunc is the callback stored in context for reporting progress.

type RateLimitedError

type RateLimitedError struct {
	Err        error
	RetryAfter time.Duration
}

RateLimitedError wraps an error with a retry-after duration.

func (*RateLimitedError) Error

func (e *RateLimitedError) Error() string

func (*RateLimitedError) Unwrap

func (e *RateLimitedError) Unwrap() error

type RedisOptions

type RedisOptions struct {
	// URL is the Redis server URL (e.g., "redis://localhost:6379", "rediss://host:6380/0").
	URL string

	// Username for Redis ACL authentication (Redis 6+). Leave empty for legacy auth.
	Username string

	// Password for Redis authentication. Overridden if set in URL.
	Password string

	// DB is the Redis database number. Default: 0.
	DB int

	// PoolSize is the maximum number of connections in the pool. Default: 10.
	PoolSize int

	// TLSConfig enables TLS for the connection. Set to non-nil to enable.
	TLSConfig *tls.Config

	// SentinelConfig enables Redis Sentinel failover mode.
	// When set, URL is ignored and Sentinel is used instead.
	SentinelConfig *RedisSentinelConfig

	// ClusterAddrs enables Redis Cluster mode.
	// When set, URL is ignored and a cluster client is created.
	ClusterAddrs []string
}

RedisOptions holds connection parameters for the Redis client.

type RedisSentinelConfig

type RedisSentinelConfig struct {
	// MasterName is the name of the master as configured in Sentinel.
	MasterName string

	// SentinelAddrs is the list of Sentinel node addresses (host:port).
	SentinelAddrs []string

	// SentinelPassword is the password for Sentinel nodes (if different from master password).
	SentinelPassword string
}

RedisSentinelConfig holds configuration for Redis Sentinel failover.

type RepeatError added in v0.1.10

type RepeatError struct {
	Err   error
	Delay time.Duration // optional delay before re-execution
}

RepeatError signals that the handler wants to save current progress and immediately re-enqueue the job for another execution. This is useful for long-running tasks that want to checkpoint periodically.

func (*RepeatError) Error added in v0.1.10

func (e *RepeatError) Error() string

func (*RepeatError) Unwrap added in v0.1.10

func (e *RepeatError) Unwrap() error

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts  int           `json:"max_attempts"`
	InitialDelay time.Duration `json:"initial_delay"`
	MaxDelay     time.Duration `json:"max_delay"`
	Multiplier   float64       `json:"multiplier"`
	Jitter       float64       `json:"jitter"` // 0.0 - 1.0
	// NonRetryableErrors is a list of error message substrings that should not be retried.
	// If a task fails with an error containing any of these strings, it is immediately
	// moved to dead status without retrying.
	NonRetryableErrors []string `json:"non_retryable_errors,omitempty"`
	// PauseAfterErrCount pauses the job instead of moving to dead when
	// N consecutive errors are reached. 0 = disabled (go directly to dead).
	// When set, the job transitions to "paused" instead of "dead" after
	// this many consecutive failures, giving operators a chance to investigate.
	PauseAfterErrCount int `json:"pause_after_err_count,omitempty"`
	// PauseRetryDelay is the duration after which a paused job is automatically
	// resumed. 0 = manual resume only.
	PauseRetryDelay time.Duration `json:"pause_retry_delay,omitempty"`
}

RetryPolicy describes retry behavior.

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy.

type RetryableError

type RetryableError struct{ Err error }

RetryableError wraps an error as retryable.

func (*RetryableError) Error

func (e *RetryableError) Error() string

func (*RetryableError) Unwrap

func (e *RetryableError) Unwrap() error

type RunStatus

type RunStatus string

RunStatus represents the state of an individual job execution.

const (
	RunStatusClaimed   RunStatus = "claimed"
	RunStatusRunning   RunStatus = "running"
	RunStatusSucceeded RunStatus = "succeeded"
	RunStatusFailed    RunStatus = "failed"
	RunStatusTimedOut  RunStatus = "timed_out"
	RunStatusCancelled RunStatus = "cancelled"
)

func (RunStatus) IsTerminal

func (s RunStatus) IsTerminal() bool

func (RunStatus) String

func (s RunStatus) String() string

type Schedule

type Schedule struct {
	Type ScheduleType `json:"type"`

	// ONE_TIME: run once at this time
	RunAt *time.Time `json:"run_at,omitempty"`

	// DURATION: repeat at fixed interval
	Interval *Duration `json:"interval,omitempty"`

	// CRON: cron expression
	CronExpr *string `json:"cron_expr,omitempty"`

	// DAILY/WEEKLY/MONTHLY
	RegularInterval *uint    `json:"regular_interval,omitempty"` // every N days/weeks/months
	AtTimes         []AtTime `json:"at_times,omitempty"`         // times of day
	IncludedDays    []int    `json:"included_days,omitempty"`    // weekdays (0-6) or month days (1-31)

	// Common boundaries
	StartsAt *time.Time `json:"starts_at,omitempty"`
	EndsAt   *time.Time `json:"ends_at,omitempty"`
	Timezone string     `json:"timezone,omitempty"`

	// OverlapPolicy controls what happens when a new occurrence fires
	// while a previous run is still active. Only for recurring schedules.
	OverlapPolicy OverlapPolicy `json:"overlap_policy,omitempty"`

	// CatchupWindow defines the maximum age of a missed firing that should
	// be backfilled. Zero means no backfill (only the latest due firing).
	CatchupWindow *Duration `json:"catchup_window,omitempty"`

	// MaxBackfillPerTick limits how many missed firings are enqueued in a
	// single scheduler tick to prevent flooding. Default: 10 if CatchupWindow is set.
	MaxBackfillPerTick *int `json:"max_backfill_per_tick,omitempty"`

	// Jitter adds a random offset to the computed next run time.
	// This prevents thundering herd when many jobs share the same schedule.
	// E.g., Jitter of 30s means the firing time is offset by rand(0, 30s).
	Jitter *Duration `json:"jitter,omitempty"`
}

Schedule defines "when" a job should run.

func (*Schedule) Validate

func (s *Schedule) Validate() error

Validate checks the schedule for consistency.

type ScheduleEntry

type ScheduleEntry struct {
	JobID           string     `json:"job_id"`
	NextRunAt       time.Time  `json:"next_run_at"`
	Schedule        Schedule   `json:"schedule"`
	LastProcessedAt *time.Time `json:"last_processed_at,omitempty"` // last successful processing time
}

ScheduleEntry is the KV entry the leader's scheduler scans for due jobs.

type ScheduleType

type ScheduleType string

ScheduleType defines how a job is scheduled.

const (
	ScheduleImmediate ScheduleType = "IMMEDIATE"
	ScheduleOneTime   ScheduleType = "ONE_TIME"
	ScheduleDuration  ScheduleType = "DURATION"
	ScheduleCron      ScheduleType = "CRON"
	ScheduleDaily     ScheduleType = "DAILY"
	ScheduleWeekly    ScheduleType = "WEEKLY"
	ScheduleMonthly   ScheduleType = "MONTHLY"
)

type SkipError added in v0.1.10

type SkipError struct {
	Reason string
}

SkipError signals that the handler wants to skip this execution without changing the job status. Useful when the handler determines that preconditions are not met and the job should simply be ignored for this run.

func (*SkipError) Error added in v0.1.10

func (e *SkipError) Error() string

type SubflowResult added in v0.1.10

type SubflowResult struct {
	Tasks []WorkflowTask `json:"tasks"`
}

SubflowResult wraps dynamically generated tasks from a subflow handler.

type TaskType

type TaskType string

TaskType identifies a registered handler. This is the "how" — what function to call when a job of this type fires.

func GetTaskType

func GetTaskType(ctx context.Context) TaskType

GetTaskType extracts the task type from the context.

func (TaskType) String

func (t TaskType) String() string

type UpstreamPayload added in v0.1.10

type UpstreamPayload[T any] struct {
	Upstream json.RawMessage `json:"upstream"`
	Original T               `json:"original"`
}

UpstreamPayload wraps an upstream task's result alongside the original task payload. Used with ResultFrom to automatically pipe upstream outputs to downstream tasks.

type WorkMessage

type WorkMessage struct {
	RunID        string            `json:"run_id"`
	JobID        string            `json:"job_id"`
	TaskType     TaskType          `json:"task_type"`
	Payload      json.RawMessage   `json:"payload"`
	Attempt      int               `json:"attempt"`
	Deadline     time.Time         `json:"deadline"`
	Metadata     map[string]string `json:"metadata,omitempty"`
	Headers      map[string]string `json:"headers,omitempty"`
	Priority     Priority          `json:"priority,omitempty"`
	DispatchedAt time.Time         `json:"dispatched_at,omitempty"`
	// Version is the handler version this message was dispatched with.
	// Workers with a mismatched version will skip and re-enqueue the message.
	Version string `json:"version,omitempty"`
}

WorkMessage is the message published to DUREQ_WORK stream.

type WorkResult

type WorkResult struct {
	RunID   string          `json:"run_id"`
	JobID   string          `json:"job_id"`
	Success bool            `json:"success"`
	Error   *string         `json:"error,omitempty"`
	Output  json.RawMessage `json:"output,omitempty"` // handler output data
}

WorkResult is the outcome of executing a work message.

type WorkflowDefinition

type WorkflowDefinition struct {
	Name             string         `json:"name"`
	Tasks            []WorkflowTask `json:"tasks"`
	ExecutionTimeout *Duration      `json:"execution_timeout,omitempty"`
	RetryPolicy      *RetryPolicy   `json:"retry_policy,omitempty"`
	DefaultPriority  *Priority      `json:"default_priority,omitempty"`
	Hooks            *WorkflowHooks `json:"hooks,omitempty"`
}

WorkflowDefinition describes a DAG of tasks to execute.

type WorkflowHookDef added in v0.1.10

type WorkflowHookDef struct {
	TaskType TaskType        `json:"task_type"`
	Payload  json.RawMessage `json:"payload,omitempty"`
	Timeout  Duration        `json:"timeout,omitempty"`
}

WorkflowHookDef defines a single lifecycle hook job.

type WorkflowHooks added in v0.1.10

type WorkflowHooks struct {
	OnInit    *WorkflowHookDef `json:"on_init,omitempty"`
	OnSuccess *WorkflowHookDef `json:"on_success,omitempty"`
	OnFailure *WorkflowHookDef `json:"on_failure,omitempty"`
	OnExit    *WorkflowHookDef `json:"on_exit,omitempty"`
}

WorkflowHooks defines lifecycle hooks that are automatically dispatched at specific workflow lifecycle events.

type WorkflowInstance

type WorkflowInstance struct {
	ID           string                       `json:"id"`
	WorkflowName string                       `json:"workflow_name"`
	Status       WorkflowStatus               `json:"status"`
	Tasks        map[string]WorkflowTaskState `json:"tasks"`
	Definition   WorkflowDefinition           `json:"definition"`
	Input        json.RawMessage              `json:"input,omitempty"`
	Deadline     *time.Time                   `json:"deadline,omitempty"`
	Attempt      int                          `json:"attempt,omitempty"`
	MaxAttempts  int                          `json:"max_attempts,omitempty"`

	// PendingDeps tracks the number of unresolved dependencies per task.
	// Initialized at workflow start, decremented on dependency completion.
	// A task with PendingDeps[name] == 0 and status == pending is ready.
	PendingDeps map[string]int `json:"pending_deps,omitempty"`

	// ConditionIterations tracks how many times each condition node has been
	// evaluated, to enforce MaxIterations limits.
	ConditionIterations map[string]int `json:"condition_iterations,omitempty"`

	// ParentWorkflowID links this workflow to a parent workflow that spawned it
	// as a child workflow task.
	ParentWorkflowID *string `json:"parent_workflow_id,omitempty"`
	// ParentTaskName is the task name in the parent workflow that this child represents.
	ParentTaskName *string `json:"parent_task_name,omitempty"`

	// HookStates tracks the state of lifecycle hooks dispatched for this workflow.
	HookStates map[string]WorkflowTaskState `json:"hook_states,omitempty"`

	// Output is the result of the final (leaf) task, copied here when the
	// workflow completes so callers can retrieve the workflow-level result.
	Output json.RawMessage `json:"output,omitempty"`

	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`
}

WorkflowInstance is a running instance of a workflow definition.

type WorkflowSignal added in v0.1.3

type WorkflowSignal struct {
	Name       string          `json:"name"`
	Payload    json.RawMessage `json:"payload,omitempty"`
	SentAt     time.Time       `json:"sent_at"`
	WorkflowID string          `json:"workflow_id"`
}

WorkflowSignal is an external signal sent to a running workflow.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the lifecycle state of a workflow instance.

const (
	WorkflowStatusPending   WorkflowStatus = "pending"
	WorkflowStatusRunning   WorkflowStatus = "running"
	WorkflowStatusCompleted WorkflowStatus = "completed"
	WorkflowStatusFailed    WorkflowStatus = "failed"
	WorkflowStatusCancelled WorkflowStatus = "cancelled"
	WorkflowStatusSuspended WorkflowStatus = "suspended"
)

func (WorkflowStatus) IsTerminal

func (s WorkflowStatus) IsTerminal() bool

func (WorkflowStatus) String

func (s WorkflowStatus) String() string

type WorkflowTask

type WorkflowTask struct {
	Name      string          `json:"name"`
	TaskType  TaskType        `json:"task_type"`
	Payload   json.RawMessage `json:"payload,omitempty"`
	DependsOn []string        `json:"depends_on,omitempty"`

	// Priority controls dispatch order among concurrently ready tasks.
	// Higher values are dispatched first. Default is 0.
	Priority int `json:"priority,omitempty"`

	// AllowFailure makes this task's failure non-fatal for the workflow.
	// When true, the task's failure is logged but does not block downstream
	// tasks or cause the workflow to fail. Downstream dependencies are
	// considered "satisfied" even if this task fails.
	AllowFailure bool `json:"allow_failure,omitempty"`

	// ResultFrom specifies which upstream task's output should be injected
	// as this task's payload when dispatching. The referenced task must be
	// in DependsOn. If set, the original Payload field is ignored and
	// replaced with the upstream task's result at dispatch time.
	ResultFrom string `json:"result_from,omitempty"`

	// Type distinguishes static tasks, condition nodes, and subflow generators.
	// Empty string (default) means a regular static task.
	Type WorkflowTaskType `json:"type,omitempty"`

	// ConditionRoutes maps condition handler return values (0, 1, 2, ...)
	// to successor task names. Only used when Type == WorkflowTaskCondition.
	// The condition handler returns a uint; the orchestrator looks up the
	// corresponding task name and marks only that branch as "satisfied".
	ConditionRoutes map[uint]string `json:"condition_routes,omitempty"`

	// MaxIterations limits how many times a condition node can be re-evaluated
	// (to prevent infinite loops). Default 0 means use system default (100).
	MaxIterations int `json:"max_iterations,omitempty"`

	// ChildWorkflowDef, when set, makes this task spawn a child workflow
	// instead of dispatching a regular job. The child runs independently
	// and the parent task completes when the child workflow completes.
	ChildWorkflowDef *WorkflowDefinition `json:"child_workflow_def,omitempty"`

	// Preconditions define conditions that must be true before this task is dispatched.
	// If any precondition fails, the task is skipped.
	Preconditions []Precondition `json:"preconditions,omitempty"`
}

WorkflowTask is one step in a workflow DAG. A task is either a regular task (TaskType set), a child workflow (ChildWorkflowDef set), a condition node, or a dynamic subflow generator.

type WorkflowTaskState

type WorkflowTaskState struct {
	Name       string     `json:"name"`
	JobID      string     `json:"job_id,omitempty"`
	Status     JobStatus  `json:"status"`
	Error      *string    `json:"error,omitempty"`
	StartedAt  *time.Time `json:"started_at,omitempty"`
	FinishedAt *time.Time `json:"finished_at,omitempty"`
	// ChildWorkflowID is set when this task spawned a child workflow.
	ChildWorkflowID string `json:"child_workflow_id,omitempty"`
	// ConditionRoute is set when a condition node completes, indicating
	// which route index was selected by the handler.
	ConditionRoute *uint `json:"condition_route,omitempty"`
	// SubflowTasks lists the task names that were dynamically injected
	// by a subflow generator node.
	SubflowTasks []string `json:"subflow_tasks,omitempty"`
	// Skipped indicates the task was skipped due to a failed precondition.
	Skipped    bool   `json:"skipped,omitempty"`
	SkipReason string `json:"skip_reason,omitempty"`
}

WorkflowTaskState tracks the state of one task within a workflow instance.

type WorkflowTaskType added in v0.1.10

type WorkflowTaskType string

WorkflowTaskType distinguishes static tasks, condition nodes, and subflow generators.

const (
	// WorkflowTaskStatic is a regular task dispatched as a job (default).
	WorkflowTaskStatic WorkflowTaskType = ""
	// WorkflowTaskCondition is a condition node whose handler returns a route index.
	// The orchestrator evaluates the condition and branches to the corresponding successor.
	WorkflowTaskCondition WorkflowTaskType = "condition"
	// WorkflowTaskSubflow is a dynamic subflow node whose handler returns []WorkflowTask
	// at runtime. The orchestrator injects these tasks into the workflow instance.
	WorkflowTaskSubflow WorkflowTaskType = "subflow"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL