README ¶
Chain: A Flexible Action-Pipeline Library
The chain package provides a flexible framework for building and executing sequential workflows (pipelines) of actions. It allows you to define a series of steps, each represented by an Action, which processes inputs and produces outputs. The package supports conditional branching, customizable execution paths, and error handling, enabling complex workflows with minimal boilerplate.
Features
-
Action and Pipeline Composition
Create modular, reusable units of work (Action
) and orchestrate them into robust workflows usingPipeline
. -
DAG-based Execution Plans
Ensure predictable execution and robust validation with built-in cycle detection to enforce acyclic workflows. -
Nested Pipelines
Use pipelines as actions within other pipelines, enabling modular and hierarchical workflow designs. -
Conditional Branching
Support forSuccess
,Failure
,Abort
and custom direction-based branching within your execution flows. -
AggregateAction Support
Simplify the orchestration of complex workflows by combiningAction
s orPipeline
s of different types into a unified control flow using AggregateAction.
Getting Started
Installation
To install the chain package, use the following command:
go get github.com/JSYoo5B/chain
Key Concepts
Action
An Action
represents a single task in the pipeline. Each action can process input data and return output or an error.
type Action[T any] interface {
Name() string
Run(ctx context.Context, input T) (output T, err error)
}
BranchAction
A BranchAction
extends Action
and supports conditional branching. It can change the execution flow based on the results of the action, allowing for multiple execution paths.
type BranchAction[T any] interface {
Name() string
Run(ctx context.Context, input T) (output T, err error)
Directions() []string
NextDirection(ctx context.Context, output T) (direction string, err error)
}
Pipeline
A Pipeline
is a sequence of Action
s executed in order. It orchestrates the flow of data between actions and handles branching, success, error, and abort conditions.
ActionPlan
An ActionPlan
is a map that associates a direction (e.g., success, error, abort) with the next Action to execute, defining the flow of a pipeline.
type ActionPlan[T any] map[string]Action[T]
Examples
Practical examples for using the chain
package will be added in future updates. Stay tuned!
Documentation ¶
Index ¶
- Constants
- type Action
- type ActionPlan
- type AggregateGetter
- type AggregateSetter
- type BranchAction
- type BranchFunc
- type Pipeline
- func (p *Pipeline[T]) Name() string
- func (p *Pipeline[T]) Run(ctx context.Context, input T) (output T, err error)
- func (p *Pipeline[T]) RunAt(initAction Action[T], ctx context.Context, input T) (output T, lastErr error)
- func (p *Pipeline[T]) SetRunPlan(currentAction Action[T], plan ActionPlan[T])
- func (p *Pipeline[T]) ValidateGraph() error
- type RunFunc
Constants ¶
const ( // Success represents the direction indicating that the action completed successfully // and the pipeline should continue. Success = "success" // Error represents the direction indicating that an error occurred, // and the pipeline should handle it accordingly. Error = "error" // Abort represents the direction indicating that // the pipeline execution should be aborted immediately. // This can occur due to a specific Abort condition or // in cases of unexpected errors or panics that cause the pipeline to halt. Abort = "abort" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action[T any] interface { // Name provides the identifier of this Action. Name() string // Run executes the Action, processing the input and returning output or an error. Run(ctx context.Context, input T) (output T, err error) }
Action is the basic unit of execution in a package. It represents a single task that processes input and produces output.
func NewAggregateAction ¶
func NewAggregateAction[T any, U any]( action Action[U], getter AggregateGetter[T, U], setter AggregateSetter[T, U], ) Action[T]
NewAggregateAction creates an Action that works with a composite data structure (T), where T is a complex type (e.g., a struct with multiple fields) and U is the data type that the Action operates on. The AggregateGetter and AggregateSetter functions are used to extract U from T and re-integrate the processed result back into T.
func NewSimpleAction ¶
NewSimpleAction creates a new Action with a custom Run function, which can be a pure function or closure. The provided runFunc must match the RunFunc signature, where T is a generic type representing the input and output types for the Action's execution.
This allows for the creation of simple Actions without manually defining a separate struct that implements the Action interface.
type ActionPlan ¶
ActionPlan represents a map that associates a direction (Success, Error, Abort, and other custom branching directions) with the next Action to execute. It is used to define the flow of actions in a pipeline based on the direction of execution.
func DefaultPlan ¶
func DefaultPlan[T any](success, error Action[T]) ActionPlan[T]
DefaultPlan returns a standard ActionPlan with valid next actions for Success and Error, and Termination for Abort.
func DefaultPlanWithAbort ¶
func DefaultPlanWithAbort[T any](success, error, abort Action[T]) ActionPlan[T]
DefaultPlanWithAbort returns an ActionPlan with valid next actions for Success, Error, and Abort.
func SuccessOnlyPlan ¶
func SuccessOnlyPlan[T any](success Action[T]) ActionPlan[T]
SuccessOnlyPlan returns an ActionPlan where only a success direction has a valid next action, and Error and Abort both lead to termination.
func TerminationPlan ¶
func TerminationPlan[T any]() ActionPlan[T]
TerminationPlan returns an ActionPlan with all directions leading to termination immediately, providing a clear indication of termination rather than using nil.
type AggregateGetter ¶
AggregateGetter extracts a subpart (U) from a composite data structure (T). It allows access to a part of the structure without modifying the entire one.
type AggregateSetter ¶
AggregateSetter updates a composite data structure (T) with a new subpart (U). It reintegrates the modified part back into the structure and returns the updated structure.
type BranchAction ¶
type BranchAction[T any] interface { // Name returns the name of the BranchAction. Name() string // Run executes the branch action, optionally modifying the input and returning an output. // If the input doesn't need changes, it can be passed through as output. The method also // returns an error if the action cannot be executed successfully. Run(ctx context.Context, input T) (output T, err error) // Directions returns a list of possible directions that the pipeline can take. // These directions are used for validation and must include all possible values that // NextDirection can return. Directions() []string // NextDirection determines the next execution path based on the result of Run. // It is called only if Run succeeds (err == nil). // The method returns a direction from the list defined by Directions. NextDirection(ctx context.Context, output T) (direction string, err error) }
BranchAction is an interface for actions that control branching in the execution flow of a Pipeline. It extends the Action interface and adds methods for handling conditional branching based on the execution results.
func NewSimpleBranchAction ¶
func NewSimpleBranchAction[T any](name string, runFunc RunFunc[T], directions []string, branchFunc BranchFunc[T]) BranchAction[T]
NewSimpleBranchAction creates a new BranchAction with customizable directions. It accepts a name for the action, a slice of directions that define the possible control flow, and a BranchFunc that contains the branching logic, which dictates the next direction based on the action's output.
Additionally, a custom runFunc can be provided to define the execution logic of the action. This function must match the RunFunc signature, where T is the generic type representing the input and output types for the action. If no specific execution logic is needed, the runFunc can be provided as `nil`. In this case, the action will simply pass the input through to the output without modification.
This allows for the creation of simple BranchActions without manually defining a separate struct that implements the BranchAction interface.
type BranchFunc ¶
BranchFunc represents the signature for the function that defines the branching logic for a BranchAction in the package. It takes the running context and output as input, and returns the direction for the next step in the process along with any potential error.
type Pipeline ¶
type Pipeline[T any] struct { // contains filtered or unexported fields }
Pipeline represents a sequence of Actions that are executed in a structured flow. It executes each of its constituent Actions in sequence, with each Action following its own Run method. The flow proceeds based on the defined structure of the Pipeline, allowing flexible and organized execution of actions to build workflows that can be as simple or complex as needed.
Pipeline implements the Action interface, meaning it can be treated as an Action itself. This allows Pipelines to be composed hierarchically, enabling more complex workflows by nesting Pipelines within other Pipelines.
func NewPipeline ¶
NewPipeline creates a new Pipeline by taking a series of Actions as its members. These Actions will be executed sequentially in the order they are provided, with the output of one Action being passed as input to the next, forming a unidirectional flow of execution.
func (*Pipeline[T]) Run ¶
Run executes the Pipeline by running Actions in the order they were configured, starting from the initAction, which is the first one of the memberActions provided by the constructor such as NewPipeline. The actions are executed in order, passing the output of one action as input to the next.
func (*Pipeline[T]) RunAt ¶
func (p *Pipeline[T]) RunAt(initAction Action[T], ctx context.Context, input T) (output T, lastErr error)
RunAt starts the execution of the pipeline from a given Action (initAction). It follows the action plan, executing actions sequentially based on the specified directions. If an action returns an error, the pipeline will proceed to the next action according to the defined plan, potentially directing the flow to an action mapped for the Error direction. The Abort direction, when encountered, will immediately halt the pipeline execution unless the plan specifies otherwise. If no action plan is found for a given direction, the pipeline will terminate with the appropriate error.
func (*Pipeline[T]) SetRunPlan ¶
func (p *Pipeline[T]) SetRunPlan(currentAction Action[T], plan ActionPlan[T])
SetRunPlan updates the execution flow for the given currentAction in the pipeline, by associating it with a specified ActionPlan. The currentAction will be validated to ensure it is a member of the pipeline. The ActionPlan defines the directions (such as Success, Error, Abort) and their corresponding next actions in the execution flow.
If the currentAction is nil or not part of the pipeline, a panic will occur. The plan can be nil, in which case the currentAction will be set to terminate for any direction not explicitly specified in the plan. If a direction is encountered in the plan that is not valid for the currentAction, or if it leads to an invalid action, another panic will occur.
Additionally, self-loops are not allowed in the plan. If the next action for a direction is the current action itself, a panic will be triggered.
func (*Pipeline[T]) ValidateGraph ¶
ValidateGraph ensures the pipeline's graph is connected and acyclic. It checks for cycles first, then verifies that all nodes connected as a single graph.