workflow

package
v0.0.0-...-4f1971e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: BSD-3-Clause Imports: 7 Imported by: 0

README

Go Reference

golang.org/x/build/internal/workflow

Package workflow declaratively defines computation graphs that support automatic parallelization, persistence, and monitoring.

Documentation

Overview

Package workflow declaratively defines computation graphs that support automatic parallelization, persistence, and monitoring.

Workflows are a set of tasks and actions that produce and consume Values. Tasks don't run until the workflow is started, so Values represent data that doesn't exist yet, and can't be used directly.

To wrap an existing Go object in a Value, use Const. To define a parameter that will be set when the workflow is started, use Param. To read a task's return value, register it as an Output, and it will be returned from Run. An arbitrary number of Values of the same type can be combined with Slice.

Each Task has a set of input Values, and returns a single output Value. Calling Task defines a task that will run a Go function when it runs. That function must take a context.Context or *TaskContext, followed by arguments corresponding to the dynamic type of the Values passed to it. It must return a value of any type and an error. The TaskContext can be used as a normal Context, and also supports workflow features like unstructured logging. A task only runs once all of its inputs are ready. All task outputs must be used either as inputs to another task or as a workflow Output.

In addition to Tasks, a workflow can have Actions, which represent functions that don't produce an output. Their Go function must only return an error, and their definition results in a Dependency rather than a Value. Both Dependencies and Values can be passed to After and then to Task and Action definitions to create an ordering dependency that doesn't correspond to a function argument.

Expansions are a third type of function that adds to a running workflow definition rather than producing an output. Unlike Actions and Tasks, they execute multiple times and must produce exactly the same workflow modifications each time. As such, they should be pure functions of their inputs. Producing different modifications, or running multiple expansions concurrently, is an error that will corrupt the workflow's state.

Once a Definition is complete, call Start to set its parameters and instantiate it into a Workflow. Call Run to execute the workflow until completion.

Index

Constants

This section is empty.

Variables

View Source
var (
	// String parameter types.
	BasicString = ParamType[string]{
		HTMLElement: "input",
	}
	URL = ParamType[string]{
		HTMLElement:   "input",
		HTMLInputType: "url",
	}
	LongString = ParamType[string]{
		HTMLElement: "textarea",
	}

	// Slice of string parameter types.
	SliceShort = ParamType[[]string]{
		HTMLElement: "input",
	}
	SliceLong = ParamType[[]string]{
		HTMLElement: "textarea",
	}

	// Checkbox bool parameter
	Bool = ParamType[bool]{
		HTMLElement:   "input",
		HTMLInputType: "checkbox",
	}
)
View Source
var MaxRetries = 3

Maximum number of retries. This could be a workflow property.

View Source
var WatchdogDelay = 11 * time.Minute // A little over go test -timeout's default value of 10 minutes.

Functions

func Output

func Output[T any](d *Definition, name string, v Value[T])

Output registers a Value as a workflow output which will be returned when the workflow finishes.

Types

type Definition

type Definition struct {
	// contains filtered or unexported fields
}

A Definition defines the structure of a workflow.

func New

func New() *Definition

New creates a new workflow definition.

func (*Definition) Parameters

func (d *Definition) Parameters() []MetaParameter

Parameters returns parameters associated with the Definition in the same order that they were registered.

func (*Definition) Sub

func (d *Definition) Sub(name string) *Definition

type Dependency

type Dependency interface {
	// contains filtered or unexported methods
}

A Dependency represents a dependency on a prior task.

func Action0

func Action0[C context.Context](d *Definition, name string, f func(C) error, opts ...TaskOption) Dependency

ActionN adds an Action to the workflow definition. Its behavior and requirements are the same as Task, except that f must only return an error, and the result of the definition is a Dependency.

func Action1

func Action1[C context.Context, I1 any](d *Definition, name string, f func(C, I1) error, i1 Value[I1], opts ...TaskOption) Dependency

func Action2

func Action2[C context.Context, I1, I2 any](d *Definition, name string, f func(C, I1, I2) error, i1 Value[I1], i2 Value[I2], opts ...TaskOption) Dependency

func Action3

func Action3[C context.Context, I1, I2, I3 any](d *Definition, name string, f func(C, I1, I2, I3) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Dependency

func Action4

func Action4[C context.Context, I1, I2, I3, I4 any](d *Definition, name string, f func(C, I1, I2, I3, I4) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Dependency

func Action5

func Action5[C context.Context, I1, I2, I3, I4, I5 any](d *Definition, name string, f func(C, I1, I2, I3, I4, I5) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Dependency

type Listener

type Listener interface {
	// TaskStateChanged is called when the state of a task changes.
	// state is safe to store or modify.
	TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error
	// Logger is called to obtain a Logger for a particular task.
	Logger(workflowID uuid.UUID, taskID string) Logger
	// WorkflowStalled is called when there are no runnable tasks.
	WorkflowStalled(workflowID uuid.UUID) error
}

A Listener is used to notify the workflow host of state changes, for display and persistence.

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

A Logger is a debug logger passed to a task implementation.

type MetaParameter

type MetaParameter interface {
	// RequireNonZero reports whether parameter p is required to have a non-zero value.
	RequireNonZero() bool
	// Valid reports whether the given parameter value is valid.
	//
	// A value is considered to be valid if:
	//   - the type of v is the parameter type
	//   - if RequireNonZero is true, the value v is non-zero
	//   - if Check is set, it reports value v to be okay
	Valid(v any) error
	Name() string
	Type() reflect.Type
	HTMLElement() string
	HTMLInputType() string
	HTMLSelectOptions() []string
	Doc() string
	Example() string
}

type ParamDef

type ParamDef[T any] struct {
	Name         string // Name identifies the parameter within a workflow. Must be non-empty.
	ParamType[T]        // Parameter type. For strings, defaults to BasicString if not specified.
	Doc          string // Doc documents the parameter. Optional.
	Example      string // Example is an example value. Optional.

	// Check reports whether the given parameter value is okay. Optional.
	Check func(T) error
}

ParamDef describes a Value that is filled in at workflow creation time.

It can be registered to a workflow with the Parameter function.

type ParamType

type ParamType[T any] struct {
	// HTMLElement configures the HTML element for entering the parameter value.
	// Supported values are "input", "textarea" and "select".
	HTMLElement string
	// HTMLInputType optionally configures the <input> type attribute when HTMLElement is "input".
	// If this attribute is not specified, <input> elements default to type="text".
	// See https://developer.mozilla.org/en-US/docs/Web/HTML/Element/input#input_types.
	HTMLInputType string
	// HTMLSelectOptions configures the available options when HTMLElement is "select".
	// See https://developer.mozilla.org/en-US/docs/Web/HTML/Element/option.
	HTMLSelectOptions []string
}

ParamType defines the type of a workflow parameter.

Since parameters are entered via an HTML form, there are some HTML-related knobs available.

type TaskContext

type TaskContext struct {
	context.Context
	Logger     Logger
	TaskName   string
	WorkflowID uuid.UUID
	// contains filtered or unexported fields
}

A TaskContext is a context.Context, plus workflow-related features.

func (*TaskContext) DisableRetries

func (c *TaskContext) DisableRetries()

func (*TaskContext) DisableWatchdog

func (c *TaskContext) DisableWatchdog()

func (*TaskContext) Printf

func (c *TaskContext) Printf(format string, v ...interface{})

func (*TaskContext) ResetWatchdog

func (c *TaskContext) ResetWatchdog()

func (*TaskContext) SetWatchdogScale

func (c *TaskContext) SetWatchdogScale(v int)

SetWatchdogScale sets the watchdog delay scale factor to max(v, 1), and resets the watchdog with the new scale.

type TaskOption

type TaskOption interface {
	// contains filtered or unexported methods
}

A TaskOption affects the execution of a task but is not an argument to its function.

func After

func After(afters ...Dependency) TaskOption

After represents an ordering dependency on another Task or Action. It can be passed in addition to any arguments to the task's function.

type TaskState

type TaskState struct {
	Name             string
	Started          bool
	Finished         bool
	Result           interface{}
	SerializedResult []byte
	Error            string
	RetryCount       int
}

TaskState contains the state of a task in a running workflow. Once Finished is true, either Result or Error will be populated.

type Value

type Value[T any] interface {
	// contains filtered or unexported methods
}

A Value is a piece of data that will be produced or consumed when a task runs. It cannot be read directly.

func Const

func Const[T any](value T) Value[T]

Const creates a Value from an existing object.

func Expand0

func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1]

ExpandN adds a workflow expansion task to the workflow definition. Expansion tasks run similarly to normal tasks, but instead of computing a result, they can add to the workflow definition.

Unlike normal tasks, expansions may run multiple times and must produce the exact same changes to the definition each time.

Running more than one expansion concurrently is an error and will corrupt the workflow.

func Expand1

func Expand1[I1, O1 any](d *Definition, name string, f func(*Definition, I1) (Value[O1], error), i1 Value[I1], opts ...TaskOption) Value[O1]

func Expand2

func Expand2[I1, I2, O1 any](d *Definition, name string, f func(*Definition, I1, I2) (Value[O1], error), i1 Value[I1], i2 Value[I2], opts ...TaskOption) Value[O1]

func Expand3

func Expand3[I1, I2, I3, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Value[O1]

func Expand4

func Expand4[I1, I2, I3, I4, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Value[O1]

func Expand5

func Expand5[I1, I2, I3, I4, I5, O1 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4, I5) (Value[O1], error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Value[O1]

func Param

func Param[T any](d *Definition, p ParamDef[T]) Value[T]

Param registers a new parameter p that is filled in at workflow creation time and returns the corresponding Value. Param name must be non-empty and uniquely identify the parameter in the workflow definition.

func Slice

func Slice[T any](vs ...Value[T]) Value[[]T]

Slice combines multiple Values of the same type into a Value containing a slice of that type.

func Task0

func Task0[C context.Context, O1 any](d *Definition, name string, f func(C) (O1, error), opts ...TaskOption) Value[O1]

TaskN adds a task to the workflow definition. It takes N inputs, and returns one output. name must uniquely identify the task in the workflow. f must be a function that takes a context.Context or *TaskContext argument, followed by one argument for each Value in inputs, corresponding to the Value's dynamic type. It must return two values, the first of which will be returned as its Value, and an error that will be used by the workflow engine. See the package documentation for examples.

func Task1

func Task1[C context.Context, I1, O1 any](d *Definition, name string, f func(C, I1) (O1, error), i1 Value[I1], opts ...TaskOption) Value[O1]

func Task2

func Task2[C context.Context, I1, I2, O1 any](d *Definition, name string, f func(C, I1, I2) (O1, error), i1 Value[I1], i2 Value[I2], opts ...TaskOption) Value[O1]

func Task3

func Task3[C context.Context, I1, I2, I3, O1 any](d *Definition, name string, f func(C, I1, I2, I3) (O1, error), i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Value[O1]

func Task4

func Task4[C context.Context, I1, I2, I3, I4, O1 any](d *Definition, name string, f func(C, I1, I2, I3, I4) (O1, error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Value[O1]

func Task5

func Task5[C context.Context, I1, I2, I3, I4, I5, O1 any](d *Definition, name string, f func(C, I1, I2, I3, I4, I5) (O1, error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Value[O1]

type Workflow

type Workflow struct {
	ID uuid.UUID
	// contains filtered or unexported fields
}

A Workflow is an instantiated workflow instance, ready to run.

func Resume

func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error)

Resume restores a workflow from stored state. Tasks that had not finished will be restarted, but tasks that finished in errors will not be retried.

The host must create the WorkflowState. TaskStates should be saved from listener callbacks, but for ease of storage, their Result field does not need to be populated.

func Start

func Start(def *Definition, params map[string]interface{}) (*Workflow, error)

Start instantiates a workflow with the given parameters.

func (*Workflow) RetryTask

func (w *Workflow) RetryTask(ctx context.Context, name string) error

RetryTask retries the named task.

func (*Workflow) Run

func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error)

Run runs a workflow and returns its outputs. A workflow will either complete successfully, reach a blocking state waiting on a task to be approved or retried, or get stopped early via context cancellation.

listener.TaskStateChanged can be used for monitoring and persistence purposes: it will be called immediately, when each task starts, and when they finish.

Register Outputs to read task results.

type WorkflowState

type WorkflowState struct {
	ID     uuid.UUID
	Params map[string]interface{}
}

WorkflowState contains the shallow state of a running workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL