Documentation
¶
Overview ¶
Package flyt is a minimalist workflow framework for Go, inspired by Pocket Flow. It provides a simple graph-based abstraction for orchestrating tasks.
Thread Safety: When using concurrent batch operations, ensure that your Node implementations are thread-safe. The framework provides SharedStore for safe concurrent access to shared data.
Example:
// Define a simple node
type PrintNode struct {
*flyt.BaseNode
}
func (n *PrintNode) Exec(ctx context.Context, prepResult any) (any, error) {
fmt.Println("Hello from node!")
return nil, nil
}
// Create and run a flow
node := &PrintNode{BaseNode: flyt.NewBaseNode()}
shared := flyt.NewSharedStore()
ctx := context.Background()
action, err := flyt.Run(ctx, node, shared)
if err != nil {
log.Fatal(err)
}
Index ¶
- Constants
- func ToSlice(v any) []any
- type Action
- type BaseNode
- func (n *BaseNode) Exec(ctx context.Context, prepResult any) (any, error)
- func (n *BaseNode) ExecFallback(prepResult any, err error) (any, error)
- func (n *BaseNode) GetMaxRetries() int
- func (n *BaseNode) GetWait() time.Duration
- func (n *BaseNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
- func (n *BaseNode) Prep(ctx context.Context, shared *SharedStore) (any, error)
- type BatchConfig
- type BatchError
- type BatchFlowFunc
- type BatchProcessFunc
- type CustomNode
- type CustomNodeOption
- type FallbackNode
- type Flow
- func NewBatchFlow(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool) *Flow
- func NewBatchFlowWithConfig(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, ...) *Flow
- func NewBatchFlowWithCountKey(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, ...) *Flow
- func NewFlow(start Node) *Flow
- func (f *Flow) Connect(from Node, action Action, to Node)
- func (f *Flow) Exec(ctx context.Context, prepResult any) (any, error)
- func (f *Flow) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
- func (f *Flow) Prep(ctx context.Context, shared *SharedStore) (any, error)
- func (f *Flow) Run(ctx context.Context, shared *SharedStore) error
- type FlowFactory
- type FlowInputs
- type Node
- func NewBatchNode(processFunc BatchProcessFunc, concurrent bool, opts ...NodeOption) Node
- func NewBatchNodeWithConfig(processFunc BatchProcessFunc, concurrent bool, config *BatchConfig, ...) Node
- func NewBatchNodeWithKeys(processFunc BatchProcessFunc, concurrent bool, itemsKey, resultsKey string, ...) Node
- func NewNode(opts ...any) Node
- type NodeOption
- type RetryableNode
- type SharedStore
- type WorkerPool
Examples ¶
Constants ¶
const ( // DefaultAction is the default action if none is specified DefaultAction Action = "default" // KeyItems is the shared store key for items to be processed KeyItems = "items" // KeyResults is the shared store key for processing results KeyResults = "results" // KeyBatchCount is the shared store key for batch count KeyBatchCount = "batch_count" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseNode ¶
type BaseNode struct {
// contains filtered or unexported fields
}
BaseNode provides a base implementation of Node
func NewBaseNode ¶
func NewBaseNode(opts ...NodeOption) *BaseNode
NewBaseNode creates a new BaseNode with options
func (*BaseNode) ExecFallback ¶
ExecFallback handles errors after all retries are exhausted
func (*BaseNode) GetMaxRetries ¶
GetMaxRetries returns the maximum number of retries
type BatchConfig ¶
type BatchConfig struct {
MaxBatchSize int
MaxConcurrency int
ItemsKey string // Key to read items from SharedStore (defaults to KeyItems)
ResultsKey string // Key to write results to SharedStore (defaults to KeyResults)
BatchCountKey string // Key to write batch count for BatchFlow (defaults to KeyBatchCount)
}
BatchConfig holds configuration for batch operations
func DefaultBatchConfig ¶
func DefaultBatchConfig() *BatchConfig
DefaultBatchConfig returns sensible defaults
type BatchError ¶
type BatchError struct {
Errors []error
}
BatchError aggregates multiple errors from batch operations
func (*BatchError) Error ¶
func (e *BatchError) Error() string
type BatchFlowFunc ¶
type BatchFlowFunc func(ctx context.Context, shared *SharedStore) ([]FlowInputs, error)
BatchFlowFunc returns input parameters for each flow iteration in batch processing
type BatchProcessFunc ¶
BatchProcessFunc is a function that processes a single item
type CustomNode ¶ added in v0.3.0
type CustomNode struct {
*BaseNode
// contains filtered or unexported fields
}
CustomNode is a node implementation that uses custom functions
func (*CustomNode) Exec ¶ added in v0.3.0
Exec implements Node.Exec by calling the custom execFunc if provided
func (*CustomNode) Post ¶ added in v0.3.0
func (n *CustomNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
Post implements Node.Post by calling the custom postFunc if provided
func (*CustomNode) Prep ¶ added in v0.3.0
func (n *CustomNode) Prep(ctx context.Context, shared *SharedStore) (any, error)
Prep implements Node.Prep by calling the custom prepFunc if provided
type CustomNodeOption ¶ added in v0.3.0
type CustomNodeOption interface {
// contains filtered or unexported methods
}
CustomNodeOption is an option for configuring a CustomNode
func WithExecFunc ¶ added in v0.3.0
WithExecFunc sets a custom Exec implementation
func WithPostFunc ¶ added in v0.3.0
func WithPostFunc(fn func(context.Context, *SharedStore, any, any) (Action, error)) CustomNodeOption
WithPostFunc sets a custom Post implementation
func WithPrepFunc ¶ added in v0.3.0
func WithPrepFunc(fn func(context.Context, *SharedStore) (any, error)) CustomNodeOption
WithPrepFunc sets a custom Prep implementation
type FallbackNode ¶
FallbackNode is a node that supports fallback on error
type Flow ¶
type Flow struct {
*BaseNode
// contains filtered or unexported fields
}
Flow represents a workflow of connected nodes
func NewBatchFlow ¶
func NewBatchFlow(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool) *Flow
NewBatchFlow creates a flow that runs multiple times with different parameters. The flowFactory must create new flow instances for concurrent execution to avoid race conditions.
func NewBatchFlowWithConfig ¶
func NewBatchFlowWithConfig(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, config *BatchConfig) *Flow
NewBatchFlowWithConfig creates a batch flow with custom configuration
func NewBatchFlowWithCountKey ¶ added in v0.2.0
func NewBatchFlowWithCountKey(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, countKey string) *Flow
NewBatchFlowWithCountKey creates a batch flow with a custom key for storing the batch count
func (*Flow) Post ¶
func (f *Flow) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
Post implements Node interface for Flow
type FlowInputs ¶ added in v0.4.0
FlowInputs holds input parameters for a flow iteration in batch processing. These parameters are merged into each flow's isolated SharedStore.
type Node ¶
type Node interface {
// Prep reads and preprocesses data from shared store
Prep(ctx context.Context, shared *SharedStore) (any, error)
// Exec executes the main logic with optional retries
Exec(ctx context.Context, prepResult any) (any, error)
// Post processes results and writes back to shared store
Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
}
Node is the interface that all nodes must implement.
Important: Nodes should not be shared across concurrent flow executions. If you need to run the same logic concurrently, create separate node instances.
func NewBatchNode ¶
func NewBatchNode(processFunc BatchProcessFunc, concurrent bool, opts ...NodeOption) Node
NewBatchNode creates a node that processes items in batch
func NewBatchNodeWithConfig ¶
func NewBatchNodeWithConfig(processFunc BatchProcessFunc, concurrent bool, config *BatchConfig, opts ...NodeOption) Node
NewBatchNodeWithConfig creates a batch node with custom configuration
func NewBatchNodeWithKeys ¶ added in v0.2.0
func NewBatchNodeWithKeys(processFunc BatchProcessFunc, concurrent bool, itemsKey, resultsKey string, opts ...NodeOption) Node
NewBatchNodeWithKeys creates a batch node with custom keys for items and results
func NewNode ¶ added in v0.3.0
NewNode creates a new node with custom function implementations
Example ¶
ExampleNewNode demonstrates creating a simple node using the NewNode helper
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/flyt"
)
func main() {
// Create a node with custom exec function
node := flyt.NewNode(
flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
fmt.Println("Hello from custom node!")
return "success", nil
}),
)
// Run the node
shared := flyt.NewSharedStore()
ctx := context.Background()
action, err := flyt.Run(ctx, node, shared)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Action: %s\n", action)
}
Output: Hello from custom node! Action: default
Example (WithAllFunctions) ¶
ExampleNewNode_withAllFunctions demonstrates using all three functions
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/flyt"
)
func main() {
// Create a node that processes data through all phases
node := flyt.NewNode(
flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (any, error) {
// Read input from shared store
input, _ := shared.Get("input")
fmt.Printf("Prep: reading input '%v'\n", input)
return input, nil
}),
flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
// Process the input
result := fmt.Sprintf("processed: %v", prepResult)
fmt.Printf("Exec: %s\n", result)
return result, nil
}),
flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
// Store the result
shared.Set("output", execResult)
fmt.Printf("Post: stored result\n")
return flyt.DefaultAction, nil
}),
)
// Setup and run
shared := flyt.NewSharedStore()
shared.Set("input", "hello world")
ctx := context.Background()
_, err := flyt.Run(ctx, node, shared)
if err != nil {
log.Fatal(err)
}
output, _ := shared.Get("output")
fmt.Printf("Final output: %v\n", output)
}
Output: Prep: reading input 'hello world' Exec: processed: hello world Post: stored result Final output: processed: hello world
Example (WithRetries) ¶
ExampleNewNode_withRetries demonstrates combining with BaseNode options
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/flyt"
)
func main() {
attempts := 0
node := flyt.NewNode(
flyt.WithExecFunc(func(ctx context.Context, prepResult any) (any, error) {
attempts++
fmt.Printf("Attempt %d\n", attempts)
if attempts < 3 {
return nil, fmt.Errorf("temporary failure")
}
return "success after retries", nil
}),
flyt.WithMaxRetries(3),
)
shared := flyt.NewSharedStore()
ctx := context.Background()
_, err := flyt.Run(ctx, node, shared)
if err != nil {
log.Fatal(err)
}
fmt.Println("Success!")
}
Output: Attempt 1 Attempt 2 Attempt 3 Success!
type NodeOption ¶
type NodeOption func(*BaseNode)
NodeOption is a function that configures a BaseNode
func WithMaxRetries ¶
func WithMaxRetries(retries int) NodeOption
WithMaxRetries sets the maximum number of retries
func WithWait ¶
func WithWait(wait time.Duration) NodeOption
WithWait sets the wait duration between retries
type RetryableNode ¶
RetryableNode is a node that supports retries
type SharedStore ¶
type SharedStore struct {
// contains filtered or unexported fields
}
SharedStore provides thread-safe access to shared data
func NewSharedStore ¶
func NewSharedStore() *SharedStore
NewSharedStore creates a new thread-safe shared store
func (*SharedStore) Get ¶
func (s *SharedStore) Get(key string) (any, bool)
Get retrieves a value from the store
func (*SharedStore) GetAll ¶
func (s *SharedStore) GetAll() map[string]any
GetAll returns a copy of all data
func (*SharedStore) Merge ¶
func (s *SharedStore) Merge(data map[string]any)
Merge merges another map into the store
func (*SharedStore) Set ¶
func (s *SharedStore) Set(key string, value any)
Set stores a value in the store
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages concurrent task execution
func NewWorkerPool ¶
func NewWorkerPool(workers int) *WorkerPool
NewWorkerPool creates a new worker pool
func (*WorkerPool) Close ¶
func (p *WorkerPool) Close()
Close closes the worker pool and waits for all workers to finish
func (*WorkerPool) Submit ¶
func (p *WorkerPool) Submit(task func())
Submit submits a task to the pool