flyt

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: MIT Imports: 6 Imported by: 0

README

Flyt Logo

Flyt

Norwegian for "flow" • Pronounced "fleet"

A minimalist workflow framework for Go with zero dependencies inspired by Pocket Flow.

Table of Contents

Installation

go get github.com/mark3labs/flyt

Getting Started

The fastest way to start a new Flyt project is using the official template:

# Create a new project from the template
git clone https://github.com/mark3labs/flyt-project-template my-flyt-project
cd my-flyt-project

# Remove the template git history and start fresh
rm -rf .git
git init

# Install dependencies
go mod tidy

# Run the example
go run main.go

The template provides a starting point for your Flyt project with a basic structure and example code.

Manual Setup
package main

import (
    "context"
    "fmt"
    "github.com/mark3labs/flyt"
)

func main() {
    // Create a simple node using the helper
    node := flyt.NewNode(
        flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
            fmt.Println("Hello, Flyt!")
            return flyt.R("done"), nil
        }),
    )

    // Run it
    ctx := context.Background()
    shared := flyt.NewSharedStore()
    
    action, err := flyt.Run(ctx, node, shared)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Completed with action: %s\n", action)
}
Builder Pattern (New)

Flyt now supports a fluent builder pattern for creating nodes:

node := flyt.NewNode().
    WithMaxRetries(3).
    WithWait(time.Second).
    WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        fmt.Println("Hello from builder pattern!")
        return flyt.R("done"), nil
    })

// NodeBuilder directly implements Node interface, so you can use it as-is
// node := flyt.NewNode().WithExecFunc(fn)

You can mix traditional and builder patterns:

// Start with traditional options
node := flyt.NewNode(
    flyt.WithMaxRetries(3),
    flyt.WithWait(time.Second),
)

// Continue with builder pattern
node = node.
    WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        return flyt.R("processed"), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        shared.Set("result", execResult.Value())
        return flyt.DefaultAction, nil
    })

Core Concepts

Nodes

Nodes are the building blocks. Each node has three phases:

  1. Prep - Read from shared store and prepare data
  2. Exec - Execute main logic (can be retried)
  3. Post - Process results and decide next action
// Simple node with type-safe Result handling
node := flyt.NewNode(
    flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
        // Use type-safe getters to retrieve data
        input := shared.GetString("input")
        return flyt.R(input), nil
    }),
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // Use Result's type-safe accessors
        input := prepResult.AsStringOr("")
        // Process data
        return flyt.R("result"), nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        shared.Set("output", execResult.Value())
        return flyt.DefaultAction, nil
    }),
)

// Working with structured data
type ProcessRequest struct {
    UserID    int      `json:"user_id"`
    Operation string   `json:"operation"`
    Resources []string `json:"resources"`
}

processNode := flyt.NewNode(
    flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
        // Bind structured data from shared store
        var request ProcessRequest
        if err := shared.Bind("request", &request); err != nil {
            return flyt.R(nil), fmt.Errorf("invalid request: %w", err)
        }
        return flyt.R(request), nil
    }),
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // Use Result's Bind method for type-safe access
        var request ProcessRequest
        prepResult.MustBind(&request)  // Or use Bind() with error handling
        
        // Process the structured request
        result := processUserRequest(request.UserID, request.Operation, request.Resources)
        return flyt.R(result), nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        shared.Set("process_result", execResult.Value())
        return flyt.DefaultAction, nil
    }),
)
Actions

Actions are strings returned by a node's Post phase that determine what happens next:

func (n *MyNode) Post(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
    // Convert to Result for type-safe access
    result := flyt.R(execResult)
    if result.AsBoolOr(false) {
        return "success", nil  // Go to node connected with "success"
    }
    return "retry", nil       // Go to node connected with "retry"
}

The default action is flyt.DefaultAction (value: "default"). If no connection exists for an action, the flow ends.

Flows

Connect nodes to create workflows:

// Create nodes
validateNode := createValidateNode()
processNode := createProcessNode()
errorNode := createErrorNode()

// Build flow with action-based routing
flow := flyt.NewFlow(validateNode)
flow.Connect(validateNode, "valid", processNode)    // If validation succeeds
flow.Connect(validateNode, "invalid", errorNode)    // If validation fails
flow.Connect(processNode, "done", nil)              // End flow after processing

// Run flow
err := flow.Run(ctx, shared)
Shared Store

Thread-safe data sharing between nodes with type-safe helpers:

shared := flyt.NewSharedStore()

// Set values
shared.Set("name", "Alice")
shared.Set("count", 42)
shared.Set("price", 19.99)
shared.Set("enabled", true)
shared.Set("items", []string{"apple", "banana"})
shared.Set("config", map[string]any{"timeout": 30})

// Type-safe getters (return zero values if not found or wrong type)
str := shared.GetString("name")           // Returns "Alice"
num := shared.GetInt("count")             // Returns 42
price := shared.GetFloat64("price")       // Returns 19.99
enabled := shared.GetBool("enabled")      // Returns true
items := shared.GetSlice("items")         // Returns []any{"apple", "banana"}
config := shared.GetMap("config")         // Returns map[string]any{"timeout": 30}

// Type-safe getters with custom defaults
str = shared.GetStringOr("missing", "anonymous")     // Returns "anonymous"
num = shared.GetIntOr("missing", -1)                 // Returns -1
price = shared.GetFloat64Or("missing", 99.99)        // Returns 99.99
enabled = shared.GetBoolOr("missing", true)          // Returns true

// Bind complex types (similar to Echo framework)
type User struct {
    ID    int      `json:"id"`
    Name  string   `json:"name"`
    Email string   `json:"email"`
    Tags  []string `json:"tags"`
}

// Store a typed struct - it gets stored as-is
user := User{
    ID:    123,
    Name:  "Alice",
    Email: "alice@example.com",
    Tags:  []string{"admin", "developer"},
}
shared.Set("user", user)

// Later, in a node's Prep function, bind it back to a struct
func (n *MyNode) Prep(ctx context.Context, shared *flyt.SharedStore) (any, error) {
    var user User
    err := shared.Bind("user", &user)  // Binds stored data to struct
    if err != nil {
        return nil, err
    }
    // Or use MustBind (panics on failure - use for required data)
    // shared.MustBind("user", &user)
    
    return user, nil
}

// Utility methods
exists := shared.Has("key")       // Check if key exists
shared.Delete("key")               // Remove a key
keys := shared.Keys()              // Get all keys
length := shared.Len()             // Get number of items
shared.Clear()                     // Remove all items

// Get all data as a map (returns a copy)
allData := shared.GetAll()

// Merge multiple values at once
shared.Merge(map[string]any{
    "user_id": 123,
    "config": map[string]any{"timeout": 30},
})

The type-safe getters handle numeric conversions automatically:

  • GetInt() converts from int8, int16, int32, int64, uint variants, and float types
  • GetFloat64() converts from all numeric types including int and float32
  • GetSlice() uses the same conversion logic as ToSlice() utility

Intermediate Patterns

Creating Nodes with Builder Pattern

The builder pattern provides a fluent interface for creating nodes:

node := flyt.NewNode().
    WithMaxRetries(3).
    WithWait(time.Second).
    WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
        // Read input data
        data := shared.Get("input")
        return flyt.NewResult(data), nil
    }).
    WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // Process data
        result := processData(prepResult.Value())
        return flyt.NewResult(result), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        // Store result
        shared.Set("output", execResult.Value())
        return flyt.DefaultAction, nil
    })
Configuration via Closures

Pass configuration to nodes using closures:

func createAPINode(apiKey string, baseURL string) flyt.Node {
    return flyt.NewNode(
        flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
            // apiKey and baseURL are captured in the closure
            url := fmt.Sprintf("%s/data", baseURL)
            req, _ := http.NewRequest("GET", url, nil)
            req.Header.Set("Authorization", apiKey)
            // ... make request
            return flyt.R(data), nil
        }),
    )
}

// Usage
node := createAPINode("secret-key", "https://api.example.com")
Error Handling & Retries

Add retry logic to handle transient failures:

node := flyt.NewNode(
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // This will be retried up to 3 times
        data, err := callFlakeyAPI()
        return flyt.R(data), err
    }),
    flyt.WithMaxRetries(3),
    flyt.WithWait(time.Second),
    flyt.WithExecFallbackFunc(func(prepResult flyt.Result, err error) (flyt.Result, error) {
        // Called after all retries fail
        return flyt.R("default-value"), nil
    }),
)
Fallback on Failure

Handle failures gracefully by implementing the FallbackNode interface:

type CachedAPINode struct {
    *flyt.BaseNode
    cache map[string]any
}

func (n *CachedAPINode) ExecFallback(prepResult any, err error) (any, error) {
    // Return cached data when API fails
    result := flyt.R(prepResult)
    key := result.MustString()
    if cached, ok := n.cache[key]; ok {
        return cached, nil
    }
    // Return default value if no cache
    return map[string]any{"status": "unavailable"}, nil
}

func (n *CachedAPINode) Exec(ctx context.Context, prepResult any) (any, error) {
    result := flyt.R(prepResult)
    key := result.MustString()
    data, err := callAPI(key)
    if err == nil {
        n.cache[key] = data // Update cache on success
    }
    return data, err
}

The ExecFallback method is called automatically after all retries are exhausted, allowing you to provide degraded functionality, cached results, or default values.

Conditional Branching

Control flow based on results:

decisionNode := flyt.NewNode(
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        value := prepResult.MustInt()
        return flyt.R(value > 100), nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        if execResult.MustBool() {
            return "high", nil
        }
        return "low", nil
    }),
)

flow := flyt.NewFlow(decisionNode)
flow.Connect(decisionNode, "high", highNode)
flow.Connect(decisionNode, "low", lowNode)

Advanced Usage

Custom Node Types

For complex nodes with state, create custom types:

type RateLimitedNode struct {
    *flyt.BaseNode
    limiter *rate.Limiter
}

func NewRateLimitedNode(rps int) *RateLimitedNode {
    return &RateLimitedNode{
        BaseNode: flyt.NewBaseNode(),
        limiter:  rate.NewLimiter(rate.Limit(rps), 1),
    }
}

func (n *RateLimitedNode) Exec(ctx context.Context, prepResult any) (any, error) {
    if err := n.limiter.Wait(ctx); err != nil {
        return nil, err
    }
    // Process with rate limiting
    data, err := process(prepResult)
    return data, err
}
RetryableNode Interface

For custom retry logic, implement the RetryableNode interface:

type CustomRetryNode struct {
    *flyt.BaseNode
    attempts int
}

func (n *CustomRetryNode) GetMaxRetries() int {
    // Dynamic retry count based on state
    if n.attempts > 5 {
        return 0 // Stop retrying after 5 total attempts
    }
    return 3
}

func (n *CustomRetryNode) GetWait() time.Duration {
    // Exponential backoff
    return time.Duration(n.attempts) * time.Second
}

func (n *CustomRetryNode) Exec(ctx context.Context, prepResult any) (any, error) {
    n.attempts++
    data, err := callAPI(prepResult)
    return data, err
}
Batch Processing

Process multiple items concurrently:

// Simple batch node for processing items
processFunc := func(ctx context.Context, item any) (any, error) {
    // Process each item
    return fmt.Sprintf("processed: %v", item), nil
}

batchNode := flyt.NewBatchNode(processFunc, true) // true for concurrent
shared.Set("items", []string{"item1", "item2", "item3"})
Advanced Batch Configuration

For more control over batch processing:

config := &flyt.BatchConfig{
    BatchSize:   10,        // Process 10 items at a time
    Concurrency: 5,         // Use 5 concurrent workers
    ItemsKey:    "data",    // Custom key for input items
    ResultsKey:  "output",  // Custom key for results
    CountKey:    "total",   // Custom key for processed count
}

processFunc := func(ctx context.Context, item any) (any, error) {
    data, err := processItem(item)
    return data, err
}

batchNode := flyt.NewBatchNodeWithConfig(processFunc, true, config)
Batch Error Handling

Batch operations aggregate errors:

action, err := flyt.Run(ctx, batchNode, shared)
if err != nil {
    if batchErr, ok := err.(*flyt.BatchError); ok {
        // Access individual errors
        for i, e := range batchErr.Errors {
            if e != nil {
                fmt.Printf("Item %d failed: %v\n", i, e)
            }
        }
    }
}
Batch Flows

Run the same flow multiple times with different parameters:

// Create a flow factory - returns a new flow instance for each iteration
flowFactory := func() *flyt.Flow {
    validateNode := flyt.NewNode(
        flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
            // Each flow has its own SharedStore with merged FlowInputs
            userID := shared.GetInt("user_id")
            email := shared.GetString("email")
            return flyt.R(map[string]any{"user_id": userID, "email": email}), nil
        }),
        flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
            data := prepResult.MustMap()
            // Process user data
            result := processUser(data)
            return flyt.R(result), nil
        }),
    )
    return flyt.NewFlow(validateNode)
}

// Define input parameters for each flow iteration
// Each FlowInputs map is merged into that flow's isolated SharedStore
batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.FlowInputs, error) {
    // Could fetch from database, API, etc.
    return []flyt.FlowInputs{
        {"user_id": 1, "email": "user1@example.com"},
        {"user_id": 2, "email": "user2@example.com"},
        {"user_id": 3, "email": "user3@example.com"},
    }, nil
}

// Create and run batch flow
batchFlow := flyt.NewBatchFlow(flowFactory, batchFunc, true) // true for concurrent
err := batchFlow.Run(ctx, shared)

// Each flow runs in isolation with its own SharedStore containing the FlowInputs
Nested Flows

Compose flows for complex workflows:

// Sub-flow for data validation
validationFlow := createValidationFlow()

// Main flow
mainFlow := flyt.NewFlow(fetchNode)
mainFlow.Connect(fetchNode, "validate", validationFlow)
mainFlow.Connect(validationFlow, flyt.DefaultAction, processNode)
Flow as Node

Flows implement the Node interface and can be used anywhere a node is expected:

// Create a reusable flow
func createProcessingFlow() *flyt.Flow {
    validateNode := createValidateNode()
    transformNode := createTransformNode()
    
    flow := flyt.NewFlow(validateNode)
    flow.Connect(validateNode, "valid", transformNode)
    return flow
}

// Use the flow as a node in another flow
processingFlow := createProcessingFlow()
mainFlow := flyt.NewFlow(fetchNode)
mainFlow.Connect(fetchNode, flyt.DefaultAction, processingFlow) // Flow used as node
mainFlow.Connect(processingFlow, flyt.DefaultAction, saveNode)
Worker Pool

For custom concurrent task management:

// Create a worker pool with 10 workers
pool := flyt.NewWorkerPool(10)

// Submit tasks
for _, item := range items {
    item := item // Capture loop variable
    pool.Submit(func() {
        // Process item
        result := processItem(item)
        // Store result safely
        mu.Lock()
        results = append(results, result)
        mu.Unlock()
    })
}

// Wait for all tasks to complete
pool.Wait()

// Clean up
pool.Close()
Utility Functions
ToSlice

Convert various types to slices for batch processing:

// Convert different types to []any
items1 := flyt.ToSlice([]string{"a", "b", "c"})
items2 := flyt.ToSlice([]int{1, 2, 3})
items3 := flyt.ToSlice("single item") // Returns []any{"single item"}

// Useful for batch processing with mixed types
shared.Set("items", flyt.ToSlice(data))

Best Practices

  1. Single Responsibility: Each node should do one thing well
  2. Idempotency: Nodes should be idempotent when possible
  3. Error Handling: Always handle errors appropriately
  4. Context Awareness: Respect context cancellation
  5. Concurrency Safety: Don't share node instances across flows

Examples

Check out the cookbook directory for complete, real-world examples:

  • Agent - AI agent with web search capabilities using LLM and search providers
  • Chat - Interactive chat application with conversation history
  • LLM Streaming - Real-time streaming of LLM responses with OpenAI SSE
  • MCP - Model Context Protocol integration with OpenAI function calling
  • Summarize - Text summarization with error handling and retries
  • Tracing - Distributed tracing with Langfuse for observability

License

MIT

Documentation

Overview

Package flyt provides batch processing capabilities for the flyt workflow framework.

The batch package includes utilities for processing collections of items either sequentially or concurrently, with configurable batch sizes and concurrency limits.

Key Features:

  • Batch processing nodes for item collections
  • Batch flow execution with parameter variations
  • Concurrent and sequential processing modes
  • Configurable batch sizes and concurrency limits
  • Error aggregation for batch operations

Example (Batch Processing):

// Process items concurrently
processFunc := func(ctx context.Context, item any) (any, error) {
    // Process each item
    return fmt.Sprintf("processed: %v", item), nil
}

node := flyt.NewBatchNode(processFunc, true) // true for concurrent
shared := flyt.NewSharedStore()
shared.Set(flyt.KeyItems, []string{"item1", "item2", "item3"})

action, err := flyt.Run(ctx, node, shared)
if err != nil {
    log.Fatal(err)
}

results, _ := shared.Get(flyt.KeyResults)
fmt.Println(results) // ["processed: item1", "processed: item2", "processed: item3"]

Example (Batch Flow):

// Run a flow multiple times with different parameters
flowFactory := func() *flyt.Flow {
    // Create your flow here
    return flyt.NewFlow(startNode)
}

batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.FlowInputs, error) {
    return []flyt.FlowInputs{
        {"user_id": 1, "action": "process"},
        {"user_id": 2, "action": "process"},
    }, nil
}

batchFlow := flyt.NewBatchFlow(flowFactory, batchFunc, true)
err := batchFlow.Run(ctx, shared)

Package flyt is a minimalist workflow framework for Go, inspired by Pocket Flow. It provides a simple graph-based abstraction for orchestrating tasks.

Thread Safety: When using concurrent batch operations, ensure that your Node implementations are thread-safe. The framework provides SharedStore for safe concurrent access to shared data.

Example:

// Define a simple node
type PrintNode struct {
    *flyt.BaseNode
}

func (n *PrintNode) Exec(ctx context.Context, prepResult any) (any, error) {
    fmt.Println("Hello from node!")
    return nil, nil
}

// Create and run a flow
node := &PrintNode{BaseNode: flyt.NewBaseNode()}
shared := flyt.NewSharedStore()

ctx := context.Background()
action, err := flyt.Run(ctx, node, shared)
if err != nil {
    log.Fatal(err)
}

Index

Constants

View Source
const (
	// DefaultAction is the default action if none is specified.
	// Flows use this when a node doesn't explicitly return an action.
	DefaultAction Action = "default"

	// KeyItems is the shared store key for items to be processed.
	// Batch nodes look for items under this key by default.
	KeyItems = "items"

	// KeyResults is the shared store key for processing results.
	// Batch nodes store their results under this key by default.
	KeyResults = "results"

	// KeyBatchCount is the shared store key for batch count.
	// Batch flows store the number of iterations under this key.
	KeyBatchCount = "batch_count"
)

Variables

This section is empty.

Functions

func As added in v0.7.0

func As[T any](r Result) (T, bool)

As attempts to retrieve the Result as the specified type T. Returns the typed value and true if successful, or zero value and false if not.

Example:

if user, ok := result.As[*User](); ok {
    // Use typed user
}

func MustAs added in v0.7.0

func MustAs[T any](r Result) T

MustAs retrieves the Result as the specified type T. Panics if the Result is nil or not of type T.

Example:

user := MustAs[*User](result)

func ToSlice

func ToSlice(v any) []any

ToSlice converts various types to []any for batch processing. This utility function handles common slice types and single values, making it easier to work with batch operations.

Supported types:

  • []any: returned as-is
  • []string, []int, []float64: converted to []any
  • []map[string]any: converted to []any
  • Other slice types: converted using reflection
  • Single values: wrapped in a slice
  • nil: returns empty slice

Example:

items1 := flyt.ToSlice([]string{"a", "b", "c"})  // []any{"a", "b", "c"}
items2 := flyt.ToSlice("single")                 // []any{"single"}
items3 := flyt.ToSlice(nil)                      // []any{}

Types

type Action

type Action string

Action represents the next action to take after a node executes. Actions are used to determine flow control in workflows, allowing nodes to direct execution to different paths based on their results.

Example:

const (
    ActionSuccess Action = "success"
    ActionRetry   Action = "retry"
    ActionFail    Action = "fail"
)

func Run

func Run(ctx context.Context, node Node, shared *SharedStore) (Action, error)

Run executes a node with the standard prep->exec->post lifecycle. This is the main entry point for executing individual nodes.

The execution flow is:

  1. Prep: Read and preprocess data from SharedStore
  2. Exec: Execute main logic with automatic retries if configured
  3. Post: Process results and write back to SharedStore

If the node implements RetryableNode, the Exec phase will automatically retry on failure according to the configured settings.

If the node implements FallbackNode and all retries fail, ExecFallback is called to provide alternative handling.

Parameters:

  • ctx: Context for cancellation and timeouts
  • node: The node to execute
  • shared: SharedStore for data exchange

Returns:

  • Action: The next action to take in the flow
  • error: Any error that occurred during execution

Example:

node := &MyNode{BaseNode: flyt.NewBaseNode()}
shared := flyt.NewSharedStore()
shared.Set("input", "data")

action, err := flyt.Run(ctx, node, shared)
if err != nil {
    log.Fatal(err)
}

func RunG added in v0.7.0

func RunG[P, E any](ctx context.Context, node NodeG[P, E], shared *SharedStore) (Action, error)

RunG executes a generic node with type safety

type BaseNode

type BaseNode struct {
	// contains filtered or unexported fields
}

BaseNode provides a base implementation of the Node interface. It includes common functionality like retry configuration and default implementations of Prep, Exec, and Post methods that can be overridden.

BaseNode is designed to be embedded in custom node implementations:

type MyNode struct {
    *flyt.BaseNode
    // custom fields
}

func (n *MyNode) Exec(ctx context.Context, prepResult any) (any, error) {
    // custom implementation
}

func NewBaseNode

func NewBaseNode(opts ...NodeOption) *BaseNode

NewBaseNode creates a new BaseNode with the provided options. By default, maxRetries is set to 1 (no retries) and wait is 0.

Example:

node := flyt.NewBaseNode(
    flyt.WithMaxRetries(3),
    flyt.WithWait(time.Second),
)

func (*BaseNode) Exec

func (n *BaseNode) Exec(ctx context.Context, prepResult any) (any, error)

Exec is the default exec implementation that returns nil. This method should be overridden in your node implementation to provide the main processing logic.

func (*BaseNode) ExecFallback

func (n *BaseNode) ExecFallback(prepResult any, err error) (any, error)

ExecFallback handles errors after all retries are exhausted. The default implementation simply returns the error. Override this method to provide custom fallback behavior.

func (*BaseNode) GetMaxRetries

func (n *BaseNode) GetMaxRetries() int

GetMaxRetries returns the maximum number of retries configured for this node. This method is thread-safe.

func (*BaseNode) GetWait

func (n *BaseNode) GetWait() time.Duration

GetWait returns the wait duration between retries configured for this node. This method is thread-safe.

func (*BaseNode) Post

func (n *BaseNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

Post is the default post implementation that returns DefaultAction. Override this method in your node implementation to process results and determine the next action in the flow.

func (*BaseNode) Prep

func (n *BaseNode) Prep(ctx context.Context, shared *SharedStore) (any, error)

Prep is the default prep implementation that returns nil. Override this method in your node implementation to read and preprocess data from the SharedStore before execution.

type BaseNodeG added in v0.7.0

type BaseNodeG[P, E any] struct {
	*BaseNode
}

BaseNodeG provides a generic base implementation

func NewBaseNodeG added in v0.7.0

func NewBaseNodeG[P, E any](opts ...NodeOption) *BaseNodeG[P, E]

NewBaseNodeG creates a new generic BaseNode

func (*BaseNodeG[P, E]) ExecG added in v0.7.0

func (n *BaseNodeG[P, E]) ExecG(ctx context.Context, prepResult P) (E, error)

ExecG default implementation

func (*BaseNodeG[P, E]) PostG added in v0.7.0

func (n *BaseNodeG[P, E]) PostG(ctx context.Context, shared *SharedStore, prepResult P, execResult E) (Action, error)

PostG default implementation

func (*BaseNodeG[P, E]) PrepG added in v0.7.0

func (n *BaseNodeG[P, E]) PrepG(ctx context.Context, shared *SharedStore) (P, error)

PrepG default implementation

type BatchConfig

type BatchConfig struct {
	// MaxBatchSize is the maximum number of items to process in a single batch.
	// If a batch exceeds this size, an error will be returned.
	// Default: 1000
	MaxBatchSize int

	// MaxConcurrency is the maximum number of concurrent workers for parallel processing.
	// This is only used when concurrent processing is enabled.
	// Default: 10
	MaxConcurrency int

	// ItemsKey is the SharedStore key to read items from.
	// Default: "items"
	ItemsKey string

	// ResultsKey is the SharedStore key to write results to.
	// Default: "results"
	ResultsKey string

	// BatchCountKey is the SharedStore key to write the batch count for BatchFlow.
	// Default: "batch_count"
	BatchCountKey string
}

BatchConfig holds configuration for batch operations. It allows customization of batch processing behavior including size limits, concurrency settings, and storage keys.

func DefaultBatchConfig

func DefaultBatchConfig() *BatchConfig

DefaultBatchConfig returns a BatchConfig with sensible defaults. The defaults are:

  • MaxBatchSize: 1000
  • MaxConcurrency: 10
  • ItemsKey: "items"
  • ResultsKey: "results"
  • BatchCountKey: "batch_count"

type BatchError

type BatchError struct {
	Errors []error
}

BatchError aggregates multiple errors from batch operations. It implements the error interface and provides detailed information about all errors that occurred during batch processing.

func (*BatchError) Error

func (e *BatchError) Error() string

type BatchFlowFunc

type BatchFlowFunc func(ctx context.Context, shared *SharedStore) ([]FlowInputs, error)

BatchFlowFunc returns input parameters for each flow iteration in batch processing. This function is called during the Prep phase to determine how many flow instances to run and what parameters each should receive.

The function receives the parent flow's SharedStore and should return a slice of FlowInputs, where each element represents parameters for one flow iteration.

type BatchProcessFunc

type BatchProcessFunc func(ctx context.Context, item any) (any, error)

BatchProcessFunc is a function that processes a single item in a batch. It receives a context and an item, and returns the processed result or an error. The function should be thread-safe if concurrent processing is enabled.

type CustomNode added in v0.3.0

type CustomNode struct {
	*BaseNode
	// contains filtered or unexported fields
}

CustomNode is a node implementation that uses custom functions for Prep, Exec, and Post phases. This allows creating nodes without defining new types, useful for simple operations.

CustomNode supports two styles of functions:

  1. Result-based functions (WithPrepFunc, WithExecFunc, WithPostFunc): These use the Result type which provides convenient type assertion methods. Best for complex data processing where type safety and conversion helpers are valuable.

  2. Any-based functions (WithPrepFuncAny, WithExecFuncAny, WithPostFuncAny): These use standard any types matching the Node interface directly. Best for simple operations or when migrating existing code.

Example with Result types:

node := flyt.NewNode(
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        data := prepResult.MustString()  // Type-safe access
        return flyt.NewResult(processData(data)), nil
    }),
)

Example with any types:

node := flyt.NewNode(
    flyt.WithExecFuncAny(func(ctx context.Context, prepResult any) (any, error) {
        data := prepResult.(string)  // Manual type assertion
        return processData(data), nil
    }),
)

func (*CustomNode) Exec added in v0.3.0

func (n *CustomNode) Exec(ctx context.Context, prepResult any) (any, error)

Exec implements Node.Exec by calling the custom execFunc if provided

func (*CustomNode) ExecFallback added in v0.5.0

func (n *CustomNode) ExecFallback(prepResult any, err error) (any, error)

ExecFallback implements FallbackNode.ExecFallback by calling the custom execFallbackFunc if provided

func (*CustomNode) Post added in v0.3.0

func (n *CustomNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

Post implements Node.Post by calling the custom postFunc if provided

func (*CustomNode) Prep added in v0.3.0

func (n *CustomNode) Prep(ctx context.Context, shared *SharedStore) (any, error)

Prep implements Node.Prep by calling the custom prepFunc if provided

type CustomNodeOption added in v0.3.0

type CustomNodeOption interface {
	// contains filtered or unexported methods
}

CustomNodeOption is an option for configuring a CustomNode. It allows setting custom implementations for Prep, Exec, and Post methods.

func WithExecFallbackFunc added in v0.5.0

func WithExecFallbackFunc(fn func(any, error) (any, error)) CustomNodeOption

WithExecFallbackFunc sets a custom ExecFallback implementation for a CustomNode. The provided function will be called when Exec fails after all retries are exhausted. This allows for custom error handling or returning a default value.

Example:

flyt.WithExecFallbackFunc(func(prepResult any, err error) (any, error) {
    log.Printf("Exec failed after retries: %v", err)
    // Return nil to indicate failure, which can be handled in Post
    return nil, nil
})

func WithExecFunc added in v0.3.0

func WithExecFunc(fn func(context.Context, Result) (Result, error)) CustomNodeOption

WithExecFunc sets a custom Exec implementation for a CustomNode. The provided function will be called during the Exec phase with the result from Prep as input.

Example:

flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
    // Process the data
    return flyt.NewResult(processedResult), nil
})

func WithExecFuncAny added in v0.8.0

func WithExecFuncAny(fn func(context.Context, any) (any, error)) CustomNodeOption

WithExecFuncAny sets a custom Exec implementation for a CustomNode using any types. This is an alternative to WithExecFunc that doesn't require Result types, useful for simpler cases or when you don't need the type assertion helpers.

Example:

flyt.WithExecFuncAny(func(ctx context.Context, prepResult any) (any, error) {
    // Process the data
    return processedResult, nil
})

func WithPostFunc added in v0.3.0

func WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) CustomNodeOption

WithPostFunc sets a custom Post implementation for a CustomNode. The provided function will be called during the Post phase to process results and determine the next action.

Example:

flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
    shared.Set("output", execResult.Value())
    if success {
        return "success", nil
    }
    return "retry", nil
})

func WithPostFuncAny added in v0.8.0

func WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) CustomNodeOption

WithPostFuncAny sets a custom Post implementation for a CustomNode using any types. This is an alternative to WithPostFunc that doesn't require Result types, useful for simpler cases or backward compatibility.

Example:

flyt.WithPostFuncAny(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
    shared.Set("output", execResult)
    if success {
        return "success", nil
    }
    return "retry", nil
})

func WithPrepFunc added in v0.3.0

func WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) CustomNodeOption

WithPrepFunc sets a custom Prep implementation for a CustomNode. The provided function will be called during the Prep phase to read and preprocess data from the SharedStore.

Example:

flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
    data, _ := shared.Get("input")
    // Preprocess data
    return flyt.NewResult(preprocessedData), nil
})

func WithPrepFuncAny added in v0.8.0

func WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) CustomNodeOption

WithPrepFuncAny sets a custom Prep implementation for a CustomNode using any types. This is an alternative to WithPrepFunc that doesn't require Result types, useful for simpler cases or when migrating existing code.

Example:

flyt.WithPrepFuncAny(func(ctx context.Context, shared *flyt.SharedStore) (any, error) {
    data, _ := shared.Get("input")
    // Preprocess data
    return preprocessedData, nil
})

type FallbackNode

type FallbackNode interface {
	ExecFallback(prepResult any, err error) (any, error)
}

FallbackNode is a node that supports custom fallback behavior on error. When all retries are exhausted, the ExecFallback method is called to provide an alternative result or handle the error gracefully.

type Flow

type Flow struct {
	*BaseNode
	// contains filtered or unexported fields
}

Flow represents a workflow of connected nodes. A Flow is itself a Node, allowing flows to be nested within other flows. Nodes are connected via actions, creating a directed graph of execution.

Example:

// Create nodes
validateNode := &ValidateNode{BaseNode: flyt.NewBaseNode()}
processNode := &ProcessNode{BaseNode: flyt.NewBaseNode()}
errorNode := &ErrorNode{BaseNode: flyt.NewBaseNode()}

// Create flow
flow := flyt.NewFlow(validateNode)
flow.Connect(validateNode, ActionSuccess, processNode)
flow.Connect(validateNode, ActionFail, errorNode)

// Or with chaining:
flow.Connect(validateNode, ActionSuccess, processNode)
     .Connect(validateNode, ActionFail, errorNode)

// Run flow
err := flow.Run(ctx, shared)

func NewBatchFlow

func NewBatchFlow(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool) *Flow

NewBatchFlow creates a flow that runs multiple times with different parameters. Each iteration gets its own isolated SharedStore with merged parameters.

Important: The flowFactory must create new flow instances for concurrent execution to avoid race conditions. Do not reuse flow instances.

Parameters:

  • flowFactory: Function that creates new flow instances
  • batchFunc: Function that returns parameters for each iteration
  • concurrent: If true, flows run concurrently

Example:

flowFactory := func() *flyt.Flow {
    // Create and return a new flow instance
    return flyt.NewFlow(startNode)
}

batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.FlowInputs, error) {
    users, _ := shared.Get("users")
    var inputs []flyt.FlowInputs
    for _, user := range users.([]User) {
        inputs = append(inputs, flyt.FlowInputs{"user": user})
    }
    return inputs, nil
}

batchFlow := flyt.NewBatchFlow(flowFactory, batchFunc, true)

func NewBatchFlowWithConfig

func NewBatchFlowWithConfig(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, config *BatchConfig) *Flow

NewBatchFlowWithConfig creates a batch flow with custom configuration. This provides full control over batch flow execution settings.

Parameters:

  • flowFactory: Function that creates new flow instances
  • batchFunc: Function that returns parameters for each iteration
  • concurrent: If true, flows run concurrently
  • config: Custom batch configuration

Example:

config := &flyt.BatchConfig{
    MaxBatchSize:   100,      // Limit to 100 flows
    MaxConcurrency: 5,        // Run max 5 flows concurrently
    BatchCountKey:  "total",  // Store count in "total" key
}
batchFlow := flyt.NewBatchFlowWithConfig(flowFactory, batchFunc, true, config)

func NewBatchFlowWithCountKey added in v0.2.0

func NewBatchFlowWithCountKey(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, countKey string) *Flow

NewBatchFlowWithCountKey creates a batch flow with a custom key for storing the batch count. After execution, the number of flows that were run is stored in the SharedStore using the specified key instead of the default "batch_count" key.

Parameters:

  • flowFactory: Function that creates new flow instances
  • batchFunc: Function that returns parameters for each iteration
  • concurrent: If true, flows run concurrently
  • countKey: SharedStore key to store the batch count

Example:

batchFlow := flyt.NewBatchFlowWithCountKey(flowFactory, batchFunc, true, "processed_count")
// After execution, retrieve count with: count, _ := shared.Get("processed_count")

func NewFlow

func NewFlow(start Node) *Flow

NewFlow creates a new Flow with a start node. The flow begins execution at the start node and follows transitions based on the actions returned by each node.

Parameters:

  • start: The first node to execute in the flow

Example:

flow := flyt.NewFlow(startNode)

func (*Flow) Connect

func (f *Flow) Connect(from Node, action Action, to Node) *Flow

Connect adds a transition from one node to another based on an action. When the 'from' node returns the specified action, execution continues with the 'to' node. Multiple actions can be connected from a single node. Returns the flow instance for method chaining.

Parameters:

  • from: The source node
  • action: The action that triggers this transition
  • to: The destination node

Example:

flow.Connect(nodeA, "success", nodeB)
flow.Connect(nodeA, "retry", nodeA)  // Self-loop for retry
flow.Connect(nodeA, "fail", errorNode)

Example with chaining:

flow.Connect(nodeA, "success", nodeB)
     .Connect(nodeB, "success", finalNode)
     .Connect(nodeB, "fail", errorNode)

func (*Flow) Exec

func (f *Flow) Exec(ctx context.Context, prepResult any) (any, error)

Exec orchestrates the flow execution by running nodes in sequence

func (*Flow) Post

func (f *Flow) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

Post implements Node interface for Flow

func (*Flow) Prep

func (f *Flow) Prep(ctx context.Context, shared *SharedStore) (any, error)

Prep implements Node interface for Flow

func (*Flow) Run

func (f *Flow) Run(ctx context.Context, shared *SharedStore) error

Run executes the flow starting from the start node. This is a convenience method that wraps the standard Run function. The flow executes nodes in sequence based on their action transitions until no more transitions are available or an error occurs.

Parameters:

  • ctx: Context for cancellation and timeouts
  • shared: SharedStore for data exchange between nodes

Returns:

  • error: Any error that occurred during flow execution

type FlowFactory

type FlowFactory func() *Flow

FlowFactory creates new instances of a flow. This is used by batch flow operations to create isolated flow instances for concurrent execution. Each call should return a new flow instance to avoid race conditions.

Example:

factory := func() *flyt.Flow {
    node1 := &ProcessNode{BaseNode: flyt.NewBaseNode()}
    node2 := &SaveNode{BaseNode: flyt.NewBaseNode()}

    flow := flyt.NewFlow(node1)
    flow.Connect(node1, flyt.DefaultAction, node2)
    return flow
}

type FlowInputs added in v0.4.0

type FlowInputs map[string]any

FlowInputs holds input parameters for a flow iteration in batch processing. These parameters are merged into each flow's isolated SharedStore, allowing each flow instance to have its own set of input data.

Example:

inputs := flyt.FlowInputs{
    "user_id": 123,
    "action": "process",
    "priority": "high",
}

type Node

type Node interface {
	// Prep reads and preprocesses data from shared store
	Prep(ctx context.Context, shared *SharedStore) (any, error)

	// Exec executes the main logic with optional retries
	Exec(ctx context.Context, prepResult any) (any, error)

	// Post processes results and writes back to shared store
	Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
}

Node is the interface that all nodes must implement.

Important: Nodes should not be shared across concurrent flow executions. If you need to run the same logic concurrently, create separate node instances.

func NewBatchNode

func NewBatchNode(processFunc BatchProcessFunc, concurrent bool, opts ...NodeOption) Node

NewBatchNode creates a node that processes items in batch. The node reads items from the SharedStore (using the key "items" by default), processes each item using the provided function, and stores results back to the SharedStore (using the key "results" by default).

Parameters:

  • processFunc: Function to process each item
  • concurrent: If true, items are processed concurrently using a worker pool
  • opts: Additional node options (e.g., WithMaxRetries, WithWait)

Example:

processFunc := func(ctx context.Context, item any) (any, error) {
    // Process the item
    return processedItem, nil
}
node := flyt.NewBatchNode(processFunc, true)

func NewBatchNodeWithConfig

func NewBatchNodeWithConfig(processFunc BatchProcessFunc, concurrent bool, config *BatchConfig, opts ...NodeOption) Node

NewBatchNodeWithConfig creates a batch node with custom configuration. This provides full control over all batch processing settings.

Parameters:

  • processFunc: Function to process each item
  • concurrent: If true, items are processed concurrently
  • config: Custom batch configuration
  • opts: Additional node options

Example:

config := &flyt.BatchConfig{
    MaxBatchSize:   500,
    MaxConcurrency: 20,
    ItemsKey:       "tasks",
    ResultsKey:     "completed_tasks",
}
node := flyt.NewBatchNodeWithConfig(processFunc, true, config)

func NewBatchNodeWithKeys added in v0.2.0

func NewBatchNodeWithKeys(processFunc BatchProcessFunc, concurrent bool, itemsKey, resultsKey string, opts ...NodeOption) Node

NewBatchNodeWithKeys creates a batch node with custom keys for items and results. This allows you to specify which keys in the SharedStore to read items from and write results to, instead of using the default keys.

Parameters:

  • processFunc: Function to process each item
  • concurrent: If true, items are processed concurrently
  • itemsKey: SharedStore key to read items from
  • resultsKey: SharedStore key to write results to
  • opts: Additional node options

Example:

node := flyt.NewBatchNodeWithKeys(processFunc, true, "input_data", "output_data")

func NewNodeAdapter added in v0.7.0

func NewNodeAdapter[P, E any](node NodeG[P, E]) Node

NewNodeAdapter creates an adapter from generic to standard node

type NodeAdapter added in v0.7.0

type NodeAdapter[P, E any] struct {
	// contains filtered or unexported fields
}

NodeAdapter adapts a generic node to the standard Node interface

func (*NodeAdapter[P, E]) Exec added in v0.7.0

func (a *NodeAdapter[P, E]) Exec(ctx context.Context, prepResult any) (any, error)

func (*NodeAdapter[P, E]) Post added in v0.7.0

func (a *NodeAdapter[P, E]) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

func (*NodeAdapter[P, E]) Prep added in v0.7.0

func (a *NodeAdapter[P, E]) Prep(ctx context.Context, shared *SharedStore) (any, error)

type NodeBuilder added in v0.9.0

type NodeBuilder struct {
	*CustomNode
}

NodeBuilder provides a fluent interface for creating and configuring nodes. It implements the Node interface while also providing chainable methods for configuration. This allows both styles:

  • Traditional: flyt.NewNode(WithExecFunc(...), WithMaxRetries(3))
  • Builder: flyt.NewNode().WithExecFunc(...).WithMaxRetries(3)

func NewNode added in v0.3.0

func NewNode(opts ...any) *NodeBuilder

NewNode creates a new node with custom function implementations. Returns a NodeBuilder that implements Node and provides chainable configuration methods.

Can be used in two styles:

Traditional (backwards compatible):

node := flyt.NewNode(
    flyt.WithMaxRetries(3),
    flyt.WithExecFunc(execFn),
)

Builder pattern:

node := flyt.NewNode().
    WithMaxRetries(3).
    WithExecFunc(execFn).
    Build()  // Build() is optional

func (*NodeBuilder) Exec added in v0.9.0

func (b *NodeBuilder) Exec(ctx context.Context, prepResult any) (any, error)

Exec implements Node.Exec by delegating to the embedded CustomNode

func (*NodeBuilder) ExecFallback added in v0.9.0

func (b *NodeBuilder) ExecFallback(prepResult any, err error) (any, error)

ExecFallback implements FallbackNode.ExecFallback by delegating to the embedded CustomNode

func (*NodeBuilder) GetMaxRetries added in v0.9.0

func (b *NodeBuilder) GetMaxRetries() int

GetMaxRetries implements RetryableNode.GetMaxRetries by delegating to the embedded BaseNode

func (*NodeBuilder) GetWait added in v0.9.0

func (b *NodeBuilder) GetWait() time.Duration

GetWait implements RetryableNode.GetWait by delegating to the embedded BaseNode

func (*NodeBuilder) Post added in v0.9.0

func (b *NodeBuilder) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

Post implements Node.Post by delegating to the embedded CustomNode

func (*NodeBuilder) Prep added in v0.9.0

func (b *NodeBuilder) Prep(ctx context.Context, shared *SharedStore) (any, error)

Prep implements Node.Prep by delegating to the embedded CustomNode

func (*NodeBuilder) WithExecFallbackFunc added in v0.9.0

func (b *NodeBuilder) WithExecFallbackFunc(fn func(any, error) (any, error)) *NodeBuilder

WithExecFallbackFunc sets a custom ExecFallback implementation. Returns the builder for method chaining.

func (*NodeBuilder) WithExecFunc added in v0.9.0

func (b *NodeBuilder) WithExecFunc(fn func(context.Context, Result) (Result, error)) *NodeBuilder

WithExecFunc sets a custom Exec implementation using Result types. Returns the builder for method chaining.

func (*NodeBuilder) WithExecFuncAny added in v0.9.0

func (b *NodeBuilder) WithExecFuncAny(fn func(context.Context, any) (any, error)) *NodeBuilder

WithExecFuncAny sets a custom Exec implementation using any types. Returns the builder for method chaining.

func (*NodeBuilder) WithMaxRetries added in v0.9.0

func (b *NodeBuilder) WithMaxRetries(retries int) *NodeBuilder

WithMaxRetries sets the maximum number of retries for the node's Exec phase. Returns the builder for method chaining.

func (*NodeBuilder) WithPostFunc added in v0.9.0

func (b *NodeBuilder) WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) *NodeBuilder

WithPostFunc sets a custom Post implementation using Result types. Returns the builder for method chaining.

func (*NodeBuilder) WithPostFuncAny added in v0.9.0

func (b *NodeBuilder) WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) *NodeBuilder

WithPostFuncAny sets a custom Post implementation using any types. Returns the builder for method chaining.

func (*NodeBuilder) WithPrepFunc added in v0.9.0

func (b *NodeBuilder) WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) *NodeBuilder

WithPrepFunc sets a custom Prep implementation using Result types. Returns the builder for method chaining.

func (*NodeBuilder) WithPrepFuncAny added in v0.9.0

func (b *NodeBuilder) WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) *NodeBuilder

WithPrepFuncAny sets a custom Prep implementation using any types. Returns the builder for method chaining.

func (*NodeBuilder) WithWait added in v0.9.0

func (b *NodeBuilder) WithWait(wait time.Duration) *NodeBuilder

WithWait sets the wait duration between retries. Returns the builder for method chaining.

type NodeG added in v0.7.0

type NodeG[P, E any] interface {
	// PrepG reads and preprocesses data from shared store with typed result
	PrepG(ctx context.Context, shared *SharedStore) (P, error)

	// ExecG executes the main logic with typed input and output
	ExecG(ctx context.Context, prepResult P) (E, error)

	// PostG processes results with typed inputs
	PostG(ctx context.Context, shared *SharedStore, prepResult P, execResult E) (Action, error)
}

NodeG is a generic version of Node interface for type-safe implementations

type NodeOption

type NodeOption func(*BaseNode)

NodeOption is a function that configures a BaseNode. Options can be passed to NewBaseNode to customize its behavior.

func WithMaxRetries

func WithMaxRetries(retries int) NodeOption

WithMaxRetries sets the maximum number of retries for the Exec phase. The default is 1 (no retries). Setting this to a value greater than 1 enables automatic retry on Exec failures.

Example:

node := flyt.NewBaseNode(flyt.WithMaxRetries(3))

func WithWait

func WithWait(wait time.Duration) NodeOption

WithWait sets the wait duration between retries. This only applies when maxRetries is greater than 1. The default is 0 (no wait between retries).

Example:

node := flyt.NewBaseNode(
    flyt.WithMaxRetries(3),
    flyt.WithWait(time.Second * 2),
)

type Result added in v0.7.0

type Result struct {
	// contains filtered or unexported fields
}

Result is a wrapper type that provides convenient type assertion methods. It's used as return type for Prep and Exec methods to improve developer experience. It can hold any value and provides type-safe accessors.

func NewResult added in v0.7.0

func NewResult(v any) Result

NewResult creates a new Result from any value.

func R added in v0.7.0

func R(v any) Result

R is a shorthand for NewResult.

func (Result) AsBool added in v0.7.0

func (r Result) AsBool() (bool, bool)

AsBool retrieves the Result as a bool. Returns false and false if the Result value is nil or not a bool.

func (Result) AsBoolOr added in v0.7.0

func (r Result) AsBoolOr(defaultVal bool) bool

AsBoolOr retrieves the Result as a bool. Returns the provided default value if the Result is nil or not a bool.

func (Result) AsFloat64 added in v0.7.0

func (r Result) AsFloat64() (float64, bool)

AsFloat64 retrieves the Result as a float64. Returns 0.0 and false if the Result value is nil or cannot be converted to float64. Supports conversion from various numeric types.

func (Result) AsFloat64Or added in v0.7.0

func (r Result) AsFloat64Or(defaultVal float64) float64

AsFloat64Or retrieves the Result as a float64. Returns the provided default value if the Result is nil or cannot be converted to float64.

func (Result) AsInt added in v0.7.0

func (r Result) AsInt() (int, bool)

AsInt retrieves the Result as an int. Returns 0 and false if the Result value is nil or cannot be converted to int. Supports conversion from int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, and float types.

func (Result) AsIntOr added in v0.7.0

func (r Result) AsIntOr(defaultVal int) int

AsIntOr retrieves the Result as an int. Returns the provided default value if the Result is nil or cannot be converted to int.

func (Result) AsMap added in v0.7.0

func (r Result) AsMap() (map[string]any, bool)

AsMap retrieves the Result as a map[string]any. Returns nil and false if the Result value is nil or not a map[string]any.

func (Result) AsMapOr added in v0.7.0

func (r Result) AsMapOr(defaultVal map[string]any) map[string]any

AsMapOr retrieves the Result as a map[string]any. Returns the provided default value if the Result is nil or not a map[string]any.

func (Result) AsSlice added in v0.7.0

func (r Result) AsSlice() ([]any, bool)

AsSlice retrieves the Result as a []any slice. Returns nil and false if the Result value is nil or not a slice. Uses ToSlice to convert various slice types to []any.

func (Result) AsSliceOr added in v0.7.0

func (r Result) AsSliceOr(defaultVal []any) []any

AsSliceOr retrieves the Result as a []any slice. Returns the provided default value if the Result is nil or not a slice.

func (Result) AsString added in v0.7.0

func (r Result) AsString() (string, bool)

AsString retrieves the Result as a string. Returns empty string and false if the Result value is nil or not a string.

func (Result) AsStringOr added in v0.7.0

func (r Result) AsStringOr(defaultVal string) string

AsStringOr retrieves the Result as a string. Returns the provided default value if the Result is nil or not a string.

func (Result) Bind added in v0.7.0

func (r Result) Bind(dest any) error

Bind binds the Result to a struct using JSON marshaling/unmarshaling. This allows for easy conversion of complex types. The destination must be a pointer to the target struct. Returns an error if the Result value is nil or binding fails.

Example:

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}
var user User
err := result.Bind(&user)

func (Result) IsNil added in v0.7.0

func (r Result) IsNil() bool

IsNil checks if the Result value is nil.

func (Result) MustBind added in v0.7.0

func (r Result) MustBind(dest any)

MustBind is like Bind but panics if binding fails. Use this only when binding failure should be considered a programming error.

Example:

var config Config
result.MustBind(&config)  // Panics if binding fails

func (Result) MustBool added in v0.7.0

func (r Result) MustBool() bool

MustBool retrieves the Result as a bool. Panics if the Result is nil or not a bool.

func (Result) MustFloat64 added in v0.7.0

func (r Result) MustFloat64() float64

MustFloat64 retrieves the Result as a float64. Panics if the Result is nil or cannot be converted to float64.

func (Result) MustInt added in v0.7.0

func (r Result) MustInt() int

MustInt retrieves the Result as an int. Panics if the Result is nil or cannot be converted to int.

func (Result) MustMap added in v0.7.0

func (r Result) MustMap() map[string]any

MustMap retrieves the Result as a map[string]any. Panics if the Result is nil or not a map[string]any.

func (Result) MustSlice added in v0.7.0

func (r Result) MustSlice() []any

MustSlice retrieves the Result as a []any slice. Panics if the Result is nil or not a slice.

func (Result) MustString added in v0.7.0

func (r Result) MustString() string

MustString retrieves the Result as a string. Panics if the Result is nil or not a string.

func (Result) Type added in v0.7.0

func (r Result) Type() string

Type returns the underlying type of the Result value as a string. Returns "nil" if the Result value is nil.

func (Result) Value added in v0.7.0

func (r Result) Value() any

Value returns the underlying value as any. This is useful when you need to pass the Result to functions expecting any.

type RetryableNode

type RetryableNode interface {
	Node
	GetMaxRetries() int
	GetWait() time.Duration
}

RetryableNode is a node that supports automatic retries on Exec failures. Nodes implementing this interface can specify retry behavior through GetMaxRetries and GetWait methods.

type SharedStore

type SharedStore struct {
	// contains filtered or unexported fields
}

SharedStore provides thread-safe access to shared data across nodes in a flow. It acts as a key-value store that can be safely accessed by multiple goroutines during concurrent batch processing.

Example:

shared := flyt.NewSharedStore()
shared.Set("user_id", 123)
shared.Set("config", map[string]any{"timeout": 30})

if val, ok := shared.Get("user_id"); ok {
    userID := val.(int)
    // Use userID
}

func NewSharedStore

func NewSharedStore() *SharedStore

NewSharedStore creates a new thread-safe shared store. The store is initialized empty and ready for use.

func (*SharedStore) Bind added in v0.6.0

func (s *SharedStore) Bind(key string, dest any) error

Bind binds a value from the store to a struct using JSON marshaling/unmarshaling. This allows for easy conversion of complex types stored in the SharedStore. The destination must be a pointer to the target struct. Returns an error if the key is not found or binding fails. This method is safe for concurrent access.

Example:

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}
var user User
err := shared.Bind("user", &user)

func (*SharedStore) Clear added in v0.6.0

func (s *SharedStore) Clear()

Clear removes all keys from the store. This method is safe for concurrent access.

func (*SharedStore) Delete added in v0.6.0

func (s *SharedStore) Delete(key string)

Delete removes a key from the store. This method is safe for concurrent access.

func (*SharedStore) Get

func (s *SharedStore) Get(key string) (any, bool)

Get retrieves a value from the store by key. Returns the value and true if the key exists, or nil and false if not found. This method is safe for concurrent access.

func (*SharedStore) GetAll

func (s *SharedStore) GetAll() map[string]any

GetAll returns a copy of all data in the store. The returned map is a shallow copy and can be safely modified without affecting the store's internal data. This method is safe for concurrent access.

func (*SharedStore) GetBool added in v0.6.0

func (s *SharedStore) GetBool(key string) bool

GetBool retrieves a bool value from the store. Returns false if the key doesn't exist or the value is not a bool. This method is safe for concurrent access.

func (*SharedStore) GetBoolOr added in v0.6.0

func (s *SharedStore) GetBoolOr(key string, defaultVal bool) bool

GetBoolOr retrieves a bool value from the store. Returns the provided default value if the key doesn't exist or the value is not a bool. This method is safe for concurrent access.

func (*SharedStore) GetFloat64 added in v0.6.0

func (s *SharedStore) GetFloat64(key string) float64

GetFloat64 retrieves a float64 value from the store. Returns 0.0 if the key doesn't exist or the value cannot be converted to float64. Supports conversion from int, float32, and other numeric types. This method is safe for concurrent access.

func (*SharedStore) GetFloat64Or added in v0.6.0

func (s *SharedStore) GetFloat64Or(key string, defaultVal float64) float64

GetFloat64Or retrieves a float64 value from the store. Returns the provided default value if the key doesn't exist or the value cannot be converted to float64. Supports conversion from various numeric types. This method is safe for concurrent access.

func (*SharedStore) GetInt added in v0.6.0

func (s *SharedStore) GetInt(key string) int

GetInt retrieves an int value from the store. Returns 0 if the key doesn't exist or the value cannot be converted to int. Supports conversion from int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, and float types. This method is safe for concurrent access.

func (*SharedStore) GetIntOr added in v0.6.0

func (s *SharedStore) GetIntOr(key string, defaultVal int) int

GetIntOr retrieves an int value from the store. Returns the provided default value if the key doesn't exist or the value cannot be converted to int. Supports conversion from various numeric types. This method is safe for concurrent access.

func (*SharedStore) GetMap added in v0.6.0

func (s *SharedStore) GetMap(key string) map[string]any

GetMap retrieves a map[string]any from the store. Returns nil if the key doesn't exist or the value is not a map[string]any. This method is safe for concurrent access.

func (*SharedStore) GetMapOr added in v0.6.0

func (s *SharedStore) GetMapOr(key string, defaultVal map[string]any) map[string]any

GetMapOr retrieves a map[string]any from the store. Returns the provided default value if the key doesn't exist or the value is not a map[string]any. This method is safe for concurrent access.

func (*SharedStore) GetSlice added in v0.6.0

func (s *SharedStore) GetSlice(key string) []any

GetSlice retrieves a []any slice from the store. Returns nil if the key doesn't exist or the value is not a slice. This method is safe for concurrent access.

func (*SharedStore) GetSliceOr added in v0.6.0

func (s *SharedStore) GetSliceOr(key string, defaultVal []any) []any

GetSliceOr retrieves a []any slice from the store. Returns the provided default value if the key doesn't exist or the value is not a slice. Uses ToSlice to convert various slice types to []any. This method is safe for concurrent access.

func (*SharedStore) GetString added in v0.6.0

func (s *SharedStore) GetString(key string) string

GetString retrieves a string value from the store. Returns empty string if the key doesn't exist or the value is not a string. This method is safe for concurrent access.

func (*SharedStore) GetStringOr added in v0.6.0

func (s *SharedStore) GetStringOr(key string, defaultVal string) string

GetStringOr retrieves a string value from the store. Returns the provided default value if the key doesn't exist or the value is not a string. This method is safe for concurrent access.

func (*SharedStore) Has added in v0.6.0

func (s *SharedStore) Has(key string) bool

Has checks if a key exists in the store. This method is safe for concurrent access.

func (*SharedStore) Keys added in v0.6.0

func (s *SharedStore) Keys() []string

Keys returns all keys in the store. The returned slice is a snapshot and can be safely modified without affecting the store's internal data. This method is safe for concurrent access.

func (*SharedStore) Len added in v0.6.0

func (s *SharedStore) Len() int

Len returns the number of items in the store. This method is safe for concurrent access.

func (*SharedStore) Merge

func (s *SharedStore) Merge(data map[string]any)

Merge merges another map into the store. Existing keys are overwritten with values from the provided map. If the provided map is nil, this method does nothing. This method is safe for concurrent access.

func (*SharedStore) MustBind added in v0.6.0

func (s *SharedStore) MustBind(key string, dest any)

MustBind is like Bind but panics if binding fails. Use this only when binding failure should be considered a programming error. This method is safe for concurrent access.

Example:

var config Config
shared.MustBind("config", &config)  // Panics if binding fails

func (*SharedStore) Set

func (s *SharedStore) Set(key string, value any)

Set stores a value in the store with the given key. If the key already exists, its value is overwritten. This method is safe for concurrent access.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages concurrent task execution with a fixed number of workers. It provides a simple way to limit concurrency and execute tasks in parallel. WorkerPool is used internally by batch processing nodes but can also be used directly for custom concurrent operations.

Example:

pool := flyt.NewWorkerPool(5)
defer pool.Close()

for _, item := range items {
    item := item // capture loop variable
    pool.Submit(func() {
        // Process item
    })
}

pool.Wait()

func NewWorkerPool

func NewWorkerPool(workers int) *WorkerPool

NewWorkerPool creates a new worker pool with the specified number of workers. If workers is less than or equal to 0, it defaults to 1. The pool starts workers immediately and is ready to accept tasks.

Parameters:

  • workers: Number of concurrent workers

Returns:

  • *WorkerPool: A new worker pool ready for use

Remember to call Close() when done to clean up resources.

func (*WorkerPool) Close

func (p *WorkerPool) Close()

Close closes the worker pool and waits for all workers to finish. After calling Close, no new tasks can be submitted. This method should be called when the pool is no longer needed to properly clean up resources.

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(task func())

Submit submits a task to the pool for execution. The task will be executed by one of the available workers. This method blocks if all workers are busy and the task buffer is full.

Parameters:

  • task: Function to execute

func (*WorkerPool) Wait

func (p *WorkerPool) Wait()

Wait waits for all submitted tasks to complete. This method blocks until all tasks have finished executing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL