Documentation
¶
Overview ¶
Package flowy provides a type-safe directed graph engine for orchestrating AI agents with support for conditional routing, parallel execution (fan-out/fan-in), checkpointing, and human-in-the-loop interrupts.
State updates can be full replace (simple types) or merge/delta (complex state); see the README section "State Management Patterns" for the recommended approach.
Example:
ctx := context.Background()
b := flowy.NewGraph[string](func(_, u string) string { return u })
b.AddNode("greet", func(ctx context.Context, s string) (string, error) { return "hello " + s, nil })
b.SetEntryPoint("greet")
b.SetFinishPoint("greet")
graph, _ := b.Compile()
result, _, err := graph.Invoke(ctx, "world")
if err != nil {
// handle error
}
Example (ConditionalEdges) ¶
Example_conditionalEdges shows a graph with a conditional edge: the router chooses the next node from state. Two Invoke calls with different state demonstrate different paths.
reducer := func(_, update string) string { return update }
b := NewGraph[string](reducer)
b.AddNode("start", func(_ context.Context, s string) (string, error) { return s + "[start]", nil })
b.AddNode("left", func(_ context.Context, s string) (string, error) { return s + "[left]", nil })
b.AddNode("right", func(_ context.Context, s string) (string, error) { return s + "[right]", nil })
b.AddConditionalEdge("start", func(_ context.Context, s string) (string, error) {
if len(s) > 0 && s[0] == 'R' {
return "right", nil
}
return "left", nil
})
b.SetEntryPoint("start")
b.SetFinishPoint("left")
b.SetFinishPoint("right")
graph, err := b.Compile()
if err != nil {
fmt.Println("compile error:", err)
return
}
ctx := context.Background()
out1, _, _ := graph.Invoke(ctx, "")
out2, _, _ := graph.Invoke(ctx, "R")
fmt.Println(out1)
fmt.Println(out2)
Output: [start][left] R[start][right]
Example (LinearGraph) ¶
Example_linearGraph demonstrates minimal graph construction: reducer, two nodes, one edge, entry and finish points, and Compile.
reducer := func(_, update string) string { return update }
b := NewGraph[string](reducer)
b.AddNode("a", func(_ context.Context, s string) (string, error) { return s + "a", nil })
b.AddNode("b", func(_ context.Context, s string) (string, error) { return s + "b", nil })
b.AddEdge("a", "b")
b.SetEntryPoint("a")
b.SetFinishPoint("b")
graph, err := b.Compile()
if err != nil {
fmt.Println("compile error:", err)
return
}
out, _, err := graph.Invoke(context.Background(), "")
if err != nil {
fmt.Println("invoke error:", err)
return
}
fmt.Println(out)
Output: ab
Example (MermaidExport) ¶
Example_mermaidExport builds a small graph and exports it to Mermaid flowchart syntax. Use this to log or inspect the graph structure before running it.
reducer := func(_, update string) string { return update }
b := NewGraph[string](reducer)
b.AddNode("a", func(_ context.Context, s string) (string, error) { return s, nil })
b.AddNode("b", func(_ context.Context, s string) (string, error) { return s, nil })
b.AddEdge("a", "b")
b.SetEntryPoint("a")
b.SetFinishPoint("b")
graph, err := b.Compile()
if err != nil {
fmt.Println("compile error:", err)
return
}
mermaid := graph.ExportMermaid()
fmt.Print(mermaid)
Output: flowchart TD a --> b
Index ¶
- Variables
- type BuildOption
- type Checkpoint
- type ConditionalEdge
- type DynamicRouter
- type Graph
- func (g *Graph[T]) AsNode() Node[T]
- func (g *Graph[T]) ExportMermaid() string
- func (g *Graph[T]) Invoke(ctx context.Context, state T) (T, *Checkpoint, error)
- func (g *Graph[T]) Resume(ctx context.Context, state T, cp *Checkpoint) (T, *Checkpoint, error)
- func (g *Graph[T]) ResumeStream(ctx context.Context, state T, cp *Checkpoint) iter.Seq2[Step[T], error]
- func (g *Graph[T]) Stream(ctx context.Context, state T) iter.Seq2[Step[T], error]
- type GraphBuilder
- func (b *GraphBuilder[T]) AddConditionalEdge(from string, router ConditionalEdge[T]) *GraphBuilder[T]
- func (b *GraphBuilder[T]) AddDynamicFanOut(from string, router DynamicRouter[T], joinNode string) *GraphBuilder[T]
- func (b *GraphBuilder[T]) AddEdge(from, to string) *GraphBuilder[T]
- func (b *GraphBuilder[T]) AddFanOut(from string, targets []string, joinNode string) *GraphBuilder[T]
- func (b *GraphBuilder[T]) AddNode(name string, fn Node[T]) *GraphBuilder[T]
- func (b *GraphBuilder[T]) Compile(opts ...BuildOption[T]) (*Graph[T], error)
- func (b *GraphBuilder[T]) SetEntryPoint(name string) *GraphBuilder[T]
- func (b *GraphBuilder[T]) SetFinishPoint(name string) *GraphBuilder[T]
- func (b *GraphBuilder[T]) Use(mw ...Middleware[T]) *GraphBuilder[T]
- type Middleware
- type Node
- type NodeHandler
- type Reducer
- type Step
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSuspend is returned when a node suspends execution (e.g. human-in-the-loop). Invoke returns (state, checkpoint, ErrSuspend). ErrSuspend = errors.New("flowy: suspend execution") // ErrMaxStepsExceeded is returned when the step limit is reached (e.g. infinite loop). ErrMaxStepsExceeded = errors.New("flowy: max steps exceeded") )
Sentinel errors for flow control and validation.
Functions ¶
This section is empty.
Types ¶
type BuildOption ¶ added in v0.3.5
type BuildOption[T any] func(*buildOpts[T])
BuildOption configures the graph at compile time. Generic over state type T so WithMiddleware can be typed.
func WithMaxConcurrency ¶ added in v0.2.1
func WithMaxConcurrency[T any](n int) BuildOption[T]
WithMaxConcurrency sets the maximum number of goroutines during a fan-out. If <= 0, no limit.
func WithMaxSteps ¶
func WithMaxSteps[T any](limit int) BuildOption[T]
WithMaxSteps sets the maximum number of steps (prevents infinite loops). Default is 1000 if <= 0.
func WithMiddleware ¶ added in v0.3.6
func WithMiddleware[T any](mw Middleware[T]) BuildOption[T]
WithMiddleware adds a node-level interceptor at compile time. Can be combined with Use() middlewares.
func WithNodeTimeout ¶
func WithNodeTimeout[T any](d time.Duration) BuildOption[T]
WithNodeTimeout sets a timeout for each node execution.
type Checkpoint ¶
type Checkpoint struct {
NextNode string // Name of the node to run next when resuming
}
Checkpoint holds the node name to resume from (v2). When a graph returns ErrSuspend, Invoke returns (state, &Checkpoint{NextNode: nodeName}, ErrSuspend). The caller persists state and this checkpoint; Resume(ctx, state, cp) continues from cp.NextNode.
type ConditionalEdge ¶
ConditionalEdge is a routing function that decides which node to execute next.
type DynamicRouter ¶ added in v0.2.0
DynamicRouter decides at runtime which nodes should be executed in parallel (dynamic fan-out). Returned node names must be registered nodes.
type Graph ¶
type Graph[T any] struct { // contains filtered or unexported fields }
Graph is the compiled, immutable graph. Created only via GraphBuilder.Compile.
func (*Graph[T]) AsNode ¶
AsNode returns a node that runs this graph (for composition when state types match).
func (*Graph[T]) ExportMermaid ¶
ExportMermaid returns a Mermaid flowchart (TD) representation of the graph. Output is deterministic (keys sorted) for stable snapshots and documentation. Node names that sanitize to the same ID get unique suffixes to avoid diagram collisions.
func (*Graph[T]) Invoke ¶
func (g *Graph[T]) Invoke(ctx context.Context, state T) (T, *Checkpoint, error)
Invoke runs the graph to completion. It returns (finalState, nil, nil) on success, or (state, &Checkpoint{NextNode: node}, ErrSuspend) when a node suspends.
func (*Graph[T]) Resume ¶
func (g *Graph[T]) Resume(ctx context.Context, state T, cp *Checkpoint) (T, *Checkpoint, error)
Resume continues execution from cp.NextNode. Pass the state and checkpoint returned by a previous Invoke that ended with ErrSuspend.
func (*Graph[T]) ResumeStream ¶ added in v0.3.5
func (g *Graph[T]) ResumeStream(ctx context.Context, state T, cp *Checkpoint) iter.Seq2[Step[T], error]
ResumeStream continues execution from cp.NextNode with the given state. Same yielding contract as Stream.
func (*Graph[T]) Stream ¶
Stream runs the graph from the entry point and yields (Step, nil) after each successful node. On error or ErrSuspend, it yields one final (Step{}, err) and stops. Use: for step, err := range graph.Stream(ctx, state).
Example ¶
ctx := context.Background()
reducer := func(_, u string) string { return u }
b := NewGraph[string](reducer)
b.AddNode("a", func(_ context.Context, s string) (string, error) { return s + "a", nil })
b.AddNode("b", func(_ context.Context, s string) (string, error) { return s + "b", nil })
b.AddEdge("a", "b")
b.SetEntryPoint("a")
b.SetFinishPoint("b")
graph, err := b.Compile()
if err != nil {
return
}
for step, err := range graph.Stream(ctx, ".") {
if err != nil {
return
}
fmt.Println(step.NodeName, step.State)
}
Output: a .a b .ab
type GraphBuilder ¶
type GraphBuilder[T any] struct { // contains filtered or unexported fields }
GraphBuilder builds a graph before compilation. Fluent API.
func NewGraph ¶
func NewGraph[T any](reducer Reducer[T]) *GraphBuilder[T]
NewGraph creates a new graph builder with the given reducer.
Example ¶
ctx := context.Background()
reducer := func(_, u string) string { return u }
b := NewGraph[string](reducer)
b.AddNode("greet", func(_ context.Context, s string) (string, error) { return "hello " + s, nil })
b.AddNode("bye", func(_ context.Context, s string) (string, error) { return s + " bye", nil })
b.AddEdge("greet", "bye")
b.SetEntryPoint("greet")
b.SetFinishPoint("bye")
graph, err := b.Compile()
if err != nil {
return
}
out, _, _ := graph.Invoke(ctx, "world")
fmt.Println(out)
Output: hello world bye
func (*GraphBuilder[T]) AddConditionalEdge ¶
func (b *GraphBuilder[T]) AddConditionalEdge(from string, router ConditionalEdge[T]) *GraphBuilder[T]
AddConditionalEdge sets a router for the given node; the router returns the next node name.
func (*GraphBuilder[T]) AddDynamicFanOut ¶ added in v0.2.0
func (b *GraphBuilder[T]) AddDynamicFanOut(from string, router DynamicRouter[T], joinNode string) *GraphBuilder[T]
AddDynamicFanOut sets up dynamic parallel execution: when routing reaches 'from', the router is called to get target node names at runtime; those nodes run in parallel, then results are reduced and joinNode runs. joinNode must be a registered node (AddNode). 'from' is a routing label, not an executable node.
func (*GraphBuilder[T]) AddEdge ¶
func (b *GraphBuilder[T]) AddEdge(from, to string) *GraphBuilder[T]
AddEdge adds a static edge from -> to.
func (*GraphBuilder[T]) AddFanOut ¶
func (b *GraphBuilder[T]) AddFanOut(from string, targets []string, joinNode string) *GraphBuilder[T]
AddFanOut sets up parallel execution: when routing reaches 'from', all target nodes run in parallel, then results are reduced and joinNode executes. 'from' is a routing label, not an executable node. joinNode must be a registered node (AddNode); it cannot be a fan-out or dynamic fan-out source.
func (*GraphBuilder[T]) AddNode ¶
func (b *GraphBuilder[T]) AddNode(name string, fn Node[T]) *GraphBuilder[T]
AddNode registers a node by name. Returns the builder for chaining.
func (*GraphBuilder[T]) Compile ¶
func (b *GraphBuilder[T]) Compile(opts ...BuildOption[T]) (*Graph[T], error)
Compile validates the graph and returns an immutable Graph. BuildOptions set run config (e.g. WithMaxSteps) and optional middlewares (WithMiddleware).
func (*GraphBuilder[T]) SetEntryPoint ¶
func (b *GraphBuilder[T]) SetEntryPoint(name string) *GraphBuilder[T]
SetEntryPoint sets the node where execution starts.
func (*GraphBuilder[T]) SetFinishPoint ¶
func (b *GraphBuilder[T]) SetFinishPoint(name string) *GraphBuilder[T]
SetFinishPoint marks a node as a valid terminal (execution stops when reached).
func (*GraphBuilder[T]) Use ¶ added in v0.2.0
func (b *GraphBuilder[T]) Use(mw ...Middleware[T]) *GraphBuilder[T]
Use adds middlewares that wrap every node at compile time (first added runs first).
type Middleware ¶ added in v0.2.0
type Middleware[T any] func(ctx context.Context, state T, nodeName string, next NodeHandler[T]) (T, error)
Middleware is an interceptor that wraps node execution. It receives ctx, state, node name, and the next handler. Use it for logging, tracing, or metrics without changing business logic.
type Node ¶
Node is the basic computation unit. It receives the current state and returns the updated state (or a delta to be merged by the Reducer).
func SubgraphNode ¶ added in v0.3.5
func SubgraphNode[Parent, Sub any](sub *Graph[Sub], mapIn func(Parent) Sub, mapOut func(Parent, Sub) Parent) Node[Parent]
SubgraphNode returns a Node[Parent] that runs the subgraph with state mapped from Parent to Sub and back. Use it to embed a graph with a different (e.g. nested) state type into a parent graph. mapIn extracts the sub-state from the parent state; mapOut merges the final sub-state into the parent. If the subgraph returns ErrSuspend, it is propagated; the top-level checkpoint will reference this SubgraphNode.
type NodeHandler ¶ added in v0.3.6
NodeHandler is the standard node signature; it is the same as Node and used for clarity in middleware contracts.
type Reducer ¶
type Reducer[T any] func(current T, update T) T
Reducer defines how to merge the current state with the update returned by a node.
type Step ¶ added in v0.3.5
type Step[T any] struct { State T // State after the node run NodeName string // Name of the node that just completed }
Step describes the graph state after one node execution, yielded by Stream and ResumeStream. Each successful node run produces one Step; the iterator stops after an error or ErrSuspend.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
hitl_agent
command
Package main demonstrates Human-in-the-Loop (v2): "approve" node returns ErrSuspend; caller persists state and checkpoint (e.g.
|
Package main demonstrates Human-in-the-Loop (v2): "approve" node returns ErrSuspend; caller persists state and checkpoint (e.g. |
|
multi_agent
command
Package main shows composition: the seller graph embeds the analyst graph as a node via AsNode(), so the inner graph runs with the same state type.
|
Package main shows composition: the seller graph embeds the analyst graph as a node via AsNode(), so the inner graph runs with the same state type. |
|
react_agent
command
Package main runs a minimal ReAct-style agent: reason -> tools -> reason -> finish (with cycle).
|
Package main runs a minimal ReAct-style agent: reason -> tools -> reason -> finish (with cycle). |
|
streaming_agent
command
Package main demonstrates token streaming: flowy.Stream emits graph state events, while LLM token streaming is done via a channel passed through context.
|
Package main demonstrates token streaming: flowy.Stream emits graph state events, while LLM token streaming is done via a channel passed through context. |
|
subgraph_agent
command
Package main demonstrates embedding a subgraph via SubgraphNode: parent state has a nested sub-state, mapIn/mapOut adapt between parent and sub; the subgraph runs as one node in the parent graph.
|
Package main demonstrates embedding a subgraph via SubgraphNode: parent state has a nested sub-state, mapIn/mapOut adapt between parent and sub; the subgraph runs as one node in the parent graph. |
|
Package testutil provides test helpers for flowy (e.g.
|
Package testutil provides test helpers for flowy (e.g. |