flowy

package module
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 10 Imported by: 0

README

Flowy

Go Reference

TL;DR — flowy is a library for building reliable, stateful AI agents and workflows in Go as directed graphs. It supports multi-agent flows, Suspend/Resume (HITL), state streaming via iterators, and Mermaid diagram export.

Features

  • Generics — strictly typed state T, no interface{} or map[string]any
  • Conditional edges — routing based on state (e.g. LLM decides next step)
  • Fan-out / fan-in — parallel execution with reducer-based merge (static AddFanOut or dynamic AddDynamicFanOut at runtime)
  • Middlewares — wrap nodes for logging, tracing, metrics without touching business logic
  • Suspend / Resume — a node returns ErrSuspend; caller persists state and Checkpoint; Resume(ctx, state, cp) continues
  • State streamingStream(ctx, state) returns iter.Seq2[Step[T], error] (Go 1.23+); consume with for step, err := range graph.Stream(...)
  • Composition — use a graph as a node (AsNode() for same state type, or SubgraphNode with mapIn/mapOut for nested state)

Requirements

  • Go 1.26+

Installation

go get github.com/skosovsky/flowy

Quick start

Minimal linear graph: state is a string, two nodes append to it, then run.

package main

import (
    "context"
    "github.com/skosovsky/flowy"
)

func main() {
    reducer := func(_, update string) string { return update }
    b := flowy.NewGraph[string](reducer)
    b.AddNode("a", func(ctx context.Context, s string) (string, error) { return s + "a", nil })
    b.AddNode("b", func(ctx context.Context, s string) (string, error) { return s + "b", nil })
    b.AddEdge("a", "b")
    b.SetEntryPoint("a")
    b.SetFinishPoint("b")

    graph, err := b.Compile()
    if err != nil {
        panic(err)
    }
    ctx := context.Background()
    out, _, err := graph.Invoke(ctx, "")
    if err != nil {
        panic(err)
    }
    // out == "ab"
}

State Management Patterns

The Reducer in flowy has a simple signature: func(current, update T) T. How you implement it depends on the complexity of your state.

1. Simple State (Full Replace)

If your state is a simple primitive (like a string or int), your nodes can just return the new absolute value, and your reducer simply returns the update:

func reducer(current, update string) string { return update }

For real-world agents, your state will likely be a complex struct containing chat history, token counters, and pending tool calls. Do not return a full copy of the state from your nodes. This often leads to bugs where one node accidentally overwrites another's data.

Instead, nodes should return only the fields that changed (a delta). The reducer is then responsible for safely merging these changes into the current state.

Example of a Merge Reducer:

type Message struct{ Text string }
type ToolCall struct{ Name string }

type AgentState struct {
    Messages    []Message
    ToolCalls   []ToolCall
    TotalTokens int
}

func mergeReducer(current, update AgentState) AgentState {
    // 1. Append slices instead of replacing
    if len(update.Messages) > 0 {
        current.Messages = append(current.Messages, update.Messages...)
    }

    // 2. Replace slices only if explicitly needed (e.g., clearing queue)
    if update.ToolCalls != nil {
        current.ToolCalls = update.ToolCalls
    }

    // 3. Sum counters
    if update.TotalTokens > 0 {
        current.TotalTokens += update.TotalTokens
    }

    return current
}

In this pattern, an LLM node that only generates a new message just returns AgentState{Messages: []Message{newMsg}}, and the reducer safely appends it without clearing the TotalTokens counter.

Key concepts

State

State has type T and is passed between nodes. Each node returns a delta (update); the reducer merges current state with that delta to produce the next state. Choose full replace for simple types or merge/delta for complex state (see State Management Patterns); see also Advanced State Management for a mutator pattern.

Nodes

A node is a function func(ctx context.Context, state T) (T, error): it receives context and current state, and returns the delta and an error. The runner applies the reducer to merge the delta into state and passes the result along the graph. On error, execution stops and the error is returned (or yielded as the second value when using Stream).

Edges and conditional edges
  • Edges (AddEdge(from, to)) define a fixed next node.
  • Conditional edges (AddConditionalEdge(from, router)) let a router function decide the next node from (ctx, state); the router returns the next node name.
Suspend / Resume (v2)

Execution is suspended when a node returns ErrSuspend (e.g. human-in-the-loop). Invoke returns (state, checkpoint, ErrSuspend); the caller persists the state and the Checkpoint (e.g. in a DB). To continue, call Resume(ctx, state, cp) with the saved state and checkpoint. The Checkpoint holds only NextNode (the node to run next); state is kept by the caller.

state, cp, err := graph.Invoke(ctx, initial)
if errors.Is(err, flowy.ErrSuspend) {
    // Persist state and cp (e.g. store.Save(ctx, "session_1", state, cp))
    // Later:
    loaded, cpLoaded, _ := store.Load(ctx, "session_1")
    final, _, err := graph.Resume(ctx, loaded, cpLoaded)
}

Build options

Options are set at Compile(opts...) and apply to all runs of that graph:

Option Description
WithMaxSteps(n) Max steps per run (prevents infinite loops; default 1000 if <= 0). Returns ErrMaxStepsExceeded when exceeded.
WithNodeTimeout(d) Timeout for each node execution; context is cancelled after d.
WithMaxConcurrency(n) Max concurrent goroutines in fan-out; n <= 0 means no limit.

Visualization (Mermaid)

You can export the compiled graph to Mermaid flowchart syntax for diagrams and debugging:

graph, _ := b.Compile()
mermaid := graph.ExportMermaid()
fmt.Println(mermaid) // flowchart TD\n  a --> b ...

Use this to log or inspect the graph structure before running it.

Errors

  • Panics — not recovered by the runner; a panic in a node will terminate execution.
  • ErrSuspend — returned when a node suspends execution (HITL). Invoke returns (state, checkpoint, ErrSuspend); continue with Resume(ctx, state, cp).
  • ErrMaxStepsExceeded — returned when the step limit (WithMaxSteps) is reached (e.g. infinite loop in the graph).

State streaming (Go 1.26+)

Stream(ctx, state) returns iter.Seq2[Step[T], error]. Each successful node yields a Step{State: state, NodeName: name}; on error or ErrSuspend the iterator yields one final (Step{}, err) and stops.

for step, err := range graph.Stream(ctx, state) {
    if err != nil {
        if errors.Is(err, flowy.ErrSuspend) { /* save state and step.NodeName for Resume */ }
        return err
    }
    fmt.Println(step.NodeName, step.State)
}

Middlewares

Use Use(mw...) to wrap every node (including fan-out targets) with cross-cutting logic. You can also add middlewares at compile time with Compile(flowy.WithMiddleware(mw)). The first middleware added runs first (outermost in the chain).

Middleware has the interceptor signature: it receives ctx, state, nodeName, and next (the next handler), and returns (state, error).

b := flowy.NewGraph[string](reducer)
b.AddNode("a", nodeA)
logMw := func(ctx context.Context, state string, nodeName string, next flowy.NodeHandler[string]) (string, error) {
    log.Println("before", nodeName)
    out, err := next(ctx, state)
    log.Println("after", nodeName)
    return out, err
}
b.Use(logMw)
// or pass at compile: graph, _ := b.Compile(flowy.WithMiddleware(logMw))

Fan-out (static and dynamic)

Static fan-out: AddFanOut(from, targets, joinNode) runs all nodes in targets in parallel, merges their results with the reducer in order, then continues at joinNode. joinNode must be a registered node (not a fan-out source).

Dynamic fan-out: when the set of branches is known only at runtime (e.g. from an LLM), use AddDynamicFanOut(from, router, joinNode). The router receives (ctx, state) and returns target node names. If it returns an empty list, execution goes straight to joinNode.

Limit concurrency: to avoid rate limits (e.g. HTTP 429) or resource exhaustion when running many branches, use WithMaxConcurrency(n) at compile time: Compile(flowy.WithMaxConcurrency(5)). It applies to Invoke, Stream, and Resume whenever a fan-out runs.

Advanced State Management (Mutation Slice Pattern)

To avoid a single giant reducer, you can keep state and apply small mutators returned by nodes:

type State struct {
    Messages []string
    Query    string
}
type StateUpdate func(*State)

reducer := func(c State, update StateUpdate) State {
    if update != nil {
        update(&c)
    }
    return c
}
b.AddNode("append", func(ctx context.Context, s State) (StateUpdate, error) {
    return func(st *State) { st.Messages = append(st.Messages, s.Query) }, nil
})

Development

make test    # run tests with race detector
make lint    # golangci-lint
make cover   # coverage report

License

See LICENSE.

Documentation

Overview

Package flowy provides a type-safe directed graph engine for orchestrating AI agents with support for conditional routing, parallel execution (fan-out/fan-in), checkpointing, and human-in-the-loop interrupts.

State updates can be full replace (simple types) or merge/delta (complex state); see the README section "State Management Patterns" for the recommended approach.

Example:

ctx := context.Background()
b := flowy.NewGraph[string](func(_, u string) string { return u })
b.AddNode("greet", func(ctx context.Context, s string) (string, error) { return "hello " + s, nil })
b.SetEntryPoint("greet")
b.SetFinishPoint("greet")
graph, _ := b.Compile()
result, _, err := graph.Invoke(ctx, "world")
if err != nil {
	// handle error
}
Example (ConditionalEdges)

Example_conditionalEdges shows a graph with a conditional edge: the router chooses the next node from state. Two Invoke calls with different state demonstrate different paths.

reducer := func(_, update string) string { return update }
b := NewGraph[string](reducer)
b.AddNode("start", func(_ context.Context, s string) (string, error) { return s + "[start]", nil })
b.AddNode("left", func(_ context.Context, s string) (string, error) { return s + "[left]", nil })
b.AddNode("right", func(_ context.Context, s string) (string, error) { return s + "[right]", nil })
b.AddConditionalEdge("start", func(_ context.Context, s string) (string, error) {
	if len(s) > 0 && s[0] == 'R' {
		return "right", nil
	}
	return "left", nil
})
b.SetEntryPoint("start")
b.SetFinishPoint("left")
b.SetFinishPoint("right")

graph, err := b.Compile()
if err != nil {
	fmt.Println("compile error:", err)
	return
}
ctx := context.Background()
out1, _, _ := graph.Invoke(ctx, "")
out2, _, _ := graph.Invoke(ctx, "R")
fmt.Println(out1)
fmt.Println(out2)
Output:

[start][left]
R[start][right]
Example (LinearGraph)

Example_linearGraph demonstrates minimal graph construction: reducer, two nodes, one edge, entry and finish points, and Compile.

reducer := func(_, update string) string { return update }
b := NewGraph[string](reducer)
b.AddNode("a", func(_ context.Context, s string) (string, error) { return s + "a", nil })
b.AddNode("b", func(_ context.Context, s string) (string, error) { return s + "b", nil })
b.AddEdge("a", "b")
b.SetEntryPoint("a")
b.SetFinishPoint("b")

graph, err := b.Compile()
if err != nil {
	fmt.Println("compile error:", err)
	return
}
out, _, err := graph.Invoke(context.Background(), "")
if err != nil {
	fmt.Println("invoke error:", err)
	return
}
fmt.Println(out)
Output:

ab
Example (MermaidExport)

Example_mermaidExport builds a small graph and exports it to Mermaid flowchart syntax. Use this to log or inspect the graph structure before running it.

reducer := func(_, update string) string { return update }
b := NewGraph[string](reducer)
b.AddNode("a", func(_ context.Context, s string) (string, error) { return s, nil })
b.AddNode("b", func(_ context.Context, s string) (string, error) { return s, nil })
b.AddEdge("a", "b")
b.SetEntryPoint("a")
b.SetFinishPoint("b")

graph, err := b.Compile()
if err != nil {
	fmt.Println("compile error:", err)
	return
}
mermaid := graph.ExportMermaid()
fmt.Print(mermaid)
Output:

flowchart TD
  a --> b

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrSuspend is returned when a node suspends execution (e.g. human-in-the-loop). Invoke returns (state, checkpoint, ErrSuspend).
	ErrSuspend = errors.New("flowy: suspend execution")
	// ErrMaxStepsExceeded is returned when the step limit is reached (e.g. infinite loop).
	ErrMaxStepsExceeded = errors.New("flowy: max steps exceeded")
)

Sentinel errors for flow control and validation.

Functions

This section is empty.

Types

type BuildOption added in v0.3.5

type BuildOption[T any] func(*buildOpts[T])

BuildOption configures the graph at compile time. Generic over state type T so WithMiddleware can be typed.

func WithMaxConcurrency added in v0.2.1

func WithMaxConcurrency[T any](n int) BuildOption[T]

WithMaxConcurrency sets the maximum number of goroutines during a fan-out. If <= 0, no limit.

func WithMaxSteps

func WithMaxSteps[T any](limit int) BuildOption[T]

WithMaxSteps sets the maximum number of steps (prevents infinite loops). Default is 1000 if <= 0.

func WithMiddleware added in v0.3.6

func WithMiddleware[T any](mw Middleware[T]) BuildOption[T]

WithMiddleware adds a node-level interceptor at compile time. Can be combined with Use() middlewares.

func WithNodeTimeout

func WithNodeTimeout[T any](d time.Duration) BuildOption[T]

WithNodeTimeout sets a timeout for each node execution.

type Checkpoint

type Checkpoint struct {
	NextNode string // Name of the node to run next when resuming
}

Checkpoint holds the node name to resume from (v2). When a graph returns ErrSuspend, Invoke returns (state, &Checkpoint{NextNode: nodeName}, ErrSuspend). The caller persists state and this checkpoint; Resume(ctx, state, cp) continues from cp.NextNode.

type ConditionalEdge

type ConditionalEdge[T any] func(ctx context.Context, state T) (string, error)

ConditionalEdge is a routing function that decides which node to execute next.

type DynamicRouter added in v0.2.0

type DynamicRouter[T any] func(ctx context.Context, state T) ([]string, error)

DynamicRouter decides at runtime which nodes should be executed in parallel (dynamic fan-out). Returned node names must be registered nodes.

type Graph

type Graph[T any] struct {
	// contains filtered or unexported fields
}

Graph is the compiled, immutable graph. Created only via GraphBuilder.Compile.

func (*Graph[T]) AsNode

func (g *Graph[T]) AsNode() Node[T]

AsNode returns a node that runs this graph (for composition when state types match).

func (*Graph[T]) ExportMermaid

func (g *Graph[T]) ExportMermaid() string

ExportMermaid returns a Mermaid flowchart (TD) representation of the graph. Output is deterministic (keys sorted) for stable snapshots and documentation. Node names that sanitize to the same ID get unique suffixes to avoid diagram collisions.

func (*Graph[T]) Invoke

func (g *Graph[T]) Invoke(ctx context.Context, state T) (T, *Checkpoint, error)

Invoke runs the graph to completion. It returns (finalState, nil, nil) on success, or (state, &Checkpoint{NextNode: node}, ErrSuspend) when a node suspends.

func (*Graph[T]) Resume

func (g *Graph[T]) Resume(ctx context.Context, state T, cp *Checkpoint) (T, *Checkpoint, error)

Resume continues execution from cp.NextNode. Pass the state and checkpoint returned by a previous Invoke that ended with ErrSuspend.

func (*Graph[T]) ResumeStream added in v0.3.5

func (g *Graph[T]) ResumeStream(ctx context.Context, state T, cp *Checkpoint) iter.Seq2[Step[T], error]

ResumeStream continues execution from cp.NextNode with the given state. Same yielding contract as Stream.

func (*Graph[T]) Stream

func (g *Graph[T]) Stream(ctx context.Context, state T) iter.Seq2[Step[T], error]

Stream runs the graph from the entry point and yields (Step, nil) after each successful node. On error or ErrSuspend, it yields one final (Step{}, err) and stops. Use: for step, err := range graph.Stream(ctx, state).

Example
ctx := context.Background()
reducer := func(_, u string) string { return u }
b := NewGraph[string](reducer)
b.AddNode("a", func(_ context.Context, s string) (string, error) { return s + "a", nil })
b.AddNode("b", func(_ context.Context, s string) (string, error) { return s + "b", nil })
b.AddEdge("a", "b")
b.SetEntryPoint("a")
b.SetFinishPoint("b")
graph, err := b.Compile()
if err != nil {
	return
}
for step, err := range graph.Stream(ctx, ".") {
	if err != nil {
		return
	}
	fmt.Println(step.NodeName, step.State)
}
Output:

a .a
b .ab

type GraphBuilder

type GraphBuilder[T any] struct {
	// contains filtered or unexported fields
}

GraphBuilder builds a graph before compilation. Fluent API.

func NewGraph

func NewGraph[T any](reducer Reducer[T]) *GraphBuilder[T]

NewGraph creates a new graph builder with the given reducer.

Example
ctx := context.Background()
reducer := func(_, u string) string { return u }
b := NewGraph[string](reducer)
b.AddNode("greet", func(_ context.Context, s string) (string, error) { return "hello " + s, nil })
b.AddNode("bye", func(_ context.Context, s string) (string, error) { return s + " bye", nil })
b.AddEdge("greet", "bye")
b.SetEntryPoint("greet")
b.SetFinishPoint("bye")
graph, err := b.Compile()
if err != nil {
	return
}
out, _, _ := graph.Invoke(ctx, "world")
fmt.Println(out)
Output:

hello world bye

func (*GraphBuilder[T]) AddConditionalEdge

func (b *GraphBuilder[T]) AddConditionalEdge(from string, router ConditionalEdge[T]) *GraphBuilder[T]

AddConditionalEdge sets a router for the given node; the router returns the next node name.

func (*GraphBuilder[T]) AddDynamicFanOut added in v0.2.0

func (b *GraphBuilder[T]) AddDynamicFanOut(from string, router DynamicRouter[T], joinNode string) *GraphBuilder[T]

AddDynamicFanOut sets up dynamic parallel execution: when routing reaches 'from', the router is called to get target node names at runtime; those nodes run in parallel, then results are reduced and joinNode runs. joinNode must be a registered node (AddNode). 'from' is a routing label, not an executable node.

func (*GraphBuilder[T]) AddEdge

func (b *GraphBuilder[T]) AddEdge(from, to string) *GraphBuilder[T]

AddEdge adds a static edge from -> to.

func (*GraphBuilder[T]) AddFanOut

func (b *GraphBuilder[T]) AddFanOut(from string, targets []string, joinNode string) *GraphBuilder[T]

AddFanOut sets up parallel execution: when routing reaches 'from', all target nodes run in parallel, then results are reduced and joinNode executes. 'from' is a routing label, not an executable node. joinNode must be a registered node (AddNode); it cannot be a fan-out or dynamic fan-out source.

func (*GraphBuilder[T]) AddNode

func (b *GraphBuilder[T]) AddNode(name string, fn Node[T]) *GraphBuilder[T]

AddNode registers a node by name. Returns the builder for chaining.

func (*GraphBuilder[T]) Compile

func (b *GraphBuilder[T]) Compile(opts ...BuildOption[T]) (*Graph[T], error)

Compile validates the graph and returns an immutable Graph. BuildOptions set run config (e.g. WithMaxSteps) and optional middlewares (WithMiddleware).

func (*GraphBuilder[T]) SetEntryPoint

func (b *GraphBuilder[T]) SetEntryPoint(name string) *GraphBuilder[T]

SetEntryPoint sets the node where execution starts.

func (*GraphBuilder[T]) SetFinishPoint

func (b *GraphBuilder[T]) SetFinishPoint(name string) *GraphBuilder[T]

SetFinishPoint marks a node as a valid terminal (execution stops when reached).

func (*GraphBuilder[T]) Use added in v0.2.0

func (b *GraphBuilder[T]) Use(mw ...Middleware[T]) *GraphBuilder[T]

Use adds middlewares that wrap every node at compile time (first added runs first).

type Middleware added in v0.2.0

type Middleware[T any] func(ctx context.Context, state T, nodeName string, next NodeHandler[T]) (T, error)

Middleware is an interceptor that wraps node execution. It receives ctx, state, node name, and the next handler. Use it for logging, tracing, or metrics without changing business logic.

type Node

type Node[T any] func(ctx context.Context, state T) (T, error)

Node is the basic computation unit. It receives the current state and returns the updated state (or a delta to be merged by the Reducer).

func SubgraphNode added in v0.3.5

func SubgraphNode[Parent, Sub any](sub *Graph[Sub], mapIn func(Parent) Sub, mapOut func(Parent, Sub) Parent) Node[Parent]

SubgraphNode returns a Node[Parent] that runs the subgraph with state mapped from Parent to Sub and back. Use it to embed a graph with a different (e.g. nested) state type into a parent graph. mapIn extracts the sub-state from the parent state; mapOut merges the final sub-state into the parent. If the subgraph returns ErrSuspend, it is propagated; the top-level checkpoint will reference this SubgraphNode.

type NodeHandler added in v0.3.6

type NodeHandler[T any] = Node[T]

NodeHandler is the standard node signature; it is the same as Node and used for clarity in middleware contracts.

type Reducer

type Reducer[T any] func(current T, update T) T

Reducer defines how to merge the current state with the update returned by a node.

type Step added in v0.3.5

type Step[T any] struct {
	State    T      // State after the node run
	NodeName string // Name of the node that just completed
}

Step describes the graph state after one node execution, yielded by Stream and ResumeStream. Each successful node run produces one Step; the iterator stops after an error or ErrSuspend.

Directories

Path Synopsis
examples
hitl_agent command
Package main demonstrates Human-in-the-Loop (v2): "approve" node returns ErrSuspend; caller persists state and checkpoint (e.g.
Package main demonstrates Human-in-the-Loop (v2): "approve" node returns ErrSuspend; caller persists state and checkpoint (e.g.
multi_agent command
Package main shows composition: the seller graph embeds the analyst graph as a node via AsNode(), so the inner graph runs with the same state type.
Package main shows composition: the seller graph embeds the analyst graph as a node via AsNode(), so the inner graph runs with the same state type.
react_agent command
Package main runs a minimal ReAct-style agent: reason -> tools -> reason -> finish (with cycle).
Package main runs a minimal ReAct-style agent: reason -> tools -> reason -> finish (with cycle).
streaming_agent command
Package main demonstrates token streaming: flowy.Stream emits graph state events, while LLM token streaming is done via a channel passed through context.
Package main demonstrates token streaming: flowy.Stream emits graph state events, while LLM token streaming is done via a channel passed through context.
subgraph_agent command
Package main demonstrates embedding a subgraph via SubgraphNode: parent state has a nested sub-state, mapIn/mapOut adapt between parent and sub; the subgraph runs as one node in the parent graph.
Package main demonstrates embedding a subgraph via SubgraphNode: parent state has a nested sub-state, mapIn/mapOut adapt between parent and sub; the subgraph runs as one node in the parent graph.
Package testutil provides test helpers for flowy (e.g.
Package testutil provides test helpers for flowy (e.g.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL