Documentation
¶
Overview ¶
Package pocket provides a minimalist framework for building LLM workflows using composable nodes in a directed graph structure.
Key features:
- Small, composable interfaces
- Type-safe operations with generics
- Built-in concurrency patterns
- Functional options for configuration
- Zero external dependencies in core
Basic usage:
// Create a simple processor
greet := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
name := input.(string)
return fmt.Sprintf("Hello, %s!", name), nil
})
// Create a node
node := pocket.NewNode("greeter", greet)
// Create and run a flow
store := pocket.NewStore()
flow := pocket.NewFlow(node, store)
result, err := flow.Run(context.Background(), "World")
Building complex flows:
// Use the builder API
builder := pocket.NewBuilder(store).
Add(pocket.NewNode("fetch", fetchData)).
Add(pocket.NewNode("process", processData)).
Add(pocket.NewNode("save", saveData)).
Connect("fetch", "success", "process").
Connect("process", "success", "save").
Start("fetch")
flow, err := builder.Build()
Concurrent patterns:
// Fan-out processing results, err := pocket.FanOut(ctx, processNode, store, items) // Pipeline result, err := pocket.Pipeline(ctx, nodes, store, input) // Concurrent execution results, err := pocket.RunConcurrent(ctx, nodes, store)
Type-safe operations:
// Create a typed store userStore := pocket.NewTypedStore[User](store) // Type-safe get/set user, exists, err := userStore.Get(ctx, "user:123") err = userStore.Set(ctx, "user:123", newUser)
Package pocket provides a minimalist framework for building LLM workflows using composable nodes in a directed graph structure.
Index ¶
- Variables
- func FanOut[T any](ctx context.Context, node *Node, store Store, items []T) ([]any, error)
- func Pipeline(ctx context.Context, nodes []*Node, store Store, input any) (any, error)
- func RunConcurrent(ctx context.Context, nodes []*Node, store Store) ([]any, error)
- type Builder
- type FanIn
- type Flow
- type FlowOption
- type Logger
- type Node
- type Option
- type Processor
- type ProcessorFunc
- type Router
- type RouterFunc
- type ScopedStore
- type Stateful
- type Store
- type SyncStore
- type Tracer
- type TypedStore
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoStartNode is returned when a flow has no start node defined. ErrNoStartNode = errors.New("pocket: no start node defined") // ErrNodeNotFound is returned when a referenced node doesn't exist. ErrNodeNotFound = errors.New("pocket: node not found") // ErrInvalidInput is returned when input type doesn't match expected type. ErrInvalidInput = errors.New("pocket: invalid input type") )
Common errors
Functions ¶
func FanOut ¶
FanOut executes a node for each input item concurrently.
Example ¶
ExampleFanOut demonstrates parallel processing of items.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
// Create a processor that simulates work
processor := pocket.NewNode("process",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
num := input.(int)
return num * num, nil
}),
)
store := pocket.NewStore()
items := []int{1, 2, 3, 4, 5}
// Process items concurrently
results, err := pocket.FanOut(context.Background(), processor, store, items)
if err != nil {
log.Fatal(err)
}
// Results maintain order
for i, result := range results {
fmt.Printf("%d -> %v\n", items[i], result)
}
}
Output: 1 -> 1 2 -> 4 3 -> 9 4 -> 16 5 -> 25
func Pipeline ¶
Pipeline executes nodes sequentially, passing output to input.
Example ¶
ExamplePipeline demonstrates sequential processing.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
store := pocket.NewStore()
// Create a pipeline of transformations
double := pocket.NewNode("double",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
return input.(int) * 2, nil
}),
)
addTen := pocket.NewNode("addTen",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
return input.(int) + 10, nil
}),
)
toString := pocket.NewNode("toString",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
return fmt.Sprintf("Result: %d", input.(int)), nil
}),
)
nodes := []*pocket.Node{double, addTen, toString}
result, err := pocket.Pipeline(context.Background(), nodes, store, 5)
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
}
Output: Result: 20
Types ¶
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder provides a fluent API for constructing flows.
Example ¶
ExampleBuilder demonstrates the fluent builder API.
package main
import (
"context"
"fmt"
"log"
"strings"
"github.com/agentstation/pocket"
)
func main() {
store := pocket.NewStore()
// Define processors
validate := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
email := input.(string)
if !strings.Contains(email, "@") {
return nil, fmt.Errorf("invalid email")
}
return email, nil
})
normalize := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
email := input.(string)
return strings.ToLower(strings.TrimSpace(email)), nil
})
// Build the flow
flow, err := pocket.NewBuilder(store).
Add(pocket.NewNode("validate", validate)).
Add(pocket.NewNode("normalize", normalize)).
Connect("validate", "default", "normalize").
Start("validate").
Build()
if err != nil {
log.Fatal(err)
}
result, err := flow.Run(context.Background(), " USER@EXAMPLE.COM ")
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
}
Output: user@example.com
func (*Builder) WithOptions ¶
func (b *Builder) WithOptions(opts ...FlowOption) *Builder
WithOptions adds flow options.
type FanIn ¶
type FanIn struct {
// contains filtered or unexported fields
}
FanIn collects results from multiple sources.
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow orchestrates the execution of connected nodes.
type Logger ¶
type Logger interface {
Debug(ctx context.Context, msg string, keysAndValues ...any)
Info(ctx context.Context, msg string, keysAndValues ...any)
Error(ctx context.Context, msg string, keysAndValues ...any)
}
Logger provides structured logging.
type Node ¶
type Node struct {
// Name identifies the node in the flow.
Name string
// Processor handles the main logic.
Processor
// Optional state management.
Stateful
// Optional routing logic.
Router
// contains filtered or unexported fields
}
Node represents a processing unit in a workflow. It combines processing, state management, and routing.
Example (Routing) ¶
ExampleNode_routing demonstrates conditional routing between nodes.
package main
import (
"context"
"fmt"
"github.com/agentstation/pocket"
)
func main() {
store := pocket.NewStore()
// Router node that checks input
router := pocket.NewNode("router",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
return input, nil
}),
)
// Set up routing logic
router.Router = pocket.RouterFunc(func(ctx context.Context, result any) (string, error) {
value := result.(int)
if value > 100 {
return "large", nil
}
return "small", nil
})
// Handler nodes
largeHandler := pocket.NewNode("large",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
return fmt.Sprintf("Large number: %v", input), nil
}),
)
smallHandler := pocket.NewNode("small",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
return fmt.Sprintf("Small number: %v", input), nil
}),
)
// Connect nodes
router.Connect("large", largeHandler)
router.Connect("small", smallHandler)
// Run with different inputs
flow := pocket.NewFlow(router, store)
result1, _ := flow.Run(context.Background(), 50)
result2, _ := flow.Run(context.Background(), 150)
fmt.Println(result1)
fmt.Println(result2)
}
Output: Small number: 50 Large number: 150
type Option ¶
type Option func(*nodeOptions)
Option configures a Node.
func WithErrorHandler ¶
WithErrorHandler sets a custom error handler.
func WithRetry ¶
WithRetry configures retry behavior.
Example ¶
ExampleWithRetry demonstrates retry configuration.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
attempts := 0
// Create a node that fails twice before succeeding
flaky := pocket.NewNode("flaky",
pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
attempts++
if attempts < 3 {
return nil, fmt.Errorf("temporary failure %d", attempts)
}
return "success", nil
}),
pocket.WithRetry(3, 0), // 3 retries, no delay for example
)
store := pocket.NewStore()
flow := pocket.NewFlow(flaky, store)
result, err := flow.Run(context.Background(), nil)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Result after %d attempts: %v\n", attempts, result)
}
Output: Result after 3 attempts: success
func WithTimeout ¶
WithTimeout sets execution timeout.
type Processor ¶
type Processor interface {
// Process executes the node's main logic.
Process(ctx context.Context, input any) (output any, err error)
}
Processor handles the main execution logic of a node.
type ProcessorFunc ¶
ProcessorFunc is an adapter to allow ordinary functions to be used as Processors.
Example ¶
ExampleProcessorFunc demonstrates using a simple function as a processor.
package main
import (
"context"
"fmt"
"log"
"strings"
"github.com/agentstation/pocket"
)
func main() {
// Create a processor from a function
uppercase := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
text, ok := input.(string)
if !ok {
return nil, fmt.Errorf("expected string, got %T", input)
}
return strings.ToUpper(text), nil
})
// Use it in a node
node := pocket.NewNode("uppercase", uppercase)
store := pocket.NewStore()
flow := pocket.NewFlow(node, store)
result, err := flow.Run(context.Background(), "hello world")
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
}
Output: HELLO WORLD
type Router ¶
type Router interface {
// Route returns the name of the next node to execute.
Route(ctx context.Context, result any) (next string, err error)
}
Router determines the next node based on processing results.
type RouterFunc ¶
RouterFunc is an adapter for router functions.
type ScopedStore ¶
type ScopedStore struct {
// contains filtered or unexported fields
}
ScopedStore provides isolated storage with a prefix.
func NewScopedStore ¶
func NewScopedStore(store Store, prefix string) *ScopedStore
NewScopedStore creates a store with key prefixing.
func (*ScopedStore) Delete ¶
func (s *ScopedStore) Delete(key string)
func (*ScopedStore) Set ¶
func (s *ScopedStore) Set(key string, value any)
type Stateful ¶
type Stateful interface {
// LoadState retrieves state from the store before processing.
LoadState(ctx context.Context, store Store) (state any, err error)
// SaveState persists state to the store after processing.
SaveState(ctx context.Context, store Store, state any) error
}
Stateful manages state interaction with a Store.
type Store ¶
type Store interface {
// Get retrieves a value by key.
Get(key string) (value any, exists bool)
// Set stores a value with the given key.
Set(key string, value any)
// Delete removes a key from the store.
Delete(key string)
}
Store provides thread-safe storage for shared state.
type SyncStore ¶
type SyncStore struct {
// contains filtered or unexported fields
}
SyncStore is a thread-safe implementation of Store using sync.Map.
type TypedStore ¶
type TypedStore[T any] interface { Get(ctx context.Context, key string) (T, bool, error) Set(ctx context.Context, key string, value T) error Delete(ctx context.Context, key string) error }
TypedStore provides type-safe storage operations.
Example ¶
ExampleTypedStore demonstrates type-safe storage.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
type User struct {
ID string
Name string
}
// Create a typed store
store := pocket.NewStore()
userStore := pocket.NewTypedStore[User](store)
ctx := context.Background()
// Store a user
user := User{ID: "123", Name: "Alice"}
err := userStore.Set(ctx, "user:123", user)
if err != nil {
log.Fatal(err)
}
// Retrieve with type safety
retrieved, exists, err := userStore.Get(ctx, "user:123")
if err != nil {
log.Fatal(err)
}
if exists {
fmt.Printf("Found user: %+v\n", retrieved)
}
}
Output: Found user: {ID:123 Name:Alice}
func NewTypedStore ¶
func NewTypedStore[T any](store Store) TypedStore[T]
NewTypedStore creates a type-safe wrapper around a Store.