Documentation
¶
Overview ¶
Package rhizome provides a lightweight, generic graph execution engine.
Index ¶
- Constants
- Variables
- func DefaultRetryBackoff(attempt int) time.Duration
- func DefaultRetryClassifier(err error) bool
- type CheckpointStore
- type CompileOption
- type CompiledGraph
- type Graph
- type InterruptHandler
- type InterruptRequest
- type InterruptResponse
- type MemoryStore
- type Middleware
- type NodeFunc
- type RetryOption
- type Router
- type RunOption
- type Snapshotter
Constants ¶
const ( Start = "__start__" // Virtual entry point; use in AddEdge to set the entrypoint. End = "__end__" // Virtual terminal; use in AddEdge or return from a router to end execution. )
const DefaultMaxNodeExecs = 10
DefaultMaxNodeExecs is the default maximum number of times a single node can execute before Run returns ErrCycleLimit.
const DefaultRetryMaxAttempts = 3
DefaultRetryMaxAttempts is the default number of times Retry will attempt a node before giving up. One attempt is the initial call; attempts above that are retries.
Variables ¶
var ( ErrDuplicateNode = errors.New("rhizome: duplicate node") ErrNodeNotFound = errors.New("rhizome: node not found") ErrReservedName = errors.New("rhizome: reserved name") ErrDuplicateEdge = errors.New("rhizome: duplicate edge") ErrConflictingEdge = errors.New("rhizome: conflicting edge") ErrNoEntrypoint = errors.New("rhizome: no entrypoint") ErrUnreachableNode = errors.New("rhizome: unreachable node") ErrNoOutgoingEdge = errors.New("rhizome: node has no outgoing edge") ErrNoTargets = errors.New("rhizome: conditional edge requires at least one target") )
Construction errors (returned by AddNode, AddEdge, AddConditionalEdge, Compile).
var ( ErrCycleLimit = errors.New("rhizome: node exceeded max execution count") ErrInvalidRoute = errors.New("rhizome: router returned invalid route") ErrUndeclaredTarget = errors.New("rhizome: router returned undeclared target") )
Runtime errors (returned by Run).
var ( ErrCheckpointRequiresSnapshotter = errors.New("rhizome: checkpointing requires state to implement Snapshotter") ErrThreadIDRequired = errors.New("rhizome: thread ID required when checkpointing is enabled") ErrNoCheckpoint = errors.New("rhizome: no checkpoint found") ErrCheckpointingDisabled = errors.New("rhizome: checkpointing is not enabled on this graph") )
Checkpointing errors.
var (
ErrNoInterruptHandler = errors.New("rhizome: no interrupt handler configured for this run")
)
Human-in-the-loop errors.
var (
ErrNodePanic = errors.New("rhizome: node panicked")
)
Built-in middleware errors.
Functions ¶
func DefaultRetryBackoff ¶ added in v0.5.0
DefaultRetryBackoff returns an exponential backoff of 100ms * 2^(attempt-1): 100ms before the second attempt, 200ms before the third, and so on.
func DefaultRetryClassifier ¶ added in v0.5.0
DefaultRetryClassifier retries every error except context.Canceled and context.DeadlineExceeded. Retrying on context errors would defeat the caller's ability to cancel a running graph.
Types ¶
type CheckpointStore ¶ added in v0.3.0
type CheckpointStore interface {
Save(ctx context.Context, threadID, nodeName string, data []byte) error
Load(ctx context.Context, threadID string) (nodeName string, data []byte, err error)
}
CheckpointStore persists graph state between node executions so that a run can be resumed later, possibly in a different process.
Save records the state produced by nodeName for threadID. Load returns the last saved node name and state bytes for threadID, or ErrNoCheckpoint if the thread is unknown.
Implementations must be safe for concurrent use.
type CompileOption ¶
type CompileOption func(*compileConfig)
CompileOption configures graph compilation.
func WithCheckpointing ¶ added in v0.3.0
func WithCheckpointing(store CheckpointStore) CompileOption
WithCheckpointing enables persistence of state after every node execution. The state type S must satisfy Snapshotter; if it does not, Compile returns an error wrapping ErrCheckpointRequiresSnapshotter.
func WithMaxNodeExecs ¶
func WithMaxNodeExecs(n int) CompileOption
WithMaxNodeExecs sets the maximum number of times a single node can execute before Run returns ErrCycleLimit. This is the default for every Run on the compiled graph; individual Run calls can override with WithRunMaxNodeExecs.
type CompiledGraph ¶
type CompiledGraph[S any] struct { // contains filtered or unexported fields }
CompiledGraph is an immutable, validated graph ready for execution.
CompiledGraph is safe for concurrent use. Run may be called from multiple goroutines simultaneously; each invocation uses only local state and never mutates the graph.
func (*CompiledGraph[S]) Resume ¶ added in v0.3.0
func (cg *CompiledGraph[S]) Resume(ctx context.Context, threadID string, empty S, opts ...RunOption[S]) (S, error)
Resume loads the latest checkpoint for threadID and continues execution from the node after the one that produced the checkpoint. The empty parameter is an uninitialized instance of S used as the target for unmarshaling the saved state — typically &YourState{} when S is a pointer type. The populated state is returned on success.
Resume requires the graph to have been compiled with WithCheckpointing. The threadID passed here overrides any WithThreadID option provided.
func (*CompiledGraph[S]) Run ¶
func (cg *CompiledGraph[S]) Run(ctx context.Context, initial S, opts ...RunOption[S]) (S, error)
Run executes the graph from the entry node until End is reached. Returns the final state on success, or the partial state and error on failure.
If the graph was compiled with WithCheckpointing, WithThreadID is required and state is persisted to the configured CheckpointStore after each node.
type Graph ¶
type Graph[S any] struct { // contains filtered or unexported fields }
Graph is a mutable builder for defining nodes and edges. Call Compile to validate and produce an executable CompiledGraph.
func (*Graph[S]) AddConditionalEdge ¶
AddConditionalEdge adds a dynamic routing function for a node. The router receives the current state and returns the name of the next node to execute, or End to terminate.
Targets declares the complete set of node names the router may return (End is permitted). Declaring targets allows Compile to verify reachability and to catch typos; at runtime, a router returning a name not in targets yields ErrUndeclaredTarget.
func (*Graph[S]) AddEdge ¶
AddEdge adds a static edge from one node to another. Use Start and End constants for entry and exit points.
func (*Graph[S]) Compile ¶
func (g *Graph[S]) Compile(opts ...CompileOption) (*CompiledGraph[S], error)
Compile validates the graph structure and returns an immutable, executable CompiledGraph. Validation checks:
- At least one edge from Start exists
- All edge targets (static and declared conditional) reference existing nodes or End
- Every declared conditional target references an existing node or End
- Every registered node has an outgoing edge (static or conditional)
- Every node is reachable from Start
type InterruptHandler ¶ added in v0.4.0
type InterruptHandler func(ctx context.Context, req InterruptRequest) (InterruptResponse, error)
InterruptHandler is invoked when a node calls Interrupt. It blocks the graph's goroutine until it returns, so implementations must honor ctx cancellation — any blocking call (channel recv, network IO, stdin read) should select on ctx.Done().
type InterruptRequest ¶ added in v0.4.0
InterruptRequest is passed from a node to the configured InterruptHandler. Node is set by the runtime before the handler is called, so node code may leave it blank. Kind and Payload are consumer-defined: Kind is a discriminator the handler can switch on, and Payload carries whatever data the handler needs to produce a response.
type InterruptResponse ¶ added in v0.4.0
type InterruptResponse struct {
Value any
}
InterruptResponse is returned by the handler to resume the paused node. Value is consumer-defined; the node type-asserts it into whatever shape it expects.
func Interrupt ¶ added in v0.4.0
func Interrupt(ctx context.Context, req InterruptRequest) (InterruptResponse, error)
Interrupt pauses the current node by invoking the InterruptHandler configured on the Run. It blocks until the handler returns. If no handler was configured it returns ErrNoInterruptHandler.
The runtime overwrites InterruptRequest.Node with the actual executing node name, so node code may leave that field blank.
type MemoryStore ¶ added in v0.3.0
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is an in-memory CheckpointStore suitable for tests and ephemeral single-process use. The zero value is ready to use.
type Middleware ¶
Middleware wraps node execution. It receives the node name, the current state, and the next function in the chain. Call next to continue execution.
func Recover ¶ added in v0.5.0
func Recover[S any]() Middleware[S]
Recover returns a Middleware that converts panics in the wrapped node into an error wrapping ErrNodePanic. The panic value is included in the error text and the stack trace at the point of panic is captured.
Because a panicking node produces no valid output, the middleware returns the state as it was when the node was entered.
func Retry ¶ added in v0.5.0
func Retry[S any](opts ...RetryOption) Middleware[S]
Retry returns a Middleware that re-invokes the wrapped node on error.
By default it attempts DefaultRetryMaxAttempts times total, sleeps DefaultRetryBackoff between attempts, and retries every error except context.Canceled and context.DeadlineExceeded. Use WithMaxAttempts, WithBackoff, and WithRetryIf to override.
The sleep between attempts aborts immediately when ctx is cancelled or hits its deadline; the context error is returned in that case.
When composed with Timeout, place Timeout *inside* Retry (i.e. Retry is added before Timeout in WithMiddleware) so that each attempt gets its own deadline rather than sharing one across attempts.
func Timeout ¶ added in v0.5.0
func Timeout[S any](d time.Duration) Middleware[S]
Timeout returns a Middleware that bounds the execution of the wrapped node by d. A child context with deadline d is threaded into next; if the node fails to respect ctx.Done() the deadline has no effect.
A non-positive duration disables the timeout, making the middleware a no-op.
type RetryOption ¶ added in v0.5.0
type RetryOption func(*retryConfig)
RetryOption configures a Retry middleware.
func WithBackoff ¶ added in v0.5.0
func WithBackoff(fn func(attempt int) time.Duration) RetryOption
WithBackoff sets the backoff function used between attempts. The argument is the 1-based index of the attempt that just failed (so the sleep before attempt 2 receives attempt=1).
func WithMaxAttempts ¶ added in v0.5.0
func WithMaxAttempts(n int) RetryOption
WithMaxAttempts sets the maximum number of attempts, including the initial call. Values less than 1 are treated as 1 (no retries).
func WithRetryIf ¶ added in v0.5.0
func WithRetryIf(fn func(error) bool) RetryOption
WithRetryIf sets the classifier that decides whether an error is retryable. Returning false on any error causes Retry to surface it immediately.
type Router ¶ added in v0.2.0
Router decides which node to execute next based on the current state. It must return one of the target names declared when the conditional edge was registered, or End.
type RunOption ¶
type RunOption[S any] func(*runConfig[S])
RunOption configures a single Run invocation.
func WithInterruptHandler ¶ added in v0.4.0
func WithInterruptHandler[S any](h InterruptHandler) RunOption[S]
WithInterruptHandler registers the handler invoked when a node calls Interrupt. Without this option, Interrupt returns ErrNoInterruptHandler.
func WithMiddleware ¶
func WithMiddleware[S any](mw ...Middleware[S]) RunOption[S]
WithMiddleware adds middleware to a Run invocation. Middleware executes in the order provided: the first middleware is the outermost wrapper. Multiple calls to WithMiddleware append to the chain.
func WithRunMaxNodeExecs ¶ added in v0.2.0
WithRunMaxNodeExecs overrides the compile-time max node execution count for this Run invocation only.
func WithThreadID ¶ added in v0.3.0
WithThreadID sets the identifier under which a Run or Resume records checkpoints. It is required when the compiled graph was built with WithCheckpointing and ignored otherwise.
type Snapshotter ¶ added in v0.3.0
type Snapshotter interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
Snapshotter is implemented by state types that can be checkpointed. It composes the standard encoding.BinaryMarshaler and BinaryUnmarshaler interfaces, so a type that already implements those for other reasons satisfies Snapshotter automatically.
Because UnmarshalBinary must mutate its receiver, Snapshotter is typically satisfied by a pointer type (e.g., S = *MyState).