Documentation
¶
Overview ¶
Package graph provides deterministic DAG orchestration for multi-agent pipelines.
Orchestration is always deterministic Go code — routing conditions are pure functions, never LLM-decided flow control.
Index ¶
- type Branch
- type BranchResult
- type Checkpoint
- type CheckpointStore
- type Edge
- type Graph
- func (g *Graph) AddEdge(from, to string, condition func(agent.StepResult) bool)
- func (g *Graph) AddNode(a agent.Agent)
- func (g *Graph) ID() string
- func (g *Graph) Run(ctx context.Context, sess *session.Session) (agent.StepResult, error)
- func (g *Graph) SetCheckpointStore(store CheckpointStore)
- func (g *Graph) Step(ctx context.Context, sess *session.Session) (agent.StepResult, error)
- func (g *Graph) StreamTurn(ctx context.Context, sess *session.Session, chunkFn func(*llm.Chunk)) (agent.StepResult, error)
- func (g *Graph) Turn(ctx context.Context, sess *session.Session) (agent.StepResult, error)
- func (g *Graph) Validate() error
- type JoinFunc
- type LoopNode
- func (n *LoopNode) ID() string
- func (n *LoopNode) Step(ctx context.Context, sess *session.Session) (agent.StepResult, error)
- func (n *LoopNode) StreamTurn(ctx context.Context, sess *session.Session, chunkFn func(*llm.Chunk)) (agent.StepResult, error)
- func (n *LoopNode) Turn(ctx context.Context, sess *session.Session) (agent.StepResult, error)
- type ParallelNode
- type SQLiteCheckpointStore
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BranchResult ¶
type BranchResult struct {
Name string
Session *session.Session
Result agent.StepResult
}
BranchResult is the gathered outcome for one fan-out branch.
type Checkpoint ¶
type Checkpoint struct {
GraphID string
SessionID string
NextNode string
Steps int
LastEventID string
Usage llm.Usage
Result agent.StepResult
Completed bool
}
Checkpoint captures the durable resume point for a graph execution.
type CheckpointStore ¶
type CheckpointStore interface {
Load(ctx context.Context, graphID, sessionID string) (*Checkpoint, error)
Save(ctx context.Context, checkpoint Checkpoint) error
Clear(ctx context.Context, graphID, sessionID string) error
}
CheckpointStore persists graph superstep progress between node turns.
type Edge ¶
type Edge struct {
From string
To string
Condition func(result agent.StepResult) bool
}
Edge connects two nodes in the graph. The Condition is evaluated on the StepResult of the From agent to decide whether to follow this edge. Conditions must be pure Go — no LLM calls.
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
Graph is a directed acyclic graph of agents with conditional routing. Execution starts at the entry node and follows edges whose conditions are satisfied by each agent's StepResult. Stops at a terminal node (no outgoing edge satisfied) or when the context is cancelled.
func (*Graph) AddEdge ¶
func (g *Graph) AddEdge(from, to string, condition func(agent.StepResult) bool)
AddEdge adds a conditional edge between two nodes. If no condition is provided, the edge is unconditional (always followed).
func (*Graph) Run ¶
Run executes the graph starting at the entry node. It follows edges whose conditions are satisfied by each StepResult. Returns the final StepResult (from the last agent to execute) and any error.
func (*Graph) SetCheckpointStore ¶
func (g *Graph) SetCheckpointStore(store CheckpointStore)
SetCheckpointStore configures durable checkpoint persistence for graph runs.
func (*Graph) StreamTurn ¶
func (g *Graph) StreamTurn( ctx context.Context, sess *session.Session, chunkFn func(*llm.Chunk), ) (agent.StepResult, error)
StreamTurn executes the graph pipeline, relaying chunks from any streaming nodes.
type JoinFunc ¶
type JoinFunc func([]BranchResult) agent.StepResult
JoinFunc reduces all branch results into the single StepResult returned from the macro-graph node.
type LoopNode ¶
type LoopNode struct {
// contains filtered or unexported fields
}
LoopNode wraps an agent and allows bounded iteration inside a single graph node so the outer graph can remain acyclic.
func NewLoopNode ¶
func NewLoopNode( a agent.Agent, maxIterations int, exitCondition func(agent.StepResult) bool, ) *LoopNode
NewLoopNode creates a node that can run the wrapped agent multiple times before yielding one final StepResult back to the macro-graph.
func (*LoopNode) StreamTurn ¶
type ParallelNode ¶
type ParallelNode struct {
// contains filtered or unexported fields
}
ParallelNode runs multiple branch agents concurrently against forked child sessions, then joins their results back in branch declaration order.
func NewParallelNode ¶
func NewParallelNode(id string, branches []Branch, join JoinFunc) *ParallelNode
NewParallelNode creates a graph node that scatters work across concurrent branches and joins the results into one StepResult.
func (*ParallelNode) ID ¶
func (n *ParallelNode) ID() string
func (*ParallelNode) Step ¶
func (n *ParallelNode) Step(ctx context.Context, sess *session.Session) (agent.StepResult, error)
func (*ParallelNode) Turn ¶
func (n *ParallelNode) Turn(ctx context.Context, sess *session.Session) (agent.StepResult, error)
type SQLiteCheckpointStore ¶
type SQLiteCheckpointStore struct {
// contains filtered or unexported fields
}
SQLiteCheckpointStore persists graph checkpoints in SQLite.
func NewSQLiteCheckpointStore ¶
func NewSQLiteCheckpointStore(dsn string) (*SQLiteCheckpointStore, error)
NewSQLiteCheckpointStore creates a new checkpoint store backed by SQLite.
func (*SQLiteCheckpointStore) Clear ¶
func (s *SQLiteCheckpointStore) Clear(ctx context.Context, graphID, sessionID string) error
func (*SQLiteCheckpointStore) Load ¶
func (s *SQLiteCheckpointStore) Load( ctx context.Context, graphID, sessionID string, ) (*Checkpoint, error)
func (*SQLiteCheckpointStore) Save ¶
func (s *SQLiteCheckpointStore) Save(ctx context.Context, checkpoint Checkpoint) error