Documentation
¶
Index ¶
- Constants
- Variables
- func AllEnvs(ctx context.Context) []string
- func AllEnvsMap(ctx context.Context) map[string]string
- func EvalBool(ctx context.Context, value any) (bool, error)
- func EvalCondition(ctx context.Context, shell []string, c *core.Condition) error
- func EvalConditions(ctx context.Context, shell []string, cond []*core.Condition) error
- func EvalObject[T any](ctx context.Context, obj T) (T, error)
- func EvalString(ctx context.Context, s string, opts ...cmdutil.EvalOption) (string, error)
- func GenerateSubDAGRunID(ctx context.Context, params string, repeated bool) string
- func NewContextForTest(ctx context.Context, dag *core.DAG, dagRunID, logFile string) context.Context
- func NewDAGRunRef(name, runID string) execution.DAGRunRef
- func Run(ctx context.Context, spec CmdSpec) error
- func Start(ctx context.Context, spec CmdSpec) error
- func WithDAGContext(ctx context.Context, rCtx Context) context.Context
- func WithEnv(ctx context.Context, e Env) context.Context
- type ChatMessagesHandler
- type CmdSpec
- type Config
- type Context
- type ContextOption
- type Data
- func (d *Data) AddSubRunsRepeated(subRun ...SubDAGRun)
- func (d *Data) Args() []string
- func (d *Data) ClearState(s core.Step)
- func (d *Data) ClearVariable(key string)
- func (d *Data) ContinueOn() core.ContinueOn
- func (s *Data) Data() NodeData
- func (d *Data) Error() error
- func (d *Data) Finish()
- func (d *Data) GetApprovalInputs() map[string]string
- func (d *Data) GetChatMessages() []execution.LLMMessage
- func (d *Data) GetDoneCount() int
- func (d *Data) GetExitCode() int
- func (d *Data) GetRetryCount() int
- func (d *Data) GetStderr() string
- func (d *Data) GetStdout() string
- func (d *Data) IncDoneCount()
- func (d *Data) IncRetryCount()
- func (d *Data) IsRepeated() bool
- func (d *Data) MarkError(err error)
- func (d *Data) MatchExitCode(exitCodes []int) bool
- func (d *Data) Name() string
- func (d *Data) ResetError()
- func (d *Data) SetApprovalInputs(inputs map[string]string)
- func (d *Data) SetArgs(args []string)
- func (d *Data) SetChatMessages(messages []execution.LLMMessage)
- func (d *Data) SetError(err error)
- func (d *Data) SetExecutorConfig(cfg core.ExecutorConfig)
- func (d *Data) SetExitCode(exitCode int)
- func (d *Data) SetRepeated(repeated bool)
- func (d *Data) SetRetriedAt(retriedAt time.Time)
- func (d *Data) SetScript(script string)
- func (d *Data) SetStatus(s core.NodeStatus)
- func (s *Data) SetStep(step core.Step)
- func (d *Data) SetSubDAG(subDAG core.SubDAG)
- func (d *Data) SetSubRuns(subRuns []SubDAGRun)
- func (d *Data) Setup(ctx context.Context, logFile string, startedAt time.Time) error
- func (d *Data) SignalOnStop() string
- func (d *Data) State() NodeState
- func (d *Data) Status() core.NodeStatus
- func (d *Data) Step() core.Step
- func (d *Data) StepInfo() cmdutil.StepInfo
- type Database
- type Dispatcher
- type EnqueueOptions
- type Env
- func (e Env) AllEnvs() []string
- func (e Env) DAGRunRef() execution.DAGRunRef
- func (e Env) EvalBool(ctx context.Context, value any) (bool, error)
- func (e Env) EvalString(ctx context.Context, s string, opts ...cmdutil.EvalOption) (string, error)
- func (e Env) ForceLoadOutputVariables(vars *collections.SyncMap)
- func (e Env) LoadOutputVariables(vars *collections.SyncMap)
- func (e Env) MailerConfig(ctx context.Context) (mailer.Config, error)
- func (e Env) Shell(ctx context.Context) []string
- func (e Env) UserEnvsMap() map[string]string
- func (e Env) WithEnvVars(envs ...string) Env
- func (e Env) WithVariables(vars ...string) Env
- type Manager
- func (m *Manager) FindSubDAGRunStatus(ctx context.Context, rootDAGRun execution.DAGRunRef, subRunID string) (*execution.DAGRunStatus, error)
- func (m *Manager) GenDAGRunID(_ context.Context) (string, error)
- func (m *Manager) GetCurrentStatus(ctx context.Context, dag *core.DAG, dagRunID string) (*execution.DAGRunStatus, error)
- func (m *Manager) GetLatestStatus(ctx context.Context, dag *core.DAG) (execution.DAGRunStatus, error)
- func (m *Manager) GetSavedStatus(ctx context.Context, dagRun execution.DAGRunRef) (*execution.DAGRunStatus, error)
- func (m *Manager) IsRunning(ctx context.Context, dag *core.DAG, dagRunID string) bool
- func (m *Manager) ListRecentStatus(ctx context.Context, name string, n int) []execution.DAGRunStatus
- func (m *Manager) Stop(ctx context.Context, dag *core.DAG, dagRunID string) error
- func (m *Manager) UpdateStatus(ctx context.Context, rootDAGRun execution.DAGRunRef, ...) error
- type Node
- func (n *Node) BuildSubDAGRuns(ctx context.Context, subDAG *core.SubDAG) ([]SubDAGRun, error)
- func (n *Node) Cancel()
- func (n *Node) Execute(ctx context.Context) error
- func (n *Node) ID() int
- func (n *Node) Init()
- func (n *Node) ItemToParam(item any) (string, error)
- func (n *Node) LogContainsPattern(ctx context.Context, patterns []string) (bool, error)
- func (n *Node) NodeData() NodeData
- func (n *Node) Prepare(ctx context.Context, logDir string, dagRunID string) error
- func (n *Node) SetupEnv(ctx context.Context) context.Context
- func (n *Node) ShouldContinue(ctx context.Context) bool
- func (n *Node) ShouldMarkSuccess(ctx context.Context) bool
- func (n *Node) Signal(ctx context.Context, sig os.Signal, allowOverride bool)
- func (n *Node) StdoutFile() string
- func (n *Node) Teardown() error
- type NodeData
- type NodeState
- type OutputCoordinator
- type Parallel
- type ParallelItem
- type Plan
- func (p *Plan) CheckFinished() bool
- func (p *Plan) Dependencies(nodeID int) []int
- func (p *Plan) Dependents(nodeID int) []int
- func (p *Plan) Duration() time.Duration
- func (p *Plan) Finish()
- func (p *Plan) FinishAt() time.Time
- func (p *Plan) GetNode(id int) *Node
- func (p *Plan) GetNodeByName(name string) *Node
- func (p *Plan) IsFinished() bool
- func (p *Plan) IsRunning() bool
- func (p *Plan) IsStarted() bool
- func (p *Plan) NodeData() []NodeData
- func (p *Plan) NodeStates() PlanNodeStates
- func (p *Plan) Nodes() []*Node
- func (p *Plan) StartAt() time.Time
- func (p *Plan) WaitingStepNames() []string
- type PlanNodeStates
- type RestartOptions
- type RetryPolicy
- type RunStatus
- type Runner
- func (r *Runner) Cancel(p *Plan)
- func (r *Runner) GetMetrics() map[string]any
- func (r *Runner) HandlerNode(name core.HandlerType) *Node
- func (r *Runner) Run(ctx context.Context, plan *Plan, progressCh chan *Node) error
- func (r *Runner) Signal(ctx context.Context, plan *Plan, sig os.Signal, done chan bool, ...)
- func (r *Runner) Status(ctx context.Context, p *Plan) core.Status
- type StartOptions
- type SubCmdBuilder
- func (b *SubCmdBuilder) Dequeue(dag *core.DAG, dagRun execution.DAGRunRef) CmdSpec
- func (b *SubCmdBuilder) Enqueue(dag *core.DAG, opts EnqueueOptions) CmdSpec
- func (b *SubCmdBuilder) Restart(dag *core.DAG, opts RestartOptions) CmdSpec
- func (b *SubCmdBuilder) Retry(dag *core.DAG, dagRunID string, stepName string) CmdSpec
- func (b *SubCmdBuilder) Start(dag *core.DAG, opts StartOptions) CmdSpec
- func (b *SubCmdBuilder) TaskRetry(task *coordinatorv1.Task) CmdSpec
- func (b *SubCmdBuilder) TaskStart(task *coordinatorv1.Task) CmdSpec
- type SubDAGRun
Constants ¶
const ErrMsgOtherConditionNotMet = "other condition was not met"
Error message for the case not all condition was not met
Variables ¶
var ( // NewContext creates a new context with DAG execution metadata. NewContext = execution.NewContext // WithDatabase sets the database interface. WithDatabase = execution.WithDatabase // WithRootDAGRun sets the root DAG run reference for sub-DAG execution. WithRootDAGRun = execution.WithRootDAGRun // WithParams sets runtime parameters. WithParams = execution.WithParams // WithCoordinator sets the coordinator dispatcher for distributed execution. WithCoordinator = execution.WithCoordinator // WithSecrets sets secret environment variables. WithSecrets = execution.WithSecrets // WithLogEncoding sets the log file character encoding. WithLogEncoding = execution.WithLogEncoding )
Re-export execution package functions for convenience.
var ( ErrCyclicPlan = errors.New("cyclic plan detected") ErrMissingNode = errors.New("missing node in execution plan") )
var ( ErrUpstreamFailed = fmt.Errorf("upstream failed") ErrUpstreamSkipped = fmt.Errorf("upstream skipped") ErrUpstreamRejected = fmt.Errorf("upstream rejected") ErrDeadlockDetected = errors.New("deadlock detected: no runnable nodes but DAG not finished") )
var (
ErrConditionNotMet = fmt.Errorf("condition was not met")
)
Errors for condition evaluation
Functions ¶
func AllEnvs ¶ added in v1.24.11
AllEnvs returns all environment variables that needs to be passed to the command. Each element is in the form of "key=value".
func AllEnvsMap ¶ added in v1.26.0
AllEnvsMap builds a map of environment variables from the current Env. It splits each "key=value" entry produced by AllEnvs and maps keys to values; entries that do not contain an "=" separator are ignored.
func EvalBool ¶
EvalBool evaluates the given value with the variables within the execution context and parses it as a boolean.
func EvalCondition ¶
EvalCondition evaluates the condition and returns the actual value. It returns an error if the evaluation failed or the condition is invalid. If c.Negate is true, the result is inverted: the condition passes when it would normally fail, and vice versa.
func EvalConditions ¶
EvalConditions evaluates a list of conditions and checks the results. It returns an error if any of the conditions were not met.
func EvalObject ¶
EvalObject recursively evaluates the string fields of the given object with the variables within the execution context.
func EvalString ¶
EvalString evaluates the given string with the variables within the execution context.
func GenerateSubDAGRunID ¶ added in v1.24.0
GenerateSubDAGRunID generates a unique run ID based on the current DAG run ID, step name, and parameters.
func NewContextForTest ¶ added in v1.26.0
func NewContextForTest(ctx context.Context, dag *core.DAG, dagRunID, logFile string) context.Context
NewContextForTest creates a minimal context for testing purposes. This is useful when you need a context with just basic DAG metadata.
func NewDAGRunRef ¶ added in v1.26.0
NewDAGRunRef is a convenience wrapper for execution.NewDAGRunRef.
func WithDAGContext ¶ added in v1.26.0
WithDAGContext returns a new context with the given DAGContext. This is a convenience wrapper for execution.WithContext.
Types ¶
type ChatMessagesHandler ¶ added in v1.30.0
type ChatMessagesHandler interface {
// WriteStepMessages writes messages for a single step.
WriteStepMessages(ctx context.Context, stepName string, messages []execution.LLMMessage) error
// ReadStepMessages reads messages for a single step.
ReadStepMessages(ctx context.Context, stepName string) ([]execution.LLMMessage, error)
}
ChatMessagesHandler handles chat conversation messages for persistence.
type CmdSpec ¶
type CmdSpec struct {
Executable string
Args []string
Env []string
Stdout *os.File
Stderr *os.File
}
CmdSpec describes a command to be executed with all its configuration.
type Context ¶ added in v1.26.0
Context is an alias for execution.Context
func GetDAGContext ¶ added in v1.26.0
GetDAGContext retrieves the DAGContext from the context. This is a convenience wrapper for execution.GetContext.
type ContextOption ¶ added in v1.26.0
type ContextOption = execution.ContextOption
ContextOption is an alias for execution.ContextOption
type Data ¶
type Data struct {
// contains filtered or unexported fields
}
Data is a thread-safe wrapper around NodeData.
func (*Data) AddSubRunsRepeated ¶ added in v1.24.0
AddSubRunsRepeated appends repeated sub DAG runs to the node.
func (*Data) ClearState ¶
func (*Data) ClearVariable ¶
func (*Data) ContinueOn ¶
func (d *Data) ContinueOn() core.ContinueOn
func (*Data) GetApprovalInputs ¶ added in v1.30.0
GetApprovalInputs returns a copy of the approval inputs map.
func (*Data) GetChatMessages ¶ added in v1.30.0
func (d *Data) GetChatMessages() []execution.LLMMessage
GetChatMessages returns the chat conversation messages for the node.
func (*Data) GetDoneCount ¶
func (*Data) GetExitCode ¶
func (*Data) GetRetryCount ¶
func (*Data) IncDoneCount ¶
func (d *Data) IncDoneCount()
func (*Data) IncRetryCount ¶
func (d *Data) IncRetryCount()
func (*Data) IsRepeated ¶
func (*Data) MatchExitCode ¶
func (*Data) ResetError ¶
func (d *Data) ResetError()
func (*Data) SetApprovalInputs ¶ added in v1.30.0
SetApprovalInputs sets the approval inputs map.
func (*Data) SetChatMessages ¶ added in v1.30.0
func (d *Data) SetChatMessages(messages []execution.LLMMessage)
SetChatMessages sets the chat conversation messages for the node.
func (*Data) SetExecutorConfig ¶
func (d *Data) SetExecutorConfig(cfg core.ExecutorConfig)
func (*Data) SetExitCode ¶
func (*Data) SetRepeated ¶
func (*Data) SetRetriedAt ¶
func (*Data) SetStatus ¶
func (d *Data) SetStatus(s core.NodeStatus)
func (*Data) SetSubRuns ¶ added in v1.24.0
SetSubRuns replaces the sub DAG runs associated with the node.
func (*Data) SignalOnStop ¶
func (*Data) Status ¶
func (d *Data) Status() core.NodeStatus
type Dispatcher ¶ added in v1.26.0
type Dispatcher = execution.Dispatcher
Dispatcher is an alias for execution.Dispatcher
type EnqueueOptions ¶
type EnqueueOptions struct {
Params string // Parameters to pass to the DAG
Quiet bool // Whether to run in quiet mode
DAGRunID string // ID for the dag-run
Queue string // Queue name to enqueue to
NameOverride string // Optional DAG name override
}
EnqueueOptions contains options for enqueuing a dag-run.
type Env ¶ added in v1.24.11
type Env struct {
// Embedded execution metadata from parent DAG run containing DAGRunID,
// RootDAGRun reference, DAG configuration, database interface,
// DAG-level environment variables, and coordinator dispatcher
Context
// Thread-safe map storing output variables from previously executed steps
// in the format "key=value". These variables are populated when a step
// completes and has an Output field defined, making the step's stdout
// available to subsequent steps via variable substitution
Variables *collections.SyncMap
// The current step being executed within this environment context
Step core.Step
// Additional environment variables specific to this step execution,
// including DAG_RUN_STEP_NAME and PWD. These take precedence over
// Variables and DAG-level Envs during variable evaluation
Envs map[string]string
// Maps step IDs to their execution information (stdout, stderr, exitCode)
// allowing steps to reference outputs from other steps using expressions
// like ${stepID.stdout} or ${stepID.exitCode} in their configurations
StepMap map[string]cmdutil.StepInfo
// Resolved absolute path for the step's working directory, determined by:
// 1. Step's Dir field if specified (resolved to absolute path)
// 2. Current working directory if Dir is not specified
// This path is also set as the PWD environment variable
WorkingDir string
}
Env holds information about the DAG and the current step to execute including the variables (environment variables and DAG variables) that are available to the step.
func NewEnv ¶ added in v1.24.8
NewEnv creates a new Env configured for executing the provided step. It resolves the step's working directory and sets initial per-step environment variables: PWD to the resolved working directory and the DAG run step name. The returned Env embeds the DAG context from ctx, stores the provided step, initializes an empty StepMap, and populates Variables from DAG.Params: for each param containing "=", the text before the first "=" is used as the key and the entire param string is stored as the value.
func NewPlanEnv ¶ added in v1.24.11
func (Env) AllEnvs ¶ added in v1.24.11
AllEnvs returns all environment variables that needs to be passed to the command.
func (Env) DAGRunRef ¶ added in v1.24.11
DAGRunRef returns the DAGRunRef for the current execution context.
func (Env) EvalBool ¶ added in v1.24.11
EvalBool evaluates the given value with the variables within the execution context
func (Env) EvalString ¶ added in v1.24.11
EvalString evaluates the given string with the variables within the execution context.
func (Env) ForceLoadOutputVariables ¶ added in v1.24.11
func (e Env) ForceLoadOutputVariables(vars *collections.SyncMap)
ForceLoadOutputVariables forces loading of output variables into the execution context. This is the same as LoadOutputVariables, but it does not check if the key already exists.
func (Env) LoadOutputVariables ¶ added in v1.24.11
func (e Env) LoadOutputVariables(vars *collections.SyncMap)
LoadOutputVariables loads the output variables from the given DAG into the
func (Env) MailerConfig ¶ added in v1.24.11
func (Env) Shell ¶ added in v1.24.11
Shell returns the shell command to use for this execution context.
func (Env) UserEnvsMap ¶ added in v1.24.11
UserEnvsMap returns user-defined environment variables as a map, excluding OS environment (BaseEnv). Use this for isolated execution environments. Precedence: Step.Env > Envs > Variables > SecretEnvs > DAGContext.Envs > DAG.Env
func (Env) WithEnvVars ¶ added in v1.24.11
WithEnvVars returns a new execution context with the given environment variable(s).
func (Env) WithVariables ¶ added in v1.24.11
WithVariables returns a new execution context with the given variable(s).
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager provides methods to interact with DAGs, including starting, stopping, restarting, and retrieving status information. It communicates with the DAG through a socket interface and manages dag-run data.
func NewManager ¶
New creates a new Manager instance. The Manager is used to interact with the DAG.
func (*Manager) FindSubDAGRunStatus ¶ added in v1.24.0
func (m *Manager) FindSubDAGRunStatus(ctx context.Context, rootDAGRun execution.DAGRunRef, subRunID string) (*execution.DAGRunStatus, error)
FindSubDAGRunStatus retrieves the status of a sub dag-run by its ID. It looks up the child attempt in the dag-run store and reads its status.
func (*Manager) GenDAGRunID ¶
GenDAGRunID generates a unique ID for a dag-run using UUID version 7.
func (*Manager) GetCurrentStatus ¶
func (m *Manager) GetCurrentStatus(ctx context.Context, dag *core.DAG, dagRunID string) (*execution.DAGRunStatus, error)
GetCurrentStatus retrieves the current status of a dag-run by its run ID. If the dag-run is running, it queries the socket for the current status. If the socket doesn't exist or times out, it falls back to stored status or creates an initial status.
func (*Manager) GetLatestStatus ¶
func (m *Manager) GetLatestStatus(ctx context.Context, dag *core.DAG) (execution.DAGRunStatus, error)
GetLatestStatus retrieves the latest status of a DAG. If the DAG is running, it attempts to get the current status from the socket. If that fails or no status exists, it returns an initial status or an error.
func (*Manager) GetSavedStatus ¶
func (m *Manager) GetSavedStatus(ctx context.Context, dagRun execution.DAGRunRef) (*execution.DAGRunStatus, error)
GetSavedStatus retrieves the saved status of a dag-run by its core.DAGRun reference.
func (*Manager) IsRunning ¶
IsRunning checks if a dag-run is currently running by querying its status. Returns true if the status can be retrieved without error, indicating the DAG is running.
func (*Manager) ListRecentStatus ¶
func (m *Manager) ListRecentStatus(ctx context.Context, name string, n int) []execution.DAGRunStatus
ListRecentStatus retrieves the n most recent statuses for a DAG by name. It returns a slice of Status objects, filtering out any that cannot be read.
func (*Manager) Stop ¶
Stop stops a running DAG by sending a stop request to its socket. If the DAG is not running, it logs a message and returns nil.
func (*Manager) UpdateStatus ¶
func (m *Manager) UpdateStatus(ctx context.Context, rootDAGRun execution.DAGRunRef, newStatus execution.DAGRunStatus) error
UpdateStatus updates the status of a dag-run.
type Node ¶
type Node struct {
Data
// contains filtered or unexported fields
}
Node is a node in a DAG. It executes a command.
func NodeWithData ¶
func (*Node) BuildSubDAGRuns ¶ added in v1.24.0
BuildSubDAGRuns constructs the sub DAG runs based on parallel configuration
func (*Node) ItemToParam ¶
ItemToParam converts a parallel item to a parameter string
func (*Node) LogContainsPattern ¶
LogContainsPattern checks if any of the given patterns exist in the node's log file. If a pattern starts with "regexp:", it will be treated as a regular expression. Returns false if no log file exists or no pattern is found. Returns error if there are issues reading the file or invalid regex pattern.
func (*Node) StdoutFile ¶
type NodeState ¶
type NodeState struct {
// Status represents the state of the node.
Status core.NodeStatus
// Stdout is the log file path from the node.
Stdout string
// Stderr is the log file path for the error log (stderr).
Stderr string
// StartedAt is the time when the node started.
StartedAt time.Time
// FinishedAt is the time when the node finished.
FinishedAt time.Time
// RetryCount is the number of retries happened based on the retry policy.
RetryCount int
// RetriedAt is the time when the node was retried last time.
RetriedAt time.Time
// DoneCount is the number of times the node was executed.
DoneCount int
// Repeated is true if the node is a repeated step.
// This is used to generate unique run IDs for repeated steps in case the node
// runs nested DAGs.
Repeated bool
// Error is the error that the executor encountered.
Error error
// ExitCode is the exit code that the command exited with.
// It only makes sense when the node is a command executor.
ExitCode int
// Parallel contains the evaluated parallel execution state for the node.
// This is populated when a step has parallel configuration and tracks
// all the items that need to be executed in parallel.
*Parallel
// SubRuns stores the sub dag-runs.
SubRuns []SubDAGRun
// SubRunsRepeated stores the repeated sub dag-runs.
SubRunsRepeated []SubDAGRun
// OutputVariables stores the output variables for the following steps.
// It only contains the local output variables.
OutputVariables *collections.SyncMap
// ChatMessages stores the chat conversation messages for message passing between steps.
ChatMessages []execution.LLMMessage
// ApprovalInputs stores key-value parameters provided during HITL approval.
// These are available as environment variables in subsequent steps.
ApprovalInputs map[string]string
// ApprovedAt is the time when the HITL step was approved.
ApprovedAt string
// ApprovedBy is the username of the user who approved the HITL step.
ApprovedBy string
// RejectedAt is the time when the HITL step was rejected.
RejectedAt string
// RejectedBy is the username of the user who rejected the HITL step.
RejectedBy string
// RejectionReason stores the optional reason for rejection.
RejectionReason string
}
type OutputCoordinator ¶
type OutputCoordinator struct {
StderrRedirectFile *os.File
// contains filtered or unexported fields
}
func (*OutputCoordinator) StdoutFile ¶
func (oc *OutputCoordinator) StdoutFile() string
type Parallel ¶
type Parallel struct {
// Items contains all the parallel items to be executed.
// Each item will result in a separate sub DAG run.
Items []ParallelItem
}
Parallel represents the evaluated parallel execution configuration for a node. It contains the expanded list of items to be processed in parallel.
type ParallelItem ¶
type ParallelItem struct {
// Item contains the actual data for this parallel execution.
// It can be either a simple value or a map of parameters from core.ParallelItem.
Item core.ParallelItem
}
ParallelItem represents a single item in a parallel execution. It combines the item data with a unique identifier for tracking.
type Plan ¶ added in v1.24.8
type Plan struct {
// Immutable adjacency lists (exposing for unit tests)
DependencyMap map[int][]int // node ID -> list of dependency node IDs (upstream)
DependantMap map[int][]int // node ID -> list of dependent node IDs (downstream)
// contains filtered or unexported fields
}
Plan represents a plan of execution for a set of steps. It encapsulates the graph structure and ensures thread-safe access.
func CreateRetryPlan ¶ added in v1.24.8
CreateRetryPlan creates a new execution plan for retrying specific nodes.
func CreateStepRetryPlan ¶ added in v1.24.8
CreateStepRetryPlan creates a new execution plan for retrying a specific step.
func NewPlan ¶ added in v1.24.8
NewPlan creates a new execution plan from the given steps. It builds the graph, validates it (checking for cycles), and returns the plan.
func NewPlanFromNodes ¶ added in v1.30.0
NewPlanFromNodes creates a plan from existing nodes without modifying their states.
func (*Plan) CheckFinished ¶ added in v1.24.8
CheckFinished checks if all nodes have completed (successfully or otherwise).
func (*Plan) Dependencies ¶ added in v1.24.8
Dependencies returns the IDs of the nodes that the given node depends on.
func (*Plan) Dependents ¶ added in v1.24.8
Dependents returns the IDs of the nodes that depend on the given node.
func (*Plan) GetNodeByName ¶ added in v1.24.8
GetNodeByName returns the node with the given name.
func (*Plan) IsFinished ¶ added in v1.24.8
func (*Plan) IsRunning ¶ added in v1.24.8
IsRunning checks if any node is currently running or pending.
func (*Plan) NodeStates ¶ added in v1.30.0
func (p *Plan) NodeStates() PlanNodeStates
NodeStates returns whether any nodes are running, waiting, not started, or rejected. Single pass, single lock for atomic read.
func (*Plan) WaitingStepNames ¶ added in v1.30.0
WaitingStepNames returns the names of steps that are waiting for approval.
type PlanNodeStates ¶ added in v1.30.0
PlanNodeStates holds the state flags for nodes in a plan.
type RestartOptions ¶
type RestartOptions struct {
Quiet bool // Whether to run in quiet mode
}
RestartOptions contains options for restarting a dag-run.
type RetryPolicy ¶
func (*RetryPolicy) ShouldRetry ¶
func (r *RetryPolicy) ShouldRetry(exitCode int) bool
ShouldRetry determines if a node should be retried based on the exit code and retry policy
type Runner ¶ added in v1.24.8
type Runner struct {
// contains filtered or unexported fields
}
Runner runs a plan of steps.
func (*Runner) GetMetrics ¶ added in v1.24.8
GetMetrics returns the current metrics for the runner
func (*Runner) HandlerNode ¶ added in v1.24.8
func (r *Runner) HandlerNode(name core.HandlerType) *Node
HandlerNode returns the handler node with the given name.
type StartOptions ¶
type StartOptions struct {
Params string // Parameters to pass to the DAG
Quiet bool // Whether to run in quiet mode
DAGRunID string // ID for the dag-run
NameOverride string // Optional DAG name override
FromRunID string // Historic dag-run ID to use as a template
Target string // Optional CLI argument override (DAG name or file path)
}
StartOptions contains options for initiating a dag-run.
type SubCmdBuilder ¶
type SubCmdBuilder struct {
// contains filtered or unexported fields
}
SubCmdBuilder centralizes CLI command argument construction.
func NewSubCmdBuilder ¶
func NewSubCmdBuilder(cfg *config.Config) *SubCmdBuilder
NewSubCmdBuilder returns a new SubCmdBuilder initialized from cfg. It sets Executable to cfg.Paths.Executable, ConfigFile to cfg.Paths.ConfigFileUsed, and base environment to cfg.Core.BaseEnv.
func (*SubCmdBuilder) Enqueue ¶
func (b *SubCmdBuilder) Enqueue(dag *core.DAG, opts EnqueueOptions) CmdSpec
Enqueue creates an enqueue command spec.
func (*SubCmdBuilder) Restart ¶
func (b *SubCmdBuilder) Restart(dag *core.DAG, opts RestartOptions) CmdSpec
Restart creates a restart command spec.
func (*SubCmdBuilder) Start ¶
func (b *SubCmdBuilder) Start(dag *core.DAG, opts StartOptions) CmdSpec
Start creates a start command spec.
func (*SubCmdBuilder) TaskRetry ¶
func (b *SubCmdBuilder) TaskRetry(task *coordinatorv1.Task) CmdSpec
TaskRetry creates a retry command spec for coordinator tasks.
func (*SubCmdBuilder) TaskStart ¶
func (b *SubCmdBuilder) TaskStart(task *coordinatorv1.Task) CmdSpec
TaskStart creates a start command spec for coordinator tasks.
type SubDAGRun ¶ added in v1.24.0
type SubDAGRun struct {
// DAGRunID is the unique identifier for the sub dag-run.
// It is generated as a base58-encoded SHA-256 hash of the string:
// "<parent-dag-run-id>:<step-name>:<deterministic-json-params>"
//
// This deterministic ID generation ensures:
// - Same parameters always produce the same sub DAG run ID
// - Retries reuse existing sub DAG runs instead of creating duplicates
// - Each step's children are namespaced by step name to prevent collisions
//
// The params are encoded as deterministic JSON (sorted keys) before hashing.
// Example input: "abc123:process-regions:{"REGION":"us-east-1","VERSION":"1.0.0"}"
// Example output: "5Kd3NBUAdUnhyzenEwVLy9pBKxSwXvE9FMPyR4UKZvpe"
DAGRunID string
// Params contains the raw parameters passed to the sub DAG run.
// This can be:
// - A simple string: "param1 param2"
// - Key-value pairs: "KEY1=value1 KEY2=value2"
// - Raw JSON: '{"region": "us-east-1", "config": {"timeout": 30}}'
// The exact format depends on how the DAG expects to receive parameters.
Params string
}
SubDAGRun represents a sub DAG execution within a parent DAG. Each sub DAG run has a deterministic ID based on its parameters to ensure idempotency.