Documentation
¶
Overview ¶
Package workflow provides generic orchestration primitives built on Temporal.
It defines the TaskInput and TaskOutput interfaces that all workflow task types must satisfy, and supplies reusable, fully-generic workflow patterns:
- ExecuteTaskWorkflow — run a single task as a Temporal activity.
- PipelineWorkflow — execute tasks sequentially with optional stop-on-error.
- ParallelWorkflow — execute tasks concurrently with a configurable concurrency limit.
- LoopWorkflow / ParameterizedLoopWorkflow — execute a task template for each item in a list, sequentially or in parallel.
The package also defines DAGInput, DAGNode, and DAGOutput types for directed acyclic graph execution with dependency edges. Concrete DAG workflow implementations live in the container/ and function/ packages.
Each pattern is parameterized on [I TaskInput, O TaskOutput] so concrete packages (container, function, datasync) can plug in their own payload types without losing type safety.
Index ¶
- Constants
- func DefaultActivityOptions() wf.ActivityOptions
- func ExecuteTaskWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input I) (*O, error)
- func ExecuteTaskWorkflowWithTimeout[I TaskInput, O TaskOutput](ctx wf.Context, input I, timeout time.Duration) (*O, error)
- func ExtractJSONPath(jsonStr, path string) (string, error)
- func ExtractRegex(text, pattern string) (string, error)
- func GenerateParameterCombinations(params map[string][]string) []map[string]string
- func ReadFile(path string) (string, error)
- func ShellEscape(s string) string
- func SubstituteTemplate(tmpl, item string, index int, params map[string]string) string
- type DAGInput
- type DAGNode
- type DAGOutput
- type LoopInput
- type LoopOutput
- func InstrumentedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input LoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
- func InstrumentedParameterizedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParameterizedLoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
- func LoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input LoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
- func ParameterizedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParameterizedLoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
- type NodeResult
- type ParallelInput
- type ParallelOutput
- type ParameterizedLoopInput
- type PipelineInput
- type PipelineOutput
- type Substitutor
- type TaskInput
- type TaskOutput
Constants ¶
const ( // FailureStrategyFailFast stops execution on the first failure. FailureStrategyFailFast = "fail_fast" // FailureStrategyContinue continues execution despite failures. FailureStrategyContinue = "continue" )
Variables ¶
This section is empty.
Functions ¶
func DefaultActivityOptions ¶
func DefaultActivityOptions() wf.ActivityOptions
DefaultActivityOptions returns the standard activity options used by workflow helpers.
func ExecuteTaskWorkflow ¶
func ExecuteTaskWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input I) (*O, error)
ExecuteTaskWorkflow runs a single task and returns results.
func ExecuteTaskWorkflowWithTimeout ¶
func ExecuteTaskWorkflowWithTimeout[I TaskInput, O TaskOutput](ctx wf.Context, input I, timeout time.Duration) (*O, error)
ExecuteTaskWorkflowWithTimeout runs a single task with a custom timeout.
func ExtractJSONPath ¶
ExtractJSONPath extracts a value from JSON using a simple JSONPath expression, supporting basic paths like "$.field", "$.field.nested", "$.array[0]".
func ExtractRegex ¶
ExtractRegex extracts a value from text using a regular expression. If the regex has a capturing group, returns the first group. Otherwise, returns the entire match.
func GenerateParameterCombinations ¶
GenerateParameterCombinations generates all combinations of parameter values (cartesian product).
func ShellEscape ¶ added in v1.12.3
ShellEscape wraps a string in single quotes for safe shell interpolation. Single quotes inside the string are escaped with the '\” idiom.
Types ¶
type DAGInput ¶ added in v1.16.0
type DAGInput[I TaskInput, O TaskOutput] struct { Nodes []DAGNode[I, O] `json:"nodes" validate:"required,min=1"` FailFast bool `json:"fail_fast"` MaxParallel int `json:"max_parallel,omitempty"` }
DAGInput defines a DAG workflow execution.
type DAGNode ¶ added in v1.16.0
type DAGNode[I TaskInput, O TaskOutput] struct { Name string `json:"name" validate:"required"` Input I `json:"input" validate:"required"` Dependencies []string `json:"dependencies,omitempty"` }
DAGNode defines a single node in a DAG workflow.
type DAGOutput ¶ added in v1.16.0
type DAGOutput[O TaskOutput] struct { Results map[string]*O `json:"results"` NodeResults []NodeResult[O] `json:"node_results"` TotalSuccess int `json:"total_success"` TotalFailed int `json:"total_failed"` TotalDuration time.Duration `json:"total_duration"` }
DAGOutput holds the results of a DAG workflow execution.
type LoopInput ¶
type LoopInput[I TaskInput, O TaskOutput] struct { Items []string `json:"items" validate:"required,min=1"` Template I `json:"template" validate:"required"` Parallel bool `json:"parallel"` // MaxConcurrency is not currently enforced. Use Temporal worker-level // concurrency settings (MaxConcurrentActivityExecutionSize) instead. MaxConcurrency int `json:"max_concurrency,omitempty"` FailureStrategy string `json:"failure_strategy" validate:"oneof='' continue fail_fast"` }
LoopInput defines loop iteration over items.
type LoopOutput ¶
type LoopOutput[O TaskOutput] struct { Results []O `json:"results"` TotalSuccess int `json:"total_success"` TotalFailed int `json:"total_failed"` TotalDuration time.Duration `json:"total_duration"` ItemCount int `json:"item_count"` }
LoopOutput defines loop execution results.
func InstrumentedLoopWorkflow ¶ added in v1.7.0
func InstrumentedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input LoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
InstrumentedLoopWorkflow wraps LoopWorkflow with structured logging at boundaries.
func InstrumentedParameterizedLoopWorkflow ¶ added in v1.7.0
func InstrumentedParameterizedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParameterizedLoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
InstrumentedParameterizedLoopWorkflow wraps ParameterizedLoopWorkflow with structured logging at boundaries.
func LoopWorkflow ¶
func LoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input LoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
LoopWorkflow executes a task template for each item.
func ParameterizedLoopWorkflow ¶
func ParameterizedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParameterizedLoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)
ParameterizedLoopWorkflow executes a task template for each parameter combination.
type NodeResult ¶ added in v1.16.0
type NodeResult[O TaskOutput] struct { Name string `json:"name"` Result *O `json:"result,omitempty"` Error string `json:"error,omitempty"` Duration time.Duration `json:"duration"` Success bool `json:"success"` }
NodeResult holds the result of a single DAG node execution.
type ParallelInput ¶
type ParallelInput[I TaskInput, O TaskOutput] struct { Tasks []I `json:"tasks" validate:"required,min=1"` // MaxConcurrency is not currently enforced. Use Temporal worker-level // concurrency settings (MaxConcurrentActivityExecutionSize) instead. MaxConcurrency int `json:"max_concurrency,omitempty"` FailureStrategy string `json:"failure_strategy" validate:"oneof='' continue fail_fast"` }
ParallelInput defines parallel task execution.
func (*ParallelInput[I, O]) Validate ¶
func (i *ParallelInput[I, O]) Validate() error
Validate validates parallel input.
type ParallelOutput ¶
type ParallelOutput[O TaskOutput] struct { Results []O `json:"results"` TotalSuccess int `json:"total_success"` TotalFailed int `json:"total_failed"` TotalDuration time.Duration `json:"total_duration"` }
ParallelOutput defines parallel execution results.
func InstrumentedParallelWorkflow ¶ added in v1.7.0
func InstrumentedParallelWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParallelInput[I, O]) (*ParallelOutput[O], error)
InstrumentedParallelWorkflow wraps ParallelWorkflow with structured logging at boundaries.
func ParallelWorkflow ¶
func ParallelWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParallelInput[I, O]) (*ParallelOutput[O], error)
ParallelWorkflow executes tasks in parallel.
type ParameterizedLoopInput ¶
type ParameterizedLoopInput[I TaskInput, O TaskOutput] struct { Parameters map[string][]string `json:"parameters" validate:"required,min=1"` Template I `json:"template" validate:"required"` Parallel bool `json:"parallel"` // MaxConcurrency is not currently enforced. Use Temporal worker-level // concurrency settings (MaxConcurrentActivityExecutionSize) instead. MaxConcurrency int `json:"max_concurrency,omitempty"` FailureStrategy string `json:"failure_strategy" validate:"oneof='' continue fail_fast"` }
ParameterizedLoopInput defines loop with multiple parameters.
func (*ParameterizedLoopInput[I, O]) Validate ¶
func (i *ParameterizedLoopInput[I, O]) Validate() error
Validate validates parameterized loop input.
type PipelineInput ¶
type PipelineInput[I TaskInput, O TaskOutput] struct { Tasks []I `json:"tasks" validate:"required,min=1"` StopOnError bool `json:"stop_on_error"` Cleanup bool `json:"cleanup"` }
PipelineInput defines sequential task execution.
func (*PipelineInput[I, O]) Validate ¶
func (i *PipelineInput[I, O]) Validate() error
Validate validates pipeline input.
type PipelineOutput ¶
type PipelineOutput[O TaskOutput] struct { Results []O `json:"results"` TotalSuccess int `json:"total_success"` TotalFailed int `json:"total_failed"` TotalDuration time.Duration `json:"total_duration"` }
PipelineOutput defines pipeline execution results.
func InstrumentedPipelineWorkflow ¶ added in v1.7.0
func InstrumentedPipelineWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input PipelineInput[I, O]) (*PipelineOutput[O], error)
InstrumentedPipelineWorkflow wraps PipelineWorkflow with structured logging at boundaries.
func PipelineWorkflow ¶
func PipelineWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input PipelineInput[I, O]) (*PipelineOutput[O], error)
PipelineWorkflow executes tasks sequentially.
type Substitutor ¶
Substitutor is a function that creates a new task input with template variables substituted.
type TaskInput ¶
TaskInput is the interface constraint that every workflow task input must satisfy. Validate returns an error if the input is invalid, and ActivityName returns the Temporal activity name used to dispatch the task.
type TaskOutput ¶
TaskOutput is the interface constraint that every workflow task output must satisfy. IsSuccess reports whether the task completed successfully, and GetError returns a human-readable error description (empty on success).
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package store provides a generic, typed key-value storage layer used by workflow tasks to persist and retrieve intermediate data.
|
Package store provides a generic, typed key-value storage layer used by workflow tasks to persist and retrieve intermediate data. |