Documentation
¶
Overview ¶
Package flyt provides batch processing capabilities for the flyt workflow framework.
The batch package includes utilities for processing collections of items either sequentially or concurrently, with configurable batch sizes and concurrency limits.
Key Features:
- Batch processing nodes for item collections
- Batch flow execution with parameter variations
- Concurrent and sequential processing modes
- Configurable batch sizes and concurrency limits
- Error aggregation for batch operations
Example (Batch Processing):
// Process items concurrently
processFunc := func(ctx context.Context, item any) (any, error) {
// Process each item
return fmt.Sprintf("processed: %v", item), nil
}
node := flyt.NewBatchNode(processFunc, true) // true for concurrent
shared := flyt.NewSharedStore()
shared.Set(flyt.KeyItems, []string{"item1", "item2", "item3"})
action, err := flyt.Run(ctx, node, shared)
if err != nil {
log.Fatal(err)
}
results, _ := shared.Get(flyt.KeyResults)
fmt.Println(results) // ["processed: item1", "processed: item2", "processed: item3"]
Example (Batch Flow):
// Run a flow multiple times with different parameters
flowFactory := func() *flyt.Flow {
// Create your flow here
return flyt.NewFlow(startNode)
}
batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.FlowInputs, error) {
return []flyt.FlowInputs{
{"user_id": 1, "action": "process"},
{"user_id": 2, "action": "process"},
}, nil
}
batchFlow := flyt.NewBatchFlow(flowFactory, batchFunc, true)
err := batchFlow.Run(ctx, shared)
Package flyt is a minimalist workflow framework for Go, inspired by Pocket Flow. It provides a simple graph-based abstraction for orchestrating tasks.
Thread Safety: When using concurrent batch operations, ensure that your Node implementations are thread-safe. The framework provides SharedStore for safe concurrent access to shared data.
Example:
// Define a simple node
type PrintNode struct {
*flyt.BaseNode
}
func (n *PrintNode) Exec(ctx context.Context, prepResult any) (any, error) {
fmt.Println("Hello from node!")
return nil, nil
}
// Create and run a flow
node := &PrintNode{BaseNode: flyt.NewBaseNode()}
shared := flyt.NewSharedStore()
ctx := context.Background()
action, err := flyt.Run(ctx, node, shared)
if err != nil {
log.Fatal(err)
}
Index ¶
- Constants
- func As[T any](r Result) (T, bool)
- func MustAs[T any](r Result) T
- func ToSlice(v any) []any
- type Action
- type BaseNode
- func (n *BaseNode) Exec(ctx context.Context, prepResult any) (any, error)
- func (n *BaseNode) ExecFallback(prepResult any, err error) (any, error)
- func (n *BaseNode) GetMaxRetries() int
- func (n *BaseNode) GetWait() time.Duration
- func (n *BaseNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
- func (n *BaseNode) Prep(ctx context.Context, shared *SharedStore) (any, error)
- type BaseNodeG
- type BatchConfig
- type BatchError
- type BatchFlowFunc
- type BatchProcessFunc
- type CustomNode
- func (n *CustomNode) Exec(ctx context.Context, prepResult any) (any, error)
- func (n *CustomNode) ExecFallback(prepResult any, err error) (any, error)
- func (n *CustomNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
- func (n *CustomNode) Prep(ctx context.Context, shared *SharedStore) (any, error)
- type CustomNodeOption
- func WithExecFallbackFunc(fn func(any, error) (any, error)) CustomNodeOption
- func WithExecFunc(fn func(context.Context, Result) (Result, error)) CustomNodeOption
- func WithExecFuncAny(fn func(context.Context, any) (any, error)) CustomNodeOption
- func WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) CustomNodeOption
- func WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) CustomNodeOption
- func WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) CustomNodeOption
- func WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) CustomNodeOption
- type FallbackNode
- type Flow
- func NewBatchFlow(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool) *Flow
- func NewBatchFlowWithConfig(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, ...) *Flow
- func NewBatchFlowWithCountKey(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, ...) *Flow
- func NewFlow(start Node) *Flow
- func (f *Flow) Connect(from Node, action Action, to Node) *Flow
- func (f *Flow) Exec(ctx context.Context, prepResult any) (any, error)
- func (f *Flow) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
- func (f *Flow) Prep(ctx context.Context, shared *SharedStore) (any, error)
- func (f *Flow) Run(ctx context.Context, shared *SharedStore) error
- type FlowFactory
- type FlowInputs
- type Node
- func NewBatchNode(processFunc BatchProcessFunc, concurrent bool, opts ...NodeOption) Node
- func NewBatchNodeWithConfig(processFunc BatchProcessFunc, concurrent bool, config *BatchConfig, ...) Node
- func NewBatchNodeWithKeys(processFunc BatchProcessFunc, concurrent bool, itemsKey, resultsKey string, ...) Node
- func NewNodeAdapter[P, E any](node NodeG[P, E]) Node
- type NodeAdapter
- type NodeBuilder
- func (b *NodeBuilder) Exec(ctx context.Context, prepResult any) (any, error)
- func (b *NodeBuilder) ExecFallback(prepResult any, err error) (any, error)
- func (b *NodeBuilder) GetMaxRetries() int
- func (b *NodeBuilder) GetWait() time.Duration
- func (b *NodeBuilder) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
- func (b *NodeBuilder) Prep(ctx context.Context, shared *SharedStore) (any, error)
- func (b *NodeBuilder) WithExecFallbackFunc(fn func(any, error) (any, error)) *NodeBuilder
- func (b *NodeBuilder) WithExecFunc(fn func(context.Context, Result) (Result, error)) *NodeBuilder
- func (b *NodeBuilder) WithExecFuncAny(fn func(context.Context, any) (any, error)) *NodeBuilder
- func (b *NodeBuilder) WithMaxRetries(retries int) *NodeBuilder
- func (b *NodeBuilder) WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) *NodeBuilder
- func (b *NodeBuilder) WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) *NodeBuilder
- func (b *NodeBuilder) WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) *NodeBuilder
- func (b *NodeBuilder) WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) *NodeBuilder
- func (b *NodeBuilder) WithWait(wait time.Duration) *NodeBuilder
- type NodeG
- type NodeOption
- type Result
- func (r Result) AsBool() (bool, bool)
- func (r Result) AsBoolOr(defaultVal bool) bool
- func (r Result) AsFloat64() (float64, bool)
- func (r Result) AsFloat64Or(defaultVal float64) float64
- func (r Result) AsInt() (int, bool)
- func (r Result) AsIntOr(defaultVal int) int
- func (r Result) AsMap() (map[string]any, bool)
- func (r Result) AsMapOr(defaultVal map[string]any) map[string]any
- func (r Result) AsSlice() ([]any, bool)
- func (r Result) AsSliceOr(defaultVal []any) []any
- func (r Result) AsString() (string, bool)
- func (r Result) AsStringOr(defaultVal string) string
- func (r Result) Bind(dest any) error
- func (r Result) IsNil() bool
- func (r Result) MustBind(dest any)
- func (r Result) MustBool() bool
- func (r Result) MustFloat64() float64
- func (r Result) MustInt() int
- func (r Result) MustMap() map[string]any
- func (r Result) MustSlice() []any
- func (r Result) MustString() string
- func (r Result) Type() string
- func (r Result) Value() any
- type RetryableNode
- type SharedStore
- func (s *SharedStore) Bind(key string, dest any) error
- func (s *SharedStore) Clear()
- func (s *SharedStore) Delete(key string)
- func (s *SharedStore) Get(key string) (any, bool)
- func (s *SharedStore) GetAll() map[string]any
- func (s *SharedStore) GetBool(key string) bool
- func (s *SharedStore) GetBoolOr(key string, defaultVal bool) bool
- func (s *SharedStore) GetFloat64(key string) float64
- func (s *SharedStore) GetFloat64Or(key string, defaultVal float64) float64
- func (s *SharedStore) GetInt(key string) int
- func (s *SharedStore) GetIntOr(key string, defaultVal int) int
- func (s *SharedStore) GetMap(key string) map[string]any
- func (s *SharedStore) GetMapOr(key string, defaultVal map[string]any) map[string]any
- func (s *SharedStore) GetSlice(key string) []any
- func (s *SharedStore) GetSliceOr(key string, defaultVal []any) []any
- func (s *SharedStore) GetString(key string) string
- func (s *SharedStore) GetStringOr(key string, defaultVal string) string
- func (s *SharedStore) Has(key string) bool
- func (s *SharedStore) Keys() []string
- func (s *SharedStore) Len() int
- func (s *SharedStore) Merge(data map[string]any)
- func (s *SharedStore) MustBind(key string, dest any)
- func (s *SharedStore) Set(key string, value any)
- type WorkerPool
Constants ¶
const ( // DefaultAction is the default action if none is specified. // Flows use this when a node doesn't explicitly return an action. DefaultAction Action = "default" // KeyItems is the shared store key for items to be processed. // Batch nodes look for items under this key by default. KeyItems = "items" // KeyResults is the shared store key for processing results. // Batch nodes store their results under this key by default. KeyResults = "results" // KeyBatchCount is the shared store key for batch count. // Batch flows store the number of iterations under this key. KeyBatchCount = "batch_count" )
Variables ¶
This section is empty.
Functions ¶
func As ¶ added in v0.7.0
As attempts to retrieve the Result as the specified type T. Returns the typed value and true if successful, or zero value and false if not.
Example:
if user, ok := result.As[*User](); ok {
// Use typed user
}
func MustAs ¶ added in v0.7.0
MustAs retrieves the Result as the specified type T. Panics if the Result is nil or not of type T.
Example:
user := MustAs[*User](result)
func ToSlice ¶
ToSlice converts various types to []any for batch processing. This utility function handles common slice types and single values, making it easier to work with batch operations.
Supported types:
- []any: returned as-is
- []string, []int, []float64: converted to []any
- []map[string]any: converted to []any
- Other slice types: converted using reflection
- Single values: wrapped in a slice
- nil: returns empty slice
Example:
items1 := flyt.ToSlice([]string{"a", "b", "c"}) // []any{"a", "b", "c"}
items2 := flyt.ToSlice("single") // []any{"single"}
items3 := flyt.ToSlice(nil) // []any{}
Types ¶
type Action ¶
type Action string
Action represents the next action to take after a node executes. Actions are used to determine flow control in workflows, allowing nodes to direct execution to different paths based on their results.
Example:
const (
ActionSuccess Action = "success"
ActionRetry Action = "retry"
ActionFail Action = "fail"
)
func Run ¶
Run executes a node with the standard prep->exec->post lifecycle. This is the main entry point for executing individual nodes.
The execution flow is:
- Prep: Read and preprocess data from SharedStore
- Exec: Execute main logic with automatic retries if configured
- Post: Process results and write back to SharedStore
If the node implements RetryableNode, the Exec phase will automatically retry on failure according to the configured settings.
If the node implements FallbackNode and all retries fail, ExecFallback is called to provide alternative handling.
Parameters:
- ctx: Context for cancellation and timeouts
- node: The node to execute
- shared: SharedStore for data exchange
Returns:
- Action: The next action to take in the flow
- error: Any error that occurred during execution
Example:
node := &MyNode{BaseNode: flyt.NewBaseNode()}
shared := flyt.NewSharedStore()
shared.Set("input", "data")
action, err := flyt.Run(ctx, node, shared)
if err != nil {
log.Fatal(err)
}
type BaseNode ¶
type BaseNode struct {
// contains filtered or unexported fields
}
BaseNode provides a base implementation of the Node interface. It includes common functionality like retry configuration and default implementations of Prep, Exec, and Post methods that can be overridden.
BaseNode is designed to be embedded in custom node implementations:
type MyNode struct {
*flyt.BaseNode
// custom fields
}
func (n *MyNode) Exec(ctx context.Context, prepResult any) (any, error) {
// custom implementation
}
func NewBaseNode ¶
func NewBaseNode(opts ...NodeOption) *BaseNode
NewBaseNode creates a new BaseNode with the provided options. By default, maxRetries is set to 1 (no retries) and wait is 0.
Example:
node := flyt.NewBaseNode(
flyt.WithMaxRetries(3),
flyt.WithWait(time.Second),
)
func (*BaseNode) Exec ¶
Exec is the default exec implementation that returns nil. This method should be overridden in your node implementation to provide the main processing logic.
func (*BaseNode) ExecFallback ¶
ExecFallback handles errors after all retries are exhausted. The default implementation simply returns the error. Override this method to provide custom fallback behavior.
func (*BaseNode) GetMaxRetries ¶
GetMaxRetries returns the maximum number of retries configured for this node. This method is thread-safe.
func (*BaseNode) GetWait ¶
GetWait returns the wait duration between retries configured for this node. This method is thread-safe.
func (*BaseNode) Post ¶
func (n *BaseNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
Post is the default post implementation that returns DefaultAction. Override this method in your node implementation to process results and determine the next action in the flow.
type BaseNodeG ¶ added in v0.7.0
BaseNodeG provides a generic base implementation
func NewBaseNodeG ¶ added in v0.7.0
func NewBaseNodeG[P, E any](opts ...NodeOption) *BaseNodeG[P, E]
NewBaseNodeG creates a new generic BaseNode
type BatchConfig ¶
type BatchConfig struct {
// MaxBatchSize is the maximum number of items to process in a single batch.
// If a batch exceeds this size, an error will be returned.
// Default: 1000
MaxBatchSize int
// MaxConcurrency is the maximum number of concurrent workers for parallel processing.
// This is only used when concurrent processing is enabled.
// Default: 10
MaxConcurrency int
// ItemsKey is the SharedStore key to read items from.
// Default: "items"
ItemsKey string
// ResultsKey is the SharedStore key to write results to.
// Default: "results"
ResultsKey string
// BatchCountKey is the SharedStore key to write the batch count for BatchFlow.
// Default: "batch_count"
BatchCountKey string
}
BatchConfig holds configuration for batch operations. It allows customization of batch processing behavior including size limits, concurrency settings, and storage keys.
func DefaultBatchConfig ¶
func DefaultBatchConfig() *BatchConfig
DefaultBatchConfig returns a BatchConfig with sensible defaults. The defaults are:
- MaxBatchSize: 1000
- MaxConcurrency: 10
- ItemsKey: "items"
- ResultsKey: "results"
- BatchCountKey: "batch_count"
type BatchError ¶
type BatchError struct {
Errors []error
}
BatchError aggregates multiple errors from batch operations. It implements the error interface and provides detailed information about all errors that occurred during batch processing.
func (*BatchError) Error ¶
func (e *BatchError) Error() string
type BatchFlowFunc ¶
type BatchFlowFunc func(ctx context.Context, shared *SharedStore) ([]FlowInputs, error)
BatchFlowFunc returns input parameters for each flow iteration in batch processing. This function is called during the Prep phase to determine how many flow instances to run and what parameters each should receive.
The function receives the parent flow's SharedStore and should return a slice of FlowInputs, where each element represents parameters for one flow iteration.
type BatchProcessFunc ¶
BatchProcessFunc is a function that processes a single item in a batch. It receives a context and an item, and returns the processed result or an error. The function should be thread-safe if concurrent processing is enabled.
type CustomNode ¶ added in v0.3.0
type CustomNode struct {
*BaseNode
// contains filtered or unexported fields
}
CustomNode is a node implementation that uses custom functions for Prep, Exec, and Post phases. This allows creating nodes without defining new types, useful for simple operations.
CustomNode supports two styles of functions:
Result-based functions (WithPrepFunc, WithExecFunc, WithPostFunc): These use the Result type which provides convenient type assertion methods. Best for complex data processing where type safety and conversion helpers are valuable.
Any-based functions (WithPrepFuncAny, WithExecFuncAny, WithPostFuncAny): These use standard any types matching the Node interface directly. Best for simple operations or when migrating existing code.
Example with Result types:
node := flyt.NewNode(
flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
data := prepResult.MustString() // Type-safe access
return flyt.NewResult(processData(data)), nil
}),
)
Example with any types:
node := flyt.NewNode(
flyt.WithExecFuncAny(func(ctx context.Context, prepResult any) (any, error) {
data := prepResult.(string) // Manual type assertion
return processData(data), nil
}),
)
func (*CustomNode) Exec ¶ added in v0.3.0
Exec implements Node.Exec by calling the custom execFunc if provided
func (*CustomNode) ExecFallback ¶ added in v0.5.0
func (n *CustomNode) ExecFallback(prepResult any, err error) (any, error)
ExecFallback implements FallbackNode.ExecFallback by calling the custom execFallbackFunc if provided
func (*CustomNode) Post ¶ added in v0.3.0
func (n *CustomNode) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
Post implements Node.Post by calling the custom postFunc if provided
func (*CustomNode) Prep ¶ added in v0.3.0
func (n *CustomNode) Prep(ctx context.Context, shared *SharedStore) (any, error)
Prep implements Node.Prep by calling the custom prepFunc if provided
type CustomNodeOption ¶ added in v0.3.0
type CustomNodeOption interface {
// contains filtered or unexported methods
}
CustomNodeOption is an option for configuring a CustomNode. It allows setting custom implementations for Prep, Exec, and Post methods.
func WithExecFallbackFunc ¶ added in v0.5.0
func WithExecFallbackFunc(fn func(any, error) (any, error)) CustomNodeOption
WithExecFallbackFunc sets a custom ExecFallback implementation for a CustomNode. The provided function will be called when Exec fails after all retries are exhausted. This allows for custom error handling or returning a default value.
Example:
flyt.WithExecFallbackFunc(func(prepResult any, err error) (any, error) {
log.Printf("Exec failed after retries: %v", err)
// Return nil to indicate failure, which can be handled in Post
return nil, nil
})
func WithExecFunc ¶ added in v0.3.0
WithExecFunc sets a custom Exec implementation for a CustomNode. The provided function will be called during the Exec phase with the result from Prep as input.
Example:
flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
// Process the data
return flyt.NewResult(processedResult), nil
})
func WithExecFuncAny ¶ added in v0.8.0
WithExecFuncAny sets a custom Exec implementation for a CustomNode using any types. This is an alternative to WithExecFunc that doesn't require Result types, useful for simpler cases or when you don't need the type assertion helpers.
Example:
flyt.WithExecFuncAny(func(ctx context.Context, prepResult any) (any, error) {
// Process the data
return processedResult, nil
})
func WithPostFunc ¶ added in v0.3.0
func WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) CustomNodeOption
WithPostFunc sets a custom Post implementation for a CustomNode. The provided function will be called during the Post phase to process results and determine the next action.
Example:
flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
shared.Set("output", execResult.Value())
if success {
return "success", nil
}
return "retry", nil
})
func WithPostFuncAny ¶ added in v0.8.0
func WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) CustomNodeOption
WithPostFuncAny sets a custom Post implementation for a CustomNode using any types. This is an alternative to WithPostFunc that doesn't require Result types, useful for simpler cases or backward compatibility.
Example:
flyt.WithPostFuncAny(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
shared.Set("output", execResult)
if success {
return "success", nil
}
return "retry", nil
})
func WithPrepFunc ¶ added in v0.3.0
func WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) CustomNodeOption
WithPrepFunc sets a custom Prep implementation for a CustomNode. The provided function will be called during the Prep phase to read and preprocess data from the SharedStore.
Example:
flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
data, _ := shared.Get("input")
// Preprocess data
return flyt.NewResult(preprocessedData), nil
})
func WithPrepFuncAny ¶ added in v0.8.0
func WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) CustomNodeOption
WithPrepFuncAny sets a custom Prep implementation for a CustomNode using any types. This is an alternative to WithPrepFunc that doesn't require Result types, useful for simpler cases or when migrating existing code.
Example:
flyt.WithPrepFuncAny(func(ctx context.Context, shared *flyt.SharedStore) (any, error) {
data, _ := shared.Get("input")
// Preprocess data
return preprocessedData, nil
})
type FallbackNode ¶
FallbackNode is a node that supports custom fallback behavior on error. When all retries are exhausted, the ExecFallback method is called to provide an alternative result or handle the error gracefully.
type Flow ¶
type Flow struct {
*BaseNode
// contains filtered or unexported fields
}
Flow represents a workflow of connected nodes. A Flow is itself a Node, allowing flows to be nested within other flows. Nodes are connected via actions, creating a directed graph of execution.
Example:
// Create nodes
validateNode := &ValidateNode{BaseNode: flyt.NewBaseNode()}
processNode := &ProcessNode{BaseNode: flyt.NewBaseNode()}
errorNode := &ErrorNode{BaseNode: flyt.NewBaseNode()}
// Create flow
flow := flyt.NewFlow(validateNode)
flow.Connect(validateNode, ActionSuccess, processNode)
flow.Connect(validateNode, ActionFail, errorNode)
// Or with chaining:
flow.Connect(validateNode, ActionSuccess, processNode)
.Connect(validateNode, ActionFail, errorNode)
// Run flow
err := flow.Run(ctx, shared)
func NewBatchFlow ¶
func NewBatchFlow(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool) *Flow
NewBatchFlow creates a flow that runs multiple times with different parameters. Each iteration gets its own isolated SharedStore with merged parameters.
Important: The flowFactory must create new flow instances for concurrent execution to avoid race conditions. Do not reuse flow instances.
Parameters:
- flowFactory: Function that creates new flow instances
- batchFunc: Function that returns parameters for each iteration
- concurrent: If true, flows run concurrently
Example:
flowFactory := func() *flyt.Flow {
// Create and return a new flow instance
return flyt.NewFlow(startNode)
}
batchFunc := func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.FlowInputs, error) {
users, _ := shared.Get("users")
var inputs []flyt.FlowInputs
for _, user := range users.([]User) {
inputs = append(inputs, flyt.FlowInputs{"user": user})
}
return inputs, nil
}
batchFlow := flyt.NewBatchFlow(flowFactory, batchFunc, true)
func NewBatchFlowWithConfig ¶
func NewBatchFlowWithConfig(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, config *BatchConfig) *Flow
NewBatchFlowWithConfig creates a batch flow with custom configuration. This provides full control over batch flow execution settings.
Parameters:
- flowFactory: Function that creates new flow instances
- batchFunc: Function that returns parameters for each iteration
- concurrent: If true, flows run concurrently
- config: Custom batch configuration
Example:
config := &flyt.BatchConfig{
MaxBatchSize: 100, // Limit to 100 flows
MaxConcurrency: 5, // Run max 5 flows concurrently
BatchCountKey: "total", // Store count in "total" key
}
batchFlow := flyt.NewBatchFlowWithConfig(flowFactory, batchFunc, true, config)
func NewBatchFlowWithCountKey ¶ added in v0.2.0
func NewBatchFlowWithCountKey(flowFactory FlowFactory, batchFunc BatchFlowFunc, concurrent bool, countKey string) *Flow
NewBatchFlowWithCountKey creates a batch flow with a custom key for storing the batch count. After execution, the number of flows that were run is stored in the SharedStore using the specified key instead of the default "batch_count" key.
Parameters:
- flowFactory: Function that creates new flow instances
- batchFunc: Function that returns parameters for each iteration
- concurrent: If true, flows run concurrently
- countKey: SharedStore key to store the batch count
Example:
batchFlow := flyt.NewBatchFlowWithCountKey(flowFactory, batchFunc, true, "processed_count")
// After execution, retrieve count with: count, _ := shared.Get("processed_count")
func NewFlow ¶
NewFlow creates a new Flow with a start node. The flow begins execution at the start node and follows transitions based on the actions returned by each node.
Parameters:
- start: The first node to execute in the flow
Example:
flow := flyt.NewFlow(startNode)
func (*Flow) Connect ¶
Connect adds a transition from one node to another based on an action. When the 'from' node returns the specified action, execution continues with the 'to' node. Multiple actions can be connected from a single node. Returns the flow instance for method chaining.
Parameters:
- from: The source node
- action: The action that triggers this transition
- to: The destination node
Example:
flow.Connect(nodeA, "success", nodeB) flow.Connect(nodeA, "retry", nodeA) // Self-loop for retry flow.Connect(nodeA, "fail", errorNode)
Example with chaining:
flow.Connect(nodeA, "success", nodeB)
.Connect(nodeB, "success", finalNode)
.Connect(nodeB, "fail", errorNode)
func (*Flow) Post ¶
func (f *Flow) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
Post implements Node interface for Flow
func (*Flow) Run ¶
func (f *Flow) Run(ctx context.Context, shared *SharedStore) error
Run executes the flow starting from the start node. This is a convenience method that wraps the standard Run function. The flow executes nodes in sequence based on their action transitions until no more transitions are available or an error occurs.
Parameters:
- ctx: Context for cancellation and timeouts
- shared: SharedStore for data exchange between nodes
Returns:
- error: Any error that occurred during flow execution
type FlowFactory ¶
type FlowFactory func() *Flow
FlowFactory creates new instances of a flow. This is used by batch flow operations to create isolated flow instances for concurrent execution. Each call should return a new flow instance to avoid race conditions.
Example:
factory := func() *flyt.Flow {
node1 := &ProcessNode{BaseNode: flyt.NewBaseNode()}
node2 := &SaveNode{BaseNode: flyt.NewBaseNode()}
flow := flyt.NewFlow(node1)
flow.Connect(node1, flyt.DefaultAction, node2)
return flow
}
type FlowInputs ¶ added in v0.4.0
FlowInputs holds input parameters for a flow iteration in batch processing. These parameters are merged into each flow's isolated SharedStore, allowing each flow instance to have its own set of input data.
Example:
inputs := flyt.FlowInputs{
"user_id": 123,
"action": "process",
"priority": "high",
}
type Node ¶
type Node interface {
// Prep reads and preprocesses data from shared store
Prep(ctx context.Context, shared *SharedStore) (any, error)
// Exec executes the main logic with optional retries
Exec(ctx context.Context, prepResult any) (any, error)
// Post processes results and writes back to shared store
Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
}
Node is the interface that all nodes must implement.
Important: Nodes should not be shared across concurrent flow executions. If you need to run the same logic concurrently, create separate node instances.
func NewBatchNode ¶
func NewBatchNode(processFunc BatchProcessFunc, concurrent bool, opts ...NodeOption) Node
NewBatchNode creates a node that processes items in batch. The node reads items from the SharedStore (using the key "items" by default), processes each item using the provided function, and stores results back to the SharedStore (using the key "results" by default).
Parameters:
- processFunc: Function to process each item
- concurrent: If true, items are processed concurrently using a worker pool
- opts: Additional node options (e.g., WithMaxRetries, WithWait)
Example:
processFunc := func(ctx context.Context, item any) (any, error) {
// Process the item
return processedItem, nil
}
node := flyt.NewBatchNode(processFunc, true)
func NewBatchNodeWithConfig ¶
func NewBatchNodeWithConfig(processFunc BatchProcessFunc, concurrent bool, config *BatchConfig, opts ...NodeOption) Node
NewBatchNodeWithConfig creates a batch node with custom configuration. This provides full control over all batch processing settings.
Parameters:
- processFunc: Function to process each item
- concurrent: If true, items are processed concurrently
- config: Custom batch configuration
- opts: Additional node options
Example:
config := &flyt.BatchConfig{
MaxBatchSize: 500,
MaxConcurrency: 20,
ItemsKey: "tasks",
ResultsKey: "completed_tasks",
}
node := flyt.NewBatchNodeWithConfig(processFunc, true, config)
func NewBatchNodeWithKeys ¶ added in v0.2.0
func NewBatchNodeWithKeys(processFunc BatchProcessFunc, concurrent bool, itemsKey, resultsKey string, opts ...NodeOption) Node
NewBatchNodeWithKeys creates a batch node with custom keys for items and results. This allows you to specify which keys in the SharedStore to read items from and write results to, instead of using the default keys.
Parameters:
- processFunc: Function to process each item
- concurrent: If true, items are processed concurrently
- itemsKey: SharedStore key to read items from
- resultsKey: SharedStore key to write results to
- opts: Additional node options
Example:
node := flyt.NewBatchNodeWithKeys(processFunc, true, "input_data", "output_data")
func NewNodeAdapter ¶ added in v0.7.0
NewNodeAdapter creates an adapter from generic to standard node
type NodeAdapter ¶ added in v0.7.0
type NodeAdapter[P, E any] struct { // contains filtered or unexported fields }
NodeAdapter adapts a generic node to the standard Node interface
func (*NodeAdapter[P, E]) Post ¶ added in v0.7.0
func (a *NodeAdapter[P, E]) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
func (*NodeAdapter[P, E]) Prep ¶ added in v0.7.0
func (a *NodeAdapter[P, E]) Prep(ctx context.Context, shared *SharedStore) (any, error)
type NodeBuilder ¶ added in v0.9.0
type NodeBuilder struct {
*CustomNode
}
NodeBuilder provides a fluent interface for creating and configuring nodes. It implements the Node interface while also providing chainable methods for configuration. This allows both styles:
- Traditional: flyt.NewNode(WithExecFunc(...), WithMaxRetries(3))
- Builder: flyt.NewNode().WithExecFunc(...).WithMaxRetries(3)
func NewNode ¶ added in v0.3.0
func NewNode(opts ...any) *NodeBuilder
NewNode creates a new node with custom function implementations. Returns a NodeBuilder that implements Node and provides chainable configuration methods.
Can be used in two styles:
Traditional (backwards compatible):
node := flyt.NewNode(
flyt.WithMaxRetries(3),
flyt.WithExecFunc(execFn),
)
Builder pattern:
node := flyt.NewNode().
WithMaxRetries(3).
WithExecFunc(execFn).
Build() // Build() is optional
func (*NodeBuilder) Exec ¶ added in v0.9.0
Exec implements Node.Exec by delegating to the embedded CustomNode
func (*NodeBuilder) ExecFallback ¶ added in v0.9.0
func (b *NodeBuilder) ExecFallback(prepResult any, err error) (any, error)
ExecFallback implements FallbackNode.ExecFallback by delegating to the embedded CustomNode
func (*NodeBuilder) GetMaxRetries ¶ added in v0.9.0
func (b *NodeBuilder) GetMaxRetries() int
GetMaxRetries implements RetryableNode.GetMaxRetries by delegating to the embedded BaseNode
func (*NodeBuilder) GetWait ¶ added in v0.9.0
func (b *NodeBuilder) GetWait() time.Duration
GetWait implements RetryableNode.GetWait by delegating to the embedded BaseNode
func (*NodeBuilder) Post ¶ added in v0.9.0
func (b *NodeBuilder) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error)
Post implements Node.Post by delegating to the embedded CustomNode
func (*NodeBuilder) Prep ¶ added in v0.9.0
func (b *NodeBuilder) Prep(ctx context.Context, shared *SharedStore) (any, error)
Prep implements Node.Prep by delegating to the embedded CustomNode
func (*NodeBuilder) WithExecFallbackFunc ¶ added in v0.9.0
func (b *NodeBuilder) WithExecFallbackFunc(fn func(any, error) (any, error)) *NodeBuilder
WithExecFallbackFunc sets a custom ExecFallback implementation. Returns the builder for method chaining.
func (*NodeBuilder) WithExecFunc ¶ added in v0.9.0
func (b *NodeBuilder) WithExecFunc(fn func(context.Context, Result) (Result, error)) *NodeBuilder
WithExecFunc sets a custom Exec implementation using Result types. Returns the builder for method chaining.
func (*NodeBuilder) WithExecFuncAny ¶ added in v0.9.0
func (b *NodeBuilder) WithExecFuncAny(fn func(context.Context, any) (any, error)) *NodeBuilder
WithExecFuncAny sets a custom Exec implementation using any types. Returns the builder for method chaining.
func (*NodeBuilder) WithMaxRetries ¶ added in v0.9.0
func (b *NodeBuilder) WithMaxRetries(retries int) *NodeBuilder
WithMaxRetries sets the maximum number of retries for the node's Exec phase. Returns the builder for method chaining.
func (*NodeBuilder) WithPostFunc ¶ added in v0.9.0
func (b *NodeBuilder) WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) *NodeBuilder
WithPostFunc sets a custom Post implementation using Result types. Returns the builder for method chaining.
func (*NodeBuilder) WithPostFuncAny ¶ added in v0.9.0
func (b *NodeBuilder) WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) *NodeBuilder
WithPostFuncAny sets a custom Post implementation using any types. Returns the builder for method chaining.
func (*NodeBuilder) WithPrepFunc ¶ added in v0.9.0
func (b *NodeBuilder) WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) *NodeBuilder
WithPrepFunc sets a custom Prep implementation using Result types. Returns the builder for method chaining.
func (*NodeBuilder) WithPrepFuncAny ¶ added in v0.9.0
func (b *NodeBuilder) WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) *NodeBuilder
WithPrepFuncAny sets a custom Prep implementation using any types. Returns the builder for method chaining.
func (*NodeBuilder) WithWait ¶ added in v0.9.0
func (b *NodeBuilder) WithWait(wait time.Duration) *NodeBuilder
WithWait sets the wait duration between retries. Returns the builder for method chaining.
type NodeG ¶ added in v0.7.0
type NodeG[P, E any] interface { // PrepG reads and preprocesses data from shared store with typed result PrepG(ctx context.Context, shared *SharedStore) (P, error) // ExecG executes the main logic with typed input and output ExecG(ctx context.Context, prepResult P) (E, error) // PostG processes results with typed inputs PostG(ctx context.Context, shared *SharedStore, prepResult P, execResult E) (Action, error) }
NodeG is a generic version of Node interface for type-safe implementations
type NodeOption ¶
type NodeOption func(*BaseNode)
NodeOption is a function that configures a BaseNode. Options can be passed to NewBaseNode to customize its behavior.
func WithMaxRetries ¶
func WithMaxRetries(retries int) NodeOption
WithMaxRetries sets the maximum number of retries for the Exec phase. The default is 1 (no retries). Setting this to a value greater than 1 enables automatic retry on Exec failures.
Example:
node := flyt.NewBaseNode(flyt.WithMaxRetries(3))
func WithWait ¶
func WithWait(wait time.Duration) NodeOption
WithWait sets the wait duration between retries. This only applies when maxRetries is greater than 1. The default is 0 (no wait between retries).
Example:
node := flyt.NewBaseNode(
flyt.WithMaxRetries(3),
flyt.WithWait(time.Second * 2),
)
type Result ¶ added in v0.7.0
type Result struct {
// contains filtered or unexported fields
}
Result is a wrapper type that provides convenient type assertion methods. It's used as return type for Prep and Exec methods to improve developer experience. It can hold any value and provides type-safe accessors.
func (Result) AsBool ¶ added in v0.7.0
AsBool retrieves the Result as a bool. Returns false and false if the Result value is nil or not a bool.
func (Result) AsBoolOr ¶ added in v0.7.0
AsBoolOr retrieves the Result as a bool. Returns the provided default value if the Result is nil or not a bool.
func (Result) AsFloat64 ¶ added in v0.7.0
AsFloat64 retrieves the Result as a float64. Returns 0.0 and false if the Result value is nil or cannot be converted to float64. Supports conversion from various numeric types.
func (Result) AsFloat64Or ¶ added in v0.7.0
AsFloat64Or retrieves the Result as a float64. Returns the provided default value if the Result is nil or cannot be converted to float64.
func (Result) AsInt ¶ added in v0.7.0
AsInt retrieves the Result as an int. Returns 0 and false if the Result value is nil or cannot be converted to int. Supports conversion from int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, and float types.
func (Result) AsIntOr ¶ added in v0.7.0
AsIntOr retrieves the Result as an int. Returns the provided default value if the Result is nil or cannot be converted to int.
func (Result) AsMap ¶ added in v0.7.0
AsMap retrieves the Result as a map[string]any. Returns nil and false if the Result value is nil or not a map[string]any.
func (Result) AsMapOr ¶ added in v0.7.0
AsMapOr retrieves the Result as a map[string]any. Returns the provided default value if the Result is nil or not a map[string]any.
func (Result) AsSlice ¶ added in v0.7.0
AsSlice retrieves the Result as a []any slice. Returns nil and false if the Result value is nil or not a slice. Uses ToSlice to convert various slice types to []any.
func (Result) AsSliceOr ¶ added in v0.7.0
AsSliceOr retrieves the Result as a []any slice. Returns the provided default value if the Result is nil or not a slice.
func (Result) AsString ¶ added in v0.7.0
AsString retrieves the Result as a string. Returns empty string and false if the Result value is nil or not a string.
func (Result) AsStringOr ¶ added in v0.7.0
AsStringOr retrieves the Result as a string. Returns the provided default value if the Result is nil or not a string.
func (Result) Bind ¶ added in v0.7.0
Bind binds the Result to a struct using JSON marshaling/unmarshaling. This allows for easy conversion of complex types. The destination must be a pointer to the target struct. Returns an error if the Result value is nil or binding fails.
Example:
type User struct {
ID int `json:"id"`
Name string `json:"name"`
}
var user User
err := result.Bind(&user)
func (Result) MustBind ¶ added in v0.7.0
MustBind is like Bind but panics if binding fails. Use this only when binding failure should be considered a programming error.
Example:
var config Config result.MustBind(&config) // Panics if binding fails
func (Result) MustBool ¶ added in v0.7.0
MustBool retrieves the Result as a bool. Panics if the Result is nil or not a bool.
func (Result) MustFloat64 ¶ added in v0.7.0
MustFloat64 retrieves the Result as a float64. Panics if the Result is nil or cannot be converted to float64.
func (Result) MustInt ¶ added in v0.7.0
MustInt retrieves the Result as an int. Panics if the Result is nil or cannot be converted to int.
func (Result) MustMap ¶ added in v0.7.0
MustMap retrieves the Result as a map[string]any. Panics if the Result is nil or not a map[string]any.
func (Result) MustSlice ¶ added in v0.7.0
MustSlice retrieves the Result as a []any slice. Panics if the Result is nil or not a slice.
func (Result) MustString ¶ added in v0.7.0
MustString retrieves the Result as a string. Panics if the Result is nil or not a string.
type RetryableNode ¶
RetryableNode is a node that supports automatic retries on Exec failures. Nodes implementing this interface can specify retry behavior through GetMaxRetries and GetWait methods.
type SharedStore ¶
type SharedStore struct {
// contains filtered or unexported fields
}
SharedStore provides thread-safe access to shared data across nodes in a flow. It acts as a key-value store that can be safely accessed by multiple goroutines during concurrent batch processing.
Example:
shared := flyt.NewSharedStore()
shared.Set("user_id", 123)
shared.Set("config", map[string]any{"timeout": 30})
if val, ok := shared.Get("user_id"); ok {
userID := val.(int)
// Use userID
}
func NewSharedStore ¶
func NewSharedStore() *SharedStore
NewSharedStore creates a new thread-safe shared store. The store is initialized empty and ready for use.
func (*SharedStore) Bind ¶ added in v0.6.0
func (s *SharedStore) Bind(key string, dest any) error
Bind binds a value from the store to a struct using JSON marshaling/unmarshaling. This allows for easy conversion of complex types stored in the SharedStore. The destination must be a pointer to the target struct. Returns an error if the key is not found or binding fails. This method is safe for concurrent access.
Example:
type User struct {
ID int `json:"id"`
Name string `json:"name"`
}
var user User
err := shared.Bind("user", &user)
func (*SharedStore) Clear ¶ added in v0.6.0
func (s *SharedStore) Clear()
Clear removes all keys from the store. This method is safe for concurrent access.
func (*SharedStore) Delete ¶ added in v0.6.0
func (s *SharedStore) Delete(key string)
Delete removes a key from the store. This method is safe for concurrent access.
func (*SharedStore) Get ¶
func (s *SharedStore) Get(key string) (any, bool)
Get retrieves a value from the store by key. Returns the value and true if the key exists, or nil and false if not found. This method is safe for concurrent access.
func (*SharedStore) GetAll ¶
func (s *SharedStore) GetAll() map[string]any
GetAll returns a copy of all data in the store. The returned map is a shallow copy and can be safely modified without affecting the store's internal data. This method is safe for concurrent access.
func (*SharedStore) GetBool ¶ added in v0.6.0
func (s *SharedStore) GetBool(key string) bool
GetBool retrieves a bool value from the store. Returns false if the key doesn't exist or the value is not a bool. This method is safe for concurrent access.
func (*SharedStore) GetBoolOr ¶ added in v0.6.0
func (s *SharedStore) GetBoolOr(key string, defaultVal bool) bool
GetBoolOr retrieves a bool value from the store. Returns the provided default value if the key doesn't exist or the value is not a bool. This method is safe for concurrent access.
func (*SharedStore) GetFloat64 ¶ added in v0.6.0
func (s *SharedStore) GetFloat64(key string) float64
GetFloat64 retrieves a float64 value from the store. Returns 0.0 if the key doesn't exist or the value cannot be converted to float64. Supports conversion from int, float32, and other numeric types. This method is safe for concurrent access.
func (*SharedStore) GetFloat64Or ¶ added in v0.6.0
func (s *SharedStore) GetFloat64Or(key string, defaultVal float64) float64
GetFloat64Or retrieves a float64 value from the store. Returns the provided default value if the key doesn't exist or the value cannot be converted to float64. Supports conversion from various numeric types. This method is safe for concurrent access.
func (*SharedStore) GetInt ¶ added in v0.6.0
func (s *SharedStore) GetInt(key string) int
GetInt retrieves an int value from the store. Returns 0 if the key doesn't exist or the value cannot be converted to int. Supports conversion from int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, and float types. This method is safe for concurrent access.
func (*SharedStore) GetIntOr ¶ added in v0.6.0
func (s *SharedStore) GetIntOr(key string, defaultVal int) int
GetIntOr retrieves an int value from the store. Returns the provided default value if the key doesn't exist or the value cannot be converted to int. Supports conversion from various numeric types. This method is safe for concurrent access.
func (*SharedStore) GetMap ¶ added in v0.6.0
func (s *SharedStore) GetMap(key string) map[string]any
GetMap retrieves a map[string]any from the store. Returns nil if the key doesn't exist or the value is not a map[string]any. This method is safe for concurrent access.
func (*SharedStore) GetMapOr ¶ added in v0.6.0
GetMapOr retrieves a map[string]any from the store. Returns the provided default value if the key doesn't exist or the value is not a map[string]any. This method is safe for concurrent access.
func (*SharedStore) GetSlice ¶ added in v0.6.0
func (s *SharedStore) GetSlice(key string) []any
GetSlice retrieves a []any slice from the store. Returns nil if the key doesn't exist or the value is not a slice. This method is safe for concurrent access.
func (*SharedStore) GetSliceOr ¶ added in v0.6.0
func (s *SharedStore) GetSliceOr(key string, defaultVal []any) []any
GetSliceOr retrieves a []any slice from the store. Returns the provided default value if the key doesn't exist or the value is not a slice. Uses ToSlice to convert various slice types to []any. This method is safe for concurrent access.
func (*SharedStore) GetString ¶ added in v0.6.0
func (s *SharedStore) GetString(key string) string
GetString retrieves a string value from the store. Returns empty string if the key doesn't exist or the value is not a string. This method is safe for concurrent access.
func (*SharedStore) GetStringOr ¶ added in v0.6.0
func (s *SharedStore) GetStringOr(key string, defaultVal string) string
GetStringOr retrieves a string value from the store. Returns the provided default value if the key doesn't exist or the value is not a string. This method is safe for concurrent access.
func (*SharedStore) Has ¶ added in v0.6.0
func (s *SharedStore) Has(key string) bool
Has checks if a key exists in the store. This method is safe for concurrent access.
func (*SharedStore) Keys ¶ added in v0.6.0
func (s *SharedStore) Keys() []string
Keys returns all keys in the store. The returned slice is a snapshot and can be safely modified without affecting the store's internal data. This method is safe for concurrent access.
func (*SharedStore) Len ¶ added in v0.6.0
func (s *SharedStore) Len() int
Len returns the number of items in the store. This method is safe for concurrent access.
func (*SharedStore) Merge ¶
func (s *SharedStore) Merge(data map[string]any)
Merge merges another map into the store. Existing keys are overwritten with values from the provided map. If the provided map is nil, this method does nothing. This method is safe for concurrent access.
func (*SharedStore) MustBind ¶ added in v0.6.0
func (s *SharedStore) MustBind(key string, dest any)
MustBind is like Bind but panics if binding fails. Use this only when binding failure should be considered a programming error. This method is safe for concurrent access.
Example:
var config Config
shared.MustBind("config", &config) // Panics if binding fails
func (*SharedStore) Set ¶
func (s *SharedStore) Set(key string, value any)
Set stores a value in the store with the given key. If the key already exists, its value is overwritten. This method is safe for concurrent access.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages concurrent task execution with a fixed number of workers. It provides a simple way to limit concurrency and execute tasks in parallel. WorkerPool is used internally by batch processing nodes but can also be used directly for custom concurrent operations.
Example:
pool := flyt.NewWorkerPool(5)
defer pool.Close()
for _, item := range items {
item := item // capture loop variable
pool.Submit(func() {
// Process item
})
}
pool.Wait()
func NewWorkerPool ¶
func NewWorkerPool(workers int) *WorkerPool
NewWorkerPool creates a new worker pool with the specified number of workers. If workers is less than or equal to 0, it defaults to 1. The pool starts workers immediately and is ready to accept tasks.
Parameters:
- workers: Number of concurrent workers
Returns:
- *WorkerPool: A new worker pool ready for use
Remember to call Close() when done to clean up resources.
func (*WorkerPool) Close ¶
func (p *WorkerPool) Close()
Close closes the worker pool and waits for all workers to finish. After calling Close, no new tasks can be submitted. This method should be called when the pool is no longer needed to properly clean up resources.
func (*WorkerPool) Submit ¶
func (p *WorkerPool) Submit(task func())
Submit submits a task to the pool for execution. The task will be executed by one of the available workers. This method blocks if all workers are busy and the task buffer is full.
Parameters:
- task: Function to execute
func (*WorkerPool) Wait ¶
func (p *WorkerPool) Wait()
Wait waits for all submitted tasks to complete. This method blocks until all tasks have finished executing.
