Documentation
¶
Overview ¶
Package executor provides functionality to execute workflows defined as directed acyclic graphs (DAGs).
Index ¶
- func WithCleanEnv(ctx context.Context) context.Context
- func WithPrintOutput(ctx context.Context) context.Context
- func WithProgress(ctx context.Context, ch chan<- ProgressEvent) context.Context
- func WithRuntimeVars(ctx context.Context, vars map[string]string) context.Context
- type Executor
- type ParallelExecutor
- type ProgressEvent
- type ProgressEventKind
- type SequentialExecutor
- type WorkStealingExecutor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithCleanEnv ¶
WithCleanEnv attaches the clean-env flag to ctx. When set, tasks that do not specify clean_env=true in TOML also start with an empty environment (only task-level env vars are inherited).
func WithPrintOutput ¶
WithPrintOutput attaches the print-output flag to ctx.
func WithProgress ¶
func WithProgress(ctx context.Context, ch chan<- ProgressEvent) context.Context
WithProgress attaches a progress-event channel to ctx. The caller owns the channel and is responsible for draining it.
func WithRuntimeVars ¶
WithRuntimeVars attaches runtime key=value pairs to ctx. doResume extracts these and injects them into the ContextMap after snapshot restoration, allowing the caller to override or supplement variables.
Types ¶
type Executor ¶
type Executor interface {
// Execute runs the entire DAG
Execute(ctx context.Context, dag *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)
// Resume continues execution from a previous failure
Resume(ctx context.Context, runID string) (*storage.Run, error)
// GetStore returns the underlying database store
GetStore() *storage.Store
}
Executor defines the interface for task execution
type ParallelExecutor ¶
type ParallelExecutor struct {
// contains filtered or unexported fields
}
func NewParallelExecutor ¶
func NewParallelExecutor(db *storage.Store, maxParallel int) *ParallelExecutor
func (*ParallelExecutor) Execute ¶
func (e *ParallelExecutor) Execute(ctx context.Context, d *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)
func (*ParallelExecutor) GetStore ¶
func (e *ParallelExecutor) GetStore() *storage.Store
type ProgressEvent ¶
type ProgressEvent struct {
Kind ProgressEventKind
RunID string
TaskID string
TaskName string
Status storage.TaskStatus // populated for task events
RunStatus storage.RunStatus // populated for run events
Attempt int
Error error
// Output is populated for ProgressTaskOutput events.
Output string
Truncated bool // true when output exceeded maxOutputEvent
At time.Time
}
ProgressEvent is emitted on the channel returned by WithProgress as execution proceeds. Consumers can render a live status view without touching the DB.
type ProgressEventKind ¶
type ProgressEventKind string
ProgressEventKind identifies what happened in a ProgressEvent.
const ( ProgressRunStarted ProgressEventKind = "run_started" ProgressTaskStarted ProgressEventKind = "task_started" ProgressTaskDone ProgressEventKind = "task_done" // success or skip ProgressTaskFailed ProgressEventKind = "task_failed" // after all retries ProgressTaskRetrying ProgressEventKind = "task_retrying" // ProgressTaskOutput carries captured stdout/stderr for a completed task. // Emitted only when --print-output is set on the run context. // Output is truncated to 64 KiB before being placed on the channel; the // full content is always available in the on-disk log file. ProgressTaskOutput ProgressEventKind = "task_output" ProgressRunDone ProgressEventKind = "run_done" )
type SequentialExecutor ¶
type SequentialExecutor struct {
// contains filtered or unexported fields
}
func NewSequentialExecutor ¶
func NewSequentialExecutor(db *storage.Store) *SequentialExecutor
func (*SequentialExecutor) Execute ¶
func (e *SequentialExecutor) Execute(ctx context.Context, d *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)
func (*SequentialExecutor) GetStore ¶
func (e *SequentialExecutor) GetStore() *storage.Store
type WorkStealingExecutor ¶
type WorkStealingExecutor struct {
// contains filtered or unexported fields
}
WorkStealingExecutor is a dependency-aware scheduler that maintains a ready-queue and dispatches tasks as soon as all their dependencies complete — regardless of topological level. This achieves true maximum concurrency: a task no longer needs to wait for unrelated siblings that happen to share its level.
Concurrency model:
- One goroutine per launched task (capped by semaphore at numWorkers).
- Each goroutine, after finishing its task, decrements the pending-dependency counter of every direct dependent. When a counter hits 0 the goroutine launches that dependent inline (wg.Add before the parent's wg.Done fires), keeping the WaitGroup count > 0 for as long as any work is reachable.
- The main goroutine waits on wg.Wait() and therefore has no polling loop.
func NewWorkStealingExecutor ¶
func NewWorkStealingExecutor(db *storage.Store, numWorkers int) *WorkStealingExecutor
NewWorkStealingExecutor returns a WorkStealingExecutor that runs at most numWorkers tasks concurrently. If numWorkers ≤ 0 it is set to 4.
func (*WorkStealingExecutor) Execute ¶
func (e *WorkStealingExecutor) Execute(ctx context.Context, d *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)
func (*WorkStealingExecutor) GetStore ¶
func (e *WorkStealingExecutor) GetStore() *storage.Store