pocket

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2025 License: MIT Imports: 6 Imported by: 0

README

Pocket

Go Reference Go Report Card CI Status codecov Go Version Release Made with Go

A minimalist Go framework for building LLM workflows with composable nodes and built-in concurrency patterns. Inspired by PocketFlow, Pocket embraces Go idioms with small interfaces, functional options, and zero dependencies.

Philosophy

  • Small interfaces: Single-purpose interfaces that compose naturally
  • Idiomatic Go: Follows Go best practices and patterns
  • Zero dependencies: Core has no external dependencies
  • Built-in concurrency: First-class support for parallel execution
  • Type-safe: Leverages generics for compile-time safety
  • Functional options: Clean, extensible configuration

Requirements

  • Go 1.21 or higher
  • No external dependencies for core functionality

Installation

go get github.com/agentstation/pocket

To verify the installation:

go doc github.com/agentstation/pocket

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    
    "github.com/agentstation/pocket"
)

func main() {
    // Create a simple processor using a function
    greet := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
        name := input.(string)
        return fmt.Sprintf("Hello, %s!", name), nil
    })
    
    // Create a node
    node := pocket.NewNode("greeter", greet)
    
    // Create and run a flow
    store := pocket.NewStore()
    flow := pocket.NewFlow(node, store)
    
    result, err := flow.Run(context.Background(), "World")
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Println(result) // "Hello, World!"
}

Core Concepts

Small, Composable Interfaces
// Process data
type Processor interface {
    Process(ctx context.Context, input any) (output any, err error)
}

// Manage state
type Stateful interface {
    LoadState(ctx context.Context, store Store) (state any, err error)
    SaveState(ctx context.Context, store Store, state any) error
}

// Route to next node
type Router interface {
    Route(ctx context.Context, result any) (next string, err error)
}
Nodes

Nodes combine processing, state management, and routing:

// Create from a function
processor := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
    // Process input
    return result, nil
})

// Create a node with options
node := pocket.NewNode("myNode", processor,
    pocket.WithRetry(3, time.Second),
    pocket.WithTimeout(30*time.Second),
)

// Connect nodes
node.Connect("success", successNode)
node.Connect("error", errorNode)
node.Default(defaultNode)
Flows

Flows orchestrate node execution:

// Simple flow
flow := pocket.NewFlow(startNode, store)
result, err := flow.Run(ctx, input)

// Using the builder
flow, err := pocket.NewBuilder(store).
    Add(nodeA).
    Add(nodeB).
    Connect("nodeA", "success", "nodeB").
    Start("nodeA").
    Build()

Concurrency Patterns

Built-in Patterns

Pocket provides idiomatic Go concurrency patterns:

// Run nodes concurrently
results, err := pocket.RunConcurrent(ctx, nodes, store)

// Pipeline - output feeds next input
result, err := pocket.Pipeline(ctx, nodes, store, input)

// Fan-out - process items in parallel
results, err := pocket.FanOut(ctx, processor, store, items)

// Fan-in - aggregate from multiple sources
fanIn := pocket.NewFanIn(aggregator, source1, source2, source3)
result, err := fanIn.Run(ctx, store)
Batch Processing

Type-safe batch operations with generics:

import "github.com/agentstation/pocket/batch"

// Map-reduce pattern
processor := batch.MapReduce(
    extractItems,    // func(ctx, store) ([]T, error)
    transformItem,   // func(ctx, T) (R, error)  
    aggregateResults,// func(ctx, []R) (any, error)
    batch.WithConcurrency(10),
)

// Process each item
batch.ForEach(extractItems, processItem,
    batch.WithConcurrency(5),
)

// Filter items
filtered := batch.Filter(extractItems, predicate)

Design Patterns

Agent with Think-Act Loop
// Think node decides actions
think := pocket.NewNode("think", &ThinkAgent{})
think.Router = &ThinkAgent{} // Implements Router interface

// Action nodes execute and loop back
research := pocket.NewNode("research", &ResearchAction{})
research.Router = pocket.RouterFunc(func(ctx, result) (string, error) {
    return "think", nil // Loop back
})

// Connect the loop
think.Connect("research", research)
think.Connect("draft", draft)
think.Connect("complete", complete)
Conditional Routing
router := pocket.NewNode("router", processor)
router.Router = pocket.RouterFunc(func(ctx context.Context, result any) (string, error) {
    value := result.(int)
    switch {
    case value > 100:
        return "large", nil
    case value < 0:
        return "negative", nil
    default:
        return "normal", nil
    }
})

Type Safety

Generic Store Operations
// Type-safe store wrapper
userStore := pocket.NewTypedStore[User](store)

// Compile-time type checking
user := User{ID: "123", Name: "Alice"}
err := userStore.Set(ctx, "user:123", user)

retrieved, exists, err := userStore.Get(ctx, "user:123")
// retrieved is typed as User, not any
Scoped Stores
// Isolated key namespaces
userScope := pocket.NewScopedStore(store, "user")
adminScope := pocket.NewScopedStore(store, "admin")

// Keys are automatically prefixed
userScope.Set("name", "Alice")  // Stored as "user:name"
adminScope.Set("name", "Bob")   // Stored as "admin:name"

Configuration

Functional Options
node := pocket.NewNode("processor", processor,
    pocket.WithRetry(3, time.Second),
    pocket.WithTimeout(30*time.Second),
    pocket.WithErrorHandler(func(err error) {
        log.Printf("Node error: %v", err)
    }),
)

flow := pocket.NewFlow(start, store,
    pocket.WithLogger(logger),
    pocket.WithTracer(tracer),
)

Examples

Testing

# Run all tests
go test ./...

# Run with race detector
go test -race ./...

# Run benchmarks
go test -bench=. ./...

# Coverage report
go test -cover ./...

Project Structure

pocket/
├── pocket.go          # Main API - interfaces and node implementation
├── flow.go           # Flow orchestration and concurrency patterns
├── store.go          # Store implementations and type-safe wrappers
├── doc.go            # Package documentation
├── batch/            # Generic batch processing
├── internal/         # Internal implementation details
└── examples/         # Example applications

Philosophy Comparison

Aspect Traditional Approach Pocket Approach
Interfaces Large, multi-method Small, focused
Concurrency External libraries Built-in patterns
Configuration Struct fields Functional options
Type Safety Interface{} everywhere Generics where useful
Dependencies Many external Zero in core

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing)
  3. Write tests for your changes
  4. Ensure all tests pass with race detector
  5. Commit your changes (git commit -m 'Add amazing feature')
  6. Push to the branch (git push origin feature/amazing)
  7. Open a Pull Request

Performance

Pocket is designed for efficiency:

  • Zero allocations in hot paths
  • Minimal overhead for node execution
  • Efficient concurrent patterns using sync.Pool where appropriate
  • Benchmarks included for critical paths
# Run benchmarks
go test -bench=. -benchmem ./...

Stability

While Pocket is a young project, we follow semantic versioning and strive for:

  • Stable interfaces (no breaking changes without major version bump)
  • Comprehensive test coverage
  • Race condition free (tested with -race)
  • Production-ready error handling

License

MIT License - see LICENSE file for details.

Acknowledgments

  • Inspired by PocketFlow's minimalist philosophy
  • Built with Go's idioms and best practices in mind
  • Designed for the modern LLM application stack

Status

  • ✅ Core functionality complete
  • ✅ Full test coverage
  • ✅ Examples and documentation
  • ✅ CI/CD pipeline
  • 🚧 Community feedback incorporation
  • 🚧 Performance optimizations
  • 🚧 Additional patterns and helpers

Documentation

Overview

Package pocket provides a minimalist framework for building LLM workflows using composable nodes in a directed graph structure.

Key features:

  • Small, composable interfaces
  • Type-safe operations with generics
  • Built-in concurrency patterns
  • Functional options for configuration
  • Zero external dependencies in core

Basic usage:

// Create a simple processor
greet := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
	name := input.(string)
	return fmt.Sprintf("Hello, %s!", name), nil
})

// Create a node
node := pocket.NewNode("greeter", greet)

// Create and run a flow
store := pocket.NewStore()
flow := pocket.NewFlow(node, store)
result, err := flow.Run(context.Background(), "World")

Building complex flows:

// Use the builder API
builder := pocket.NewBuilder(store).
	Add(pocket.NewNode("fetch", fetchData)).
	Add(pocket.NewNode("process", processData)).
	Add(pocket.NewNode("save", saveData)).
	Connect("fetch", "success", "process").
	Connect("process", "success", "save").
	Start("fetch")

flow, err := builder.Build()

Concurrent patterns:

// Fan-out processing
results, err := pocket.FanOut(ctx, processNode, store, items)

// Pipeline
result, err := pocket.Pipeline(ctx, nodes, store, input)

// Concurrent execution
results, err := pocket.RunConcurrent(ctx, nodes, store)

Type-safe operations:

// Create a typed store
userStore := pocket.NewTypedStore[User](store)

// Type-safe get/set
user, exists, err := userStore.Get(ctx, "user:123")
err = userStore.Set(ctx, "user:123", newUser)

Package pocket provides a minimalist framework for building LLM workflows using composable nodes in a directed graph structure.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoStartNode is returned when a flow has no start node defined.
	ErrNoStartNode = errors.New("pocket: no start node defined")

	// ErrNodeNotFound is returned when a referenced node doesn't exist.
	ErrNodeNotFound = errors.New("pocket: node not found")

	// ErrInvalidInput is returned when input type doesn't match expected type.
	ErrInvalidInput = errors.New("pocket: invalid input type")
)

Common errors

Functions

func FanOut

func FanOut[T any](ctx context.Context, node *Node, store Store, items []T) ([]any, error)

FanOut executes a node for each input item concurrently.

Example

ExampleFanOut demonstrates parallel processing of items.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/agentstation/pocket"
)

func main() {
	// Create a processor that simulates work
	processor := pocket.NewNode("process",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			num := input.(int)
			return num * num, nil
		}),
	)

	store := pocket.NewStore()
	items := []int{1, 2, 3, 4, 5}

	// Process items concurrently
	results, err := pocket.FanOut(context.Background(), processor, store, items)
	if err != nil {
		log.Fatal(err)
	}

	// Results maintain order
	for i, result := range results {
		fmt.Printf("%d -> %v\n", items[i], result)
	}
}
Output:
1 -> 1
2 -> 4
3 -> 9
4 -> 16
5 -> 25

func Pipeline

func Pipeline(ctx context.Context, nodes []*Node, store Store, input any) (any, error)

Pipeline executes nodes sequentially, passing output to input.

Example

ExamplePipeline demonstrates sequential processing.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/agentstation/pocket"
)

func main() {
	store := pocket.NewStore()

	// Create a pipeline of transformations
	double := pocket.NewNode("double",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			return input.(int) * 2, nil
		}),
	)

	addTen := pocket.NewNode("addTen",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			return input.(int) + 10, nil
		}),
	)

	toString := pocket.NewNode("toString",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			return fmt.Sprintf("Result: %d", input.(int)), nil
		}),
	)

	nodes := []*pocket.Node{double, addTen, toString}

	result, err := pocket.Pipeline(context.Background(), nodes, store, 5)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(result)
}
Output:
Result: 20

func RunConcurrent

func RunConcurrent(ctx context.Context, nodes []*Node, store Store) ([]any, error)

RunConcurrent executes multiple nodes concurrently.

Types

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder provides a fluent API for constructing flows.

Example

ExampleBuilder demonstrates the fluent builder API.

package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	"github.com/agentstation/pocket"
)

func main() {
	store := pocket.NewStore()

	// Define processors
	validate := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
		email := input.(string)
		if !strings.Contains(email, "@") {
			return nil, fmt.Errorf("invalid email")
		}
		return email, nil
	})

	normalize := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
		email := input.(string)
		return strings.ToLower(strings.TrimSpace(email)), nil
	})

	// Build the flow
	flow, err := pocket.NewBuilder(store).
		Add(pocket.NewNode("validate", validate)).
		Add(pocket.NewNode("normalize", normalize)).
		Connect("validate", "default", "normalize").
		Start("validate").
		Build()

	if err != nil {
		log.Fatal(err)
	}

	result, err := flow.Run(context.Background(), "  USER@EXAMPLE.COM  ")
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(result)
}
Output:
user@example.com

func NewBuilder

func NewBuilder(store Store) *Builder

NewBuilder creates a new flow builder.

func (*Builder) Add

func (b *Builder) Add(node *Node) *Builder

Add registers a node in the flow.

func (*Builder) Build

func (b *Builder) Build() (*Flow, error)

Build creates the flow.

func (*Builder) Connect

func (b *Builder) Connect(from, action, to string) *Builder

Connect creates a connection between nodes.

func (*Builder) Start

func (b *Builder) Start(name string) *Builder

Start sets the starting node.

func (*Builder) WithOptions

func (b *Builder) WithOptions(opts ...FlowOption) *Builder

WithOptions adds flow options.

type FanIn

type FanIn struct {
	// contains filtered or unexported fields
}

FanIn collects results from multiple sources.

func NewFanIn

func NewFanIn(combine func([]any) (any, error), sources ...*Node) *FanIn

NewFanIn creates a fan-in pattern.

func (*FanIn) Run

func (f *FanIn) Run(ctx context.Context, store Store) (any, error)

Run executes the fan-in pattern.

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow orchestrates the execution of connected nodes.

func NewFlow

func NewFlow(start *Node, store Store, opts ...FlowOption) *Flow

NewFlow creates a new flow starting from the given node.

func (*Flow) Run

func (f *Flow) Run(ctx context.Context, input any) (output any, err error)

Run executes the flow with the given input.

type FlowOption

type FlowOption func(*flowOptions)

FlowOption configures a Flow.

func WithLogger

func WithLogger(logger Logger) FlowOption

WithLogger adds logging to the flow.

func WithTracer

func WithTracer(tracer Tracer) FlowOption

WithTracer adds distributed tracing.

type Logger

type Logger interface {
	Debug(ctx context.Context, msg string, keysAndValues ...any)
	Info(ctx context.Context, msg string, keysAndValues ...any)
	Error(ctx context.Context, msg string, keysAndValues ...any)
}

Logger provides structured logging.

type Node

type Node struct {
	// Name identifies the node in the flow.
	Name string

	// Processor handles the main logic.
	Processor

	// Optional state management.
	Stateful

	// Optional routing logic.
	Router
	// contains filtered or unexported fields
}

Node represents a processing unit in a workflow. It combines processing, state management, and routing.

Example (Routing)

ExampleNode_routing demonstrates conditional routing between nodes.

package main

import (
	"context"
	"fmt"

	"github.com/agentstation/pocket"
)

func main() {
	store := pocket.NewStore()

	// Router node that checks input
	router := pocket.NewNode("router",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			return input, nil
		}),
	)

	// Set up routing logic
	router.Router = pocket.RouterFunc(func(ctx context.Context, result any) (string, error) {
		value := result.(int)
		if value > 100 {
			return "large", nil
		}
		return "small", nil
	})

	// Handler nodes
	largeHandler := pocket.NewNode("large",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			return fmt.Sprintf("Large number: %v", input), nil
		}),
	)

	smallHandler := pocket.NewNode("small",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			return fmt.Sprintf("Small number: %v", input), nil
		}),
	)

	// Connect nodes
	router.Connect("large", largeHandler)
	router.Connect("small", smallHandler)

	// Run with different inputs
	flow := pocket.NewFlow(router, store)

	result1, _ := flow.Run(context.Background(), 50)
	result2, _ := flow.Run(context.Background(), 150)

	fmt.Println(result1)
	fmt.Println(result2)
}
Output:
Small number: 50
Large number: 150

func NewNode

func NewNode(name string, processor Processor, opts ...Option) *Node

NewNode creates a new node with the given processor.

func (*Node) Connect

func (n *Node) Connect(action string, next *Node) *Node

Connect adds a successor node for the given action.

func (*Node) Default

func (n *Node) Default(next *Node) *Node

Default connects to the default next node.

type Option

type Option func(*nodeOptions)

Option configures a Node.

func WithErrorHandler

func WithErrorHandler(handler func(error)) Option

WithErrorHandler sets a custom error handler.

func WithRetry

func WithRetry(maxRetries int, delay time.Duration) Option

WithRetry configures retry behavior.

Example

ExampleWithRetry demonstrates retry configuration.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/agentstation/pocket"
)

func main() {
	attempts := 0

	// Create a node that fails twice before succeeding
	flaky := pocket.NewNode("flaky",
		pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
			attempts++
			if attempts < 3 {
				return nil, fmt.Errorf("temporary failure %d", attempts)
			}
			return "success", nil
		}),
		pocket.WithRetry(3, 0), // 3 retries, no delay for example
	)

	store := pocket.NewStore()
	flow := pocket.NewFlow(flaky, store)

	result, err := flow.Run(context.Background(), nil)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Result after %d attempts: %v\n", attempts, result)
}
Output:
Result after 3 attempts: success

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout sets execution timeout.

type Processor

type Processor interface {
	// Process executes the node's main logic.
	Process(ctx context.Context, input any) (output any, err error)
}

Processor handles the main execution logic of a node.

type ProcessorFunc

type ProcessorFunc func(ctx context.Context, input any) (any, error)

ProcessorFunc is an adapter to allow ordinary functions to be used as Processors.

Example

ExampleProcessorFunc demonstrates using a simple function as a processor.

package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	"github.com/agentstation/pocket"
)

func main() {
	// Create a processor from a function
	uppercase := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
		text, ok := input.(string)
		if !ok {
			return nil, fmt.Errorf("expected string, got %T", input)
		}
		return strings.ToUpper(text), nil
	})

	// Use it in a node
	node := pocket.NewNode("uppercase", uppercase)
	store := pocket.NewStore()
	flow := pocket.NewFlow(node, store)

	result, err := flow.Run(context.Background(), "hello world")
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(result)
}
Output:
HELLO WORLD

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(ctx context.Context, input any) (any, error)

Process calls f(ctx, input).

type Router

type Router interface {
	// Route returns the name of the next node to execute.
	Route(ctx context.Context, result any) (next string, err error)
}

Router determines the next node based on processing results.

type RouterFunc

type RouterFunc func(ctx context.Context, result any) (string, error)

RouterFunc is an adapter for router functions.

func (RouterFunc) Route

func (f RouterFunc) Route(ctx context.Context, result any) (string, error)

Route calls f(ctx, result).

type ScopedStore

type ScopedStore struct {
	// contains filtered or unexported fields
}

ScopedStore provides isolated storage with a prefix.

func NewScopedStore

func NewScopedStore(store Store, prefix string) *ScopedStore

NewScopedStore creates a store with key prefixing.

func (*ScopedStore) Delete

func (s *ScopedStore) Delete(key string)

func (*ScopedStore) Get

func (s *ScopedStore) Get(key string) (any, bool)

func (*ScopedStore) Set

func (s *ScopedStore) Set(key string, value any)

type Stateful

type Stateful interface {
	// LoadState retrieves state from the store before processing.
	LoadState(ctx context.Context, store Store) (state any, err error)

	// SaveState persists state to the store after processing.
	SaveState(ctx context.Context, store Store, state any) error
}

Stateful manages state interaction with a Store.

type Store

type Store interface {
	// Get retrieves a value by key.
	Get(key string) (value any, exists bool)

	// Set stores a value with the given key.
	Set(key string, value any)

	// Delete removes a key from the store.
	Delete(key string)
}

Store provides thread-safe storage for shared state.

func NewStore

func NewStore() Store

NewStore creates a new thread-safe store.

type SyncStore

type SyncStore struct {
	// contains filtered or unexported fields
}

SyncStore is a thread-safe implementation of Store using sync.Map.

func (*SyncStore) Delete

func (s *SyncStore) Delete(key string)

Delete removes a key from the store.

func (*SyncStore) Get

func (s *SyncStore) Get(key string) (any, bool)

Get retrieves a value by key.

func (*SyncStore) Set

func (s *SyncStore) Set(key string, value any)

Set stores a value with the given key.

type Tracer

type Tracer interface {
	StartSpan(ctx context.Context, name string) (context.Context, func())
}

Tracer provides distributed tracing capabilities.

type TypedStore

type TypedStore[T any] interface {
	Get(ctx context.Context, key string) (T, bool, error)
	Set(ctx context.Context, key string, value T) error
	Delete(ctx context.Context, key string) error
}

TypedStore provides type-safe storage operations.

Example

ExampleTypedStore demonstrates type-safe storage.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/agentstation/pocket"
)

func main() {
	type User struct {
		ID   string
		Name string
	}

	// Create a typed store
	store := pocket.NewStore()
	userStore := pocket.NewTypedStore[User](store)

	ctx := context.Background()

	// Store a user
	user := User{ID: "123", Name: "Alice"}
	err := userStore.Set(ctx, "user:123", user)
	if err != nil {
		log.Fatal(err)
	}

	// Retrieve with type safety
	retrieved, exists, err := userStore.Get(ctx, "user:123")
	if err != nil {
		log.Fatal(err)
	}

	if exists {
		fmt.Printf("Found user: %+v\n", retrieved)
	}
}
Output:
Found user: {ID:123 Name:Alice}

func NewTypedStore

func NewTypedStore[T any](store Store) TypedStore[T]

NewTypedStore creates a type-safe wrapper around a Store.

Directories

Path Synopsis
Package batch provides generic batch processing capabilities for pocket workflows.
Package batch provides generic batch processing capabilities for pocket workflows.
examples
agent command
chat command
parallel command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL