flyt

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2025 License: MIT Imports: 5 Imported by: 0

README

Flyt

A minimalist workflow framework for Go with zero dependencies.

Installation

go get github.com/mark3labs/flyt

Quick Start

package main

import (
    "context"
    "fmt"
    "github.com/mark3labs/flyt"
)

func main() {
    // Create a simple node using the helper
    node := flyt.NewNode(
        flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
            fmt.Println("Hello, Flyt!")
            return "done", nil
        }),
    )

    // Run it
    ctx := context.Background()
    shared := flyt.NewSharedStore()
    
    action, err := flyt.Run(ctx, node, shared)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Completed with action: %s\n", action)
}

Core Concepts

Nodes

Nodes are the building blocks. Each node has three phases:

  1. Prep - Read from shared store and prepare data
  2. Exec - Execute main logic (can be retried)
  3. Post - Process results and decide next action
// Using the helper
node := flyt.NewNode(
    flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (any, error) {
        data, _ := shared.Get("input")
        return data, nil
    }),
    flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
        // Process data
        return "result", nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
        shared.Set("output", execResult)
        return flyt.DefaultAction, nil
    }),
)
Actions

Actions are strings returned by a node's Post phase that determine what happens next:

func (n *MyNode) Post(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
    if execResult.(bool) {
        return "success", nil  // Go to node connected with "success"
    }
    return "retry", nil       // Go to node connected with "retry"
}

The default action is flyt.DefaultAction (value: "default"). If no connection exists for an action, the flow ends.

Flows

Connect nodes to create workflows:

// Create nodes
validateNode := createValidateNode()
processNode := createProcessNode()
errorNode := createErrorNode()

// Build flow with action-based routing
flow := flyt.NewFlow(validateNode)
flow.Connect(validateNode, "valid", processNode)    // If validation succeeds
flow.Connect(validateNode, "invalid", errorNode)    // If validation fails
flow.Connect(processNode, "done", nil)              // End flow after processing

// Run flow
err := flow.Run(ctx, shared)
Shared Store

Thread-safe data sharing between nodes:

shared := flyt.NewSharedStore()
shared.Set("key", "value")
value, ok := shared.Get("key")

Intermediate Patterns

Configuration via Closures

Pass configuration to nodes using closures:

func createAPINode(apiKey string, baseURL string) flyt.Node {
    return flyt.NewNode(
        flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
            // apiKey and baseURL are captured in the closure
            url := fmt.Sprintf("%s/data", baseURL)
            req, _ := http.NewRequest("GET", url, nil)
            req.Header.Set("Authorization", apiKey)
            // ... make request
            return data, nil
        }),
    )
}

// Usage
node := createAPINode("secret-key", "https://api.example.com")
Error Handling & Retries

Add retry logic to handle transient failures:

node := flyt.NewNode(
    flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
        // This will be retried up to 3 times
        return callFlakeyAPI()
    }),
    flyt.WithMaxRetries(3),
    flyt.WithWait(time.Second),
)
Conditional Branching

Control flow based on results:

decisionNode := flyt.NewNode(
    flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
        value := prepResult.(int)
        return value > 100, nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
        if execResult.(bool) {
            return "high", nil
        }
        return "low", nil
    }),
)

flow := flyt.NewFlow(decisionNode)
flow.Connect(decisionNode, "high", highNode)
flow.Connect(decisionNode, "low", lowNode)

Advanced Usage

Custom Node Types

For complex nodes with state, create custom types:

type RateLimitedNode struct {
    *flyt.BaseNode
    limiter *rate.Limiter
}

func NewRateLimitedNode(rps int) *RateLimitedNode {
    return &RateLimitedNode{
        BaseNode: flyt.NewBaseNode(),
        limiter:  rate.NewLimiter(rate.Limit(rps), 1),
    }
}

func (n *RateLimitedNode) Exec(ctx context.Context, prepResult any) (any, error) {
    if err := n.limiter.Wait(ctx); err != nil {
        return nil, err
    }
    // Process with rate limiting
    return process(prepResult)
}
Batch Processing

Process multiple items concurrently:

// Simple batch node for processing items
processFunc := func(ctx context.Context, item any) (any, error) {
    // Process each item
    return fmt.Sprintf("processed: %v", item), nil
}

batchNode := flyt.NewBatchNode(processFunc, true) // true for concurrent
shared.Set("items", []string{"item1", "item2", "item3"})
Batch Flows

Run the same flow multiple times with different parameters:

// Create a flow factory - returns a new flow instance for each iteration
flowFactory := func() *flyt.Flow {
    validateNode := flyt.NewNode(
        flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (any, error) {
            // Each flow has its own SharedStore with merged FlowInputs
            userID, _ := shared.Get("user_id")
            email, _ := shared.Get("email")
            return map[string]any{"user_id": userID, "email": email}, nil
        }),
        flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
            data := prepResult.(map[string]any)
            // Process user data
            return processUser(data), nil
        }),
    )
    return flyt.NewFlow(validateNode)
}

// Define input parameters for each flow iteration
// Each FlowInputs map is merged into that flow's isolated SharedStore
batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.FlowInputs, error) {
    // Could fetch from database, API, etc.
    return []flyt.FlowInputs{
        {"user_id": 1, "email": "user1@example.com"},
        {"user_id": 2, "email": "user2@example.com"},
        {"user_id": 3, "email": "user3@example.com"},
    }, nil
}

// Create and run batch flow
batchFlow := flyt.NewBatchFlow(flowFactory, batchFunc, true) // true for concurrent
err := batchFlow.Run(ctx, shared)

// Each flow runs in isolation with its own SharedStore containing the FlowInputs
Nested Flows

Compose flows for complex workflows:

// Sub-flow for data validation
validationFlow := createValidationFlow()

// Main flow
mainFlow := flyt.NewFlow(fetchNode)
mainFlow.Connect(fetchNode, "validate", validationFlow)
mainFlow.Connect(validationFlow, flyt.DefaultAction, processNode)

Best Practices

  1. Single Responsibility: Each node should do one thing well
  2. Idempotency: Nodes should be idempotent when possible
  3. Error Handling: Always handle errors appropriately
  4. Context Awareness: Respect context cancellation
  5. Concurrency Safety: Don't share node instances across flows

Examples

Check out the cookbook directory for complete, real-world examples.

License

MIT

Documentation

Overview

Package flyt is a minimalist workflow framework for Go, inspired by Pocket Flow. It provides a simple graph-based abstraction for orchestrating tasks.

Thread Safety: When using concurrent batch operations, ensure that your Node implementations are thread-safe. The framework provides SharedStore for safe concurrent access to shared data.

Example:

// Define a simple node
type PrintNode struct {
    *flyt.BaseNode
}

func (n *PrintNode) Exec(ctx context.Context, prepResult any) (any, error) {
    fmt.Println("Hello from node!")
    return nil, nil
}

// Create and run a flow
node := &PrintNode{BaseNode: flyt.NewBaseNode()}
shared := flyt.NewSharedStore()

ctx := context.Background()
action, err := flyt.Run(ctx, node, shared)
if err != nil {
    log.Fatal(err)
}

Index

Examples

Constants

View Source
const (
	// DefaultAction is the default action if none is specified
	DefaultAction Action = "default"

	// KeyItems is the shared store key for items to be processed
	KeyItems = "items"
	// KeyResults is the shared store key for processing results
	KeyResults = "results"
	// KeyBatchCount is the shared store key for batch count
	KeyBatchCount = "batch_count"
)

Variables

This section is empty.

Functions

func ToSlice

func ToSlice(v any) []any

ToSlice converts various types to []any (exported for testing)

Types

type Action

type Action string

Action represents the next action to take after a node executes

func Run

func Run(ctx context.Context, node Node, shared *SharedStore) (Action, error)

Run executes the node with the prep->exec->post lifecycle

type BaseNode

type BaseNode struct {
	// contains filtered or unexported fields
}

BaseNode provides a base implementation of Node

func NewBaseNode

func NewBaseNode(opts ...NodeOption) *BaseNode

NewBaseNode creates a new BaseNode with options

func (*BaseNode) Exec

func (n *BaseNode) Exec(ctx context.Context, prepResult any) (any, error)

Exec is the default exec implementation (must be overridden)

func (*BaseNode) ExecFallback

func (n *BaseNode) ExecFallback(prepResult any, err error) (any, error)

ExecFallback handles errors after all retries are exhausted

func (*BaseNode) GetMaxRetries

func (n *BaseNode) GetMaxRetries() int

GetMaxRetries returns the maximum number of retries

func (*BaseNode) GetWait

func (n *BaseNode) GetWait() time.Duration

GetWait returns the wait duration between retries

func (*BaseNode) Post

func (n *BaseNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

Post is the default post implementation (can be overridden)

func (*BaseNode) Prep

func (n *BaseNode) Prep(ctx context.Context, shared *SharedStore) (any, error)

Prep is the default prep implementation (can be overridden)

type BatchConfig

type BatchConfig struct {
	MaxBatchSize   int
	MaxConcurrency int
	ItemsKey       string // Key to read items from SharedStore (defaults to KeyItems)
	ResultsKey     string // Key to write results to SharedStore (defaults to KeyResults)
	BatchCountKey  string // Key to write batch count for BatchFlow (defaults to KeyBatchCount)
}

BatchConfig holds configuration for batch operations

func DefaultBatchConfig

func DefaultBatchConfig() *BatchConfig

DefaultBatchConfig returns sensible defaults

type BatchError

type BatchError struct {
	Errors []error
}

BatchError aggregates multiple errors from batch operations

func (*BatchError) Error

func (e *BatchError) Error() string

type BatchFlowFunc

type BatchFlowFunc func(ctx context.Context, shared *SharedStore) ([]FlowInputs, error)

BatchFlowFunc returns input parameters for each flow iteration in batch processing

type BatchProcessFunc

type BatchProcessFunc func(ctx context.Context, item any) (any, error)

BatchProcessFunc is a function that processes a single item

type CustomNode added in v0.3.0

type CustomNode struct {
	*BaseNode
	// contains filtered or unexported fields
}

CustomNode is a node implementation that uses custom functions

func (*CustomNode) Exec added in v0.3.0

func (n *CustomNode) Exec(ctx context.Context, prepResult any) (any, error)

Exec implements Node.Exec by calling the custom execFunc if provided

func (*CustomNode) Post added in v0.3.0

func (n *CustomNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

Post implements Node.Post by calling the custom postFunc if provided

func (*CustomNode) Prep added in v0.3.0

func (n *CustomNode) Prep(ctx context.Context, shared *SharedStore) (any, error)

Prep implements Node.Prep by calling the custom prepFunc if provided

type CustomNodeOption added in v0.3.0

type CustomNodeOption interface {
	// contains filtered or unexported methods
}

CustomNodeOption is an option for configuring a CustomNode

func WithExecFunc added in v0.3.0

func WithExecFunc(fn func(context.Context, any) (any, error)) CustomNodeOption

WithExecFunc sets a custom Exec implementation

func WithPostFunc added in v0.3.0

func WithPostFunc(fn func(context.Context, *SharedStore, any, any) (Action, error)) CustomNodeOption

WithPostFunc sets a custom Post implementation

func WithPrepFunc added in v0.3.0

func WithPrepFunc(fn func(context.Context, *SharedStore) (any, error)) CustomNodeOption

WithPrepFunc sets a custom Prep implementation

type FallbackNode

type FallbackNode interface {
	ExecFallback(prepResult any, err error) (any, error)
}

FallbackNode is a node that supports fallback on error

type Flow

type Flow struct {
	*BaseNode
	// contains filtered or unexported fields
}

Flow represents a workflow of connected nodes

func NewBatchFlow

func NewBatchFlow(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool) *Flow

NewBatchFlow creates a flow that runs multiple times with different parameters. The flowFactory must create new flow instances for concurrent execution to avoid race conditions.

func NewBatchFlowWithConfig

func NewBatchFlowWithConfig(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, config *BatchConfig) *Flow

NewBatchFlowWithConfig creates a batch flow with custom configuration

func NewBatchFlowWithCountKey added in v0.2.0

func NewBatchFlowWithCountKey(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, countKey string) *Flow

NewBatchFlowWithCountKey creates a batch flow with a custom key for storing the batch count

func NewFlow

func NewFlow(start Node) *Flow

NewFlow creates a new Flow with a start node

func (*Flow) Connect

func (f *Flow) Connect(from Node, action Action, to Node)

Connect adds a transition from one node to another based on an action

func (*Flow) Exec

func (f *Flow) Exec(ctx context.Context, prepResult any) (any, error)

Exec orchestrates the flow execution by running nodes in sequence

func (*Flow) Post

func (f *Flow) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)

Post implements Node interface for Flow

func (*Flow) Prep

func (f *Flow) Prep(ctx context.Context, shared *SharedStore) (any, error)

Prep implements Node interface for Flow

func (*Flow) Run

func (f *Flow) Run(ctx context.Context, shared *SharedStore) error

Run executes the flow starting from the start node

type FlowFactory

type FlowFactory func() *Flow

FlowFactory creates new instances of a flow

type FlowInputs added in v0.4.0

type FlowInputs map[string]any

FlowInputs holds input parameters for a flow iteration in batch processing. These parameters are merged into each flow's isolated SharedStore.

type Node

type Node interface {
	// Prep reads and preprocesses data from shared store
	Prep(ctx context.Context, shared *SharedStore) (any, error)

	// Exec executes the main logic with optional retries
	Exec(ctx context.Context, prepResult any) (any, error)

	// Post processes results and writes back to shared store
	Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
}

Node is the interface that all nodes must implement.

Important: Nodes should not be shared across concurrent flow executions. If you need to run the same logic concurrently, create separate node instances.

func NewBatchNode

func NewBatchNode(processFunc BatchProcessFunc, concurrent bool, opts ...NodeOption) Node

NewBatchNode creates a node that processes items in batch

func NewBatchNodeWithConfig

func NewBatchNodeWithConfig(processFunc BatchProcessFunc, concurrent bool, config *BatchConfig, opts ...NodeOption) Node

NewBatchNodeWithConfig creates a batch node with custom configuration

func NewBatchNodeWithKeys added in v0.2.0

func NewBatchNodeWithKeys(processFunc BatchProcessFunc, concurrent bool, itemsKey, resultsKey string, opts ...NodeOption) Node

NewBatchNodeWithKeys creates a batch node with custom keys for items and results

func NewNode added in v0.3.0

func NewNode(opts ...any) Node

NewNode creates a new node with custom function implementations

Example

ExampleNewNode demonstrates creating a simple node using the NewNode helper

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/mark3labs/flyt"
)

func main() {
	// Create a node with custom exec function
	node := flyt.NewNode(
		flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
			fmt.Println("Hello from custom node!")
			return "success", nil
		}),
	)

	// Run the node
	shared := flyt.NewSharedStore()
	ctx := context.Background()

	action, err := flyt.Run(ctx, node, shared)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Action: %s\n", action)
}
Output:

Hello from custom node!
Action: default
Example (WithAllFunctions)

ExampleNewNode_withAllFunctions demonstrates using all three functions

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/mark3labs/flyt"
)

func main() {
	// Create a node that processes data through all phases
	node := flyt.NewNode(
		flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (any, error) {
			// Read input from shared store
			input, _ := shared.Get("input")
			fmt.Printf("Prep: reading input '%v'\n", input)
			return input, nil
		}),
		flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
			// Process the input
			result := fmt.Sprintf("processed: %v", prepResult)
			fmt.Printf("Exec: %s\n", result)
			return result, nil
		}),
		flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
			// Store the result
			shared.Set("output", execResult)
			fmt.Printf("Post: stored result\n")
			return flyt.DefaultAction, nil
		}),
	)

	// Setup and run
	shared := flyt.NewSharedStore()
	shared.Set("input", "hello world")
	ctx := context.Background()

	_, err := flyt.Run(ctx, node, shared)
	if err != nil {
		log.Fatal(err)
	}

	output, _ := shared.Get("output")
	fmt.Printf("Final output: %v\n", output)
}
Output:

Prep: reading input 'hello world'
Exec: processed: hello world
Post: stored result
Final output: processed: hello world
Example (WithRetries)

ExampleNewNode_withRetries demonstrates combining with BaseNode options

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/mark3labs/flyt"
)

func main() {
	attempts := 0
	node := flyt.NewNode(
		flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
			attempts++
			fmt.Printf("Attempt %d\n", attempts)
			if attempts < 3 {
				return nil, fmt.Errorf("temporary failure")
			}
			return "success after retries", nil
		}),
		flyt.WithMaxRetries(3),
	)

	shared := flyt.NewSharedStore()
	ctx := context.Background()

	_, err := flyt.Run(ctx, node, shared)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Success!")
}
Output:

Attempt 1
Attempt 2
Attempt 3
Success!

type NodeOption

type NodeOption func(*BaseNode)

NodeOption is a function that configures a BaseNode

func WithMaxRetries

func WithMaxRetries(retries int) NodeOption

WithMaxRetries sets the maximum number of retries

func WithWait

func WithWait(wait time.Duration) NodeOption

WithWait sets the wait duration between retries

type RetryableNode

type RetryableNode interface {
	Node
	GetMaxRetries() int
	GetWait() time.Duration
}

RetryableNode is a node that supports retries

type SharedStore

type SharedStore struct {
	// contains filtered or unexported fields
}

SharedStore provides thread-safe access to shared data

func NewSharedStore

func NewSharedStore() *SharedStore

NewSharedStore creates a new thread-safe shared store

func (*SharedStore) Get

func (s *SharedStore) Get(key string) (any, bool)

Get retrieves a value from the store

func (*SharedStore) GetAll

func (s *SharedStore) GetAll() map[string]any

GetAll returns a copy of all data

func (*SharedStore) Merge

func (s *SharedStore) Merge(data map[string]any)

Merge merges another map into the store

func (*SharedStore) Set

func (s *SharedStore) Set(key string, value any)

Set stores a value in the store

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages concurrent task execution

func NewWorkerPool

func NewWorkerPool(workers int) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) Close

func (p *WorkerPool) Close()

Close closes the worker pool and waits for all workers to finish

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(task func())

Submit submits a task to the pool

func (*WorkerPool) Wait

func (p *WorkerPool) Wait()

Wait waits for all tasks to complete

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL