flodk

package module
v0.0.0-...-38e0290 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

README

flodk

A Go framework for building stateful, graph-based workflows with support for human-in-the-loop interrupts and LLM integration.

Overview

flodk is a workflow orchestration framework that enables you to build complex, multi-step processes as directed graphs. It provides built-in support for:

  • Graph-based workflows: Define nodes and edges to create complex execution flows
  • State management: Thread-safe state persistence across workflow steps
  • Human-in-the-loop interrupts: Pause execution and request user input with validation
  • LLM integration: Built-in nodes for data extraction and processing using LLM providers
  • Checkpoint/resume: Resume workflow execution from interruption points
  • Conditional routing: Dynamic edge resolution based on state

Architecture

Core Components
  • Node: The basic unit of work in a workflow. Executes logic and transforms state.
  • Graph: A directed graph of nodes connected by edges that define execution flow.
  • Edge/EdgeResolver: Determines the next node to execute based on current state.
  • Flow: Manages the execution of a graph with callbacks at various stages.
  • Pipe: Supervises graph execution, manages state persistence, and handles interrupts.
  • Store: Persists execution state for resumption after interrupts.
State Management

The framework uses a checkpoint-based approach to maintain execution state:

  • CheckpointState: Stores the current node, visited nodes, and interrupt history
  • ExecutionState: Combines checkpoint state with application-specific state
  • ExecutionID: Uniquely identifies an execution (ID + flow name)
Callback ordering
  • Callbacks run synchronously inside Flow.Execute. Any error returned by a callback is returned by Execute.
  • When a node returns a HITLInterrupt, OnInterrupt is invoked before Execute returns. Use OnInterrupt to persist interrupt state.
  • When a node succeeds, the edge resolver computes the next node, CheckpointState.CheckpointID is updated, and then OnNodeExec is invoked. Use OnNodeExec to persist the checkpoint for the next step.
  • OnNodeExec is not invoked on the interrupt path. Persist both OnInterrupt and OnNodeExec to ensure safe resumption.
  • OnGraphEnd runs once after the graph finishes. Its error is propagated.

Usage

Basic Workflow
import (
 "context"
 "github.com/aki-kong/flodk"
)

type MyState struct {
 Value string
}

func main() {
 ctx := context.Background()

 // Build the graph
 gb := flodk.NewGraphBuilder[MyState]()
 graph, _ := gb.
  AddNode("start", flodk.Noop[MyState]()).
  AddNode("end", flodk.Noop[MyState]()).
  AddEdge("start", "end").
  SetStartNode("start").
  Build()

 // Create a pipe with an in-memory store
 store := flodk.NewInMemoryStore[MyState]()
 pipe := flodk.NewPipe("my_workflow", graph, store)

 // Execute the workflow
 state := MyState{Value: "initial"}
 result, err := pipe.Invoke(ctx, "thread-123", state)
}
Custom Nodes

Implement the Node interface to create custom processing steps:

type MyNode struct{}

func (n MyNode) Execute(ctx context.Context, state MyState) (MyState, error) {
 state.Value = "processed"
 return state, nil
}
Conditional Routing

Route execution based on state values:

graph, _ := gb.
 AddConditionalEdge(
  "decision",
  flodk.ConditionalFunction[MyState](func(ctx context.Context, state MyState) string {
   if state.Value == "proceed" {
    return "next_step"
   }
   return "alternate_step"
  }),
  map[string]string{
   "next_step":      "next_step",
   "alternate_step": "alternate_step",
  },
 ).
 Build()
Human-in-the-Loop Interrupts

Request user input during workflow execution:

values, err := flodk.InterruptWithValidation(
 ctx,
 "Please provide your name",
 "name_required",
 flodk.Requirements{
  "name": {Type: flodk.Custom},
 },
 flodk.Requirements.Validate,
)

if err != nil {
 // Handle interrupt error
 // User will need to call pipe.Continue() with values
 return state, err
}

state.Name = values["name"]
LLM Integration

Extract structured data using LLM providers:

llmClient := ollama.NewOllamaClient(baseURL)

extractionNode := llm.NewDataExtraction[MyState](llmClient, "model-name").
 Extract("field1", llm.DTString).
 Extract("field2", llm.DTInteger)

graph, _ := gb.
 AddNode("extract", extractionNode).
 Build()

Supported LLM Providers

  • Ollama: Local LLM inference
  • Custom: Implement the llm.Client interface

Workflow Resumption

When an interrupt occurs, the workflow state is persisted. Resume execution with:

state, err := pipe.Continue(ctx, "thread-123", flodk.ResumeConfig{
 InterruptValues: map[string]string{
  "name": "John Doe",
 },
})

Example

See the example/main.go for a complete flight booking workflow that demonstrates:

  • Data extraction with LLM
  • User input collection with validation
  • Conditional routing
  • State persistence and resumption

Installation

go get github.com/aki-kong/flodk

License

Licensed under the Apache License, Version 2.0. See the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetNodeID

func GetNodeID(ctx context.Context) (string, bool)

GetNodeID is used to retrieve the current node id (node name) from the context.

func Interrupt

func Interrupt(
	ctx context.Context,
	message string,
	reason string,
	values Requirements,
) (map[string]string, error)

Interrupt is a helper function which calls InterruptWithValidation with a no validation.

func InterruptWithValidation

func InterruptWithValidation(
	ctx context.Context,
	message string,
	reason string,
	values Requirements,
	fn func(map[string]string) error,
) (map[string]string, error)

InterruptWithValidation is used to test and return if the execution context contains any resolved interrupts or return a newly created interrupt. This also validates the resolved values passed for the interrupt in the context.

Usage: When a node calls this function for the first time (read: no HITL is present for a graph nodeID) a new interrupt is created and returned as a error. When the smae node calls this function again (read: a resolved HITL interrupt is possibly stored in the context) the previously created HITL interrupt is found with resolved answer from the user.

Validation: When the resolved interrupt is found, the values sumbitted by the user is passed through the validation function. Any error returned will be bubbled up as a HITL Interrupt with a validation error attached to it.

func LoadInterrupt

func LoadInterrupt(ctx context.Context, interrupt HITLInterrupt, values map[string]string) context.Context

LoadInterrupt is used to load the context with a resolved interrupt (original HITLInterrupt and answer values).

func LoadNodeID

func LoadNodeID(ctx context.Context, nodeID string) context.Context

LoadNodeID is used to store the current graph node id (node name) into the passed context.

Types

type CheckpointState

type CheckpointState struct {
	// CheckpointID is the name of the graph node which will be picked up next when
	// the flow is executed.
	CheckpointID string `json:"checkpoint_id"`
	// Visited stores all the visited graph node (node IDs).
	Visited []string `json:"visited"`
	// Interrupt stores the Human in the loop interrupt when any node return a HITLInterrupt error.
	Interrupt HITLInterrupt `json:"interrupt"`
	// InterruptHistory stores all the resolved HITL interrupts.
	InterruptHistory []ResolvedHITLInterrupt `json:"interrupt_history"`
}

CheckpointState stores flow execution state which will be used to resume when the execution is interrupted.

type ConditionalEdge

type ConditionalEdge[T any] struct {
	// contains filtered or unexported fields
}

ConditionalEdge is used to redirect to different branches based on the value returned by the ConditionalNode.

func (ConditionalEdge[T]) Resolve

func (ce ConditionalEdge[T]) Resolve(ctx context.Context, state T) string

Resolve implements the EdgeResolver interface for ConditionalEdge.

type ConditionalFunction

type ConditionalFunction[T any] func(ctx context.Context, state T) string

ConditionalFunction is a function type which implements the ConditionalNode interface. This is used when no pre-defined state is necessary for the conditional node.

func (ConditionalFunction[T]) Execute

func (fn ConditionalFunction[T]) Execute(ctx context.Context, state T) string

Execute implements the ConditionalNode interface for FunctionNode

type ConditionalNode

type ConditionalNode[T any] interface {
	Execute(ctx context.Context, state T) string
}

ConditionalNode is a node type which inspects the state to decide the next Node.

type ConitionalInterrupt

type ConitionalInterrupt struct {
	Value string
}

ConditionalInterrupt is used to direct the execution of a flow using a alias value. This value will then be used to choose the next edge of the graph.

func (ConitionalInterrupt) Error

func (ci ConitionalInterrupt) Error() string

Error implements the error interface for the conditional interrupt.

type ConstEdge

type ConstEdge[T any] string

ConstEdge is simple implementation of the EdgeResolver which returns a constant next node id no matter what the current the state.

func (ConstEdge[T]) Resolve

func (c ConstEdge[T]) Resolve(ctx context.Context, state T) string

Resolve implements the EdgeResolver which is used to resolve to a constant node regardless of the current state.

type EdgeResolver

type EdgeResolver[T any] interface {
	Edger
	Resolve(ctx context.Context, state T) string
}

EdgeResolver interface defines the edge routing configuration which returns the next node id for the passed state.

type Edger

type Edger interface {
	// contains filtered or unexported methods
}

Edger returns the static list of possible target node names for an edge.

type ErrRequirementInvalidValue

type ErrRequirementInvalidValue struct {
	Key         string
	Value       string
	Suggestions string
}

func RequirementInvalid

func RequirementInvalid(key string, value string, suggestions []string) ErrRequirementInvalidValue

func (ErrRequirementInvalidValue) Error

type ErrRequirmentKeyNotFound

type ErrRequirmentKeyNotFound string

func RequirementKeyNotFound

func RequirementKeyNotFound(key string) ErrRequirmentKeyNotFound

func (ErrRequirmentKeyNotFound) Error

func (k ErrRequirmentKeyNotFound) Error() string

type ExecutionID

type ExecutionID struct {
	ID       string `json:"id"`
	FlowName string `json:"flow_name"`
}

ExecutionID is a compound ID of a unique ID passed by the modules callers and name of the flow that is being executed.

type ExecutionState

type ExecutionState[T any] struct {
	CheckpointState  CheckpointState `json:"checkpoint_state"`
	ApplicationState T               `json:"application_state"`
}

ExecutionState stores the execution state CheckpointState and app state between executions.

type Flow

type Flow[T any] struct {
	// contains filtered or unexported fields
}

Flow is a construct used start or resume execution of a graph with the passed initial app and checkpoint state.

func NewFlow

func NewFlow[T any](
	name string,
	graph Graph[T],
) *Flow[T]

NewFlow create a new flow construct for the passed graph and name.

func (*Flow[T]) Execute

func (f *Flow[T]) Execute(ctx context.Context, state T) (T, error)

Execute executes the graph with provided initial state and resumes based on the passed checkpoint state configuration.

func (*Flow[T]) Name

func (f *Flow[T]) Name() string

Name returns the name of the flow.

func (*Flow[T]) OnGraphEnd

func (f *Flow[T]) OnGraphEnd(cb FlowCallback[T]) *Flow[T]

OnGraphEnd sets the callback function to be called after the graph execution is completed.

func (*Flow[T]) OnInterrupt

func (f *Flow[T]) OnInterrupt(cb FlowCallback[T]) *Flow[T]

OnInterrupt sets the callback function to be called when the flow is interrupted.

func (*Flow[T]) OnNodeExec

func (f *Flow[T]) OnNodeExec(cb FlowCallback[T]) *Flow[T]

OnNodeExec sets the callback function to be called after a node is executed.

func (*Flow[T]) WithCheckpoint

func (f *Flow[T]) WithCheckpoint(cp CheckpointState) *Flow[T]

WithCheckpoint is used to set the checkpoint state for this flow execution.

type FlowCallback

type FlowCallback[T any] func(cs CheckpointState, runState T) error

FlowCallback is a helper type which will be called during flow execution during different steps.

func (FlowCallback[T]) Call

func (fc FlowCallback[T]) Call(cs CheckpointState, runState T) error

Call is a helper method which checks if the flow function passed is not nil and calls the same.

type FunctionNode

type FunctionNode[T any] func(ctx context.Context, state T) (T, error)

FunctionNode is a function type which implements the Node interface. This is useful when the [Node]s don't require any preloaded state.

func (FunctionNode[T]) Execute

func (fn FunctionNode[T]) Execute(ctx context.Context, state T) (T, error)

Execute implements the Node interface for FunctionNode

type Graph

type Graph[T any] struct {
	// contains filtered or unexported fields
}

Graph stores the graph nodes and edge configuration.

type GraphBuilder

type GraphBuilder[T any] struct {
	// contains filtered or unexported fields
}

GraphBuilder is a helper type which contains methods to build a graph.

func NewGraphBuilder

func NewGraphBuilder[T any]() *GraphBuilder[T]

NewGraphBuilder returns a new GraphBuilder. Chain this return with other methods to GraphBuilder.Build a graph.

func (*GraphBuilder[T]) AddConditionalEdge

func (gb *GraphBuilder[T]) AddConditionalEdge(start string, end ConditionalNode[T], redirections map[string]string) *GraphBuilder[T]

AddEdge adds a single edge relation with a conditional redirection.

func (*GraphBuilder[T]) AddEdge

func (gb *GraphBuilder[T]) AddEdge(start, end string) *GraphBuilder[T]

AddEdge adds a single edge relation.

func (*GraphBuilder[T]) AddNode

func (gb *GraphBuilder[T]) AddNode(name string, node Node[T]) *GraphBuilder[T]

AddNode adds a node to the graph.

func (*GraphBuilder[T]) AddNodes

func (gb *GraphBuilder[T]) AddNodes(nodes map[string]Node[T]) *GraphBuilder[T]

AddNodes adds multiple named nodes to the graph.

func (*GraphBuilder[T]) Build

func (gb *GraphBuilder[T]) Build() (Graph[T], error)

Build checks for the validity of the graph and returns the graph.

func (*GraphBuilder[T]) SetStartNode

func (gb *GraphBuilder[T]) SetStartNode(start string) *GraphBuilder[T]

SetStartNode sets the start node of the graph.

type HITLInterrupt

type HITLInterrupt struct {
	Reason          string       `json:"reason"`
	Message         string       `json:"message"`
	ValidationError error        `json:"validation_error"`
	Requirements    Requirements `json:"requirements"`
	InterruptID     InterruptID  `json:"interrupt_id"`
}

HITLInterrupt is used to return a invoke a human in the loop routine as a part of the flow.

func (HITLInterrupt) Error

func (it HITLInterrupt) Error() string

Error implements the error interface for the task interrupt.

type InMemoryStore

type InMemoryStore[T any] struct {
	// contains filtered or unexported fields
}

InMemoryStore implements the Store interface to store the checkpointing data in a in-memory map.

func NewInMemoryStore

func NewInMemoryStore[T any]() *InMemoryStore[T]

NewInMemoryStore create a new InMemoryStore.

func (*InMemoryStore[T]) Get

func (s *InMemoryStore[T]) Get(ctx context.Context, id ExecutionID) (ExecutionState[T], error)

Get implements the [Store.Get] method of the Store interface.

func (*InMemoryStore[T]) Set

func (s *InMemoryStore[T]) Set(ctx context.Context, id ExecutionID, state ExecutionState[T]) error

Set implements the [Store.Set] method of the Store interface.

type InterruptID

type InterruptID struct {
	NodeID string `json:"node_id"`
	ID     string `json:"id"`
}

InterruptID is used to identify the interrupt against the Node which threw the interrupt.

func (InterruptID) String

func (i InterruptID) String() string

String returns the string representation of the interrupt.

type Node

type Node[T any] interface {
	// Execute runs the node logic with the passed in state.
	Execute(ctx context.Context, state T) (T, error)
}

Node represents any node of the execution graph.

func Noop

func Noop[T any]() Node[T]

Noop returns a Node which does nothing.

type Pipe

type Pipe[T any] struct {
	// contains filtered or unexported fields
}

Pipe is a graph execution supervisor which loads the necessary data from the checkpoint store and validates any HITL responses during resumption, after which it executes the flow with the right context.

func NewPipe

func NewPipe[T any](
	name string,
	graph Graph[T],
	store Store[T],
) *Pipe[T]

NewPipe creates a new Pipe state for the passed flow name, graph and store implementation.

func (*Pipe[T]) Continue

func (p *Pipe[T]) Continue(
	ctx context.Context,
	id string,
	rc ResumeConfig,
) (T, error)

Continue is used to continue the flow execution right after interrupt. This method fetches the execution state for this flow (flow name) and the provided ID, validates the interrupt values provided against the original interrupt requirements.

func (*Pipe[T]) Invoke

func (p *Pipe[T]) Invoke(
	ctx context.Context,
	id string,
	initState T,
) (T, error)

Invoke is used to start a flow execution for a given unique identifier with the passed initial state.

type Requirement

type Requirement struct {
	// Defines the type of requirmenet.
	Type RequirementTypes `json:"type"`
	// Suggestions for the values of the requirement.
	Suggestions []string `json:"suggestions"`
}

Requirement defines the constaints for the interrupt requirements.

type RequirementTypes

type RequirementTypes string

RequirementTypes defines the type of the requirement.

const (
	// Enum requirements can only choose values from the provided suggestions.
	Enum RequirementTypes = "enum"
	// Custom requirements can input any text as the interrupt value.
	Custom RequirementTypes = "custom"
	// CustomWithSuggestions requirements can input any with a given suggestions as hints.
	CustomWithSuggestions RequirementTypes = "custom_with_suggestions"
)

type Requirements

type Requirements map[string]Requirement

Requirements is hash map of all the requirements which needs input from the user.

func (Requirements) Validate

func (r Requirements) Validate(values map[string]string) error

Validate method does a basic validation on the provided values against the constrainsts defined in the Requirements.

type ResolvedHITLInterrupt

type ResolvedHITLInterrupt struct {
	HITLInterrupt
	Values map[string]string
}

ResolvedHITLInterrupt contains the original HITL interrupt and the answer values submitted by the user.

type ResumeConfig

type ResumeConfig struct {
	// InterruptValues stores answer values provided during the HITL interration
	// for the HITL Interrupt in the previous execution step.
	InterruptValues map[string]string
}

ResumeConfig defines the values required for resuming the flow execution.

type Store

type Store[T any] interface {
	Get(ctx context.Context, id ExecutionID) (ExecutionState[T], error)
	Set(ctx context.Context, id ExecutionID, state ExecutionState[T]) error
}

Store interface defines all the necessary functions used to store the execution and application state.

Directories

Path Synopsis
llm

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL