wf

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2025 License: AGPL-3.0 Imports: 6 Imported by: 0

README

WF (Workflow) Package

The wf package provides a flexible and extensible framework for defining and executing sequential operations in Go.

The Runnable interface is implemented by the Step, Pipeline, and Dag types.

The Step type is used for representing a single unit of work that can be executed within a given context and specified data.

The Pipeline type is used for grouping related runnables and executing them in sequence.

The Dag (Directed Acyclic Graph) type is used for advanced workflow management with dependencies between runnables.

Key Features

  • Simple Step Definitions: Easily define individual operations as reusable steps
  • Organized Pipelines: Group related operations into logical pipelines for better maintainability
  • Flexible Dependencies: Create complex workflows with step dependencies using DAG
  • Cycle Detection: Automatically detects and prevents circular dependencies
  • Context Management: Share data between steps using a context object
  • Error Handling: Proper error propagation through the entire workflow
  • Testable: Designed with testing in mind

Core Components

  • Step: Represents a single execution step with unique ID, name, and execution handler
  • Pipeline: Groups related steps into a logical unit that can be treated as a single step
  • Dag: Manages a collection of steps and their dependencies, executing them in the correct order

Component Hierarchy

Runnable
├── Step (basic unit of work, single operation)
├── Pipeline (runs a set of runnables in the sequence they are added)
└── Dag (advanced workflow manager with dependencies between runnables)

Usage Examples

Creating Steps
// Create a step with an execution function
step := NewStep()
step.SetName("My Step")
step.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
    data["key"] = "value"
    return ctx, data, nil
})
Creating a Pipeline
// Create steps for a pipeline
step1 := NewStep()
step1.SetName("Process Data")
step1.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
    data["processed"] = true
    return ctx, data, nil
})

step2 := NewStep()
step2.SetName("Validate Data")
step2.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
    if !data["processed"].(bool) {
        return ctx, data, errors.New("data not processed")
    }
    return ctx, data, nil
})

// Create a pipeline
pipeline := NewPipeline()
pipeline.SetName("Data Processing Pipeline")

// Add steps to pipeline
pipeline.RunnableAdd(step1, step2)
Creating a DAG
// Create a DAG
dag := NewDag()
dag.SetName("My DAG")

// Add steps
dag.RunnableAdd(step1, step2)

// Add dependencies
dag.DependencyAdd(step2, step1) // step2 depends on step1
Using a Pipeline in a DAG
// Create a pipeline with steps
pipeline := NewPipeline()
pipeline.SetName("Data Processing Pipeline")

// Add steps to pipeline
pipeline.RunnableAdd(step1, step2)

// Create a DAG
dag := NewDag()
dag.SetName("My DAG")

// Add pipeline to DAG
dag.RunnableAdd(pipeline)

// Add other steps that depend on the pipeline
dag.RunnableAdd(step3)
dag.DependencyAdd(step3, pipeline)
Executing Steps
// Create a context and data map
ctx := context.Background()
data := make(map[string]any)

// Execute all steps
_, data, err := dag.Run(ctx, data)
if err != nil {
    // Handle error
}

Testing

The package includes comprehensive tests that verify:

  • Successful step execution
  • Error propagation
  • Dependency handling
  • Context data sharing
  • Cycle detection
  • Parallel execution
  • Serialization

Dependencies

  • github.com/gouniverse/uid: For generating unique IDs

Best Practices

  1. Always use the provided interfaces for type safety
  2. Handle errors appropriately in step implementations
  3. Use the context for data sharing between steps
  4. Define dependencies when steps must be executed in a specific order
  5. Avoid creating circular dependencies between steps
  6. Use pipelines to group related steps into logical units
  7. Implement proper error handling in each step

Examples

The package includes several examples demonstrating different use cases:

Basic Usage Example
Conditional Logic Example
  • Demonstrates how to implement conditional logic using DAGs and pipelines
  • Shows different step chains for different scenarios
  • Location: examples/dag_conditional_logic
Dependencies Example
  • Shows how to create steps with complex dependencies
  • Demonstrates proper execution order through dependencies
  • Location: examples/dag_dependencies
Error Handling Example

Error Handling

The package will return errors in the following cases:

  • If a cycle is detected in the dependency graph
  • If any step execution fails
  • If a step is added multiple times
  • If dependencies are not properly defined
  • If pipeline execution fails
  • If conditional logic conditions are not met

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dag

type Dag struct {
	// contains filtered or unexported fields
}

func (*Dag) DependencyAdd

func (d *Dag) DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)

DependencyAdd adds a dependency between two nodes.

func (*Dag) DependencyList

func (d *Dag) DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface

DependencyList returns all dependencies for a given node.

func (*Dag) GetID

func (d *Dag) GetID() string

func (*Dag) GetName

func (d *Dag) GetName() string

func (*Dag) Run

func (d *Dag) Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

Run executes all nodes in the DAG in the correct order

func (*Dag) RunnableAdd

func (d *Dag) RunnableAdd(node ...RunnableInterface)

RunnableAdd adds a single node to the DAG.

func (*Dag) RunnableList

func (d *Dag) RunnableList() []RunnableInterface

RunnableList returns all runnable nodes in the DAG.

func (*Dag) RunnableRemove

func (d *Dag) RunnableRemove(node RunnableInterface) bool

RunnableRemove removes a node from the DAG.

func (*Dag) SetID

func (d *Dag) SetID(id string)

func (*Dag) SetName

func (d *Dag) SetName(name string)

type DagInterface

type DagInterface interface {
	RunnableInterface

	// RunnableAdd adds a single node to the DAG.
	// Runnable nodes can be added in any order, as their execution order will be determined by their dependencies.
	RunnableAdd(node ...RunnableInterface)

	// RunnableRemove removes a node from the DAG.
	// Returns true if the node was found and removed, false if it wasn't found.
	RunnableRemove(node RunnableInterface) bool

	// RunnableList returns all runnable nodes in the DAG.
	// The order of nodes in the returned slice is not guaranteed to be their execution order.
	// Use Run() to execute nodes in the correct order based on their dependencies.
	RunnableList() []RunnableInterface

	// DependencyAdd adds a dependency between two nodes.
	// The dependent node will only execute after the dependency node has completed successfully.
	DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)

	// DependencyList returns all dependencies for a given node.
	// The actual dependencies may vary based on the context and any conditional dependencies.
	DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface
}

DagInterface represents a Directed Acyclic Graph (DAG) of steps that can be executed in a specific order. It manages the dependencies between steps and ensures they are executed in the correct sequence.

func NewDag

func NewDag() DagInterface

NewDag creates a new DAG

type PipelineInterface

type PipelineInterface interface {
	RunnableInterface

	// RunnableAdd adds a runnable node(s) to the pipeline.
	RunnableAdd(node ...RunnableInterface)

	// RunnableRemove removes a runnable node from the pipeline.
	RunnableRemove(node RunnableInterface) bool

	// RunnableList returns all runnable nodes in the pipeline.
	// The order of nodes in the returned slice is the order they were added.
	RunnableList() []RunnableInterface
}

PipelineInterface represents a sequence of runnable nodes that will be executed in the sequence they are added.

func NewPipeline

func NewPipeline() PipelineInterface

type RunnableInterface

type RunnableInterface interface {
	GetID() string
	SetID(id string)
	GetName() string
	SetName(name string)
	Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
}

RunnableInterface represents a single unit of work, that can be executed within a given context, and specified data. It can work wuth the data and return the result of the work.

It can be used as a single step, or combined with other nodes to form a Pipeline, Workflow or DAG.

type Step

type Step struct {
	// contains filtered or unexported fields
}

func (*Step) GetHandler

func (s *Step) GetHandler() StepHandler

GetHandler returns the step's execution function

func (*Step) GetID

func (s *Step) GetID() string

func (*Step) GetName

func (s *Step) GetName() string

func (*Step) Run

func (s *Step) Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

Run executes the step's function with the given context

func (*Step) SetHandler

func (s *Step) SetHandler(fn StepHandler)

SetHandler sets the step's execution function

func (*Step) SetID

func (s *Step) SetID(id string)

func (*Step) SetName

func (s *Step) SetName(name string)

type StepHandler

type StepHandler func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

type StepInterface

type StepInterface interface {
	RunnableInterface

	// GetHandler returns the function that implements the step's execution logic.
	GetHandler() StepHandler

	// SetHandler allows setting or modifying the step's execution logic.
	SetHandler(handler StepHandler)
}

StepInterface represents a single node in a Pipeline, Workflow or DAG. A step is a unit of work that can be executed within a given context. A step is executed by a Pipeline, Workflow or DAG which manages its dependencies and execution order.

func NewStep

func NewStep() StepInterface

NewStep creates a new step with the given execution function and optional ID.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL