Documentation
¶
Overview ¶
Package pocketflow provides a minimalist workflow orchestration framework.
Package pocketflow is a minimalist workflow orchestration framework for Go.
It is a port of the Python PocketFlow library, providing the same core functionality with idiomatic Go patterns.
Core Concepts ¶
Nodes are the building blocks of workflows. Each node has three lifecycle phases:
- Prep: Prepare data from shared state
- Exec: Execute the main logic
- Post: Post-process and determine the next action
Flows orchestrate the execution of connected nodes based on action strings returned from Post methods.
Node Chaining ¶
Instead of Python's >> operator, Go uses method chaining:
// Python: node_a >> node_b >> node_c
nodeA.Then(nodeB).Then(nodeC)
// Python: node_a - "error" >> error_handler
nodeA.On("error").Then(errorHandler)
Basic Usage ¶
// Create nodes
node1 := pocketflow.NewFuncNode(func(prepRes any) (any, error) {
return "processed", nil
})
node2 := pocketflow.NewFuncNode(func(prepRes any) (any, error) {
return "done", nil
})
// Chain nodes
node1.Then(node2)
// Create and run flow
flow := pocketflow.NewFlow(node1)
shared := pocketflow.Shared{"data": "input"}
action, err := flow.Run(shared)
Async Support ¶
For concurrent execution, use AsyncNode and AsyncFlow with context:
node := pocketflow.NewAsyncNode()
node.ExecFunc = func(ctx context.Context, prepRes any) (any, error) {
// async work here
return result, nil
}
flow := pocketflow.NewAsyncFlow(node)
ctx := context.Background()
action, err := flow.RunAsync(ctx, shared)
Retry Support ¶
Nodes support automatic retry with configurable attempts and wait time:
node := pocketflow.NewRetryNode(
pocketflow.WithMaxRetries(3),
pocketflow.WithWait(time.Second),
)
Index ¶
- Constants
- Variables
- type AsyncBatchFlow
- type AsyncBatchNode
- type AsyncFlow
- func (f *AsyncFlow) Exec(prepRes any) (any, error)
- func (f *AsyncFlow) GetNextNode(curr Node, action string) Node
- func (f *AsyncFlow) Post(shared Shared, prepRes, execRes any) (string, error)
- func (f *AsyncFlow) PostAsync(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
- func (f *AsyncFlow) Prep(shared Shared) (any, error)
- func (f *AsyncFlow) PrepAsync(ctx context.Context, shared Shared) (any, error)
- func (f *AsyncFlow) Run(shared Shared) (string, error)
- func (f *AsyncFlow) RunAsync(ctx context.Context, shared Shared) (string, error)
- func (f *AsyncFlow) Start(node Node) Node
- type AsyncNode
- func (n *AsyncNode) Exec(prepRes any) (any, error)
- func (n *AsyncNode) ExecAsync(ctx context.Context, prepRes any) (any, error)
- func (n *AsyncNode) ExecFallbackAsync(ctx context.Context, prepRes any, err error) (any, error)
- func (n *AsyncNode) Post(shared Shared, prepRes, execRes any) (string, error)
- func (n *AsyncNode) PostAsync(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
- func (n *AsyncNode) Prep(shared Shared) (any, error)
- func (n *AsyncNode) PrepAsync(ctx context.Context, shared Shared) (any, error)
- func (n *AsyncNode) Run(shared Shared) (string, error)
- func (n *AsyncNode) RunAsync(ctx context.Context, shared Shared) (string, error)
- type AsyncParallelBatchFlow
- type AsyncParallelBatchNode
- type BaseNode
- func (n *BaseNode) Exec(prepRes any) (any, error)
- func (n *BaseNode) GetParams() Params
- func (n *BaseNode) GetSuccessors() map[string]Node
- func (n *BaseNode) Next(node Node, action string) Node
- func (n *BaseNode) On(action string) *ConditionalTransition
- func (n *BaseNode) Post(shared Shared, prepRes, execRes any) (string, error)
- func (n *BaseNode) Prep(shared Shared) (any, error)
- func (n *BaseNode) Run(shared Shared) (string, error)
- func (n *BaseNode) SetParams(params Params)
- func (n *BaseNode) Then(node Node) Node
- type BatchFlow
- type BatchNode
- type ConditionalTransition
- type Flow
- type FlowOption
- type FuncNode
- type Node
- type NodeOption
- type Params
- type RetryNode
- func (n *RetryNode) Exec(prepRes any) (any, error)
- func (n *RetryNode) ExecFallback(prepRes any, err error) (any, error)
- func (n *RetryNode) Post(shared Shared, prepRes, execRes any) (string, error)
- func (n *RetryNode) Prep(shared Shared) (any, error)
- func (n *RetryNode) Run(shared Shared) (string, error)
- type Shared
Constants ¶
const DefaultAction = "default"
Action is the default action name for node transitions
const Version = "0.1.0"
Version is the current version of PocketFlowGo
Variables ¶
var ( // ErrNodeNotFound indicates the requested node was not found ErrNodeNotFound = errors.New("node not found") // ErrActionNotFound indicates no successor for the given action ErrActionNotFound = errors.New("action not found in successors") // ErrMaxRetriesExceeded indicates all retry attempts failed ErrMaxRetriesExceeded = errors.New("max retries exceeded") // ErrNilNode indicates a nil node was provided ErrNilNode = errors.New("node cannot be nil") // ErrNoStartNode indicates flow has no start node ErrNoStartNode = errors.New("flow has no start node") )
Functions ¶
This section is empty.
Types ¶
type AsyncBatchFlow ¶
type AsyncBatchFlow struct {
*AsyncFlow
BatchPrepFunc func(ctx context.Context, shared Shared) ([]Params, error)
}
AsyncBatchFlow processes multiple parameter sets sequentially with async support
func NewAsyncBatchFlow ¶
func NewAsyncBatchFlow(startNode Node) *AsyncBatchFlow
NewAsyncBatchFlow creates a new AsyncBatchFlow
type AsyncBatchNode ¶
type AsyncBatchNode struct {
*AsyncNode
}
AsyncBatchNode processes batches sequentially with async support
func NewAsyncBatchNode ¶
func NewAsyncBatchNode(opts ...NodeOption) *AsyncBatchNode
NewAsyncBatchNode creates a new AsyncBatchNode
type AsyncFlow ¶
type AsyncFlow struct {
*BaseNode
StartNode Node
PrepFunc func(ctx context.Context, shared Shared) (any, error)
PostFunc func(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
}
AsyncFlow orchestrates async node execution
func NewAsyncFlow ¶
NewAsyncFlow creates a new AsyncFlow
func (*AsyncFlow) GetNextNode ¶
GetNextNode returns the next node based on action
func (*AsyncFlow) PostAsync ¶
func (f *AsyncFlow) PostAsync(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
PostAsync executes async post-processing
type AsyncNode ¶
type AsyncNode struct {
*BaseNode
MaxRetries int
Wait time.Duration
CurRetry int
// Async hooks
PrepFunc func(ctx context.Context, shared Shared) (any, error)
ExecFunc func(ctx context.Context, prepRes any) (any, error)
PostFunc func(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
ExecFallbackFunc func(ctx context.Context, prepRes any, err error) (any, error)
}
AsyncNode provides asynchronous execution with context support
func NewAsyncNode ¶
func NewAsyncNode(opts ...NodeOption) *AsyncNode
NewAsyncNode creates a new AsyncNode
func (*AsyncNode) ExecFallbackAsync ¶
ExecFallbackAsync handles async execution failures
func (*AsyncNode) PostAsync ¶
func (n *AsyncNode) PostAsync(ctx context.Context, shared Shared, prepRes, execRes any) (string, error)
PostAsync executes async post-processing
type AsyncParallelBatchFlow ¶
type AsyncParallelBatchFlow struct {
*AsyncFlow
BatchPrepFunc func(ctx context.Context, shared Shared) ([]Params, error)
}
AsyncParallelBatchFlow processes parameter sets concurrently
func NewAsyncParallelBatchFlow ¶
func NewAsyncParallelBatchFlow(startNode Node) *AsyncParallelBatchFlow
NewAsyncParallelBatchFlow creates a new AsyncParallelBatchFlow
type AsyncParallelBatchNode ¶
type AsyncParallelBatchNode struct {
*AsyncNode
}
AsyncParallelBatchNode processes batches concurrently using goroutines
func NewAsyncParallelBatchNode ¶
func NewAsyncParallelBatchNode(opts ...NodeOption) *AsyncParallelBatchNode
NewAsyncParallelBatchNode creates a new AsyncParallelBatchNode
type BaseNode ¶
type BaseNode struct {
// contains filtered or unexported fields
}
BaseNode provides the foundation for all node types. It implements basic parameter and successor management.
func (*BaseNode) GetSuccessors ¶
GetSuccessors returns the map of successor nodes
func (*BaseNode) Next ¶
Next links a successor node for a given action. This is the Go equivalent of Python's next() method and >> operator.
Usage:
nodeA.Next(nodeB, "default") nodeA.Next(errorNode, "error")
func (*BaseNode) On ¶
func (n *BaseNode) On(action string) *ConditionalTransition
On returns a ConditionalTransition for conditional routing. This is the Go equivalent of Python's - operator for actions.
Usage:
nodeA.On("error").Then(errorHandler)
func (*BaseNode) Post ¶
Post is the post-processing phase - override in implementations Returns the action string for determining the next node
type BatchFlow ¶
BatchFlow processes multiple parameter sets through the flow
func NewBatchFlow ¶
NewBatchFlow creates a new BatchFlow
type BatchNode ¶
type BatchNode struct {
*RetryNode
}
BatchNode processes collections of items
func NewBatchNode ¶
func NewBatchNode(opts ...NodeOption) *BatchNode
NewBatchNode creates a new BatchNode
type ConditionalTransition ¶
type ConditionalTransition struct {
// contains filtered or unexported fields
}
ConditionalTransition represents a pending conditional link between nodes. It is the Go equivalent of Python's _ConditionalTransition class.
Usage:
nodeA.On("error").Then(errorHandler)
This is equivalent to Python's:
node_a - "error" >> error_handler
func (*ConditionalTransition) Then ¶
func (ct *ConditionalTransition) Then(target Node) Node
Then completes the conditional transition by linking the target node. Returns the target node for further chaining.
type Flow ¶
Flow orchestrates the execution of connected nodes
func (*Flow) GetNextNode ¶
GetNextNode returns the next node based on the action
type FlowOption ¶
type FlowOption func(*flowConfig)
FlowOption is a functional option for configuring flows
func WithStartNode ¶
func WithStartNode(node Node) FlowOption
WithStartNode sets the starting node for a flow
type FuncNode ¶
type FuncNode struct {
*RetryNode
}
FuncNode is a convenient node that accepts functions directly
func NewFuncNode ¶
func NewFuncNode(exec func(prepRes any) (any, error), opts ...NodeOption) *FuncNode
NewFuncNode creates a node with an exec function
type Node ¶
type Node interface {
// Parameter management
SetParams(params Params)
GetParams() Params
// Successor management
GetSuccessors() map[string]Node
Next(node Node, action string) Node
Then(node Node) Node
On(action string) *ConditionalTransition
// Lifecycle methods
Prep(shared Shared) (any, error)
Exec(prepRes any) (any, error)
Post(shared Shared, prepRes, execRes any) (string, error)
// Execution
Run(shared Shared) (string, error)
// contains filtered or unexported methods
}
Node is the interface that all node types must implement
type NodeOption ¶
type NodeOption func(*nodeConfig)
NodeOption is a functional option for configuring nodes
func WithMaxRetries ¶
func WithMaxRetries(n int) NodeOption
WithMaxRetries sets the maximum number of retry attempts
func WithWait ¶
func WithWait(d time.Duration) NodeOption
WithWait sets the wait duration between retries
type RetryNode ¶
type RetryNode struct {
*BaseNode
MaxRetries int
Wait time.Duration
CurRetry int
// Hooks for customization
PrepFunc func(shared Shared) (any, error)
ExecFunc func(prepRes any) (any, error)
PostFunc func(shared Shared, prepRes, execRes any) (string, error)
ExecFallbackFunc func(prepRes any, err error) (any, error)
}
RetryNode extends BaseNode with retry logic
func NewRetryNode ¶
func NewRetryNode(opts ...NodeOption) *RetryNode
NewRetryNode creates a new RetryNode with options
func (*RetryNode) ExecFallback ¶
ExecFallback handles execution failures after all retries
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
basic
command
Example: Basic node chaining and flow execution
|
Example: Basic node chaining and flow execution |
|
conditional
command
Example: Conditional routing based on actions
|
Example: Conditional routing based on actions |
|
parallel
command
Example: Parallel batch processing with AsyncParallelBatchNode
|
Example: Parallel batch processing with AsyncParallelBatchNode |
|
retry
command
Example: Retry mechanism with fallback
|
Example: Retry mechanism with fallback |
|
internal
|
|
|
util
Package util provides internal utility functions for pocketflow.
|
Package util provides internal utility functions for pocketflow. |