Documentation ¶
Overview ¶
Package workflow declaratively defines computation graphs that support automatic parallelization, persistence, and monitoring.
Workflows are a set of tasks and actions that produce and consume Values. Tasks don't run until the workflow is started, so Values represent data that doesn't exist yet, and can't be used directly.
To wrap an existing Go object in a Value, use Const. To define a parameter that will be set when the workflow is started, use Param. To read a task's return value, register it as an Output, and it will be returned from Run. An arbitrary number of Values of the same type can be combined with Slice.
Each Task has a set of input Values, and returns a single output Value. Calling Task defines a task that will run a Go function when it runs. That function must take a context.Context or *TaskContext, followed by arguments corresponding to the dynamic type of the Values passed to it. It must return a value of any type and an error. The TaskContext can be used as a normal Context, and also supports workflow features like unstructured logging. A task only runs once all of its inputs are ready. All task outputs must be used either as inputs to another task or as a workflow Output.
In addition to Tasks, a workflow can have Actions, which represent functions that don't produce an output. Their Go function must only return an error, and their definition results in a Dependency rather than a Value. Both Dependencies and Values can be passed to After and then to Task and Action definitions to create an ordering dependency that doesn't correspond to a function argument.
Expansions are a third type of function that adds to a running workflow definition rather than producing an output. Unlike Actions and Tasks, they execute multiple times and must produce exactly the same workflow modifications each time. As such, they should be pure functions of their inputs. Producing different modifications, or running multiple expansions concurrently, is an error that will corrupt the workflow's state.
Once a Definition is complete, call Start to set its parameters and instantiate it into a Workflow. Call Run to execute the workflow until completion.
Index ¶
- Variables
- func Output[T any](d *Definition, name string, v Value[T])
- type ACL
- type Definition
- type Dependency
- func Action0[C context.Context](d *Definition, name string, f func(C) error, opts ...TaskOption) Dependency
- func Action1[C context.Context, I1 any](d *Definition, name string, f func(C, I1) error, i1 Value[I1], ...) Dependency
- func Action2[C context.Context, I1, I2 any](d *Definition, name string, f func(C, I1, I2) error, i1 Value[I1], ...) Dependency
- func Action3[C context.Context, I1, I2, I3 any](d *Definition, name string, f func(C, I1, I2, I3) error, i1 Value[I1], ...) Dependency
- func Action4[C context.Context, I1, I2, I3, I4 any](d *Definition, name string, f func(C, I1, I2, I3, I4) error, i1 Value[I1], ...) Dependency
- func Action5[C context.Context, I1, I2, I3, I4, I5 any](d *Definition, name string, f func(C, I1, I2, I3, I4, I5) error, i1 Value[I1], ...) Dependency
- type Listener
- type Logger
- type MetaParameter
- type ParamDef
- type ParamType
- type TaskContext
- type TaskOption
- type TaskState
- type Value
- func Const[T any](value T) Value[T]
- func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), ...) Value[O1]
- func Expand1[I1, O1 any](d *Definition, name string, f func(*Definition, I1) (Value[O1], error), ...) Value[O1]
- func Expand2[I1, I2, O1 any](d *Definition, name string, f func(*Definition, I1, I2) (Value[O1], error), ...) Value[O1]
- func Expand3[I1, I2, I3, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3) (Value[O1], error), ...) Value[O1]
- func Expand4[I1, I2, I3, I4, O1 any](d *Definition, name string, ...) Value[O1]
- func Expand5[I1, I2, I3, I4, I5, O1 any](d *Definition, name string, ...) Value[O1]
- func Param[T any](d *Definition, p ParamDef[T]) Value[T]
- func Slice[T any](vs ...Value[T]) Value[[]T]
- func Task0[C context.Context, O1 any](d *Definition, name string, f func(C) (O1, error), opts ...TaskOption) Value[O1]
- func Task1[C context.Context, I1, O1 any](d *Definition, name string, f func(C, I1) (O1, error), i1 Value[I1], ...) Value[O1]
- func Task2[C context.Context, I1, I2, O1 any](d *Definition, name string, f func(C, I1, I2) (O1, error), i1 Value[I1], ...) Value[O1]
- func Task3[C context.Context, I1, I2, I3, O1 any](d *Definition, name string, f func(C, I1, I2, I3) (O1, error), i1 Value[I1], ...) Value[O1]
- func Task4[C context.Context, I1, I2, I3, I4, O1 any](d *Definition, name string, f func(C, I1, I2, I3, I4) (O1, error), ...) Value[O1]
- func Task5[C context.Context, I1, I2, I3, I4, I5, O1 any](d *Definition, name string, f func(C, I1, I2, I3, I4, I5) (O1, error), ...) Value[O1]
- type Workflow
- type WorkflowState
Constants ¶
This section is empty.
Variables ¶
var ( // String parameter types. BasicString = ParamType[string]{ HTMLElement: "input", } URL = ParamType[string]{ HTMLElement: "input", HTMLInputType: "url", } LongString = ParamType[string]{ HTMLElement: "textarea", } // Slice of string parameter types. SliceShort = ParamType[[]string]{ HTMLElement: "input", } SliceLong = ParamType[[]string]{ HTMLElement: "textarea", } // Checkbox bool parameter Bool = ParamType[bool]{ HTMLElement: "input", HTMLInputType: "checkbox", } )
var MaxRetries = 3
Maximum number of retries. This could be a workflow property.
var WatchdogDelay = 11 * time.Minute // A little over go test -timeout's default value of 10 minutes.
Functions ¶
Types ¶
type ACL ¶
type ACL struct { // In order to interact with a workflow, the user must be a member // of one of any of the following groups. Groups []string }
type Definition ¶
type Definition struct {
// contains filtered or unexported fields
}
A Definition defines the structure of a workflow.
func (*Definition) AuthorizedGroups ¶
func (d *Definition) AuthorizedGroups() []string
AuthorizedGroups returns the list of groups which are authorized to create, approve, stop, and retry this workflow. If any user can preform these actions, a nil slice is returned.
func (*Definition) Parameters ¶
func (d *Definition) Parameters() []MetaParameter
Parameters returns parameters associated with the Definition in the same order that they were registered.
func (*Definition) Sub ¶
func (d *Definition) Sub(name string) *Definition
type Dependency ¶
type Dependency interface {
// contains filtered or unexported methods
}
A Dependency represents a dependency on a prior task.
func Action0 ¶
func Action0[C context.Context](d *Definition, name string, f func(C) error, opts ...TaskOption) Dependency
ActionN adds an Action to the workflow definition. Its behavior and requirements are the same as Task, except that f must only return an error, and the result of the definition is a Dependency.
func Action1 ¶
func Action1[C context.Context, I1 any](d *Definition, name string, f func(C, I1) error, i1 Value[I1], opts ...TaskOption) Dependency
func Action2 ¶
func Action2[C context.Context, I1, I2 any](d *Definition, name string, f func(C, I1, I2) error, i1 Value[I1], i2 Value[I2], opts ...TaskOption) Dependency
func Action3 ¶
func Action3[C context.Context, I1, I2, I3 any](d *Definition, name string, f func(C, I1, I2, I3) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Dependency
func Action4 ¶
func Action4[C context.Context, I1, I2, I3, I4 any](d *Definition, name string, f func(C, I1, I2, I3, I4) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Dependency
func Action5 ¶
func Action5[C context.Context, I1, I2, I3, I4, I5 any](d *Definition, name string, f func(C, I1, I2, I3, I4, I5) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Dependency
type Listener ¶
type Listener interface { // TaskStateChanged is called when the state of a task changes. // state is safe to store or modify. TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error // Logger is called to obtain a Logger for a particular task. Logger(workflowID uuid.UUID, taskID string) Logger // WorkflowStalled is called when there are no runnable tasks. WorkflowStalled(workflowID uuid.UUID) error }
A Listener is used to notify the workflow host of state changes, for display and persistence.
type Logger ¶
type Logger interface {
Printf(format string, v ...interface{})
}
A Logger is a debug logger passed to a task implementation.
type MetaParameter ¶
type MetaParameter interface { // RequireNonZero reports whether parameter p is required to have a non-zero value. RequireNonZero() bool // Valid reports whether the given parameter value is valid. // // A value is considered to be valid if: // - the type of v is the parameter type // - if RequireNonZero is true, the value v is non-zero // - if Check is set, it reports value v to be okay Valid(v any) error Name() string Type() reflect.Type HTMLElement() string HTMLInputType() string HTMLSelectOptions() []string Doc() string Example() string }
type ParamDef ¶
type ParamDef[T any] struct { Name string // Name identifies the parameter within a workflow. Must be non-empty. ParamType[T] // Parameter type. For strings, defaults to BasicString if not specified. Doc string // Doc documents the parameter. Optional. Example string // Example is an example value. Optional. // Check reports whether the given parameter value is okay. Optional. Check func(T) error }
ParamDef describes a Value that is filled in at workflow creation time.
It can be registered to a workflow with the Parameter function.
type ParamType ¶
type ParamType[T any] struct { // HTMLElement configures the HTML element for entering the parameter value. // Supported values are "input", "textarea" and "select". HTMLElement string // HTMLInputType optionally configures the <input> type attribute when HTMLElement is "input". // If this attribute is not specified, <input> elements default to type="text". // See https://developer.mozilla.org/en-US/docs/Web/HTML/Element/input#input_types. HTMLInputType string // HTMLSelectOptions configures the available options when HTMLElement is "select". // See https://developer.mozilla.org/en-US/docs/Web/HTML/Element/option. HTMLSelectOptions []string }
ParamType defines the type of a workflow parameter.
Since parameters are entered via an HTML form, there are some HTML-related knobs available.
type TaskContext ¶
type TaskContext struct { context.Context Logger Logger TaskName string WorkflowID uuid.UUID // contains filtered or unexported fields }
A TaskContext is a context.Context, plus workflow-related features.
func (*TaskContext) DisableRetries ¶
func (c *TaskContext) DisableRetries()
func (*TaskContext) DisableWatchdog ¶
func (c *TaskContext) DisableWatchdog()
func (*TaskContext) Printf ¶
func (c *TaskContext) Printf(format string, v ...interface{})
func (*TaskContext) ResetWatchdog ¶
func (c *TaskContext) ResetWatchdog()
func (*TaskContext) SetWatchdogScale ¶
func (c *TaskContext) SetWatchdogScale(v int)
SetWatchdogScale sets the watchdog delay scale factor to max(v, 1), and resets the watchdog with the new scale.
type TaskOption ¶
type TaskOption interface {
// contains filtered or unexported methods
}
A TaskOption affects the execution of a task but is not an argument to its function.
func After ¶
func After(afters ...Dependency) TaskOption
After represents an ordering dependency on another Task or Action. It can be passed in addition to any arguments to the task's function.
type TaskState ¶
type TaskState struct { Name string Started bool Finished bool Result interface{} SerializedResult []byte Error string RetryCount int }
TaskState contains the state of a task in a running workflow. Once Finished is true, either Result or Error will be populated.
type Value ¶
type Value[T any] interface { // contains filtered or unexported methods }
A Value is a piece of data that will be produced or consumed when a task runs. It cannot be read directly.
func Expand0 ¶
func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1]
ExpandN adds a workflow expansion task to the workflow definition. Expansion tasks run similarly to normal tasks, but instead of computing a result, they can add to the workflow definition.
Unlike normal tasks, expansions may run multiple times and must produce the exact same changes to the definition each time.
Running more than one expansion concurrently is an error and will corrupt the workflow.
func Expand1 ¶
func Expand1[I1, O1 any](d *Definition, name string, f func(*Definition, I1) (Value[O1], error), i1 Value[I1], opts ...TaskOption) Value[O1]
func Expand2 ¶
func Expand2[I1, I2, O1 any](d *Definition, name string, f func(*Definition, I1, I2) (Value[O1], error), i1 Value[I1], i2 Value[I2], opts ...TaskOption) Value[O1]
func Expand3 ¶
func Expand3[I1, I2, I3, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Value[O1]
func Expand4 ¶
func Expand4[I1, I2, I3, I4, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Value[O1]
func Expand5 ¶
func Expand5[I1, I2, I3, I4, I5, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4, I5) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Value[O1]
func Param ¶
func Param[T any](d *Definition, p ParamDef[T]) Value[T]
Param registers a new parameter p that is filled in at workflow creation time and returns the corresponding Value. Param name must be non-empty and uniquely identify the parameter in the workflow definition.
func Slice ¶
Slice combines multiple Values of the same type into a Value containing a slice of that type.
func Task0 ¶
func Task0[C context.Context, O1 any](d *Definition, name string, f func(C) (O1, error), opts ...TaskOption) Value[O1]
TaskN adds a task to the workflow definition. It takes N inputs, and returns one output. name must uniquely identify the task in the workflow. f must be a function that takes a context.Context or *TaskContext argument, followed by one argument for each Value in inputs, corresponding to the Value's dynamic type. It must return two values, the first of which will be returned as its Value, and an error that will be used by the workflow engine. See the package documentation for examples.
func Task1 ¶
func Task1[C context.Context, I1, O1 any](d *Definition, name string, f func(C, I1) (O1, error), i1 Value[I1], opts ...TaskOption) Value[O1]
func Task2 ¶
func Task2[C context.Context, I1, I2, O1 any](d *Definition, name string, f func(C, I1, I2) (O1, error), i1 Value[I1], i2 Value[I2], opts ...TaskOption) Value[O1]
type Workflow ¶
A Workflow is an instantiated workflow instance, ready to run.
func Resume ¶
func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error)
Resume restores a workflow from stored state. Tasks that had not finished will be restarted, but tasks that finished in errors will not be retried.
The host must create the WorkflowState. TaskStates should be saved from listener callbacks, but for ease of storage, their Result field does not need to be populated.
func Start ¶
func Start(def *Definition, params map[string]interface{}) (*Workflow, error)
Start instantiates a workflow with the given parameters.
func (*Workflow) Run ¶
Run runs a workflow and returns its outputs. A workflow will either complete successfully, reach a blocking state waiting on a task to be approved or retried, or get stopped early via context cancellation.
listener.TaskStateChanged can be used for monitoring and persistence purposes: it will be called immediately, when each task starts, and when they finish.
Register Outputs to read task results.
type WorkflowState ¶
WorkflowState contains the shallow state of a running workflow.