orchestrate

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComputeCriticalPath

func ComputeCriticalPath(nodes []Node) map[string]int

ComputeCriticalPath analyzes the DAG topology using the Critical Path Method (CPM) and assigns priority values to nodes. Nodes on the critical path get higher priority. The weight of each node is assumed to be 1 (unit weight) unless Duration metadata is available.

func ExecuteConditional

func ExecuteConditional(ctx context.Context, cn *ConditionalNode, req *schema.RunRequest,
	upstreamResults map[string]*schema.RunResponse,
) (*schema.RunResponse, string, error)

ExecuteConditional runs a ConditionalNode: first executes the node's runner, then evaluates branches to determine which target nodes to run. It returns the runner's response and the selected target node ID.

func ExecuteDynamicSpawn

func ExecuteDynamicSpawn(ctx context.Context, dsn *DynamicSpawnNode, req *schema.RunRequest) (*schema.RunResponse, error)

ExecuteDynamicSpawn executes a DynamicSpawnNode: 1. Runs the node's own runner to produce output. 2. Calls Spawner to generate child nodes. 3. Executes child nodes in parallel. 4. Aggregates results using SpawnAggregator.

func ExecuteLoop

func ExecuteLoop(ctx context.Context, loop LoopNode, req *schema.RunRequest) (*schema.RunResponse, error)

ExecuteLoop runs a loop with the given body runner and termination conditions.

func ValidateConditionalNode

func ValidateConditionalNode(cn *ConditionalNode) error

ValidateConditionalNode validates a ConditionalNode's configuration.

func ValidateDAG

func ValidateDAG(nodes []Node) error

ValidateDAG performs comprehensive validation on DAG nodes: duplicate IDs, missing dependencies, cycle detection, and connectivity check. Multiple root nodes (no deps) and multiple terminal nodes are allowed, but all nodes must be part of a single connected graph.

Types

type Aggregator

type Aggregator interface {
	Aggregate(ctx context.Context, results map[string]*schema.RunResponse) (*schema.RunResponse, error)
}

Aggregator merges terminal node results into a single response.

func ConcatMessagesAggregator

func ConcatMessagesAggregator() Aggregator

ConcatMessagesAggregator returns an Aggregator that concatenates messages from all terminal nodes ordered by sorted node ID.

func LastResultAggregator

func LastResultAggregator() Aggregator

LastResultAggregator returns an Aggregator that picks the last terminal node result by sorted node ID.

type BackpressureConfig

type BackpressureConfig struct {
	InitialConcurrency int           // Starting concurrency level.
	MinConcurrency     int           // Minimum concurrency level.
	MaxConcurrency     int           // Maximum concurrency level.
	LatencyThreshold   time.Duration // Latency above this triggers concurrency decrease.
	AdjustInterval     time.Duration // How often to adjust concurrency.
}

BackpressureConfig configures adaptive concurrency control.

type Branch

type Branch struct {
	// Condition evaluates upstream results and returns true if this branch should be taken.
	Condition func(upstreamResults map[string]*schema.RunResponse) bool
	// TargetID is the ID of the node to activate when this branch is taken.
	TargetID string
}

Branch represents a conditional branch with a condition function and target node ID.

type CheckpointStore

type CheckpointStore interface {
	// Save persists a node's result.
	Save(ctx context.Context, dagID, nodeID string, resp *schema.RunResponse) error
	// Load retrieves a single node's result.
	Load(ctx context.Context, dagID, nodeID string) (*schema.RunResponse, error)
	// LoadAll retrieves all saved node results for a DAG execution.
	LoadAll(ctx context.Context, dagID string) (map[string]*schema.RunResponse, error)
	// Clear removes all saved results for a DAG execution.
	Clear(ctx context.Context, dagID string) error
}

CheckpointStore persists node results for resume and replay.

type Compensatable

type Compensatable interface {
	// Compensate rolls back the effects of a previously successful execution.
	Compensate(ctx context.Context, original *schema.RunResponse) error
	// Idempotent returns true if the Compensate operation is idempotent (safe to retry).
	// Deprecated: implement IdempotentChecker instead for new code.
	Idempotent() bool
}

Compensatable is implemented by Runners that support compensation (rollback).

type CompensateConfig

type CompensateConfig struct {
	Strategy   CompensateStrategy // Compensation approach.
	Timeout    time.Duration      // Timeout for each individual compensation operation.
	MaxRetries int                // Max retries for compensation (only if Idempotent).
}

CompensateConfig configures compensation behavior.

type CompensateStrategy

type CompensateStrategy int

CompensateStrategy defines the compensation approach.

const (
	// BackwardCompensate rolls back completed nodes in reverse topological order (Saga pattern).
	BackwardCompensate CompensateStrategy = iota
	// ForwardRecovery retries the failed node until success or max retries.
	ForwardRecovery
)

type ConditionalNode

type ConditionalNode struct {
	Node                // Embedded base node (Runner executes first, then branches are evaluated).
	Branches   []Branch // Conditional branches, evaluated in order.
	Default    string   // Default target node ID when no branch matches (empty = skip).
	Exhaustive bool     // When true, validation requires Default to be non-empty.
}

ConditionalNode represents a node that routes execution to different branches based on upstream results. Branches are evaluated in order; the first match wins.

func (*ConditionalNode) EvaluateBranches

func (cn *ConditionalNode) EvaluateBranches(upstreamResults map[string]*schema.RunResponse) string

EvaluateBranches evaluates the conditional branches and returns the target node ID. Returns empty string if no branch matches and no default is set.

type DAGConfig

type DAGConfig struct {
	MaxConcurrency     int
	ErrorStrategy      ErrorStrategy
	EarlyExitFunc      func(nodeID string, resp *schema.RunResponse) bool
	Aggregator         Aggregator
	CheckpointStore    CheckpointStore     // Optional checkpoint store for save/resume.
	ReplayMode         bool                // When true, replay from checkpoint without executing runners.
	PriorityScheduling bool                // Use priority queue for ready nodes (default: FIFO).
	CriticalPathAuto   bool                // Auto-compute critical path priorities (requires PriorityScheduling).
	BackpressureCfg    *BackpressureConfig // Adaptive concurrency control (nil = disabled).
	ResourceLimits     map[string]int      // Per-resource-tag concurrency limits.
	ResourceRateLimits map[string]float64  // Per-resource-tag rate limits (requests/second).
	CompensateCfg      *CompensateConfig   // Compensation configuration (nil = disabled).
	EventHandler       DAGEventHandler     // Optional event handler for observability (nil = disabled).
}

DAGConfig holds configuration for DAG execution.

type DAGEventHandler

type DAGEventHandler interface {
	OnNodeStart(nodeID string)
	OnNodeComplete(nodeID string, status NodeStatus, err error)
	OnCheckpointError(nodeID string, err error)
}

DAGEventHandler receives lifecycle events during DAG execution. All methods must be safe for concurrent use.

type DAGOption

type DAGOption func(*DAGConfig)

DAGOption is a functional option for configuring DAG execution.

func WithAggregator

func WithAggregator(a Aggregator) DAGOption

WithAggregator sets the aggregator for combining terminal node results.

func WithBackpressure

func WithBackpressure(cfg *BackpressureConfig) DAGOption

WithBackpressure enables adaptive concurrency control.

func WithCheckpointStore

func WithCheckpointStore(cs CheckpointStore) DAGOption

WithCheckpointStore enables checkpoint-based save/resume.

func WithCompensation

func WithCompensation(cfg *CompensateConfig) DAGOption

WithCompensation enables compensation (Saga pattern) on failure.

func WithEarlyExit

func WithEarlyExit(fn func(nodeID string, resp *schema.RunResponse) bool) DAGOption

WithEarlyExit sets a function that can trigger early DAG termination.

func WithErrorStrategy

func WithErrorStrategy(s ErrorStrategy) DAGOption

WithErrorStrategy sets the error handling strategy.

func WithEventHandler

func WithEventHandler(h DAGEventHandler) DAGOption

WithEventHandler sets the event handler for observability.

func WithMaxConcurrency

func WithMaxConcurrency(n int) DAGOption

WithMaxConcurrency sets the maximum number of concurrently running nodes.

func WithPriorityScheduling

func WithPriorityScheduling(criticalPathAuto bool) DAGOption

WithPriorityScheduling enables priority-based scheduling with optional critical path auto-computation.

func WithReplayMode

func WithReplayMode() DAGOption

WithReplayMode enables replaying from checkpoints without re-executing runners.

func WithResourceLimits

func WithResourceLimits(limits map[string]int) DAGOption

WithResourceLimits sets per-resource-tag concurrency limits.

func WithResourceRateLimits

func WithResourceRateLimits(limits map[string]float64) DAGOption

WithResourceRateLimits sets per-resource-tag rate limits (requests/second).

type DAGResult

type DAGResult struct {
	NodeResults map[string]*schema.RunResponse
	NodeStatus  map[string]NodeStatus
	FinalOutput *schema.RunResponse
	Usage       *aimodel.Usage
	Timeline    []NodeTimeline // Node execution timeline (Gantt chart data).
}

DAGResult holds the results of a DAG execution.

func ExecuteDAG

func ExecuteDAG(ctx context.Context, cfg DAGConfig, nodes []Node, req *schema.RunRequest) (*DAGResult, error)

ExecuteDAG runs a DAG of nodes with the given config and initial request.

func RunDAG

func RunDAG(ctx context.Context, nodes []Node, req *schema.RunRequest, opts ...DAGOption) (*DAGResult, error)

RunDAG is a convenience entry point that builds a DAGConfig from functional options and delegates to ExecuteDAG. For advanced use cases, use ExecuteDAG directly.

type DynamicSpawnNode

type DynamicSpawnNode struct {
	Node                                                                                  // Embedded base node.
	Spawner         func(ctx context.Context, output *schema.RunResponse) ([]Node, error) // Generates child nodes.
	SpawnAggregator Aggregator                                                            // Aggregates child results.
	MaxSpawnCount   int                                                                   // Max number of spawned nodes (0 = unlimited).
	SpawnTimeout    time.Duration                                                         // Timeout for all spawned nodes (0 = no timeout).
	SpawnDepthLimit int                                                                   // Max recursion depth for nested spawns (0 = no nesting).
}

DynamicSpawnNode generates child nodes at runtime from the parent's output (Map-Reduce pattern).

type Edge

type Edge struct {
	From string
	To   string
}

Edge represents a dependency edge from one node to another.

type ErrorStrategy

type ErrorStrategy int

ErrorStrategy controls how the DAG engine handles node failures.

const (
	Abort ErrorStrategy = iota
	Skip
	Compensate
)

type IdempotentChecker

type IdempotentChecker interface {
	Idempotent() bool
}

IdempotentChecker is implemented by operations that can indicate whether they are idempotent. This is checked via type assertion and can be implemented by any Runner or Compensatable.

type InMemoryCheckpointStore

type InMemoryCheckpointStore struct {
	// contains filtered or unexported fields
}

InMemoryCheckpointStore is an in-memory implementation of CheckpointStore.

func NewInMemoryCheckpointStore

func NewInMemoryCheckpointStore() *InMemoryCheckpointStore

NewInMemoryCheckpointStore creates a new in-memory checkpoint store.

func (*InMemoryCheckpointStore) Clear

func (s *InMemoryCheckpointStore) Clear(_ context.Context, dagID string) error

func (*InMemoryCheckpointStore) Load

func (s *InMemoryCheckpointStore) Load(_ context.Context, dagID, nodeID string) (*schema.RunResponse, error)

func (*InMemoryCheckpointStore) LoadAll

func (*InMemoryCheckpointStore) Save

func (s *InMemoryCheckpointStore) Save(_ context.Context, dagID, nodeID string, resp *schema.RunResponse) error

type InputMapFunc

type InputMapFunc func(upstreamResults map[string]*schema.RunResponse) (*schema.RunRequest, error)

InputMapFunc maps upstream results to the current node's input.

type LoopNode

type LoopNode struct {
	Body            Runner
	Condition       func(*schema.RunResponse) bool
	MaxIters        int
	ConvergenceFunc func(prev, curr *schema.RunResponse) bool
}

LoopNode defines a loop with a body runner and termination conditions.

type Node

type Node struct {
	ID           string
	Runner       Runner
	Deps         []string
	InputMapper  InputMapFunc
	Optional     bool
	Condition    func(upstreamResults map[string]*schema.RunResponse) bool
	Timeout      time.Duration // Per-node execution timeout (0 = no limit).
	Retries      int           // Max retry count on failure (0 = no retry).
	ResourceTags []string      // Resource tags for concurrency/rate control.
	Priority     int           // Scheduling priority (higher = more priority).
}

Node is a single node in a DAG execution graph.

func BuildDAG

func BuildDAG(nodes []Node, edges []Edge) ([]Node, error)

BuildDAG converts a list of nodes and edges into nodes with Deps populated. It validates that all edge references exist and that no node has pre-existing Deps (to avoid mixing the two definition styles).

type NodeStatus

type NodeStatus int

NodeStatus represents the execution status of a node.

const (
	NodePending NodeStatus = iota
	NodeRunning
	NodeDone
	NodeFailed
	NodeSkipped
	NodeCompensated
)

type NodeTimeline

type NodeTimeline struct {
	NodeID    string        `json:"node_id"`
	StartTime time.Time     `json:"start_time"`
	EndTime   time.Time     `json:"end_time"`
	Duration  time.Duration `json:"duration_ns"`
	Status    NodeStatus    `json:"status"`
}

NodeTimeline records the execution timing of a single node.

type Runner

type Runner interface {
	Run(ctx context.Context, req *schema.RunRequest) (*schema.RunResponse, error)
}

Runner executes a unit of work. agent.Agent satisfies this interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL