runtime

package
v1.30.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2026 License: GPL-3.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const ErrMsgOtherConditionNotMet = "other condition was not met"

Error message for the case not all condition was not met

Variables

View Source
var (
	// NewContext creates a new context with DAG execution metadata.
	NewContext = execution.NewContext
	// WithDatabase sets the database interface.
	WithDatabase = execution.WithDatabase
	// WithRootDAGRun sets the root DAG run reference for sub-DAG execution.
	WithRootDAGRun = execution.WithRootDAGRun
	// WithParams sets runtime parameters.
	WithParams = execution.WithParams
	// WithCoordinator sets the coordinator dispatcher for distributed execution.
	WithCoordinator = execution.WithCoordinator
	// WithSecrets sets secret environment variables.
	WithSecrets = execution.WithSecrets
	// WithLogEncoding sets the log file character encoding.
	WithLogEncoding = execution.WithLogEncoding
)

Re-export execution package functions for convenience.

View Source
var (
	ErrCyclicPlan  = errors.New("cyclic plan detected")
	ErrMissingNode = errors.New("missing node in execution plan")
)
View Source
var (
	ErrUpstreamFailed   = fmt.Errorf("upstream failed")
	ErrUpstreamSkipped  = fmt.Errorf("upstream skipped")
	ErrUpstreamRejected = fmt.Errorf("upstream rejected")
	ErrDeadlockDetected = errors.New("deadlock detected: no runnable nodes but DAG not finished")
)
View Source
var (
	ErrConditionNotMet = fmt.Errorf("condition was not met")
)

Errors for condition evaluation

Functions

func AllEnvs added in v1.24.11

func AllEnvs(ctx context.Context) []string

AllEnvs returns all environment variables that needs to be passed to the command. Each element is in the form of "key=value".

func AllEnvsMap added in v1.26.0

func AllEnvsMap(ctx context.Context) map[string]string

AllEnvsMap builds a map of environment variables from the current Env. It splits each "key=value" entry produced by AllEnvs and maps keys to values; entries that do not contain an "=" separator are ignored.

func EvalBool

func EvalBool(ctx context.Context, value any) (bool, error)

EvalBool evaluates the given value with the variables within the execution context and parses it as a boolean.

func EvalCondition

func EvalCondition(ctx context.Context, shell []string, c *core.Condition) error

EvalCondition evaluates the condition and returns the actual value. It returns an error if the evaluation failed or the condition is invalid. If c.Negate is true, the result is inverted: the condition passes when it would normally fail, and vice versa.

func EvalConditions

func EvalConditions(ctx context.Context, shell []string, cond []*core.Condition) error

EvalConditions evaluates a list of conditions and checks the results. It returns an error if any of the conditions were not met.

func EvalObject

func EvalObject[T any](ctx context.Context, obj T) (T, error)

EvalObject recursively evaluates the string fields of the given object with the variables within the execution context.

func EvalString

func EvalString(ctx context.Context, s string, opts ...cmdutil.EvalOption) (string, error)

EvalString evaluates the given string with the variables within the execution context.

func GenerateSubDAGRunID added in v1.24.0

func GenerateSubDAGRunID(ctx context.Context, params string, repeated bool) string

GenerateSubDAGRunID generates a unique run ID based on the current DAG run ID, step name, and parameters.

func NewContextForTest added in v1.26.0

func NewContextForTest(ctx context.Context, dag *core.DAG, dagRunID, logFile string) context.Context

NewContextForTest creates a minimal context for testing purposes. This is useful when you need a context with just basic DAG metadata.

func NewDAGRunRef added in v1.26.0

func NewDAGRunRef(name, runID string) execution.DAGRunRef

NewDAGRunRef is a convenience wrapper for execution.NewDAGRunRef.

func Run

func Run(ctx context.Context, spec CmdSpec) error

Run executes the command and waits for it to complete.

func Start

func Start(ctx context.Context, spec CmdSpec) error

Start executes the command without waiting for it to complete.

func WithDAGContext added in v1.26.0

func WithDAGContext(ctx context.Context, rCtx Context) context.Context

WithDAGContext returns a new context with the given DAGContext. This is a convenience wrapper for execution.WithContext.

func WithEnv added in v1.24.11

func WithEnv(ctx context.Context, e Env) context.Context

WithEnv returns a new context with the given execution context.

Types

type ChatMessagesHandler added in v1.30.0

type ChatMessagesHandler interface {
	// WriteStepMessages writes messages for a single step.
	WriteStepMessages(ctx context.Context, stepName string, messages []execution.LLMMessage) error
	// ReadStepMessages reads messages for a single step.
	ReadStepMessages(ctx context.Context, stepName string) ([]execution.LLMMessage, error)
}

ChatMessagesHandler handles chat conversation messages for persistence.

type CmdSpec

type CmdSpec struct {
	Executable string
	Args       []string
	Env        []string
	Stdout     *os.File
	Stderr     *os.File
}

CmdSpec describes a command to be executed with all its configuration.

type Config

type Config struct {
	LogDir          string
	MaxActiveSteps  int
	Timeout         time.Duration
	Delay           time.Duration
	Dry             bool
	OnInit          *core.Step
	OnExit          *core.Step
	OnSuccess       *core.Step
	OnFailure       *core.Step
	OnCancel        *core.Step
	DAGRunID        string
	MessagesHandler ChatMessagesHandler
	OnWait          *core.Step
}

type Context added in v1.26.0

type Context = execution.Context

Context is an alias for execution.Context

func GetDAGContext added in v1.26.0

func GetDAGContext(ctx context.Context) Context

GetDAGContext retrieves the DAGContext from the context. This is a convenience wrapper for execution.GetContext.

type ContextOption added in v1.26.0

type ContextOption = execution.ContextOption

ContextOption is an alias for execution.ContextOption

type Data

type Data struct {
	// contains filtered or unexported fields
}

Data is a thread-safe wrapper around NodeData.

func (*Data) AddSubRunsRepeated added in v1.24.0

func (d *Data) AddSubRunsRepeated(subRun ...SubDAGRun)

AddSubRunsRepeated appends repeated sub DAG runs to the node.

func (*Data) Args

func (d *Data) Args() []string

func (*Data) ClearState

func (d *Data) ClearState(s core.Step)

func (*Data) ClearVariable

func (d *Data) ClearVariable(key string)

func (*Data) ContinueOn

func (d *Data) ContinueOn() core.ContinueOn

func (*Data) Data

func (s *Data) Data() NodeData

func (*Data) Error

func (d *Data) Error() error

func (*Data) Finish

func (d *Data) Finish()

func (*Data) GetApprovalInputs added in v1.30.0

func (d *Data) GetApprovalInputs() map[string]string

GetApprovalInputs returns a copy of the approval inputs map.

func (*Data) GetChatMessages added in v1.30.0

func (d *Data) GetChatMessages() []execution.LLMMessage

GetChatMessages returns the chat conversation messages for the node.

func (*Data) GetDoneCount

func (d *Data) GetDoneCount() int

func (*Data) GetExitCode

func (d *Data) GetExitCode() int

func (*Data) GetRetryCount

func (d *Data) GetRetryCount() int

func (*Data) GetStderr

func (d *Data) GetStderr() string

func (*Data) GetStdout

func (d *Data) GetStdout() string

func (*Data) IncDoneCount

func (d *Data) IncDoneCount()

func (*Data) IncRetryCount

func (d *Data) IncRetryCount()

func (*Data) IsRepeated

func (d *Data) IsRepeated() bool

func (*Data) MarkError

func (d *Data) MarkError(err error)

func (*Data) MatchExitCode

func (d *Data) MatchExitCode(exitCodes []int) bool

func (*Data) Name

func (d *Data) Name() string

func (*Data) ResetError

func (d *Data) ResetError()

func (*Data) SetApprovalInputs added in v1.30.0

func (d *Data) SetApprovalInputs(inputs map[string]string)

SetApprovalInputs sets the approval inputs map.

func (*Data) SetArgs

func (d *Data) SetArgs(args []string)

func (*Data) SetChatMessages added in v1.30.0

func (d *Data) SetChatMessages(messages []execution.LLMMessage)

SetChatMessages sets the chat conversation messages for the node.

func (*Data) SetError

func (d *Data) SetError(err error)

func (*Data) SetExecutorConfig

func (d *Data) SetExecutorConfig(cfg core.ExecutorConfig)

func (*Data) SetExitCode

func (d *Data) SetExitCode(exitCode int)

func (*Data) SetRepeated

func (d *Data) SetRepeated(repeated bool)

func (*Data) SetRetriedAt

func (d *Data) SetRetriedAt(retriedAt time.Time)

func (*Data) SetScript

func (d *Data) SetScript(script string)

func (*Data) SetStatus

func (d *Data) SetStatus(s core.NodeStatus)

func (*Data) SetStep

func (s *Data) SetStep(step core.Step)

func (*Data) SetSubDAG added in v1.24.0

func (d *Data) SetSubDAG(subDAG core.SubDAG)

func (*Data) SetSubRuns added in v1.24.0

func (d *Data) SetSubRuns(subRuns []SubDAGRun)

SetSubRuns replaces the sub DAG runs associated with the node.

func (*Data) Setup

func (d *Data) Setup(ctx context.Context, logFile string, startedAt time.Time) error

func (*Data) SignalOnStop

func (d *Data) SignalOnStop() string

func (*Data) State

func (d *Data) State() NodeState

func (*Data) Status

func (d *Data) Status() core.NodeStatus

func (*Data) Step

func (d *Data) Step() core.Step

func (*Data) StepInfo added in v1.24.8

func (d *Data) StepInfo() cmdutil.StepInfo

type Database added in v1.26.0

type Database = execution.Database

Database is an alias for execution.Database

type Dispatcher added in v1.26.0

type Dispatcher = execution.Dispatcher

Dispatcher is an alias for execution.Dispatcher

type EnqueueOptions

type EnqueueOptions struct {
	Params       string // Parameters to pass to the DAG
	Quiet        bool   // Whether to run in quiet mode
	DAGRunID     string // ID for the dag-run
	Queue        string // Queue name to enqueue to
	NameOverride string // Optional DAG name override
}

EnqueueOptions contains options for enqueuing a dag-run.

type Env added in v1.24.11

type Env struct {
	// Embedded execution metadata from parent DAG run containing DAGRunID,
	// RootDAGRun reference, DAG configuration, database interface,
	// DAG-level environment variables, and coordinator dispatcher
	Context

	// Thread-safe map storing output variables from previously executed steps
	// in the format "key=value". These variables are populated when a step
	// completes and has an Output field defined, making the step's stdout
	// available to subsequent steps via variable substitution
	Variables *collections.SyncMap

	// The current step being executed within this environment context
	Step core.Step

	// Additional environment variables specific to this step execution,
	// including DAG_RUN_STEP_NAME and PWD. These take precedence over
	// Variables and DAG-level Envs during variable evaluation
	Envs map[string]string

	// Maps step IDs to their execution information (stdout, stderr, exitCode)
	// allowing steps to reference outputs from other steps using expressions
	// like ${stepID.stdout} or ${stepID.exitCode} in their configurations
	StepMap map[string]cmdutil.StepInfo

	// Resolved absolute path for the step's working directory, determined by:
	// 1. Step's Dir field if specified (resolved to absolute path)
	// 2. Current working directory if Dir is not specified
	// This path is also set as the PWD environment variable
	WorkingDir string
}

Env holds information about the DAG and the current step to execute including the variables (environment variables and DAG variables) that are available to the step.

func GetEnv added in v1.24.11

func GetEnv(ctx context.Context) Env

GetEnv returns the execution context from the given context.

func NewEnv added in v1.24.8

func NewEnv(ctx context.Context, step core.Step) Env

NewEnv creates a new Env configured for executing the provided step. It resolves the step's working directory and sets initial per-step environment variables: PWD to the resolved working directory and the DAG run step name. The returned Env embeds the DAG context from ctx, stores the provided step, initializes an empty StepMap, and populates Variables from DAG.Params: for each param containing "=", the text before the first "=" is used as the key and the entire param string is stored as the value.

func NewPlanEnv added in v1.24.11

func NewPlanEnv(ctx context.Context, step core.Step, plan *Plan) Env

func (Env) AllEnvs added in v1.24.11

func (e Env) AllEnvs() []string

AllEnvs returns all environment variables that needs to be passed to the command.

func (Env) DAGRunRef added in v1.24.11

func (e Env) DAGRunRef() execution.DAGRunRef

DAGRunRef returns the DAGRunRef for the current execution context.

func (Env) EvalBool added in v1.24.11

func (e Env) EvalBool(ctx context.Context, value any) (bool, error)

EvalBool evaluates the given value with the variables within the execution context

func (Env) EvalString added in v1.24.11

func (e Env) EvalString(ctx context.Context, s string, opts ...cmdutil.EvalOption) (string, error)

EvalString evaluates the given string with the variables within the execution context.

func (Env) ForceLoadOutputVariables added in v1.24.11

func (e Env) ForceLoadOutputVariables(vars *collections.SyncMap)

ForceLoadOutputVariables forces loading of output variables into the execution context. This is the same as LoadOutputVariables, but it does not check if the key already exists.

func (Env) LoadOutputVariables added in v1.24.11

func (e Env) LoadOutputVariables(vars *collections.SyncMap)

LoadOutputVariables loads the output variables from the given DAG into the

func (Env) MailerConfig added in v1.24.11

func (e Env) MailerConfig(ctx context.Context) (mailer.Config, error)

func (Env) Shell added in v1.24.11

func (e Env) Shell(ctx context.Context) []string

Shell returns the shell command to use for this execution context.

func (Env) UserEnvsMap added in v1.24.11

func (e Env) UserEnvsMap() map[string]string

UserEnvsMap returns user-defined environment variables as a map, excluding OS environment (BaseEnv). Use this for isolated execution environments. Precedence: Step.Env > Envs > Variables > SecretEnvs > DAGContext.Envs > DAG.Env

func (Env) WithEnvVars added in v1.24.11

func (e Env) WithEnvVars(envs ...string) Env

WithEnvVars returns a new execution context with the given environment variable(s).

func (Env) WithVariables added in v1.24.11

func (e Env) WithVariables(vars ...string) Env

WithVariables returns a new execution context with the given variable(s).

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager provides methods to interact with DAGs, including starting, stopping, restarting, and retrieving status information. It communicates with the DAG through a socket interface and manages dag-run data.

func NewManager

func NewManager(drs execution.DAGRunStore, ps execution.ProcStore, cfg *config.Config) Manager

New creates a new Manager instance. The Manager is used to interact with the DAG.

func (*Manager) FindSubDAGRunStatus added in v1.24.0

func (m *Manager) FindSubDAGRunStatus(ctx context.Context, rootDAGRun execution.DAGRunRef, subRunID string) (*execution.DAGRunStatus, error)

FindSubDAGRunStatus retrieves the status of a sub dag-run by its ID. It looks up the child attempt in the dag-run store and reads its status.

func (*Manager) GenDAGRunID

func (m *Manager) GenDAGRunID(_ context.Context) (string, error)

GenDAGRunID generates a unique ID for a dag-run using UUID version 7.

func (*Manager) GetCurrentStatus

func (m *Manager) GetCurrentStatus(ctx context.Context, dag *core.DAG, dagRunID string) (*execution.DAGRunStatus, error)

GetCurrentStatus retrieves the current status of a dag-run by its run ID. If the dag-run is running, it queries the socket for the current status. If the socket doesn't exist or times out, it falls back to stored status or creates an initial status.

func (*Manager) GetLatestStatus

func (m *Manager) GetLatestStatus(ctx context.Context, dag *core.DAG) (execution.DAGRunStatus, error)

GetLatestStatus retrieves the latest status of a DAG. If the DAG is running, it attempts to get the current status from the socket. If that fails or no status exists, it returns an initial status or an error.

func (*Manager) GetSavedStatus

func (m *Manager) GetSavedStatus(ctx context.Context, dagRun execution.DAGRunRef) (*execution.DAGRunStatus, error)

GetSavedStatus retrieves the saved status of a dag-run by its core.DAGRun reference.

func (*Manager) IsRunning

func (m *Manager) IsRunning(ctx context.Context, dag *core.DAG, dagRunID string) bool

IsRunning checks if a dag-run is currently running by querying its status. Returns true if the status can be retrieved without error, indicating the DAG is running.

func (*Manager) ListRecentStatus

func (m *Manager) ListRecentStatus(ctx context.Context, name string, n int) []execution.DAGRunStatus

ListRecentStatus retrieves the n most recent statuses for a DAG by name. It returns a slice of Status objects, filtering out any that cannot be read.

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context, dag *core.DAG, dagRunID string) error

Stop stops a running DAG by sending a stop request to its socket. If the DAG is not running, it logs a message and returns nil.

func (*Manager) UpdateStatus

func (m *Manager) UpdateStatus(ctx context.Context, rootDAGRun execution.DAGRunRef, newStatus execution.DAGRunStatus) error

UpdateStatus updates the status of a dag-run.

type Node

type Node struct {
	Data
	// contains filtered or unexported fields
}

Node is a node in a DAG. It executes a command.

func NewNode

func NewNode(step core.Step, state NodeState) *Node

func NodeWithData

func NodeWithData(data NodeData) *Node

func (*Node) BuildSubDAGRuns added in v1.24.0

func (n *Node) BuildSubDAGRuns(ctx context.Context, subDAG *core.SubDAG) ([]SubDAGRun, error)

BuildSubDAGRuns constructs the sub DAG runs based on parallel configuration

func (*Node) Cancel

func (n *Node) Cancel()

func (*Node) Execute

func (n *Node) Execute(ctx context.Context) error

func (*Node) ID added in v1.24.8

func (n *Node) ID() int

func (*Node) Init

func (n *Node) Init()

func (*Node) ItemToParam

func (n *Node) ItemToParam(item any) (string, error)

ItemToParam converts a parallel item to a parameter string

func (*Node) LogContainsPattern

func (n *Node) LogContainsPattern(ctx context.Context, patterns []string) (bool, error)

LogContainsPattern checks if any of the given patterns exist in the node's log file. If a pattern starts with "regexp:", it will be treated as a regular expression. Returns false if no log file exists or no pattern is found. Returns error if there are issues reading the file or invalid regex pattern.

func (*Node) NodeData

func (n *Node) NodeData() NodeData

func (*Node) Prepare added in v1.24.8

func (n *Node) Prepare(ctx context.Context, logDir string, dagRunID string) error

func (*Node) SetupEnv added in v1.24.8

func (n *Node) SetupEnv(ctx context.Context) context.Context

func (*Node) ShouldContinue

func (n *Node) ShouldContinue(ctx context.Context) bool

func (*Node) ShouldMarkSuccess

func (n *Node) ShouldMarkSuccess(ctx context.Context) bool

func (*Node) Signal

func (n *Node) Signal(ctx context.Context, sig os.Signal, allowOverride bool)

func (*Node) StdoutFile

func (n *Node) StdoutFile() string

func (*Node) Teardown

func (n *Node) Teardown() error

type NodeData

type NodeData struct {
	Step  core.Step
	State NodeState
}

NodeData represents the data of a node.

type NodeState

type NodeState struct {
	// Status represents the state of the node.
	Status core.NodeStatus
	// Stdout is the log file path from the node.
	Stdout string
	// Stderr is the log file path for the error log (stderr).
	Stderr string
	// StartedAt is the time when the node started.
	StartedAt time.Time
	// FinishedAt is the time when the node finished.
	FinishedAt time.Time
	// RetryCount is the number of retries happened based on the retry policy.
	RetryCount int
	// RetriedAt is the time when the node was retried last time.
	RetriedAt time.Time
	// DoneCount is the number of times the node was executed.
	DoneCount int
	// Repeated is true if the node is a repeated step.
	// This is used to generate unique run IDs for repeated steps in case the node
	// runs nested DAGs.
	Repeated bool
	// Error is the error that the executor encountered.
	Error error
	// ExitCode is the exit code that the command exited with.
	// It only makes sense when the node is a command executor.
	ExitCode int
	// Parallel contains the evaluated parallel execution state for the node.
	// This is populated when a step has parallel configuration and tracks
	// all the items that need to be executed in parallel.
	*Parallel
	// SubRuns stores the sub dag-runs.
	SubRuns []SubDAGRun
	// SubRunsRepeated stores the repeated sub dag-runs.
	SubRunsRepeated []SubDAGRun
	// OutputVariables stores the output variables for the following steps.
	// It only contains the local output variables.
	OutputVariables *collections.SyncMap
	// ChatMessages stores the chat conversation messages for message passing between steps.
	ChatMessages []execution.LLMMessage
	// ApprovalInputs stores key-value parameters provided during HITL approval.
	// These are available as environment variables in subsequent steps.
	ApprovalInputs map[string]string
	// ApprovedAt is the time when the HITL step was approved.
	ApprovedAt string
	// ApprovedBy is the username of the user who approved the HITL step.
	ApprovedBy string
	// RejectedAt is the time when the HITL step was rejected.
	RejectedAt string
	// RejectedBy is the username of the user who rejected the HITL step.
	RejectedBy string
	// RejectionReason stores the optional reason for rejection.
	RejectionReason string
}

type OutputCoordinator

type OutputCoordinator struct {
	StderrRedirectFile *os.File
	// contains filtered or unexported fields
}

func (*OutputCoordinator) StdoutFile

func (oc *OutputCoordinator) StdoutFile() string

type Parallel

type Parallel struct {
	// Items contains all the parallel items to be executed.
	// Each item will result in a separate sub DAG run.
	Items []ParallelItem
}

Parallel represents the evaluated parallel execution configuration for a node. It contains the expanded list of items to be processed in parallel.

type ParallelItem

type ParallelItem struct {
	// Item contains the actual data for this parallel execution.
	// It can be either a simple value or a map of parameters from core.ParallelItem.
	Item core.ParallelItem
}

ParallelItem represents a single item in a parallel execution. It combines the item data with a unique identifier for tracking.

type Plan added in v1.24.8

type Plan struct {

	// Immutable adjacency lists (exposing for unit tests)
	DependencyMap map[int][]int // node ID -> list of dependency node IDs (upstream)
	DependantMap  map[int][]int // node ID -> list of dependent node IDs (downstream)
	// contains filtered or unexported fields
}

Plan represents a plan of execution for a set of steps. It encapsulates the graph structure and ensures thread-safe access.

func CreateRetryPlan added in v1.24.8

func CreateRetryPlan(ctx context.Context, dag *core.DAG, nodes ...*Node) (*Plan, error)

CreateRetryPlan creates a new execution plan for retrying specific nodes.

func CreateStepRetryPlan added in v1.24.8

func CreateStepRetryPlan(dag *core.DAG, nodes []*Node, stepName string) (*Plan, error)

CreateStepRetryPlan creates a new execution plan for retrying a specific step.

func NewPlan added in v1.24.8

func NewPlan(steps ...core.Step) (*Plan, error)

NewPlan creates a new execution plan from the given steps. It builds the graph, validates it (checking for cycles), and returns the plan.

func NewPlanFromNodes added in v1.30.0

func NewPlanFromNodes(nodes ...*Node) (*Plan, error)

NewPlanFromNodes creates a plan from existing nodes without modifying their states.

func (*Plan) CheckFinished added in v1.24.8

func (p *Plan) CheckFinished() bool

CheckFinished checks if all nodes have completed (successfully or otherwise).

func (*Plan) Dependencies added in v1.24.8

func (p *Plan) Dependencies(nodeID int) []int

Dependencies returns the IDs of the nodes that the given node depends on.

func (*Plan) Dependents added in v1.24.8

func (p *Plan) Dependents(nodeID int) []int

Dependents returns the IDs of the nodes that depend on the given node.

func (*Plan) Duration added in v1.24.8

func (p *Plan) Duration() time.Duration

func (*Plan) Finish added in v1.24.8

func (p *Plan) Finish()

func (*Plan) FinishAt added in v1.24.8

func (p *Plan) FinishAt() time.Time

func (*Plan) GetNode added in v1.24.8

func (p *Plan) GetNode(id int) *Node

GetNode returns the node with the given ID.

func (*Plan) GetNodeByName added in v1.24.8

func (p *Plan) GetNodeByName(name string) *Node

GetNodeByName returns the node with the given name.

func (*Plan) IsFinished added in v1.24.8

func (p *Plan) IsFinished() bool

func (*Plan) IsRunning added in v1.24.8

func (p *Plan) IsRunning() bool

IsRunning checks if any node is currently running or pending.

func (*Plan) IsStarted added in v1.24.8

func (p *Plan) IsStarted() bool

func (*Plan) NodeData added in v1.24.8

func (p *Plan) NodeData() []NodeData

NodeData returns a snapshot of data for all nodes.

func (*Plan) NodeStates added in v1.30.0

func (p *Plan) NodeStates() PlanNodeStates

NodeStates returns whether any nodes are running, waiting, not started, or rejected. Single pass, single lock for atomic read.

func (*Plan) Nodes added in v1.24.8

func (p *Plan) Nodes() []*Node

Nodes returns a slice of all nodes in the plan.

func (*Plan) StartAt added in v1.24.8

func (p *Plan) StartAt() time.Time

func (*Plan) WaitingStepNames added in v1.30.0

func (p *Plan) WaitingStepNames() []string

WaitingStepNames returns the names of steps that are waiting for approval.

type PlanNodeStates added in v1.30.0

type PlanNodeStates struct {
	HasRunning    bool
	HasWaiting    bool
	HasNotStarted bool
	HasRejected   bool
}

PlanNodeStates holds the state flags for nodes in a plan.

type RestartOptions

type RestartOptions struct {
	Quiet bool // Whether to run in quiet mode
}

RestartOptions contains options for restarting a dag-run.

type RetryPolicy

type RetryPolicy struct {
	Limit     int
	Interval  time.Duration
	ExitCodes []int
}

func (*RetryPolicy) ShouldRetry

func (r *RetryPolicy) ShouldRetry(exitCode int) bool

ShouldRetry determines if a node should be retried based on the exit code and retry policy

type RunStatus added in v1.26.0

type RunStatus = execution.RunStatus

RunStatus is an alias for execution.RunStatus

type Runner added in v1.24.8

type Runner struct {
	// contains filtered or unexported fields
}

Runner runs a plan of steps.

func New

func New(cfg *Config) *Runner

func (*Runner) Cancel added in v1.24.8

func (r *Runner) Cancel(p *Plan)

Cancel sends -1 signal to all nodes.

func (*Runner) GetMetrics added in v1.24.8

func (r *Runner) GetMetrics() map[string]any

GetMetrics returns the current metrics for the runner

func (*Runner) HandlerNode added in v1.24.8

func (r *Runner) HandlerNode(name core.HandlerType) *Node

HandlerNode returns the handler node with the given name.

func (*Runner) Run added in v1.24.8

func (r *Runner) Run(ctx context.Context, plan *Plan, progressCh chan *Node) error

Run runs the plan of steps.

func (*Runner) Signal added in v1.24.8

func (r *Runner) Signal(
	ctx context.Context, plan *Plan, sig os.Signal, done chan bool, allowOverride bool,
)

Signal sends a signal to the runner. for a node with repeat policy, it does not stop the node and wait to finish current run.

func (*Runner) Status added in v1.24.8

func (r *Runner) Status(ctx context.Context, p *Plan) core.Status

Status returns the status of the runner.

type StartOptions

type StartOptions struct {
	Params   string // Parameters to pass to the DAG
	Quiet    bool   // Whether to run in quiet mode
	DAGRunID string // ID for the dag-run

	NameOverride string // Optional DAG name override
	FromRunID    string // Historic dag-run ID to use as a template
	Target       string // Optional CLI argument override (DAG name or file path)
}

StartOptions contains options for initiating a dag-run.

type SubCmdBuilder

type SubCmdBuilder struct {
	// contains filtered or unexported fields
}

SubCmdBuilder centralizes CLI command argument construction.

func NewSubCmdBuilder

func NewSubCmdBuilder(cfg *config.Config) *SubCmdBuilder

NewSubCmdBuilder returns a new SubCmdBuilder initialized from cfg. It sets Executable to cfg.Paths.Executable, ConfigFile to cfg.Paths.ConfigFileUsed, and base environment to cfg.Core.BaseEnv.

func (*SubCmdBuilder) Dequeue

func (b *SubCmdBuilder) Dequeue(dag *core.DAG, dagRun execution.DAGRunRef) CmdSpec

Dequeue creates a dequeue command spec.

func (*SubCmdBuilder) Enqueue

func (b *SubCmdBuilder) Enqueue(dag *core.DAG, opts EnqueueOptions) CmdSpec

Enqueue creates an enqueue command spec.

func (*SubCmdBuilder) Restart

func (b *SubCmdBuilder) Restart(dag *core.DAG, opts RestartOptions) CmdSpec

Restart creates a restart command spec.

func (*SubCmdBuilder) Retry

func (b *SubCmdBuilder) Retry(dag *core.DAG, dagRunID string, stepName string) CmdSpec

Retry creates a retry command spec.

func (*SubCmdBuilder) Start

func (b *SubCmdBuilder) Start(dag *core.DAG, opts StartOptions) CmdSpec

Start creates a start command spec.

func (*SubCmdBuilder) TaskRetry

func (b *SubCmdBuilder) TaskRetry(task *coordinatorv1.Task) CmdSpec

TaskRetry creates a retry command spec for coordinator tasks.

func (*SubCmdBuilder) TaskStart

func (b *SubCmdBuilder) TaskStart(task *coordinatorv1.Task) CmdSpec

TaskStart creates a start command spec for coordinator tasks.

type SubDAGRun added in v1.24.0

type SubDAGRun struct {
	// DAGRunID is the unique identifier for the sub dag-run.
	// It is generated as a base58-encoded SHA-256 hash of the string:
	// "<parent-dag-run-id>:<step-name>:<deterministic-json-params>"
	//
	// This deterministic ID generation ensures:
	// - Same parameters always produce the same sub DAG run ID
	// - Retries reuse existing sub DAG runs instead of creating duplicates
	// - Each step's children are namespaced by step name to prevent collisions
	//
	// The params are encoded as deterministic JSON (sorted keys) before hashing.
	// Example input: "abc123:process-regions:{"REGION":"us-east-1","VERSION":"1.0.0"}"
	// Example output: "5Kd3NBUAdUnhyzenEwVLy9pBKxSwXvE9FMPyR4UKZvpe"
	DAGRunID string
	// Params contains the raw parameters passed to the sub DAG run.
	// This can be:
	// - A simple string: "param1 param2"
	// - Key-value pairs: "KEY1=value1 KEY2=value2"
	// - Raw JSON: '{"region": "us-east-1", "config": {"timeout": 30}}'
	// The exact format depends on how the DAG expects to receive parameters.
	Params string
}

SubDAGRun represents a sub DAG execution within a parent DAG. Each sub DAG run has a deterministic ID based on its parameters to ensure idempotency.

Directories

Path Synopsis
chat
Package chat provides an executor for chat (LLM-based conversation) steps.
Package chat provides an executor for chat (LLM-based conversation) steps.
dag
gha
jq
ssh

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL