graph

package
v0.0.0-...-3613e4e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package graph provides deterministic DAG orchestration for multi-agent pipelines.

Orchestration is always deterministic Go code — routing conditions are pure functions, never LLM-decided flow control.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Branch

type Branch struct {
	Name  string
	Agent agent.Agent
}

Branch describes one branch in a fan-out node.

type BranchResult

type BranchResult struct {
	Name    string
	Session *session.Session
	Result  agent.StepResult
}

BranchResult is the gathered outcome for one fan-out branch.

type Checkpoint

type Checkpoint struct {
	GraphID     string
	SessionID   string
	NextNode    string
	Steps       int
	LastEventID string
	Usage       llm.Usage
	Result      agent.StepResult
	Completed   bool
}

Checkpoint captures the durable resume point for a graph execution.

type CheckpointStore

type CheckpointStore interface {
	Load(ctx context.Context, graphID, sessionID string) (*Checkpoint, error)
	Save(ctx context.Context, checkpoint Checkpoint) error
	Clear(ctx context.Context, graphID, sessionID string) error
}

CheckpointStore persists graph superstep progress between node turns.

type Edge

type Edge struct {
	From      string
	To        string
	Condition func(result agent.StepResult) bool
}

Edge connects two nodes in the graph. The Condition is evaluated on the StepResult of the From agent to decide whether to follow this edge. Conditions must be pure Go — no LLM calls.

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph is a directed acyclic graph of agents with conditional routing. Execution starts at the entry node and follows edges whose conditions are satisfied by each agent's StepResult. Stops at a terminal node (no outgoing edge satisfied) or when the context is cancelled.

func New

func New(id, entry string) *Graph

New creates an empty Graph with the given ID and entry agent ID.

func (*Graph) AddEdge

func (g *Graph) AddEdge(from, to string, condition func(agent.StepResult) bool)

AddEdge adds a conditional edge between two nodes. If no condition is provided, the edge is unconditional (always followed).

func (*Graph) AddNode

func (g *Graph) AddNode(a agent.Agent)

AddNode registers an agent as a node in the graph.

func (*Graph) ID

func (g *Graph) ID() string

ID returns the graph's unique identifier.

func (*Graph) Run

func (g *Graph) Run(ctx context.Context, sess *session.Session) (agent.StepResult, error)

Run executes the graph starting at the entry node. It follows edges whose conditions are satisfied by each StepResult. Returns the final StepResult (from the last agent to execute) and any error.

func (*Graph) SetCheckpointStore

func (g *Graph) SetCheckpointStore(store CheckpointStore)

SetCheckpointStore configures durable checkpoint persistence for graph runs.

func (*Graph) Step

func (g *Graph) Step(ctx context.Context, sess *session.Session) (agent.StepResult, error)

Step executes the graph pipeline. For a graph, Step and Turn are equivalent.

func (*Graph) StreamTurn

func (g *Graph) StreamTurn(
	ctx context.Context,
	sess *session.Session,
	chunkFn func(*llm.Chunk),
) (agent.StepResult, error)

StreamTurn executes the graph pipeline, relaying chunks from any streaming nodes.

func (*Graph) Turn

func (g *Graph) Turn(ctx context.Context, sess *session.Session) (agent.StepResult, error)

Turn executes the graph pipeline.

func (*Graph) Validate

func (g *Graph) Validate() error

Validate checks the graph for structural errors (missing nodes, cycles). Must be called before Run to ensure the graph is a valid DAG.

type JoinFunc

type JoinFunc func([]BranchResult) agent.StepResult

JoinFunc reduces all branch results into the single StepResult returned from the macro-graph node.

type LoopNode

type LoopNode struct {
	// contains filtered or unexported fields
}

LoopNode wraps an agent and allows bounded iteration inside a single graph node so the outer graph can remain acyclic.

func NewLoopNode

func NewLoopNode(
	a agent.Agent,
	maxIterations int,
	exitCondition func(agent.StepResult) bool,
) *LoopNode

NewLoopNode creates a node that can run the wrapped agent multiple times before yielding one final StepResult back to the macro-graph.

func (*LoopNode) ID

func (n *LoopNode) ID() string

func (*LoopNode) Step

func (n *LoopNode) Step(ctx context.Context, sess *session.Session) (agent.StepResult, error)

func (*LoopNode) StreamTurn

func (n *LoopNode) StreamTurn(
	ctx context.Context,
	sess *session.Session,
	chunkFn func(*llm.Chunk),
) (agent.StepResult, error)

func (*LoopNode) Turn

func (n *LoopNode) Turn(ctx context.Context, sess *session.Session) (agent.StepResult, error)

type ParallelNode

type ParallelNode struct {
	// contains filtered or unexported fields
}

ParallelNode runs multiple branch agents concurrently against forked child sessions, then joins their results back in branch declaration order.

func NewParallelNode

func NewParallelNode(id string, branches []Branch, join JoinFunc) *ParallelNode

NewParallelNode creates a graph node that scatters work across concurrent branches and joins the results into one StepResult.

func (*ParallelNode) ID

func (n *ParallelNode) ID() string

func (*ParallelNode) Step

func (*ParallelNode) Turn

type SQLiteCheckpointStore

type SQLiteCheckpointStore struct {
	// contains filtered or unexported fields
}

SQLiteCheckpointStore persists graph checkpoints in SQLite.

func NewSQLiteCheckpointStore

func NewSQLiteCheckpointStore(dsn string) (*SQLiteCheckpointStore, error)

NewSQLiteCheckpointStore creates a new checkpoint store backed by SQLite.

func (*SQLiteCheckpointStore) Clear

func (s *SQLiteCheckpointStore) Clear(ctx context.Context, graphID, sessionID string) error

func (*SQLiteCheckpointStore) Load

func (s *SQLiteCheckpointStore) Load(
	ctx context.Context,
	graphID, sessionID string,
) (*Checkpoint, error)

func (*SQLiteCheckpointStore) Save

func (s *SQLiteCheckpointStore) Save(ctx context.Context, checkpoint Checkpoint) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL