executor

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package executor provides functionality to execute workflows defined as directed acyclic graphs (DAGs).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithCleanEnv

func WithCleanEnv(ctx context.Context) context.Context

WithCleanEnv attaches the clean-env flag to ctx. When set, tasks that do not specify clean_env=true in TOML also start with an empty environment (only task-level env vars are inherited).

func WithPrintOutput

func WithPrintOutput(ctx context.Context) context.Context

WithPrintOutput attaches the print-output flag to ctx.

func WithProgress

func WithProgress(ctx context.Context, ch chan<- ProgressEvent) context.Context

WithProgress attaches a progress-event channel to ctx. The caller owns the channel and is responsible for draining it.

func WithRuntimeVars

func WithRuntimeVars(ctx context.Context, vars map[string]string) context.Context

WithRuntimeVars attaches runtime key=value pairs to ctx. doResume extracts these and injects them into the ContextMap after snapshot restoration, allowing the caller to override or supplement variables.

Types

type Executor

type Executor interface {
	// Execute runs the entire DAG
	Execute(ctx context.Context, dag *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)

	// Resume continues execution from a previous failure
	Resume(ctx context.Context, runID string) (*storage.Run, error)

	// GetStore returns the underlying database store
	GetStore() *storage.Store
}

Executor defines the interface for task execution

type ParallelExecutor

type ParallelExecutor struct {
	// contains filtered or unexported fields
}

func NewParallelExecutor

func NewParallelExecutor(db *storage.Store, maxParallel int) *ParallelExecutor

func (*ParallelExecutor) Execute

func (e *ParallelExecutor) Execute(ctx context.Context, d *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)

func (*ParallelExecutor) GetStore

func (e *ParallelExecutor) GetStore() *storage.Store

func (*ParallelExecutor) Resume

func (e *ParallelExecutor) Resume(ctx context.Context, runID string) (*storage.Run, error)

type ProgressEvent

type ProgressEvent struct {
	Kind      ProgressEventKind
	RunID     string
	TaskID    string
	TaskName  string
	Status    storage.TaskStatus // populated for task events
	RunStatus storage.RunStatus  // populated for run events
	Attempt   int
	Error     error
	// Output is populated for ProgressTaskOutput events.
	Output    string
	Truncated bool // true when output exceeded maxOutputEvent
	At        time.Time
}

ProgressEvent is emitted on the channel returned by WithProgress as execution proceeds. Consumers can render a live status view without touching the DB.

type ProgressEventKind

type ProgressEventKind string

ProgressEventKind identifies what happened in a ProgressEvent.

const (
	ProgressRunStarted   ProgressEventKind = "run_started"
	ProgressTaskStarted  ProgressEventKind = "task_started"
	ProgressTaskDone     ProgressEventKind = "task_done"   // success or skip
	ProgressTaskFailed   ProgressEventKind = "task_failed" // after all retries
	ProgressTaskRetrying ProgressEventKind = "task_retrying"
	// ProgressTaskOutput carries captured stdout/stderr for a completed task.
	// Emitted only when --print-output is set on the run context.
	// Output is truncated to 64 KiB before being placed on the channel; the
	// full content is always available in the on-disk log file.
	ProgressTaskOutput ProgressEventKind = "task_output"
	ProgressRunDone    ProgressEventKind = "run_done"
)

type SequentialExecutor

type SequentialExecutor struct {
	// contains filtered or unexported fields
}

func NewSequentialExecutor

func NewSequentialExecutor(db *storage.Store) *SequentialExecutor

func (*SequentialExecutor) Execute

func (e *SequentialExecutor) Execute(ctx context.Context, d *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)

func (*SequentialExecutor) GetStore

func (e *SequentialExecutor) GetStore() *storage.Store

func (*SequentialExecutor) Resume

func (e *SequentialExecutor) Resume(ctx context.Context, runID string) (*storage.Run, error)

type WorkStealingExecutor

type WorkStealingExecutor struct {
	// contains filtered or unexported fields
}

WorkStealingExecutor is a dependency-aware scheduler that maintains a ready-queue and dispatches tasks as soon as all their dependencies complete — regardless of topological level. This achieves true maximum concurrency: a task no longer needs to wait for unrelated siblings that happen to share its level.

Concurrency model:

  • One goroutine per launched task (capped by semaphore at numWorkers).
  • Each goroutine, after finishing its task, decrements the pending-dependency counter of every direct dependent. When a counter hits 0 the goroutine launches that dependent inline (wg.Add before the parent's wg.Done fires), keeping the WaitGroup count > 0 for as long as any work is reachable.
  • The main goroutine waits on wg.Wait() and therefore has no polling loop.

func NewWorkStealingExecutor

func NewWorkStealingExecutor(db *storage.Store, numWorkers int) *WorkStealingExecutor

NewWorkStealingExecutor returns a WorkStealingExecutor that runs at most numWorkers tasks concurrently. If numWorkers ≤ 0 it is set to 4.

func (*WorkStealingExecutor) Execute

func (e *WorkStealingExecutor) Execute(ctx context.Context, d *dag.DAG, ctxMap *contextmap.ContextMap) (*storage.Run, error)

func (*WorkStealingExecutor) GetStore

func (e *WorkStealingExecutor) GetStore() *storage.Store

func (*WorkStealingExecutor) Resume

func (e *WorkStealingExecutor) Resume(ctx context.Context, runID string) (*storage.Run, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL