Documentation
¶
Index ¶
- func ComputeCriticalPath(nodes []Node) map[string]int
- func ExecuteConditional(ctx context.Context, cn *ConditionalNode, req *schema.RunRequest, ...) (*schema.RunResponse, string, error)
- func ExecuteDynamicSpawn(ctx context.Context, dsn *DynamicSpawnNode, req *schema.RunRequest) (*schema.RunResponse, error)
- func ExecuteLoop(ctx context.Context, loop LoopNode, req *schema.RunRequest) (*schema.RunResponse, error)
- func ValidateConditionalNode(cn *ConditionalNode) error
- func ValidateDAG(nodes []Node) error
- type Aggregator
- type BackpressureConfig
- type Branch
- type CheckpointStore
- type Compensatable
- type CompensateConfig
- type CompensateStrategy
- type ConditionalNode
- type DAGConfig
- type DAGEventHandler
- type DAGOption
- func WithAggregator(a Aggregator) DAGOption
- func WithBackpressure(cfg *BackpressureConfig) DAGOption
- func WithCheckpointStore(cs CheckpointStore) DAGOption
- func WithCompensation(cfg *CompensateConfig) DAGOption
- func WithEarlyExit(fn func(nodeID string, resp *schema.RunResponse) bool) DAGOption
- func WithErrorStrategy(s ErrorStrategy) DAGOption
- func WithEventHandler(h DAGEventHandler) DAGOption
- func WithMaxConcurrency(n int) DAGOption
- func WithPriorityScheduling(criticalPathAuto bool) DAGOption
- func WithReplayMode() DAGOption
- func WithResourceLimits(limits map[string]int) DAGOption
- func WithResourceRateLimits(limits map[string]float64) DAGOption
- type DAGResult
- type DynamicSpawnNode
- type Edge
- type ErrorStrategy
- type IdempotentChecker
- type InMemoryCheckpointStore
- func (s *InMemoryCheckpointStore) Clear(_ context.Context, dagID string) error
- func (s *InMemoryCheckpointStore) Load(_ context.Context, dagID, nodeID string) (*schema.RunResponse, error)
- func (s *InMemoryCheckpointStore) LoadAll(_ context.Context, dagID string) (map[string]*schema.RunResponse, error)
- func (s *InMemoryCheckpointStore) Save(_ context.Context, dagID, nodeID string, resp *schema.RunResponse) error
- type InputMapFunc
- type LoopNode
- type Node
- type NodeStatus
- type NodeTimeline
- type Runner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComputeCriticalPath ¶
ComputeCriticalPath analyzes the DAG topology using the Critical Path Method (CPM) and assigns priority values to nodes. Nodes on the critical path get higher priority. The weight of each node is assumed to be 1 (unit weight) unless Duration metadata is available.
func ExecuteConditional ¶
func ExecuteConditional(ctx context.Context, cn *ConditionalNode, req *schema.RunRequest, upstreamResults map[string]*schema.RunResponse, ) (*schema.RunResponse, string, error)
ExecuteConditional runs a ConditionalNode: first executes the node's runner, then evaluates branches to determine which target nodes to run. It returns the runner's response and the selected target node ID.
func ExecuteDynamicSpawn ¶
func ExecuteDynamicSpawn(ctx context.Context, dsn *DynamicSpawnNode, req *schema.RunRequest) (*schema.RunResponse, error)
ExecuteDynamicSpawn executes a DynamicSpawnNode: 1. Runs the node's own runner to produce output. 2. Calls Spawner to generate child nodes. 3. Executes child nodes in parallel. 4. Aggregates results using SpawnAggregator.
func ExecuteLoop ¶
func ExecuteLoop(ctx context.Context, loop LoopNode, req *schema.RunRequest) (*schema.RunResponse, error)
ExecuteLoop runs a loop with the given body runner and termination conditions.
func ValidateConditionalNode ¶
func ValidateConditionalNode(cn *ConditionalNode) error
ValidateConditionalNode validates a ConditionalNode's configuration.
func ValidateDAG ¶
ValidateDAG performs comprehensive validation on DAG nodes: duplicate IDs, missing dependencies, cycle detection, and connectivity check. Multiple root nodes (no deps) and multiple terminal nodes are allowed, but all nodes must be part of a single connected graph.
Types ¶
type Aggregator ¶
type Aggregator interface {
Aggregate(ctx context.Context, results map[string]*schema.RunResponse) (*schema.RunResponse, error)
}
Aggregator merges terminal node results into a single response.
func ConcatMessagesAggregator ¶
func ConcatMessagesAggregator() Aggregator
ConcatMessagesAggregator returns an Aggregator that concatenates messages from all terminal nodes ordered by sorted node ID.
func LastResultAggregator ¶
func LastResultAggregator() Aggregator
LastResultAggregator returns an Aggregator that picks the last terminal node result by sorted node ID.
type BackpressureConfig ¶
type BackpressureConfig struct {
InitialConcurrency int // Starting concurrency level.
MinConcurrency int // Minimum concurrency level.
MaxConcurrency int // Maximum concurrency level.
LatencyThreshold time.Duration // Latency above this triggers concurrency decrease.
AdjustInterval time.Duration // How often to adjust concurrency.
}
BackpressureConfig configures adaptive concurrency control.
type Branch ¶
type Branch struct {
// Condition evaluates upstream results and returns true if this branch should be taken.
Condition func(upstreamResults map[string]*schema.RunResponse) bool
// TargetID is the ID of the node to activate when this branch is taken.
TargetID string
}
Branch represents a conditional branch with a condition function and target node ID.
type CheckpointStore ¶
type CheckpointStore interface {
// Save persists a node's result.
Save(ctx context.Context, dagID, nodeID string, resp *schema.RunResponse) error
// Load retrieves a single node's result.
Load(ctx context.Context, dagID, nodeID string) (*schema.RunResponse, error)
// LoadAll retrieves all saved node results for a DAG execution.
LoadAll(ctx context.Context, dagID string) (map[string]*schema.RunResponse, error)
// Clear removes all saved results for a DAG execution.
Clear(ctx context.Context, dagID string) error
}
CheckpointStore persists node results for resume and replay.
type Compensatable ¶
type Compensatable interface {
// Compensate rolls back the effects of a previously successful execution.
Compensate(ctx context.Context, original *schema.RunResponse) error
// Idempotent returns true if the Compensate operation is idempotent (safe to retry).
// Deprecated: implement IdempotentChecker instead for new code.
Idempotent() bool
}
Compensatable is implemented by Runners that support compensation (rollback).
type CompensateConfig ¶
type CompensateConfig struct {
Strategy CompensateStrategy // Compensation approach.
Timeout time.Duration // Timeout for each individual compensation operation.
MaxRetries int // Max retries for compensation (only if Idempotent).
}
CompensateConfig configures compensation behavior.
type CompensateStrategy ¶
type CompensateStrategy int
CompensateStrategy defines the compensation approach.
const ( // BackwardCompensate rolls back completed nodes in reverse topological order (Saga pattern). BackwardCompensate CompensateStrategy = iota // ForwardRecovery retries the failed node until success or max retries. ForwardRecovery )
type ConditionalNode ¶
type ConditionalNode struct {
Node // Embedded base node (Runner executes first, then branches are evaluated).
Branches []Branch // Conditional branches, evaluated in order.
Default string // Default target node ID when no branch matches (empty = skip).
Exhaustive bool // When true, validation requires Default to be non-empty.
}
ConditionalNode represents a node that routes execution to different branches based on upstream results. Branches are evaluated in order; the first match wins.
func (*ConditionalNode) EvaluateBranches ¶
func (cn *ConditionalNode) EvaluateBranches(upstreamResults map[string]*schema.RunResponse) string
EvaluateBranches evaluates the conditional branches and returns the target node ID. Returns empty string if no branch matches and no default is set.
type DAGConfig ¶
type DAGConfig struct {
MaxConcurrency int
ErrorStrategy ErrorStrategy
EarlyExitFunc func(nodeID string, resp *schema.RunResponse) bool
Aggregator Aggregator
CheckpointStore CheckpointStore // Optional checkpoint store for save/resume.
ReplayMode bool // When true, replay from checkpoint without executing runners.
PriorityScheduling bool // Use priority queue for ready nodes (default: FIFO).
CriticalPathAuto bool // Auto-compute critical path priorities (requires PriorityScheduling).
BackpressureCfg *BackpressureConfig // Adaptive concurrency control (nil = disabled).
ResourceLimits map[string]int // Per-resource-tag concurrency limits.
ResourceRateLimits map[string]float64 // Per-resource-tag rate limits (requests/second).
CompensateCfg *CompensateConfig // Compensation configuration (nil = disabled).
EventHandler DAGEventHandler // Optional event handler for observability (nil = disabled).
}
DAGConfig holds configuration for DAG execution.
type DAGEventHandler ¶
type DAGEventHandler interface {
OnNodeStart(nodeID string)
OnNodeComplete(nodeID string, status NodeStatus, err error)
OnCheckpointError(nodeID string, err error)
}
DAGEventHandler receives lifecycle events during DAG execution. All methods must be safe for concurrent use.
type DAGOption ¶
type DAGOption func(*DAGConfig)
DAGOption is a functional option for configuring DAG execution.
func WithAggregator ¶
func WithAggregator(a Aggregator) DAGOption
WithAggregator sets the aggregator for combining terminal node results.
func WithBackpressure ¶
func WithBackpressure(cfg *BackpressureConfig) DAGOption
WithBackpressure enables adaptive concurrency control.
func WithCheckpointStore ¶
func WithCheckpointStore(cs CheckpointStore) DAGOption
WithCheckpointStore enables checkpoint-based save/resume.
func WithCompensation ¶
func WithCompensation(cfg *CompensateConfig) DAGOption
WithCompensation enables compensation (Saga pattern) on failure.
func WithEarlyExit ¶
func WithEarlyExit(fn func(nodeID string, resp *schema.RunResponse) bool) DAGOption
WithEarlyExit sets a function that can trigger early DAG termination.
func WithErrorStrategy ¶
func WithErrorStrategy(s ErrorStrategy) DAGOption
WithErrorStrategy sets the error handling strategy.
func WithEventHandler ¶
func WithEventHandler(h DAGEventHandler) DAGOption
WithEventHandler sets the event handler for observability.
func WithMaxConcurrency ¶
WithMaxConcurrency sets the maximum number of concurrently running nodes.
func WithPriorityScheduling ¶
WithPriorityScheduling enables priority-based scheduling with optional critical path auto-computation.
func WithReplayMode ¶
func WithReplayMode() DAGOption
WithReplayMode enables replaying from checkpoints without re-executing runners.
func WithResourceLimits ¶
WithResourceLimits sets per-resource-tag concurrency limits.
func WithResourceRateLimits ¶
WithResourceRateLimits sets per-resource-tag rate limits (requests/second).
type DAGResult ¶
type DAGResult struct {
NodeResults map[string]*schema.RunResponse
NodeStatus map[string]NodeStatus
FinalOutput *schema.RunResponse
Usage *aimodel.Usage
Timeline []NodeTimeline // Node execution timeline (Gantt chart data).
}
DAGResult holds the results of a DAG execution.
func ExecuteDAG ¶
func ExecuteDAG(ctx context.Context, cfg DAGConfig, nodes []Node, req *schema.RunRequest) (*DAGResult, error)
ExecuteDAG runs a DAG of nodes with the given config and initial request.
type DynamicSpawnNode ¶
type DynamicSpawnNode struct {
Node // Embedded base node.
Spawner func(ctx context.Context, output *schema.RunResponse) ([]Node, error) // Generates child nodes.
SpawnAggregator Aggregator // Aggregates child results.
MaxSpawnCount int // Max number of spawned nodes (0 = unlimited).
SpawnTimeout time.Duration // Timeout for all spawned nodes (0 = no timeout).
SpawnDepthLimit int // Max recursion depth for nested spawns (0 = no nesting).
}
DynamicSpawnNode generates child nodes at runtime from the parent's output (Map-Reduce pattern).
type ErrorStrategy ¶
type ErrorStrategy int
ErrorStrategy controls how the DAG engine handles node failures.
const ( Abort ErrorStrategy = iota Skip Compensate )
type IdempotentChecker ¶
type IdempotentChecker interface {
Idempotent() bool
}
IdempotentChecker is implemented by operations that can indicate whether they are idempotent. This is checked via type assertion and can be implemented by any Runner or Compensatable.
type InMemoryCheckpointStore ¶
type InMemoryCheckpointStore struct {
// contains filtered or unexported fields
}
InMemoryCheckpointStore is an in-memory implementation of CheckpointStore.
func NewInMemoryCheckpointStore ¶
func NewInMemoryCheckpointStore() *InMemoryCheckpointStore
NewInMemoryCheckpointStore creates a new in-memory checkpoint store.
func (*InMemoryCheckpointStore) Clear ¶
func (s *InMemoryCheckpointStore) Clear(_ context.Context, dagID string) error
func (*InMemoryCheckpointStore) Load ¶
func (s *InMemoryCheckpointStore) Load(_ context.Context, dagID, nodeID string) (*schema.RunResponse, error)
func (*InMemoryCheckpointStore) LoadAll ¶
func (s *InMemoryCheckpointStore) LoadAll(_ context.Context, dagID string) (map[string]*schema.RunResponse, error)
func (*InMemoryCheckpointStore) Save ¶
func (s *InMemoryCheckpointStore) Save(_ context.Context, dagID, nodeID string, resp *schema.RunResponse) error
type InputMapFunc ¶
type InputMapFunc func(upstreamResults map[string]*schema.RunResponse) (*schema.RunRequest, error)
InputMapFunc maps upstream results to the current node's input.
type LoopNode ¶
type LoopNode struct {
Body Runner
Condition func(*schema.RunResponse) bool
MaxIters int
ConvergenceFunc func(prev, curr *schema.RunResponse) bool
}
LoopNode defines a loop with a body runner and termination conditions.
type Node ¶
type Node struct {
ID string
Runner Runner
Deps []string
InputMapper InputMapFunc
Optional bool
Condition func(upstreamResults map[string]*schema.RunResponse) bool
Timeout time.Duration // Per-node execution timeout (0 = no limit).
Retries int // Max retry count on failure (0 = no retry).
ResourceTags []string // Resource tags for concurrency/rate control.
Priority int // Scheduling priority (higher = more priority).
}
Node is a single node in a DAG execution graph.
type NodeStatus ¶
type NodeStatus int
NodeStatus represents the execution status of a node.
const ( NodePending NodeStatus = iota NodeRunning NodeDone NodeFailed NodeSkipped NodeCompensated )
type NodeTimeline ¶
type NodeTimeline struct {
NodeID string `json:"node_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration_ns"`
Status NodeStatus `json:"status"`
}
NodeTimeline records the execution timing of a single node.
type Runner ¶
type Runner interface {
Run(ctx context.Context, req *schema.RunRequest) (*schema.RunResponse, error)
}
Runner executes a unit of work. agent.Agent satisfies this interface.