v0.9.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2024 License: Apache-2.0 Imports: 15 Imported by: 27



Package flow provides a low-level workflow manager based on a CUE Instance.

A Task defines an operational unit in a Workflow and corresponds to a struct in a CUE instance. This package does not define what a Task looks like in a CUE Instance. Instead, the user of this package must supply a TaskFunc that creates a Runner for cue.Values that are deemed to be a Task.

Tasks may depend on other tasks. Cyclic dependencies are thereby not allowed. A Task A depends on another Task B if A, directly or indirectly, has a reference to any field of Task B, including its root.

package main

import (


func main() {
	ctx := cuecontext.New()
	v := ctx.CompileString(`
	a: {
		input: "world"
		output: string
	b: {
		input: a.output
		output: string
	if err := v.Err(); err != nil {
	controller := flow.New(nil, v, ioTaskFunc)
	if err := controller.Run(context.Background()); err != nil {

func ioTaskFunc(v cue.Value) (flow.Runner, error) {
	inputPath := cue.ParsePath("input")

	input := v.LookupPath(inputPath)
	if !input.Exists() {
		return nil, nil

	return flow.RunnerFunc(func(t *flow.Task) error {
		inputVal, err := t.Value().LookupPath(inputPath).String()
		if err != nil {
			return fmt.Errorf("input not of type string")

		outputVal := fmt.Sprintf("hello %s", inputVal)
		fmt.Printf("setting %s.output to %q\n", t.Path(), outputVal)

		return t.Fill(map[string]string{
			"output": outputVal,
	}), nil

setting a.output to "hello world"
setting b.output to "hello hello world"




This section is empty.


View Source
var (
	// ErrAbort may be returned by a task to avoid processing downstream tasks.
	// This can be used by control nodes to influence execution.
	ErrAbort = errors.New("abort dependant tasks without failure")


This section is empty.


type Config

type Config struct {
	// Root limits the search for tasks to be within the path indicated to root.
	// For the cue command, this is set to ["command"]. The default value is
	// for all tasks to be root.
	Root cue.Path

	// InferTasks allows tasks to be defined outside of the Root. Such tasks
	// will only be included in the workflow if any of its fields is referenced
	// by any of the tasks defined within Root.
	// CAVEAT EMPTOR: this features is mostly provided for backwards
	// compatibility with v0.2. A problem with this approach is that it will
	// look for task structs within arbitrary data. So if not careful, there may
	// be spurious matches.
	InferTasks bool

	// IgnoreConcrete ignores references for which the values are already
	// concrete and cannot change.
	IgnoreConcrete bool

	// FindHiddenTasks allows tasks to be defined in hidden fields.
	FindHiddenTasks bool

	// UpdateFunc is called whenever the information in the controller is
	// updated. This includes directly after initialization. The task may be
	// nil if this call is not the result of a task completing.
	UpdateFunc func(c *Controller, t *Task) error

A Config defines options for interpreting an Instance as a Workflow.

type Controller

type Controller struct {
	// contains filtered or unexported fields

A Controller defines a set of Tasks to be executed.

func New

func New(cfg *Config, inst cue.InstanceOrValue, f TaskFunc) *Controller

New creates a Controller for a given Instance and TaskFunc.

The instance value can either be a *cue.Instance or a cue.Value.

func (*Controller) Run

func (c *Controller) Run(ctx context.Context) error

Run runs the tasks of a workflow until completion.

func (*Controller) Stats added in v0.5.0

func (c *Controller) Stats() (counts stats.Counts)

Stats reports statistics on the total number of CUE operations used.

This is an experimental method and the API is likely to change. The Counts.String method will likely stay and is the safest way to use this API.

This currently should only be called after completion or within a call to UpdateFunc.

func (*Controller) Tasks

func (c *Controller) Tasks() []*Task

Tasks reports the tasks that are currently registered with the controller.

This may currently only be called before Run is called or from within a call to UpdateFunc. Task pointers returned by this call are not guaranteed to be the same between successive calls to this method.

func (*Controller) Value added in v0.4.3

func (c *Controller) Value() cue.Value

Value returns the value managed by the controller.

It is safe to use the value only after Run() has returned. It panics if the flow is running.

type Runner

type Runner interface {
	// Run runs a Task. If any of the tasks it depends on returned an error it
	// is passed to this task. It reports an error upon failure.
	// Any results to be returned can be set by calling Fill on the passed task.
	// TODO: what is a good contract for receiving and passing errors and abort.
	// If for a returned error x errors.Is(x, ErrAbort), all dependant tasks
	// will not be run, without this being an error.
	Run(t *Task, err error) error

A Runner executes a Task.

type RunnerFunc

type RunnerFunc func(t *Task) error

A RunnerFunc runs a Task.

func (RunnerFunc) Run

func (f RunnerFunc) Run(t *Task, err error) error

type State

type State int

A State indicates the state of a Task.

The following state diagram indicates the possible state transitions:

    ↗︎        ↘︎
Waiting  ←  Running
    ↘︎        ↙︎

A Task may move from Waiting to Terminating if one of the tasks on which it depends fails.

NOTE: transitions from Running to Waiting are currently not supported. In the future this may be possible if a task depends on continuously running tasks that send updates.

const (
	// Waiting indicates a task is blocked on input from another task.
	// NOTE: although this is currently not implemented, a task could
	// theoretically move from the Running to Waiting state.
	Waiting State = iota

	// Ready means a tasks is ready to run, but currently not running.

	// Running indicates a goroutine is currently active for a task and that
	// it is not Waiting.

	// Terminated means a task has stopped running either because it terminated
	// while Running or was aborted by task on which it depends. The error
	// value of a Task indicates the reason for the termination.

func (State) String

func (i State) String() string

type Task

type Task struct {
	// contains filtered or unexported fields

A Task contains the context for a single task execution. Tasks may be run concurrently.

func (*Task) Context

func (t *Task) Context() context.Context

Context reports the Controller's Context.

func (*Task) Dependencies

func (t *Task) Dependencies() []*Task

Dependencies reports the Tasks t depends on.

This method may currently only be called before Run is called or after a Task completed, or from within a call to UpdateFunc.

func (*Task) Err

func (t *Task) Err() error

Err returns the error of a completed Task.

This method may currently only be called before Run is called, after a Task completed, or from within a call to UpdateFunc.

func (*Task) Fill

func (t *Task) Fill(x interface{}) error

Fill fills in values of the Controller's configuration for the current task. The changes take effect after the task completes.

This method may currently only be called by the runner.

func (*Task) Index

func (t *Task) Index() int

Index reports the sequence number of the Task. This will not change over time.

func (*Task) Path

func (t *Task) Path() cue.Path

Path reports the path of Task within the Instance in which it is defined. The Path is always valid.

func (*Task) PathDependencies

func (t *Task) PathDependencies(p cue.Path) []*Task

PathDependencies reports the dependencies found for a value at the given path.

This may currently only be called before Run is called or from within a call to UpdateFunc.

func (*Task) State

func (t *Task) State() State

State is the current state of the Task.

This method may currently only be called before Run is called or after a Task completed, or from within a call to UpdateFunc.

func (*Task) Stats added in v0.5.0

func (t *Task) Stats() stats.Counts

Stats reports statistics on the number of CUE operations used to complete this task.

This is an experimental method and the API is likely to change.

It only shows numbers upon completion. This may change in the future.

func (*Task) Value

func (t *Task) Value() cue.Value

Value reports the latest value of this task.

This method may currently only be called before Run is called or after a Task completed, or from within a call to UpdateFunc.

type TaskFunc

type TaskFunc func(v cue.Value) (Runner, error)

A TaskFunc creates a Runner for v if v defines a task or reports nil otherwise. It reports an error for illformed tasks.

If TaskFunc returns a non-nil Runner the search for task within v stops. That is, subtasks are not supported.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL