pocketflow

package module
v0.0.0-...-0b6c0f6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 5 Imported by: 0

README

PocketFlowGo

English | 中文

The Go implementation of PocketFlow — a minimalist LLM Agents framework in just ~100 lines of code.

Why PocketFlowGo?

Why Go for LLM Agents?

While Python dominates the AI/ML ecosystem, Go offers compelling advantages for production LLM agent deployments:

Aspect Python Go
Deployment Requires runtime, virtualenv, dependencies Single static binary
Performance Interpreter overhead, GIL limitations Native compilation, true parallelism
Concurrency asyncio complexity, thread limitations Goroutines + channels (simple & powerful)
Memory Higher footprint Efficient memory usage
Type Safety Runtime errors Compile-time guarantees
Ops Integration Separate tooling Native fit for K8s, Docker, cloud infrastructure
When to Choose PocketFlowGo
  • Building production microservices that orchestrate LLM calls
  • Deploying agents in containerized environments (K8s, Docker)
  • Need high concurrency with thousands of simultaneous agent workflows
  • Teams with Go-based infrastructure wanting consistent tech stack
  • Edge deployment where binary size and startup time matter
  • Building CLI tools that leverage LLM capabilities
Design Philosophy

Like the original PocketFlow, we believe:

"The best framework is the one you can fully understand."

PocketFlowGo maintains the same minimalist philosophy:

  • ~100 lines of core code — easy to understand, modify, and extend
  • Zero external dependencies — just the Go standard library
  • No magic — explicit control flow, no hidden behaviors

Installation

go get github.com/lalolv/PocketFlowGo

Quick Start

package main

import (
    "fmt"
    pf "github.com/lalolv/PocketFlowGo"
)

func main() {
    // Create nodes
    node1 := pf.NewFuncNode(func(prepRes any) (any, error) {
        fmt.Println("Node 1 executing")
        return "result1", nil
    })

    node2 := pf.NewFuncNode(func(prepRes any) (any, error) {
        fmt.Println("Node 2 executing")
        return "result2", nil
    })

    // Chain nodes (equivalent to Python's node1 >> node2)
    node1.Then(node2)

    // Create and run flow
    flow := pf.NewFlow(node1)
    shared := pf.Shared{}
    flow.Run(shared)
}

Syntax Comparison: Python vs Go

Python PocketFlow Go PocketFlowGo
node_a >> node_b nodeA.Then(nodeB)
node_a - "action" >> node_b nodeA.On("action").Then(nodeB)
async def exec_async() func(ctx context.Context, prepRes any)
asyncio.gather() goroutines + sync.WaitGroup

Node Types

Type Description
BaseNode Foundation with params/successors management
RetryNode Adds retry logic with configurable attempts and wait
FuncNode Convenient inline function node
BatchNode Processes collections sequentially
AsyncNode Async execution with context support
AsyncBatchNode Async batch processing (sequential)
AsyncParallelBatchNode Async batch processing (parallel with goroutines)

Flow Types

Type Description
Flow Basic node orchestration
BatchFlow Process multiple parameter sets
AsyncFlow Async node orchestration
AsyncBatchFlow Async batch (sequential)
AsyncParallelBatchFlow Async batch (parallel)

Examples

Conditional Routing
validator := pf.NewFuncNodeFull(
    func(shared pf.Shared) (any, error) {
        return shared["value"], nil
    },
    func(prepRes any) (any, error) {
        return prepRes.(int), nil
    },
    func(shared pf.Shared, prepRes, execRes any) (string, error) {
        if execRes.(int) > 100 {
            return "too_high", nil
        }
        return "valid", nil
    },
)

validator.On("valid").Then(successNode)
validator.On("too_high").Then(errorNode)
Retry with Fallback
node := pf.NewRetryNode(
    pf.WithMaxRetries(3),
    pf.WithWait(time.Second),
)
node.ExecFunc = func(prepRes any) (any, error) {
    return fetchData(prepRes)
}
node.ExecFallbackFunc = func(prepRes any, err error) (any, error) {
    return "fallback data", nil
}
Parallel Processing
node := pf.NewAsyncParallelBatchNode()
node.PrepFunc = func(ctx context.Context, shared pf.Shared) (any, error) {
    return []any{"url1", "url2", "url3"}, nil
}
node.ExecFunc = func(ctx context.Context, prepRes any) (any, error) {
    return fetch(ctx, prepRes.(string))
}

flow := pf.NewAsyncFlow(node)
flow.RunAsync(context.Background(), shared)

Run Examples

go run ./examples/basic/
go run ./examples/retry/
go run ./examples/conditional/
go run ./examples/parallel/

License

MIT

Documentation

Overview

Package pocketflow provides a minimalist workflow orchestration framework.

Package pocketflow is a minimalist workflow orchestration framework for Go.

It is a port of the Python PocketFlow library, providing the same core functionality with idiomatic Go patterns.

Core Concepts

Nodes are the building blocks of workflows. Each node has three lifecycle phases:

  • Prep: Prepare data from shared state
  • Exec: Execute the main logic
  • Post: Post-process and determine the next action

Flows orchestrate the execution of connected nodes based on action strings returned from Post methods.

Node Chaining

Instead of Python's >> operator, Go uses method chaining:

// Python: node_a >> node_b >> node_c
nodeA.Then(nodeB).Then(nodeC)

// Python: node_a - "error" >> error_handler
nodeA.On("error").Then(errorHandler)

Basic Usage

// Create nodes
node1 := pocketflow.NewFuncNode(func(prepRes any) (any, error) {
    return "processed", nil
})

node2 := pocketflow.NewFuncNode(func(prepRes any) (any, error) {
    return "done", nil
})

// Chain nodes
node1.Then(node2)

// Create and run flow
flow := pocketflow.NewFlow(node1)
shared := pocketflow.Shared{"data": "input"}
action, err := flow.Run(shared)

Async Support

For concurrent execution, use AsyncNode and AsyncFlow with context:

node := pocketflow.NewAsyncNode()
node.ExecFunc = func(ctx context.Context, prepRes any) (any, error) {
    // async work here
    return result, nil
}

flow := pocketflow.NewAsyncFlow(node)
ctx := context.Background()
action, err := flow.RunAsync(ctx, shared)

Retry Support

Nodes support automatic retry with configurable attempts and wait time:

node := pocketflow.NewRetryNode(
    pocketflow.WithMaxRetries(3),
    pocketflow.WithWait(time.Second),
)

Index

Constants

View Source
const DefaultAction = "default"

Action is the default action name for node transitions

View Source
const Version = "0.1.0"

Version is the current version of PocketFlowGo

Variables

View Source
var (
	// ErrNodeNotFound indicates the requested node was not found
	ErrNodeNotFound = errors.New("node not found")

	// ErrActionNotFound indicates no successor for the given action
	ErrActionNotFound = errors.New("action not found in successors")

	// ErrMaxRetriesExceeded indicates all retry attempts failed
	ErrMaxRetriesExceeded = errors.New("max retries exceeded")

	// ErrNilNode indicates a nil node was provided
	ErrNilNode = errors.New("node cannot be nil")

	// ErrNoStartNode indicates flow has no start node
	ErrNoStartNode = errors.New("flow has no start node")
)

Functions

This section is empty.

Types

type AsyncBatchFlow

type AsyncBatchFlow struct {
	*AsyncFlow
	BatchPrepFunc func(ctx context.Context, shared Shared) ([]Params, error)
}

AsyncBatchFlow processes multiple parameter sets sequentially with async support

func NewAsyncBatchFlow

func NewAsyncBatchFlow(startNode Node) *AsyncBatchFlow

NewAsyncBatchFlow creates a new AsyncBatchFlow

func (*AsyncBatchFlow) RunAsync

func (f *AsyncBatchFlow) RunAsync(ctx context.Context, shared Shared) (string, error)

RunAsync executes the batch flow

type AsyncBatchNode

type AsyncBatchNode struct {
	*AsyncNode
}

AsyncBatchNode processes batches sequentially with async support

func NewAsyncBatchNode

func NewAsyncBatchNode(opts ...NodeOption) *AsyncBatchNode

NewAsyncBatchNode creates a new AsyncBatchNode

type AsyncFlow

type AsyncFlow struct {
	*BaseNode
	StartNode Node

	PrepFunc func(ctx context.Context, shared Shared) (any, error)
	PostFunc func(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
}

AsyncFlow orchestrates async node execution

func NewAsyncFlow

func NewAsyncFlow(startNode Node) *AsyncFlow

NewAsyncFlow creates a new AsyncFlow

func (*AsyncFlow) Exec

func (f *AsyncFlow) Exec(prepRes any) (any, error)

func (*AsyncFlow) GetNextNode

func (f *AsyncFlow) GetNextNode(curr Node, action string) Node

GetNextNode returns the next node based on action

func (*AsyncFlow) Post

func (f *AsyncFlow) Post(shared Shared, prepRes, execRes any) (string, error)

func (*AsyncFlow) PostAsync

func (f *AsyncFlow) PostAsync(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)

PostAsync executes async post-processing

func (*AsyncFlow) Prep

func (f *AsyncFlow) Prep(shared Shared) (any, error)

Sync methods - not supported

func (*AsyncFlow) PrepAsync

func (f *AsyncFlow) PrepAsync(ctx context.Context, shared Shared) (any, error)

PrepAsync executes async preparation

func (*AsyncFlow) Run

func (f *AsyncFlow) Run(shared Shared) (string, error)

func (*AsyncFlow) RunAsync

func (f *AsyncFlow) RunAsync(ctx context.Context, shared Shared) (string, error)

RunAsync executes the flow asynchronously

func (*AsyncFlow) Start

func (f *AsyncFlow) Start(node Node) Node

Start sets the starting node

type AsyncNode

type AsyncNode struct {
	*BaseNode
	MaxRetries int
	Wait       time.Duration
	CurRetry   int

	// Async hooks
	PrepFunc         func(ctx context.Context, shared Shared) (any, error)
	ExecFunc         func(ctx context.Context, prepRes any) (any, error)
	PostFunc         func(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
	ExecFallbackFunc func(ctx context.Context, prepRes any, err error) (any, error)
}

AsyncNode provides asynchronous execution with context support

func NewAsyncNode

func NewAsyncNode(opts ...NodeOption) *AsyncNode

NewAsyncNode creates a new AsyncNode

func (*AsyncNode) Exec

func (n *AsyncNode) Exec(prepRes any) (any, error)

Exec - sync version throws error

func (*AsyncNode) ExecAsync

func (n *AsyncNode) ExecAsync(ctx context.Context, prepRes any) (any, error)

ExecAsync executes async main logic

func (*AsyncNode) ExecFallbackAsync

func (n *AsyncNode) ExecFallbackAsync(ctx context.Context, prepRes any, err error) (any, error)

ExecFallbackAsync handles async execution failures

func (*AsyncNode) Post

func (n *AsyncNode) Post(shared Shared, prepRes, execRes any) (string, error)

Post - sync version throws error

func (*AsyncNode) PostAsync

func (n *AsyncNode) PostAsync(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)

PostAsync executes async post-processing

func (*AsyncNode) Prep

func (n *AsyncNode) Prep(shared Shared) (any, error)

Prep - sync version throws error

func (*AsyncNode) PrepAsync

func (n *AsyncNode) PrepAsync(ctx context.Context, shared Shared) (any, error)

PrepAsync executes async preparation

func (*AsyncNode) Run

func (n *AsyncNode) Run(shared Shared) (string, error)

Run - sync version throws error

func (*AsyncNode) RunAsync

func (n *AsyncNode) RunAsync(ctx context.Context, shared Shared) (string, error)

RunAsync executes the node asynchronously

type AsyncParallelBatchFlow

type AsyncParallelBatchFlow struct {
	*AsyncFlow
	BatchPrepFunc func(ctx context.Context, shared Shared) ([]Params, error)
}

AsyncParallelBatchFlow processes parameter sets concurrently

func NewAsyncParallelBatchFlow

func NewAsyncParallelBatchFlow(startNode Node) *AsyncParallelBatchFlow

NewAsyncParallelBatchFlow creates a new AsyncParallelBatchFlow

func (*AsyncParallelBatchFlow) RunAsync

func (f *AsyncParallelBatchFlow) RunAsync(ctx context.Context, shared Shared) (string, error)

RunAsync executes batch flows in parallel

type AsyncParallelBatchNode

type AsyncParallelBatchNode struct {
	*AsyncNode
}

AsyncParallelBatchNode processes batches concurrently using goroutines

func NewAsyncParallelBatchNode

func NewAsyncParallelBatchNode(opts ...NodeOption) *AsyncParallelBatchNode

NewAsyncParallelBatchNode creates a new AsyncParallelBatchNode

type BaseNode

type BaseNode struct {
	// contains filtered or unexported fields
}

BaseNode provides the foundation for all node types. It implements basic parameter and successor management.

func NewBaseNode

func NewBaseNode() *BaseNode

NewBaseNode creates a new BaseNode instance

func (*BaseNode) Exec

func (n *BaseNode) Exec(prepRes any) (any, error)

Exec is the execution phase - override in implementations

func (*BaseNode) GetParams

func (n *BaseNode) GetParams() Params

GetParams returns the node parameters

func (*BaseNode) GetSuccessors

func (n *BaseNode) GetSuccessors() map[string]Node

GetSuccessors returns the map of successor nodes

func (*BaseNode) Next

func (n *BaseNode) Next(node Node, action string) Node

Next links a successor node for a given action. This is the Go equivalent of Python's next() method and >> operator.

Usage:

nodeA.Next(nodeB, "default")
nodeA.Next(errorNode, "error")

func (*BaseNode) On

func (n *BaseNode) On(action string) *ConditionalTransition

On returns a ConditionalTransition for conditional routing. This is the Go equivalent of Python's - operator for actions.

Usage:

nodeA.On("error").Then(errorHandler)

func (*BaseNode) Post

func (n *BaseNode) Post(shared Shared, prepRes, execRes any) (string, error)

Post is the post-processing phase - override in implementations Returns the action string for determining the next node

func (*BaseNode) Prep

func (n *BaseNode) Prep(shared Shared) (any, error)

Prep is the preparation phase - override in implementations

func (*BaseNode) Run

func (n *BaseNode) Run(shared Shared) (string, error)

Run executes the node (warns if successors exist)

func (*BaseNode) SetParams

func (n *BaseNode) SetParams(params Params)

SetParams sets the node parameters

func (*BaseNode) Then

func (n *BaseNode) Then(node Node) Node

Then links a successor node with the default action. This is syntactic sugar equivalent to Python's >> operator.

Usage:

nodeA.Then(nodeB).Then(nodeC)

type BatchFlow

type BatchFlow struct {
	*Flow
	PrepFunc func(shared Shared) ([]Params, error)
}

BatchFlow processes multiple parameter sets through the flow

func NewBatchFlow

func NewBatchFlow(startNode Node) *BatchFlow

NewBatchFlow creates a new BatchFlow

func (*BatchFlow) Prep

func (f *BatchFlow) Prep(shared Shared) (any, error)

Prep returns the batch parameters

type BatchNode

type BatchNode struct {
	*RetryNode
}

BatchNode processes collections of items

func NewBatchNode

func NewBatchNode(opts ...NodeOption) *BatchNode

NewBatchNode creates a new BatchNode

type ConditionalTransition

type ConditionalTransition struct {
	// contains filtered or unexported fields
}

ConditionalTransition represents a pending conditional link between nodes. It is the Go equivalent of Python's _ConditionalTransition class.

Usage:

nodeA.On("error").Then(errorHandler)

This is equivalent to Python's:

node_a - "error" >> error_handler

func (*ConditionalTransition) Then

func (ct *ConditionalTransition) Then(target Node) Node

Then completes the conditional transition by linking the target node. Returns the target node for further chaining.

func (*ConditionalTransition) ThenFunc

func (ct *ConditionalTransition) ThenFunc(exec func(prepRes any) (any, error)) Node

ThenFunc creates a new FuncNode and links it as the successor. This is a convenience method for inline node creation.

type Flow

type Flow struct {
	*BaseNode
	StartNode Node
}

Flow orchestrates the execution of connected nodes

func NewFlow

func NewFlow(startNode Node) *Flow

NewFlow creates a new Flow instance

func (*Flow) GetNextNode

func (f *Flow) GetNextNode(curr Node, action string) Node

GetNextNode returns the next node based on the action

func (*Flow) Post

func (f *Flow) Post(shared Shared, prepRes, execRes any) (string, error)

Post returns the last action by default

func (*Flow) Run

func (f *Flow) Run(shared Shared) (string, error)

Run executes the flow

func (*Flow) Start

func (f *Flow) Start(node Node) Node

Start sets the starting node and returns it for chaining

type FlowOption

type FlowOption func(*flowConfig)

FlowOption is a functional option for configuring flows

func WithStartNode

func WithStartNode(node Node) FlowOption

WithStartNode sets the starting node for a flow

type FuncNode

type FuncNode struct {
	*RetryNode
}

FuncNode is a convenient node that accepts functions directly

func NewFuncNode

func NewFuncNode(exec func(prepRes any) (any, error), opts ...NodeOption) *FuncNode

NewFuncNode creates a node with an exec function

func NewFuncNodeFull

func NewFuncNodeFull(
	prep func(shared Shared) (any, error),
	exec func(prepRes any) (any, error),
	post func(shared Shared, prepRes, execRes any) (string, error),
	opts ...NodeOption,
) *FuncNode

NewFuncNodeFull creates a node with all lifecycle functions

type Node

type Node interface {
	// Parameter management
	SetParams(params Params)
	GetParams() Params

	// Successor management
	GetSuccessors() map[string]Node
	Next(node Node, action string) Node
	Then(node Node) Node
	On(action string) *ConditionalTransition

	// Lifecycle methods
	Prep(shared Shared) (any, error)
	Exec(prepRes any) (any, error)
	Post(shared Shared, prepRes, execRes any) (string, error)

	// Execution
	Run(shared Shared) (string, error)
	// contains filtered or unexported methods
}

Node is the interface that all node types must implement

type NodeOption

type NodeOption func(*nodeConfig)

NodeOption is a functional option for configuring nodes

func WithMaxRetries

func WithMaxRetries(n int) NodeOption

WithMaxRetries sets the maximum number of retry attempts

func WithWait

func WithWait(d time.Duration) NodeOption

WithWait sets the wait duration between retries

type Params

type Params map[string]any

Params represents node-specific parameters

type RetryNode

type RetryNode struct {
	*BaseNode
	MaxRetries int
	Wait       time.Duration
	CurRetry   int

	// Hooks for customization
	PrepFunc         func(shared Shared) (any, error)
	ExecFunc         func(prepRes any) (any, error)
	PostFunc         func(shared Shared, prepRes, execRes any) (string, error)
	ExecFallbackFunc func(prepRes any, err error) (any, error)
}

RetryNode extends BaseNode with retry logic

func NewRetryNode

func NewRetryNode(opts ...NodeOption) *RetryNode

NewRetryNode creates a new RetryNode with options

func (*RetryNode) Exec

func (n *RetryNode) Exec(prepRes any) (any, error)

Exec executes the main logic

func (*RetryNode) ExecFallback

func (n *RetryNode) ExecFallback(prepRes any, err error) (any, error)

ExecFallback handles execution failures after all retries

func (*RetryNode) Post

func (n *RetryNode) Post(shared Shared, prepRes, execRes any) (string, error)

Post executes the post-processing phase

func (*RetryNode) Prep

func (n *RetryNode) Prep(shared Shared) (any, error)

Prep executes the preparation phase

func (*RetryNode) Run

func (n *RetryNode) Run(shared Shared) (string, error)

Run executes the node

type Shared

type Shared map[string]any

Shared represents shared state passed between nodes during flow execution

Directories

Path Synopsis
examples
basic command
Example: Basic node chaining and flow execution
Example: Basic node chaining and flow execution
conditional command
Example: Conditional routing based on actions
Example: Conditional routing based on actions
parallel command
Example: Parallel batch processing with AsyncParallelBatchNode
Example: Parallel batch processing with AsyncParallelBatchNode
retry command
Example: Retry mechanism with fallback
Example: Retry mechanism with fallback
internal
util
Package util provides internal utility functions for pocketflow.
Package util provides internal utility functions for pocketflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL