rhizome

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: MIT Imports: 9 Imported by: 0

README

rhizome

A lightweight, generic graph execution engine in Go. Define nodes and edges, compile the graph, and run it.

go get github.com/jefflinse/rhizome

Quick Start

Linear Pipeline
package main

import (
	"context"
	"fmt"

	"github.com/jefflinse/rhizome"
)

func main() {
	g := rhizome.New[int]()

	g.AddNode("double", func(_ context.Context, n int) (int, error) {
		return n * 2, nil
	})
	g.AddNode("add-ten", func(_ context.Context, n int) (int, error) {
		return n + 10, nil
	})

	g.AddEdge(rhizome.Start, "double")
	g.AddEdge("double", "add-ten")
	g.AddEdge("add-ten", rhizome.End)

	compiled, _ := g.Compile()
	result, _ := compiled.Run(context.Background(), 5)

	fmt.Println(result) // 20
}
Conditional Branching

Route dynamically based on state. Declare the set of targets the router may return so Compile can verify reachability:

type state struct {
	Value  int
	Status string
}

g := rhizome.New[*state]()

g.AddNode("classify", func(_ context.Context, s *state) (*state, error) {
	if s.Value >= 100 {
		s.Status = "high"
	} else {
		s.Status = "low"
	}
	return s, nil
})
g.AddNode("handle-high", handleHigh)
g.AddNode("handle-low", handleLow)

g.AddEdge(rhizome.Start, "classify")
g.AddConditionalEdge("classify", func(_ context.Context, s *state) (string, error) {
	if s.Status == "high" {
		return "handle-high", nil
	}
	return "handle-low", nil
}, "handle-high", "handle-low")
g.AddEdge("handle-high", rhizome.End)
g.AddEdge("handle-low", rhizome.End)

compiled, _ := g.Compile()
result, _ := compiled.Run(ctx, &state{Value: 150})
Loops

Nodes can loop back to themselves or earlier nodes. Built-in cycle protection prevents infinite loops (default: 10 executions per node).

g := rhizome.New[int]()

g.AddNode("increment", func(_ context.Context, n int) (int, error) {
	return n + 1, nil
})

g.AddEdge(rhizome.Start, "increment")
g.AddConditionalEdge("increment", func(_ context.Context, n int) (string, error) {
	if n >= 5 {
		return rhizome.End, nil
	}
	return "increment", nil
}, "increment", rhizome.End)

compiled, _ := g.Compile(rhizome.WithMaxNodeExecs(20)) // compile-time default
result, _ := compiled.Run(context.Background(), 0)     // result: 5

// Per-Run override:
result, _ = compiled.Run(context.Background(), 0, rhizome.WithRunMaxNodeExecs[int](50))
Middleware

Wrap node execution for logging, timing, or anything else:

logger := func(ctx context.Context, node string, state int, next rhizome.NodeFunc[int]) (int, error) {
	fmt.Printf("entering %s\n", node)
	result, err := next(ctx, state)
	fmt.Printf("leaving %s\n", node)
	return result, err
}

result, _ := compiled.Run(ctx, 0, rhizome.WithMiddleware(logger))
Built-in middleware

Three resilience primitives are included:

result, err := compiled.Run(ctx, initial, rhizome.WithMiddleware(
    rhizome.Recover[*State](),               // trap panics, return ErrNodePanic
    rhizome.Retry[*State](                   // retry transient failures
        rhizome.WithMaxAttempts(3),
    ),
    rhizome.Timeout[*State](30*time.Second), // per-attempt deadline
))
  • Recover converts panics into an error wrapping ErrNodePanic, with the panic value and stack trace included. The input state is returned unchanged since the node produced no valid output.
  • Timeout(d) threads a context.WithTimeout into the node. Node code must honor ctx.Done() for it to take effect.
  • Retry(opts...) re-invokes the node on error. Defaults: 3 attempts, exponential backoff starting at 100ms, and a classifier that retries everything except context.Canceled and context.DeadlineExceeded (so cancelling a run unwinds promptly instead of retrying). Override with WithMaxAttempts, WithBackoff, and WithRetryIf.

When combining Retry with Timeout, place Retry before Timeout in the middleware list as shown above. That way each retry attempt gets its own deadline; reversing the order makes one deadline span all attempts.

Checkpointing

Opt in to persisted state and the graph saves a snapshot after every node. A crashed run can be resumed later — possibly in a different process — from the last successful node.

State must implement Snapshotter (composed from the stdlib encoding.BinaryMarshaler/BinaryUnmarshaler pair). The type check runs at Compile time, so misconfiguration fails early.

type MyState struct {
	Step int
	Logs []string
}

func (s *MyState) MarshalBinary() ([]byte, error)    { return json.Marshal(s) }
func (s *MyState) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, s) }

store := &rhizome.MemoryStore{} // or any CheckpointStore implementation

g := rhizome.New[*MyState]()
// ... AddNode/AddEdge ...

compiled, err := g.Compile(rhizome.WithCheckpointing(store))
if err != nil {
	// Returns ErrCheckpointRequiresSnapshotter if *MyState does not satisfy Snapshotter.
	panic(err)
}

// Thread IDs correlate runs with checkpoints; required when checkpointing is enabled.
final, err := compiled.Run(ctx, &MyState{}, rhizome.WithThreadID[*MyState]("conversation-123"))

// If the run was interrupted, resume from the last checkpoint in a fresh state instance:
resumed, err := compiled.Resume(ctx, "conversation-123", &MyState{})

CheckpointStore is a two-method interface; MemoryStore ships for tests and single-process use. Persistent backends (SQLite, Postgres, etc.) are intentionally left to separate modules so the core stays dependency-free.

Interrupts

A node can pause execution and delegate to a consumer-provided handler by calling Interrupt. The graph's goroutine blocks inside the handler until it returns. Common uses include human-in-the-loop approvals (CLI prompt, dialog, web request awaiting a response), waiting on asynchronous external events (webhook callbacks, queued work results), policy gates, and debugging breakpoints.

g.AddNode("confirm", func(ctx context.Context, s *State) (*State, error) {
    resp, err := rhizome.Interrupt(ctx, rhizome.InterruptRequest{
        Kind:    "approve",
        Payload: s.Proposal,
    })
    if err != nil {
        return s, err
    }
    s.Approved = resp.Value.(bool)
    return s, nil
})

handler := func(ctx context.Context, req rhizome.InterruptRequest) (rhizome.InterruptResponse, error) {
    // Blocking is expected here — the graph waits.
    // Any blocking call should select on ctx.Done() to honor cancellation.
    approved := promptUser(req.Payload)
    return rhizome.InterruptResponse{Value: approved}, nil
}

final, err := compiled.Run(ctx, &State{},
    rhizome.WithInterruptHandler[*State](handler))

InterruptRequest.Node is populated by the runtime, so node code doesn't need to know its own name. Kind and Payload are consumer-defined — use Kind as a discriminator if a single graph raises multiple kinds of interrupts. Calling Interrupt on a run without a handler returns ErrNoInterruptHandler.

This is an in-process primitive: the graph's goroutine parks inside the handler and resumes when it returns. Durable pause-and-resume (where the responder answers minutes or days later, possibly in a different process) is a separate feature that layers on top of Snapshotter.

API

Function / Type Description
New[S]() Create a new graph builder
AddNode(name, fn) Register a named node
AddEdge(from, to) Add a static edge between nodes
AddConditionalEdge(from, router, targets...) Add dynamic routing; router may only return declared targets
Compile(opts...) Validate and freeze the graph
Run(ctx, state, opts...) Execute the compiled graph
Resume(ctx, threadID, empty, opts...) Continue a checkpointed run from its last saved node
Start / End Virtual entry and exit points
WithMaxNodeExecs(n) Compile option: per-node execution limit (default)
WithCheckpointing(store) Compile option: persist state after each node; requires S to satisfy Snapshotter
WithRunMaxNodeExecs[S](n) Run option: override the per-node execution limit
WithMiddleware(mw...) Run option: add middleware chain
WithThreadID[S](id) Run option: required when checkpointing is enabled
WithInterruptHandler[S](h) Run option: handler invoked when a node calls Interrupt
Interrupt(ctx, req) Called inside a node to pause and request input from the handler
Snapshotter Interface state must satisfy for checkpointing (stdlib binary marshal/unmarshal)
CheckpointStore Interface for persisting snapshots
MemoryStore In-memory CheckpointStore for tests and single-process use (zero value is ready to use)
Recover[S]() Built-in middleware: trap panics as ErrNodePanic
Timeout[S](d) Built-in middleware: bound each node call with a deadline
Retry[S](opts...) Built-in middleware: retry failed nodes with backoff
WithMaxAttempts(n) / WithBackoff(fn) / WithRetryIf(fn) Retry options

License

See LICENSE.

Documentation

Overview

Package rhizome provides a lightweight, generic graph execution engine.

Index

Constants

View Source
const (
	Start = "__start__" // Virtual entry point; use in AddEdge to set the entrypoint.
	End   = "__end__"   // Virtual terminal; use in AddEdge or return from a router to end execution.
)
View Source
const DefaultMaxNodeExecs = 10

DefaultMaxNodeExecs is the default maximum number of times a single node can execute before Run returns ErrCycleLimit.

View Source
const DefaultRetryMaxAttempts = 3

DefaultRetryMaxAttempts is the default number of times Retry will attempt a node before giving up. One attempt is the initial call; attempts above that are retries.

Variables

View Source
var (
	ErrDuplicateNode   = errors.New("rhizome: duplicate node")
	ErrNodeNotFound    = errors.New("rhizome: node not found")
	ErrReservedName    = errors.New("rhizome: reserved name")
	ErrDuplicateEdge   = errors.New("rhizome: duplicate edge")
	ErrConflictingEdge = errors.New("rhizome: conflicting edge")
	ErrNoEntrypoint    = errors.New("rhizome: no entrypoint")
	ErrUnreachableNode = errors.New("rhizome: unreachable node")
	ErrNoOutgoingEdge  = errors.New("rhizome: node has no outgoing edge")
	ErrNoTargets       = errors.New("rhizome: conditional edge requires at least one target")
)

Construction errors (returned by AddNode, AddEdge, AddConditionalEdge, Compile).

View Source
var (
	ErrCycleLimit       = errors.New("rhizome: node exceeded max execution count")
	ErrInvalidRoute     = errors.New("rhizome: router returned invalid route")
	ErrUndeclaredTarget = errors.New("rhizome: router returned undeclared target")
)

Runtime errors (returned by Run).

View Source
var (
	ErrCheckpointRequiresSnapshotter = errors.New("rhizome: checkpointing requires state to implement Snapshotter")
	ErrThreadIDRequired              = errors.New("rhizome: thread ID required when checkpointing is enabled")
	ErrNoCheckpoint                  = errors.New("rhizome: no checkpoint found")
	ErrCheckpointingDisabled         = errors.New("rhizome: checkpointing is not enabled on this graph")
)

Checkpointing errors.

View Source
var (
	ErrNoInterruptHandler = errors.New("rhizome: no interrupt handler configured for this run")
)

Human-in-the-loop errors.

View Source
var (
	ErrNodePanic = errors.New("rhizome: node panicked")
)

Built-in middleware errors.

Functions

func DefaultRetryBackoff added in v0.5.0

func DefaultRetryBackoff(attempt int) time.Duration

DefaultRetryBackoff returns an exponential backoff of 100ms * 2^(attempt-1): 100ms before the second attempt, 200ms before the third, and so on.

func DefaultRetryClassifier added in v0.5.0

func DefaultRetryClassifier(err error) bool

DefaultRetryClassifier retries every error except context.Canceled and context.DeadlineExceeded. Retrying on context errors would defeat the caller's ability to cancel a running graph.

Types

type CheckpointStore added in v0.3.0

type CheckpointStore interface {
	Save(ctx context.Context, threadID, nodeName string, data []byte) error
	Load(ctx context.Context, threadID string) (nodeName string, data []byte, err error)
}

CheckpointStore persists graph state between node executions so that a run can be resumed later, possibly in a different process.

Save records the state produced by nodeName for threadID. Load returns the last saved node name and state bytes for threadID, or ErrNoCheckpoint if the thread is unknown.

Implementations must be safe for concurrent use.

type CompileOption

type CompileOption func(*compileConfig)

CompileOption configures graph compilation.

func WithCheckpointing added in v0.3.0

func WithCheckpointing(store CheckpointStore) CompileOption

WithCheckpointing enables persistence of state after every node execution. The state type S must satisfy Snapshotter; if it does not, Compile returns an error wrapping ErrCheckpointRequiresSnapshotter.

func WithMaxNodeExecs

func WithMaxNodeExecs(n int) CompileOption

WithMaxNodeExecs sets the maximum number of times a single node can execute before Run returns ErrCycleLimit. This is the default for every Run on the compiled graph; individual Run calls can override with WithRunMaxNodeExecs.

type CompiledGraph

type CompiledGraph[S any] struct {
	// contains filtered or unexported fields
}

CompiledGraph is an immutable, validated graph ready for execution.

CompiledGraph is safe for concurrent use. Run may be called from multiple goroutines simultaneously; each invocation uses only local state and never mutates the graph.

func (*CompiledGraph[S]) Resume added in v0.3.0

func (cg *CompiledGraph[S]) Resume(ctx context.Context, threadID string, empty S, opts ...RunOption[S]) (S, error)

Resume loads the latest checkpoint for threadID and continues execution from the node after the one that produced the checkpoint. The empty parameter is an uninitialized instance of S used as the target for unmarshaling the saved state — typically &YourState{} when S is a pointer type. The populated state is returned on success.

Resume requires the graph to have been compiled with WithCheckpointing. The threadID passed here overrides any WithThreadID option provided.

func (*CompiledGraph[S]) Run

func (cg *CompiledGraph[S]) Run(ctx context.Context, initial S, opts ...RunOption[S]) (S, error)

Run executes the graph from the entry node until End is reached. Returns the final state on success, or the partial state and error on failure.

If the graph was compiled with WithCheckpointing, WithThreadID is required and state is persisted to the configured CheckpointStore after each node.

type Graph

type Graph[S any] struct {
	// contains filtered or unexported fields
}

Graph is a mutable builder for defining nodes and edges. Call Compile to validate and produce an executable CompiledGraph.

func New

func New[S any]() *Graph[S]

New creates an empty graph.

func (*Graph[S]) AddConditionalEdge

func (g *Graph[S]) AddConditionalEdge(from string, router Router[S], targets ...string) error

AddConditionalEdge adds a dynamic routing function for a node. The router receives the current state and returns the name of the next node to execute, or End to terminate.

Targets declares the complete set of node names the router may return (End is permitted). Declaring targets allows Compile to verify reachability and to catch typos; at runtime, a router returning a name not in targets yields ErrUndeclaredTarget.

func (*Graph[S]) AddEdge

func (g *Graph[S]) AddEdge(from, to string) error

AddEdge adds a static edge from one node to another. Use Start and End constants for entry and exit points.

func (*Graph[S]) AddNode

func (g *Graph[S]) AddNode(name string, fn NodeFunc[S]) error

AddNode registers a named node with the given function.

func (*Graph[S]) Compile

func (g *Graph[S]) Compile(opts ...CompileOption) (*CompiledGraph[S], error)

Compile validates the graph structure and returns an immutable, executable CompiledGraph. Validation checks:

  • At least one edge from Start exists
  • All edge targets (static and declared conditional) reference existing nodes or End
  • Every declared conditional target references an existing node or End
  • Every registered node has an outgoing edge (static or conditional)
  • Every node is reachable from Start

type InterruptHandler added in v0.4.0

type InterruptHandler func(ctx context.Context, req InterruptRequest) (InterruptResponse, error)

InterruptHandler is invoked when a node calls Interrupt. It blocks the graph's goroutine until it returns, so implementations must honor ctx cancellation — any blocking call (channel recv, network IO, stdin read) should select on ctx.Done().

type InterruptRequest added in v0.4.0

type InterruptRequest struct {
	Node    string
	Kind    string
	Payload any
}

InterruptRequest is passed from a node to the configured InterruptHandler. Node is set by the runtime before the handler is called, so node code may leave it blank. Kind and Payload are consumer-defined: Kind is a discriminator the handler can switch on, and Payload carries whatever data the handler needs to produce a response.

type InterruptResponse added in v0.4.0

type InterruptResponse struct {
	Value any
}

InterruptResponse is returned by the handler to resume the paused node. Value is consumer-defined; the node type-asserts it into whatever shape it expects.

func Interrupt added in v0.4.0

Interrupt pauses the current node by invoking the InterruptHandler configured on the Run. It blocks until the handler returns. If no handler was configured it returns ErrNoInterruptHandler.

The runtime overwrites InterruptRequest.Node with the actual executing node name, so node code may leave that field blank.

type MemoryStore added in v0.3.0

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory CheckpointStore suitable for tests and ephemeral single-process use. The zero value is ready to use.

func (*MemoryStore) Load added in v0.3.0

func (m *MemoryStore) Load(_ context.Context, threadID string) (string, []byte, error)

Load returns the most recent checkpoint for threadID, or ErrNoCheckpoint if no checkpoint has been recorded.

func (*MemoryStore) Save added in v0.3.0

func (m *MemoryStore) Save(_ context.Context, threadID, nodeName string, data []byte) error

Save records the checkpoint for threadID, overwriting any previous entry.

type Middleware

type Middleware[S any] func(ctx context.Context, node string, state S, next NodeFunc[S]) (S, error)

Middleware wraps node execution. It receives the node name, the current state, and the next function in the chain. Call next to continue execution.

func Recover added in v0.5.0

func Recover[S any]() Middleware[S]

Recover returns a Middleware that converts panics in the wrapped node into an error wrapping ErrNodePanic. The panic value is included in the error text and the stack trace at the point of panic is captured.

Because a panicking node produces no valid output, the middleware returns the state as it was when the node was entered.

func Retry added in v0.5.0

func Retry[S any](opts ...RetryOption) Middleware[S]

Retry returns a Middleware that re-invokes the wrapped node on error.

By default it attempts DefaultRetryMaxAttempts times total, sleeps DefaultRetryBackoff between attempts, and retries every error except context.Canceled and context.DeadlineExceeded. Use WithMaxAttempts, WithBackoff, and WithRetryIf to override.

The sleep between attempts aborts immediately when ctx is cancelled or hits its deadline; the context error is returned in that case.

When composed with Timeout, place Timeout *inside* Retry (i.e. Retry is added before Timeout in WithMiddleware) so that each attempt gets its own deadline rather than sharing one across attempts.

func Timeout added in v0.5.0

func Timeout[S any](d time.Duration) Middleware[S]

Timeout returns a Middleware that bounds the execution of the wrapped node by d. A child context with deadline d is threaded into next; if the node fails to respect ctx.Done() the deadline has no effect.

A non-positive duration disables the timeout, making the middleware a no-op.

type NodeFunc

type NodeFunc[S any] func(ctx context.Context, state S) (S, error)

NodeFunc is a function that transforms state.

type RetryOption added in v0.5.0

type RetryOption func(*retryConfig)

RetryOption configures a Retry middleware.

func WithBackoff added in v0.5.0

func WithBackoff(fn func(attempt int) time.Duration) RetryOption

WithBackoff sets the backoff function used between attempts. The argument is the 1-based index of the attempt that just failed (so the sleep before attempt 2 receives attempt=1).

func WithMaxAttempts added in v0.5.0

func WithMaxAttempts(n int) RetryOption

WithMaxAttempts sets the maximum number of attempts, including the initial call. Values less than 1 are treated as 1 (no retries).

func WithRetryIf added in v0.5.0

func WithRetryIf(fn func(error) bool) RetryOption

WithRetryIf sets the classifier that decides whether an error is retryable. Returning false on any error causes Retry to surface it immediately.

type Router added in v0.2.0

type Router[S any] func(ctx context.Context, state S) (string, error)

Router decides which node to execute next based on the current state. It must return one of the target names declared when the conditional edge was registered, or End.

type RunOption

type RunOption[S any] func(*runConfig[S])

RunOption configures a single Run invocation.

func WithInterruptHandler added in v0.4.0

func WithInterruptHandler[S any](h InterruptHandler) RunOption[S]

WithInterruptHandler registers the handler invoked when a node calls Interrupt. Without this option, Interrupt returns ErrNoInterruptHandler.

func WithMiddleware

func WithMiddleware[S any](mw ...Middleware[S]) RunOption[S]

WithMiddleware adds middleware to a Run invocation. Middleware executes in the order provided: the first middleware is the outermost wrapper. Multiple calls to WithMiddleware append to the chain.

func WithRunMaxNodeExecs added in v0.2.0

func WithRunMaxNodeExecs[S any](n int) RunOption[S]

WithRunMaxNodeExecs overrides the compile-time max node execution count for this Run invocation only.

func WithThreadID added in v0.3.0

func WithThreadID[S any](id string) RunOption[S]

WithThreadID sets the identifier under which a Run or Resume records checkpoints. It is required when the compiled graph was built with WithCheckpointing and ignored otherwise.

type Snapshotter added in v0.3.0

type Snapshotter interface {
	encoding.BinaryMarshaler
	encoding.BinaryUnmarshaler
}

Snapshotter is implemented by state types that can be checkpointed. It composes the standard encoding.BinaryMarshaler and BinaryUnmarshaler interfaces, so a type that already implements those for other reasons satisfies Snapshotter automatically.

Because UnmarshalBinary must mutate its receiver, Snapshotter is typically satisfied by a pointer type (e.g., S = *MyState).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL