Documentation
¶
Overview ¶
Package graph is the core graph execution engine.
Index ¶
- Variables
- func IsRetryable(err error) bool
- type Branch
- type CheckpointManager
- type Condition
- type Config
- type EdgeDef
- type Engine
- type ExecutionResult
- type Graph
- func (g *Graph[S]) AddCondition(from string, c Condition[S])
- func (g *Graph[S]) AddEdge(from, to string)
- func (g *Graph[S]) AddNode(name string, fn NodeFunc[S])
- func (g *Graph[S]) Compile() error
- func (g *Graph[S]) DOT() string
- func (g *Graph[S]) MarkTerminalNode(node string, reason TerminationReason)
- func (g *Graph[S]) SetEntryPoint(name string)
- func (g *Graph[S]) SetLoopExit(node string, fn func(ctx context.Context, state S) bool)
- func (g *Graph[S]) SetMaxIterations(node string, n int)
- func (g *Graph[S]) SetMergeFunc(node string, fn MergeFunc[S])
- func (g *Graph[S]) SetNodeTimeout(name string, d time.Duration)
- type GraphError
- type Hook
- type LoopDef
- type MergeFunc
- type MultiError
- type NodeDef
- type NodeError
- type NodeFunc
- type OTelHook
- type OneOrMany
- type Option
- type PanicError
- type Registry
- func (r *Registry[S]) RegisterCondition(name string, fn func(ctx context.Context, state S) string)
- func (r *Registry[S]) RegisterExitCondition(name string, fn func(ctx context.Context, state S) bool)
- func (r *Registry[S]) RegisterMerge(name string, fn MergeFunc[S])
- func (r *Registry[S]) RegisterNode(typeName string, fn NodeFunc[S])
- type RetryDef
- type Retryable
- type StateDeepCopier
- type StateSerializer
- type Stream
- type TerminationReason
Constants ¶
This section is empty.
Variables ¶
var ( ErrGraphNotCompiled = errors.New("graph not compiled") ErrGraphEmpty = errors.New("graph has no nodes") ErrNodeNotFound = errors.New("node not found") ErrEntryNotSet = errors.New("entry point not set") ErrMaxIterations = errors.New("max iterations reached") ErrCircuitOpen = errors.New("circuit breaker is open") ErrBulkheadFull = errors.New("bulkhead concurrency limit reached") ErrValidation = errors.New("validation failed") )
Functions ¶
func IsRetryable ¶
IsRetryable reports whether err (or any in its chain) is retryable.
Types ¶
type CheckpointManager ¶
type CheckpointManager = checkpoint.Manager
CheckpointManager is the persistence interface used by Engine.Run. checkpoint.InMemoryManager and checkpoint.FileManager both satisfy this interface.
type Condition ¶
Condition is one arm of a conditional edge. If If is nil the edge acts as the fallback (default) path.
type Config ¶
type Config struct {
Name string `yaml:"name"`
Version string `yaml:"version"`
Entry string `yaml:"entry"`
Include []string `yaml:"include,omitempty"`
Nodes []NodeDef `yaml:"nodes"`
Edges []EdgeDef `yaml:"edges"`
Loops []LoopDef `yaml:"loops,omitempty"`
}
Config is the top-level structure for a YAML workflow definition.
type EdgeDef ¶
type EdgeDef struct {
From OneOrMany `yaml:"from"`
To OneOrMany `yaml:"to,omitempty"`
Condition string `yaml:"condition,omitempty"`
Branches []Branch `yaml:"branches,omitempty"`
Merge string `yaml:"merge,omitempty"`
}
EdgeDef describes an edge (or fan-out/fan-in) in the config.
type Engine ¶
type Engine[S any] struct { // contains filtered or unexported fields }
Engine executes a compiled Graph.
type ExecutionResult ¶
type ExecutionResult[S any] struct { FinalState S `json:"final_state"` GraphName string `json:"graph_name"` ExecutionID string `json:"execution_id"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` Termination TerminationReason `json:"termination"` Error error `json:"error,omitempty"` NodeCount int `json:"node_count"` TotalNodes int `json:"total_nodes"` TotalSteps int `json:"total_steps"` TotalDuration time.Duration `json:"total_duration"` CheckpointID string `json:"checkpoint_id,omitempty"` TraceID string `json:"trace_id,omitempty"` SpanID string `json:"span_id,omitempty"` }
ExecutionResult is returned by Engine.Run.
type Graph ¶
type Graph[S any] struct { // contains filtered or unexported fields }
Graph is a type-safe, compiled execution graph. Build it with the Add*/Set* methods, then call Compile before passing to NewEngine.
func LoadFromFile ¶
LoadFromFile loads a graph from a YAML config file and the provided registry. TODO(P1): implement — requires gopkg.in/yaml.v3; use programmatic API for now.
func (*Graph[S]) AddCondition ¶
AddCondition adds a conditional edge from from. Conditions are evaluated in the order they were added; first match wins. A nil If acts as the fallback.
func (*Graph[S]) Compile ¶
Compile validates the graph structure and detects back edges. Must be called once before passing the graph to NewEngine.
func (*Graph[S]) MarkTerminalNode ¶
func (g *Graph[S]) MarkTerminalNode(node string, reason TerminationReason)
MarkTerminalNode marks node as a terminal node that stops execution with reason.
func (*Graph[S]) SetEntryPoint ¶
SetEntryPoint designates the node where execution begins.
func (*Graph[S]) SetLoopExit ¶
SetLoopExit sets the exit predicate for a loop node. The predicate receives state just before re-entry; returning false exits the loop.
func (*Graph[S]) SetMaxIterations ¶
SetMaxIterations caps the number of times node may be re-entered via a back edge. Default: 1000.
func (*Graph[S]) SetMergeFunc ¶
SetMergeFunc sets the merge function called when multiple branches converge on node.
type GraphError ¶
GraphError is the top-level error returned by Engine.Run.
func (*GraphError) Error ¶
func (e *GraphError) Error() string
func (*GraphError) Unwrap ¶
func (e *GraphError) Unwrap() error
type Hook ¶
type Hook[S any] interface { OnGraphStart(ctx context.Context, graphName string, state S) OnGraphEnd(ctx context.Context, graphName string, state S, err error) OnNodeStart(ctx context.Context, nodeName string, state S) OnNodeEnd(ctx context.Context, nodeName string, state S, err error, duration time.Duration) OnRetry(ctx context.Context, nodeName string, attempt int, lastErr error) }
Hook receives lifecycle events from the engine.
func ComposeHooks ¶
ComposeHooks combines multiple hooks into one; calls are forwarded in order.
type LoopDef ¶
type LoopDef struct {
Node string `yaml:"node"`
MaxIterations int `yaml:"max_iterations,omitempty"`
ExitCondition string `yaml:"exit_condition,omitempty"`
}
LoopDef configures loop behaviour for a node.
type MultiError ¶
type MultiError struct{ Errs []error }
MultiError aggregates errors from parallel branches.
func (*MultiError) Append ¶
func (e *MultiError) Append(err error)
Append adds a non-nil error to the collection.
func (*MultiError) Error ¶
func (e *MultiError) Error() string
func (*MultiError) ToError ¶
func (e *MultiError) ToError() error
ToError returns nil if no errors were collected.
type NodeDef ¶
type NodeDef struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Timeout string `yaml:"timeout,omitempty"`
Retry *RetryDef `yaml:"retry,omitempty"`
}
NodeDef describes a single node in the config.
type OTelHook ¶
type OTelHook[S any] struct{}
OTelHook is a Hook implementation that emits OpenTelemetry spans and metrics.
TODO(P7): implement using go.opentelemetry.io/otel
spans: graph.graph.<name>, graph.node.<name>, graph.checkpoint.* metrics: graph.node.duration_ms, graph.node.executions, graph.graph.duration_ms
type OneOrMany ¶
type OneOrMany []string
OneOrMany deserialises a YAML field that can be a single string or a list.
type Option ¶
type Option func(*runConfig)
Option configures a single Run call.
func WithCheckpoint ¶
func WithCheckpoint(mgr CheckpointManager) Option
WithCheckpoint enables checkpoint persistence via mgr.
func WithCheckpointFreq ¶
WithCheckpointFreq sets how often (every N nodes) the engine saves a checkpoint.
func WithMaxConcurrency ¶
WithMaxConcurrency caps the goroutines used for parallel branches.
func WithPanicRecovery ¶
WithPanicRecovery auto-wraps every node with panic recovery.
func WithResumeFrom ¶
WithResumeFrom resumes execution from checkpointID. Pass "" to use the latest checkpoint for this graph.
func WithTimeout ¶
WithTimeout sets a global deadline for the entire graph execution.
type PanicError ¶
PanicError wraps a recovered panic value.
func (*PanicError) Error ¶
func (e *PanicError) Error() string
type Registry ¶
type Registry[S any] struct { // contains filtered or unexported fields }
Registry holds all named components referenced by config files.
func (*Registry[S]) RegisterCondition ¶
RegisterCondition registers a condition function; it returns the branch name to follow.
func (*Registry[S]) RegisterExitCondition ¶
func (r *Registry[S]) RegisterExitCondition(name string, fn func(ctx context.Context, state S) bool)
RegisterExitCondition registers a loop exit predicate (false = exit loop).
func (*Registry[S]) RegisterMerge ¶
RegisterMerge registers a merge function for fan-in nodes.
func (*Registry[S]) RegisterNode ¶
RegisterNode registers a node implementation under typeName.
type RetryDef ¶
type RetryDef struct {
MaxAttempts int `yaml:"max_attempts"`
Backoff string `yaml:"backoff,omitempty"`
MaxBackoff string `yaml:"max_backoff,omitempty"`
}
RetryDef holds retry policy parameters.
type Retryable ¶
type Retryable struct{ Cause error }
Retryable wraps an error to signal that the engine should retry the node.
type StateDeepCopier ¶
type StateDeepCopier[S any] interface { DeepCopy() S }
StateDeepCopier is optionally implemented by S for parallel branch isolation. Default implementation uses a gob round-trip.
type StateSerializer ¶
StateSerializer is optionally implemented by S to customise checkpoint encoding. Default implementation uses gob.
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream is a generic, closeable channel wrapper for streaming node output.
func Broadcast ¶
Broadcast fans-out one channel into n Streams. Each stream receives every value; all streams close when in closes or ctx is cancelled.
func Merge ¶
Merge fans-in multiple channels into a single Stream. The returned stream closes once all inputs are drained or ctx is cancelled.
func (*Stream[T]) Chan ¶
func (s *Stream[T]) Chan() <-chan T
Chan returns the read-only channel consumers read from.
func (*Stream[T]) CloseWithError ¶
CloseWithError closes the stream and records an error retrievable via Err.
type TerminationReason ¶
type TerminationReason string
TerminationReason describes why a graph execution ended.
const ( TerminationCompleted TerminationReason = "completed" TerminationCancelled TerminationReason = "cancelled" TerminationError TerminationReason = "error" TerminationTimeout TerminationReason = "timeout" TerminationMaxSteps TerminationReason = "max_steps" )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
redis
Package redis provides a Redis-backed checkpoint.Manager.
|
Package redis provides a Redis-backed checkpoint.Manager. |
|
sqlite
Package sqlite provides a SQLite-backed checkpoint.Manager.
|
Package sqlite provides a SQLite-backed checkpoint.Manager. |
|
Package node provides reusable built-in node implementations for the graph engine.
|
Package node provides reusable built-in node implementations for the graph engine. |