workflow

package
v1.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: MIT Imports: 11 Imported by: 0

README

Workflow Package

Generic orchestration primitives built on Temporal. Defines the TaskInput and TaskOutput interfaces that all workflow task types must satisfy, and provides reusable, fully-generic workflow patterns.

Key Features

  • TaskInput / TaskOutput interfaces — common contract for all task payloads
  • ExecuteTaskWorkflow — run a single task as a Temporal activity
  • PipelineWorkflow — sequential execution with optional stop-on-error
  • ParallelWorkflow — concurrent execution with configurable concurrency limit
  • LoopWorkflow — execute a task template for each item in a list
  • DAG types — DAGInput, DAGNode, DAGOutput for graph execution with dependency edges
  • Fully generic — parameterized on [I TaskInput, O TaskOutput] so concrete packages plug in their own types

Documentation

Quick Example

// Build a pipeline of tasks
input := workflow.PipelineInput[MyInput, MyOutput]{
    Tasks:       []MyInput{task1, task2, task3},
    StopOnError: true,
}

we, _ := c.ExecuteWorkflow(ctx, opts,
    workflow.PipelineWorkflow[MyInput, MyOutput], input)

Documentation

Overview

Package workflow provides generic orchestration primitives built on Temporal.

It defines the TaskInput and TaskOutput interfaces that all workflow task types must satisfy, and supplies reusable, fully-generic workflow patterns:

The package also defines DAGInput, DAGNode, and DAGOutput types for directed acyclic graph execution with dependency edges. Concrete DAG workflow implementations live in the container/ and function/ packages.

Each pattern is parameterized on [I TaskInput, O TaskOutput] so concrete packages (container, function, datasync) can plug in their own payload types without losing type safety.

Index

Constants

View Source
const (
	// FailureStrategyFailFast stops execution on the first failure.
	FailureStrategyFailFast = "fail_fast"
	// FailureStrategyContinue continues execution despite failures.
	FailureStrategyContinue = "continue"
)

Variables

This section is empty.

Functions

func DefaultActivityOptions

func DefaultActivityOptions() wf.ActivityOptions

DefaultActivityOptions returns the standard activity options used by workflow helpers.

func ExecuteTaskWorkflow

func ExecuteTaskWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input I) (*O, error)

ExecuteTaskWorkflow runs a single task and returns results.

func ExecuteTaskWorkflowWithTimeout

func ExecuteTaskWorkflowWithTimeout[I TaskInput, O TaskOutput](ctx wf.Context, input I, timeout time.Duration) (*O, error)

ExecuteTaskWorkflowWithTimeout runs a single task with a custom timeout.

func ExtractJSONPath

func ExtractJSONPath(jsonStr, path string) (string, error)

ExtractJSONPath extracts a value from JSON using a simple JSONPath expression, supporting basic paths like "$.field", "$.field.nested", "$.array[0]".

func ExtractRegex

func ExtractRegex(text, pattern string) (string, error)

ExtractRegex extracts a value from text using a regular expression. If the regex has a capturing group, returns the first group. Otherwise, returns the entire match.

func GenerateParameterCombinations

func GenerateParameterCombinations(params map[string][]string) []map[string]string

GenerateParameterCombinations generates all combinations of parameter values (cartesian product).

func ReadFile

func ReadFile(path string) (string, error)

ReadFile reads a file and returns its contents as a string.

func ShellEscape added in v1.12.3

func ShellEscape(s string) string

ShellEscape wraps a string in single quotes for safe shell interpolation. Single quotes inside the string are escaped with the '\” idiom.

func SubstituteTemplate

func SubstituteTemplate(tmpl, item string, index int, params map[string]string) string

SubstituteTemplate replaces template variables in a string. Supports: {{item}}, {{index}}, and {{.paramName}}/{{paramName}} syntax.

Types

type DAGInput added in v1.16.0

type DAGInput[I TaskInput, O TaskOutput] struct {
	Nodes       []DAGNode[I, O] `json:"nodes" validate:"required,min=1"`
	FailFast    bool            `json:"fail_fast"`
	MaxParallel int             `json:"max_parallel,omitempty"`
}

DAGInput defines a DAG workflow execution.

func (*DAGInput[I, O]) Validate added in v1.16.0

func (d *DAGInput[I, O]) Validate() error

Validate validates DAG input including cycle detection.

type DAGNode added in v1.16.0

type DAGNode[I TaskInput, O TaskOutput] struct {
	Name         string   `json:"name" validate:"required"`
	Input        I        `json:"input" validate:"required"`
	Dependencies []string `json:"dependencies,omitempty"`
}

DAGNode defines a single node in a DAG workflow.

type DAGOutput added in v1.16.0

type DAGOutput[O TaskOutput] struct {
	Results       map[string]*O   `json:"results"`
	NodeResults   []NodeResult[O] `json:"node_results"`
	TotalSuccess  int             `json:"total_success"`
	TotalFailed   int             `json:"total_failed"`
	TotalDuration time.Duration   `json:"total_duration"`
}

DAGOutput holds the results of a DAG workflow execution.

type LoopInput

type LoopInput[I TaskInput, O TaskOutput] struct {
	Items    []string `json:"items" validate:"required,min=1"`
	Template I        `json:"template" validate:"required"`
	Parallel bool     `json:"parallel"`
	// MaxConcurrency is not currently enforced. Use Temporal worker-level
	// concurrency settings (MaxConcurrentActivityExecutionSize) instead.
	MaxConcurrency  int    `json:"max_concurrency,omitempty"`
	FailureStrategy string `json:"failure_strategy" validate:"oneof='' continue fail_fast"`
}

LoopInput defines loop iteration over items.

func (*LoopInput[I, O]) Validate

func (i *LoopInput[I, O]) Validate() error

Validate validates loop input.

type LoopOutput

type LoopOutput[O TaskOutput] struct {
	Results       []O           `json:"results"`
	TotalSuccess  int           `json:"total_success"`
	TotalFailed   int           `json:"total_failed"`
	TotalDuration time.Duration `json:"total_duration"`
	ItemCount     int           `json:"item_count"`
}

LoopOutput defines loop execution results.

func InstrumentedLoopWorkflow added in v1.7.0

func InstrumentedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input LoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)

InstrumentedLoopWorkflow wraps LoopWorkflow with structured logging at boundaries.

func InstrumentedParameterizedLoopWorkflow added in v1.7.0

func InstrumentedParameterizedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParameterizedLoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)

InstrumentedParameterizedLoopWorkflow wraps ParameterizedLoopWorkflow with structured logging at boundaries.

func LoopWorkflow

func LoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input LoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)

LoopWorkflow executes a task template for each item.

func ParameterizedLoopWorkflow

func ParameterizedLoopWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParameterizedLoopInput[I, O], substitutor Substitutor[I]) (*LoopOutput[O], error)

ParameterizedLoopWorkflow executes a task template for each parameter combination.

type NodeResult added in v1.16.0

type NodeResult[O TaskOutput] struct {
	Name     string        `json:"name"`
	Result   *O            `json:"result,omitempty"`
	Error    string        `json:"error,omitempty"`
	Duration time.Duration `json:"duration"`
	Success  bool          `json:"success"`
}

NodeResult holds the result of a single DAG node execution.

type ParallelInput

type ParallelInput[I TaskInput, O TaskOutput] struct {
	Tasks []I `json:"tasks" validate:"required,min=1"`
	// MaxConcurrency is not currently enforced. Use Temporal worker-level
	// concurrency settings (MaxConcurrentActivityExecutionSize) instead.
	MaxConcurrency  int    `json:"max_concurrency,omitempty"`
	FailureStrategy string `json:"failure_strategy" validate:"oneof='' continue fail_fast"`
}

ParallelInput defines parallel task execution.

func (*ParallelInput[I, O]) Validate

func (i *ParallelInput[I, O]) Validate() error

Validate validates parallel input.

type ParallelOutput

type ParallelOutput[O TaskOutput] struct {
	Results       []O           `json:"results"`
	TotalSuccess  int           `json:"total_success"`
	TotalFailed   int           `json:"total_failed"`
	TotalDuration time.Duration `json:"total_duration"`
}

ParallelOutput defines parallel execution results.

func InstrumentedParallelWorkflow added in v1.7.0

func InstrumentedParallelWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParallelInput[I, O]) (*ParallelOutput[O], error)

InstrumentedParallelWorkflow wraps ParallelWorkflow with structured logging at boundaries.

func ParallelWorkflow

func ParallelWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input ParallelInput[I, O]) (*ParallelOutput[O], error)

ParallelWorkflow executes tasks in parallel.

type ParameterizedLoopInput

type ParameterizedLoopInput[I TaskInput, O TaskOutput] struct {
	Parameters map[string][]string `json:"parameters" validate:"required,min=1"`
	Template   I                   `json:"template" validate:"required"`
	Parallel   bool                `json:"parallel"`
	// MaxConcurrency is not currently enforced. Use Temporal worker-level
	// concurrency settings (MaxConcurrentActivityExecutionSize) instead.
	MaxConcurrency  int    `json:"max_concurrency,omitempty"`
	FailureStrategy string `json:"failure_strategy" validate:"oneof='' continue fail_fast"`
}

ParameterizedLoopInput defines loop with multiple parameters.

func (*ParameterizedLoopInput[I, O]) Validate

func (i *ParameterizedLoopInput[I, O]) Validate() error

Validate validates parameterized loop input.

type PipelineInput

type PipelineInput[I TaskInput, O TaskOutput] struct {
	Tasks       []I  `json:"tasks" validate:"required,min=1"`
	StopOnError bool `json:"stop_on_error"`
	Cleanup     bool `json:"cleanup"`
}

PipelineInput defines sequential task execution.

func (*PipelineInput[I, O]) Validate

func (i *PipelineInput[I, O]) Validate() error

Validate validates pipeline input.

type PipelineOutput

type PipelineOutput[O TaskOutput] struct {
	Results       []O           `json:"results"`
	TotalSuccess  int           `json:"total_success"`
	TotalFailed   int           `json:"total_failed"`
	TotalDuration time.Duration `json:"total_duration"`
}

PipelineOutput defines pipeline execution results.

func InstrumentedPipelineWorkflow added in v1.7.0

func InstrumentedPipelineWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input PipelineInput[I, O]) (*PipelineOutput[O], error)

InstrumentedPipelineWorkflow wraps PipelineWorkflow with structured logging at boundaries.

func PipelineWorkflow

func PipelineWorkflow[I TaskInput, O TaskOutput](ctx wf.Context, input PipelineInput[I, O]) (*PipelineOutput[O], error)

PipelineWorkflow executes tasks sequentially.

type Substitutor

type Substitutor[I TaskInput] func(template I, item string, index int, params map[string]string) I

Substitutor is a function that creates a new task input with template variables substituted.

type TaskInput

type TaskInput interface {
	Validate() error
	ActivityName() string
}

TaskInput is the interface constraint that every workflow task input must satisfy. Validate returns an error if the input is invalid, and ActivityName returns the Temporal activity name used to dispatch the task.

type TaskOutput

type TaskOutput interface {
	IsSuccess() bool
	GetError() string
}

TaskOutput is the interface constraint that every workflow task output must satisfy. IsSuccess reports whether the task completed successfully, and GetError returns a human-readable error description (empty on success).

Directories

Path Synopsis
Package store provides a generic, typed key-value storage layer used by workflow tasks to persist and retrieve intermediate data.
Package store provides a generic, typed key-value storage layer used by workflow tasks to persist and retrieve intermediate data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL