graph

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package graph is the core graph execution engine.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrGraphNotCompiled = errors.New("graph not compiled")
	ErrGraphEmpty       = errors.New("graph has no nodes")
	ErrNodeNotFound     = errors.New("node not found")
	ErrEntryNotSet      = errors.New("entry point not set")
	ErrMaxIterations    = errors.New("max iterations reached")
	ErrCircuitOpen      = errors.New("circuit breaker is open")
	ErrBulkheadFull     = errors.New("bulkhead concurrency limit reached")
	ErrValidation       = errors.New("validation failed")
)

Functions

func IsRetryable

func IsRetryable(err error) bool

IsRetryable reports whether err (or any in its chain) is retryable.

Types

type Branch

type Branch struct {
	When string `yaml:"when"` // "default" = fallback
	To   string `yaml:"to"`
}

Branch is one arm of a conditional edge.

type CheckpointManager

type CheckpointManager = checkpoint.Manager

CheckpointManager is the persistence interface used by Engine.Run. checkpoint.InMemoryManager and checkpoint.FileManager both satisfy this interface.

type Condition

type Condition[S any] struct {
	If     func(ctx context.Context, state S) bool
	Target string
}

Condition is one arm of a conditional edge. If If is nil the edge acts as the fallback (default) path.

type Config

type Config struct {
	Name    string    `yaml:"name"`
	Version string    `yaml:"version"`
	Entry   string    `yaml:"entry"`
	Include []string  `yaml:"include,omitempty"`
	Nodes   []NodeDef `yaml:"nodes"`
	Edges   []EdgeDef `yaml:"edges"`
	Loops   []LoopDef `yaml:"loops,omitempty"`
}

Config is the top-level structure for a YAML workflow definition.

type EdgeDef

type EdgeDef struct {
	From      OneOrMany `yaml:"from"`
	To        OneOrMany `yaml:"to,omitempty"`
	Condition string    `yaml:"condition,omitempty"`
	Branches  []Branch  `yaml:"branches,omitempty"`
	Merge     string    `yaml:"merge,omitempty"`
}

EdgeDef describes an edge (or fan-out/fan-in) in the config.

type Engine

type Engine[S any] struct {
	// contains filtered or unexported fields
}

Engine executes a compiled Graph.

func NewEngine

func NewEngine[S any](g *Graph[S]) *Engine[S]

NewEngine creates an engine for the given compiled graph.

func (*Engine[S]) Run

func (e *Engine[S]) Run(ctx context.Context, initialState S, opts ...Option) (*ExecutionResult[S], error)

Run executes the graph from its entry point and returns the execution result. The graph must have been compiled with Compile before calling Run.

type ExecutionResult

type ExecutionResult[S any] struct {
	FinalState    S                 `json:"final_state"`
	GraphName     string            `json:"graph_name"`
	ExecutionID   string            `json:"execution_id"`
	StartTime     time.Time         `json:"start_time"`
	EndTime       time.Time         `json:"end_time"`
	Termination   TerminationReason `json:"termination"`
	Error         error             `json:"error,omitempty"`
	NodeCount     int               `json:"node_count"`
	TotalNodes    int               `json:"total_nodes"`
	TotalSteps    int               `json:"total_steps"`
	TotalDuration time.Duration     `json:"total_duration"`
	CheckpointID  string            `json:"checkpoint_id,omitempty"`
	TraceID       string            `json:"trace_id,omitempty"`
	SpanID        string            `json:"span_id,omitempty"`
}

ExecutionResult is returned by Engine.Run.

type Graph

type Graph[S any] struct {
	// contains filtered or unexported fields
}

Graph is a type-safe, compiled execution graph. Build it with the Add*/Set* methods, then call Compile before passing to NewEngine.

func LoadFromFile

func LoadFromFile[S any](path string, reg *Registry[S]) (*Graph[S], error)

LoadFromFile loads a graph from a YAML config file and the provided registry. TODO(P1): implement — requires gopkg.in/yaml.v3; use programmatic API for now.

func NewGraph

func NewGraph[S any](name string) *Graph[S]

NewGraph creates an empty, uncompiled graph.

func (*Graph[S]) AddCondition

func (g *Graph[S]) AddCondition(from string, c Condition[S])

AddCondition adds a conditional edge from from. Conditions are evaluated in the order they were added; first match wins. A nil If acts as the fallback.

func (*Graph[S]) AddEdge

func (g *Graph[S]) AddEdge(from, to string)

AddEdge adds an unconditional edge from → to.

func (*Graph[S]) AddNode

func (g *Graph[S]) AddNode(name string, fn NodeFunc[S])

AddNode registers fn under name. Must be called before Compile.

func (*Graph[S]) Compile

func (g *Graph[S]) Compile() error

Compile validates the graph structure and detects back edges. Must be called once before passing the graph to NewEngine.

func (*Graph[S]) DOT

func (g *Graph[S]) DOT() string

DOT returns a Graphviz DOT string useful for debugging and visualisation.

func (*Graph[S]) MarkTerminalNode

func (g *Graph[S]) MarkTerminalNode(node string, reason TerminationReason)

MarkTerminalNode marks node as a terminal node that stops execution with reason.

func (*Graph[S]) SetEntryPoint

func (g *Graph[S]) SetEntryPoint(name string)

SetEntryPoint designates the node where execution begins.

func (*Graph[S]) SetLoopExit

func (g *Graph[S]) SetLoopExit(node string, fn func(ctx context.Context, state S) bool)

SetLoopExit sets the exit predicate for a loop node. The predicate receives state just before re-entry; returning false exits the loop.

func (*Graph[S]) SetMaxIterations

func (g *Graph[S]) SetMaxIterations(node string, n int)

SetMaxIterations caps the number of times node may be re-entered via a back edge. Default: 1000.

func (*Graph[S]) SetMergeFunc

func (g *Graph[S]) SetMergeFunc(node string, fn MergeFunc[S])

SetMergeFunc sets the merge function called when multiple branches converge on node.

func (*Graph[S]) SetNodeTimeout

func (g *Graph[S]) SetNodeTimeout(name string, d time.Duration)

SetNodeTimeout sets a per-invocation deadline for name.

type GraphError

type GraphError struct {
	GraphName   string
	ExecutionID string
	Cause       error
}

GraphError is the top-level error returned by Engine.Run.

func (*GraphError) Error

func (e *GraphError) Error() string

func (*GraphError) Unwrap

func (e *GraphError) Unwrap() error

type Hook

type Hook[S any] interface {
	OnGraphStart(ctx context.Context, graphName string, state S)
	OnGraphEnd(ctx context.Context, graphName string, state S, err error)
	OnNodeStart(ctx context.Context, nodeName string, state S)
	OnNodeEnd(ctx context.Context, nodeName string, state S, err error, duration time.Duration)
	OnRetry(ctx context.Context, nodeName string, attempt int, lastErr error)
}

Hook receives lifecycle events from the engine.

func ComposeHooks

func ComposeHooks[S any](hooks ...Hook[S]) Hook[S]

ComposeHooks combines multiple hooks into one; calls are forwarded in order.

type LoopDef

type LoopDef struct {
	Node          string `yaml:"node"`
	MaxIterations int    `yaml:"max_iterations,omitempty"`
	ExitCondition string `yaml:"exit_condition,omitempty"`
}

LoopDef configures loop behaviour for a node.

type MergeFunc

type MergeFunc[S any] func(ctx context.Context, parent S, branches []S) (S, error)

MergeFunc merges parallel branch states into one for fan-in nodes.

type MultiError

type MultiError struct{ Errs []error }

MultiError aggregates errors from parallel branches.

func (*MultiError) Append

func (e *MultiError) Append(err error)

Append adds a non-nil error to the collection.

func (*MultiError) Error

func (e *MultiError) Error() string

func (*MultiError) ToError

func (e *MultiError) ToError() error

ToError returns nil if no errors were collected.

type NodeDef

type NodeDef struct {
	Name    string    `yaml:"name"`
	Type    string    `yaml:"type"`
	Timeout string    `yaml:"timeout,omitempty"`
	Retry   *RetryDef `yaml:"retry,omitempty"`
}

NodeDef describes a single node in the config.

type NodeError

type NodeError struct {
	NodeName string
	Attempt  int
	Cause    error
}

NodeError wraps an error produced by a specific node execution.

func (*NodeError) Error

func (e *NodeError) Error() string

func (*NodeError) Unwrap

func (e *NodeError) Unwrap() error

type NodeFunc

type NodeFunc[S any] func(ctx context.Context, state S) (S, error)

NodeFunc is the signature every node must implement.

type OTelHook

type OTelHook[S any] struct{}

OTelHook is a Hook implementation that emits OpenTelemetry spans and metrics.

TODO(P7): implement using go.opentelemetry.io/otel

spans: graph.graph.<name>, graph.node.<name>, graph.checkpoint.*
metrics: graph.node.duration_ms, graph.node.executions, graph.graph.duration_ms

type OneOrMany

type OneOrMany []string

OneOrMany deserialises a YAML field that can be a single string or a list.

type Option

type Option func(*runConfig)

Option configures a single Run call.

func WithCheckpoint

func WithCheckpoint(mgr CheckpointManager) Option

WithCheckpoint enables checkpoint persistence via mgr.

func WithCheckpointFreq

func WithCheckpointFreq(n int) Option

WithCheckpointFreq sets how often (every N nodes) the engine saves a checkpoint.

func WithHook

func WithHook[S any](h Hook[S]) Option

WithHook attaches a lifecycle hook to this Run.

func WithMaxConcurrency

func WithMaxConcurrency(n int) Option

WithMaxConcurrency caps the goroutines used for parallel branches.

func WithPanicRecovery

func WithPanicRecovery(enabled bool) Option

WithPanicRecovery auto-wraps every node with panic recovery.

func WithResumeFrom

func WithResumeFrom(checkpointID string) Option

WithResumeFrom resumes execution from checkpointID. Pass "" to use the latest checkpoint for this graph.

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout sets a global deadline for the entire graph execution.

type PanicError

type PanicError struct {
	NodeName string
	Value    any
	Stack    []byte
}

PanicError wraps a recovered panic value.

func (*PanicError) Error

func (e *PanicError) Error() string

type Registry

type Registry[S any] struct {
	// contains filtered or unexported fields
}

Registry holds all named components referenced by config files.

func NewRegistry

func NewRegistry[S any]() *Registry[S]

NewRegistry creates an empty registry.

func (*Registry[S]) RegisterCondition

func (r *Registry[S]) RegisterCondition(name string, fn func(ctx context.Context, state S) string)

RegisterCondition registers a condition function; it returns the branch name to follow.

func (*Registry[S]) RegisterExitCondition

func (r *Registry[S]) RegisterExitCondition(name string, fn func(ctx context.Context, state S) bool)

RegisterExitCondition registers a loop exit predicate (false = exit loop).

func (*Registry[S]) RegisterMerge

func (r *Registry[S]) RegisterMerge(name string, fn MergeFunc[S])

RegisterMerge registers a merge function for fan-in nodes.

func (*Registry[S]) RegisterNode

func (r *Registry[S]) RegisterNode(typeName string, fn NodeFunc[S])

RegisterNode registers a node implementation under typeName.

type RetryDef

type RetryDef struct {
	MaxAttempts int    `yaml:"max_attempts"`
	Backoff     string `yaml:"backoff,omitempty"`
	MaxBackoff  string `yaml:"max_backoff,omitempty"`
}

RetryDef holds retry policy parameters.

type Retryable

type Retryable struct{ Cause error }

Retryable wraps an error to signal that the engine should retry the node.

func (*Retryable) Error

func (e *Retryable) Error() string

func (*Retryable) Unwrap

func (e *Retryable) Unwrap() error

type StateDeepCopier

type StateDeepCopier[S any] interface {
	DeepCopy() S
}

StateDeepCopier is optionally implemented by S for parallel branch isolation. Default implementation uses a gob round-trip.

type StateSerializer

type StateSerializer interface {
	MarshalState() ([]byte, error)
	UnmarshalState([]byte) error
}

StateSerializer is optionally implemented by S to customise checkpoint encoding. Default implementation uses gob.

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream is a generic, closeable channel wrapper for streaming node output.

func Broadcast

func Broadcast[T any](ctx context.Context, in <-chan T, n int) []*Stream[T]

Broadcast fans-out one channel into n Streams. Each stream receives every value; all streams close when in closes or ctx is cancelled.

func Merge

func Merge[T any](ctx context.Context, streams ...<-chan T) *Stream[T]

Merge fans-in multiple channels into a single Stream. The returned stream closes once all inputs are drained or ctx is cancelled.

func NewStream

func NewStream[T any](buf int) *Stream[T]

NewStream creates a Stream with the given channel buffer size.

func (*Stream[T]) Chan

func (s *Stream[T]) Chan() <-chan T

Chan returns the read-only channel consumers read from.

func (*Stream[T]) Close

func (s *Stream[T]) Close()

Close closes the stream normally.

func (*Stream[T]) CloseWithError

func (s *Stream[T]) CloseWithError(err error)

CloseWithError closes the stream and records an error retrievable via Err.

func (*Stream[T]) Err

func (s *Stream[T]) Err() error

Err returns the error passed to CloseWithError, or nil for a clean close.

func (*Stream[T]) Send

func (s *Stream[T]) Send(v T) bool

Send sends a value into the stream. Returns false if the stream is already closed.

type TerminationReason

type TerminationReason string

TerminationReason describes why a graph execution ended.

const (
	TerminationCompleted TerminationReason = "completed"
	TerminationCancelled TerminationReason = "cancelled"
	TerminationError     TerminationReason = "error"
	TerminationTimeout   TerminationReason = "timeout"
	TerminationMaxSteps  TerminationReason = "max_steps"
)

Directories

Path Synopsis
redis
Package redis provides a Redis-backed checkpoint.Manager.
Package redis provides a Redis-backed checkpoint.Manager.
sqlite
Package sqlite provides a SQLite-backed checkpoint.Manager.
Package sqlite provides a SQLite-backed checkpoint.Manager.
Package node provides reusable built-in node implementations for the graph engine.
Package node provides reusable built-in node implementations for the graph engine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL