wf

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2025 License: AGPL-3.0 Imports: 11 Imported by: 0

README

WF (Workflow) Package

DAG

The wf package provides a flexible and extensible framework for defining and executing automated tasks and processes in Go. It is designed for computational workflows where steps are executed programmatically without requiring manual intervention.

The Runnable interface is implemented by the Step, Pipeline, and Dag types, providing a unified way to execute automated tasks.

The Step type is used for representing a single unit of automated work that can be executed within a given context and specified data.

The Pipeline type is used for grouping related automated tasks and executing them in sequence.

The Dag (Directed Acyclic Graph) type is used for advanced workflow management with dependencies between automated tasks.

Key Features

  • Automated Task Execution: Define and execute tasks programmatically without manual intervention
  • Simple Step Definitions: Easily define individual operations as reusable steps
  • Organized Pipelines: Group related operations into logical pipelines for better maintainability
  • Flexible Dependencies: Create complex workflows with step dependencies using DAG
  • Cycle Detection: Automatically detects and prevents circular dependencies
  • Context Management: Share data between steps using a context object
  • Error Handling: Proper error propagation through the entire workflow
  • State Management: Track and persist workflow execution state
  • Pause and Resume: Ability to pause, save, and resume workflow execution
  • Testable: Designed with testing in mind

When to Use This Package

This package is ideal for:

  • Data processing pipelines
  • Automated build processes
  • ETL workflows
  • Batch processing jobs
  • Service orchestration
  • Automated testing pipelines
  • Any process that can be executed programmatically

It is not suitable for:

  • Human-driven approval workflows
  • Manual review processes
  • Step-by-step form completion
  • Processes requiring manual intervention
  • Business processes with human decision points

Core Components

  • Step: Represents a single execution step with unique ID, name, and execution handler
  • Pipeline: Groups related steps into a logical unit that can be treated as a single step
  • Dag: Manages a collection of steps and their dependencies, executing them in the correct order
  • State: Manages workflow execution state, including status, completed steps, and workflow data

Component Hierarchy

Runnable
├── Step (basic unit of work, single operation)
├── Pipeline (runs a set of runnables in the sequence they are added)
└── Dag (advanced workflow manager with dependencies between runnables)

Usage Examples

Creating Steps
// Create a step with an execution function
step := NewStep()
step.SetName("My Step")
step.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
    data["key"] = "value"
    return ctx, data, nil
})
Creating a Pipeline
// Create steps for a pipeline
step1 := NewStep()
step1.SetName("Process Data")
step1.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
    data["processed"] = true
    return ctx, data, nil
})

step2 := NewStep()
step2.SetName("Validate Data")
step2.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
    if !data["processed"].(bool) {
        return ctx, data, errors.New("data not processed")
    }
    return ctx, data, nil
})

// Create a pipeline
pipeline := NewPipeline()
pipeline.SetName("Data Processing Pipeline")

// Add steps to pipeline
pipeline.RunnableAdd(step1, step2)
Creating a DAG
// Create a DAG
dag := NewDag()
dag.SetName("My DAG")

// Add steps
dag.RunnableAdd(step1, step2)

// Add dependencies
dag.DependencyAdd(step2, step1) // step2 depends on step1
Using a Pipeline in a DAG

Pipeline

// Create a pipeline with steps
pipeline := NewPipeline()
pipeline.SetName("Data Processing Pipeline")

// Add steps to pipeline
pipeline.RunnableAdd(step1, step2)

// Create a DAG
dag := NewDag()
dag.SetName("My DAG")

// Add pipeline to DAG
dag.RunnableAdd(pipeline)

// Add other steps that depend on the pipeline
dag.RunnableAdd(step3)
dag.DependencyAdd(step3, pipeline)
Executing Steps
// Create a context and data map
ctx := context.Background()
data := make(map[string]any)

// Execute all steps
_, data, err := dag.Run(ctx, data)
if err != nil {
    // Handle error
}
State Management

The workflow package provides robust state management capabilities that allow you to track, save, and restore the execution state of any workflow.

This is particularly useful for long-running workflows that may need to be paused, saved, and resumed later.

// Create a workflow
dag := NewDag()
dag.SetName("My Workflow")

// Add steps and dependencies...

// Start workflow
ctx := context.Background()
data := make(map[string]any)
ctx, data, err := dag.Run(ctx, data)

// Pause workflow
if err := dag.Pause(); err != nil {
    // Handle error
}

// Save state
state := dag.GetState()
stateJSON, err := state.ToJSON()
if err != nil {
    // Handle error
}

// Create new workflow instance
newDag := NewDag()

// Load saved state
newState := NewState()
if err := newState.FromJSON(stateJSON); err != nil {
    // Handle error
}
newDag.SetState(newState)

// Resume workflow
ctx, data, err = newDag.Resume(ctx, data)
How State Management Works
  1. State Tracking: The workflow automatically tracks its execution state, including:

    • Current status (Running, Paused, Complete, Failed)
    • Completed steps
    • Current step being executed
    • Workflow data
  2. State Transitions: The workflow enforces valid state transitions:

    • A workflow starts in the "Running" state
    • A running workflow can transition to "Paused", "Complete", or "Failed"
    • A paused workflow can only transition back to "Running"
    • Completed or failed workflows are terminal states with no valid transitions
  3. Pause and Resume: You can pause a running workflow at any time:

    • The Pause() method sets the state to "Paused"
    • The Resume() method continues execution from where it was paused
    • The workflow remembers which steps were completed
  4. State Serialization: The state can be serialized to JSON:

    • ToJSON() converts the state to a JSON byte array
    • FromJSON() loads a state from a JSON byte array
    • This allows saving the state to a file or database
  5. State Restoration: You can restore a workflow to a previous state:

    • Create a new workflow instance
    • Load the saved state using FromJSON()
    • Set the state using SetState()
    • Resume execution using Resume()
  6. Helper Methods: The workflow provides helper methods to check the current state:

    • IsRunning() - checks if the workflow is currently running
    • IsPaused() - checks if the workflow is paused
    • IsCompleted() - checks if the workflow has completed successfully
    • IsFailed() - checks if the workflow has failed
    • IsWaiting() - checks if the workflow is waiting to start

This state management system enables robust workflow execution that can survive interruptions, system restarts, or distributed execution across multiple machines.

Testing

The package includes comprehensive tests that verify:

  • Successful step execution
  • Error propagation
  • Dependency handling
  • Context data sharing
  • Cycle detection
  • Parallel execution
  • Serialization
  • State management
  • Pause and resume functionality

Dependencies

  • github.com/gouniverse/uid: For generating unique IDs

Best Practices

  1. Always use the provided interfaces for type safety
  2. Handle errors appropriately in step implementations
  3. Use the context for data sharing between steps
  4. Define dependencies when steps must be executed in a specific order
  5. Avoid creating circular dependencies between steps
  6. Use pipelines to group related steps into logical units
  7. Implement proper error handling in each step
  8. Save workflow state at appropriate points for recovery
  9. Use the state management features for long-running workflows

Examples

The package includes several examples demonstrating different use cases:

Basic Usage Example
Conditional Logic Example
  • Demonstrates how to implement conditional logic using DAGs and pipelines
  • Shows different step chains for different scenarios
  • Location: examples/dag_conditional_logic
Dependencies Example
  • Shows how to create steps with complex dependencies
  • Demonstrates proper execution order through dependencies
  • Location: examples/dag_dependencies
Error Handling Example
State Management Example
  • Demonstrates workflow state management
  • Shows how to pause, save, and resume workflows
  • Location: examples/dag_state

Error Handling

The package will return errors in the following cases:

  • If a cycle is detected in the dependency graph
  • If any step execution fails
  • If a step is added multiple times
  • If dependencies are not properly defined
  • If pipeline execution fails
  • If conditional logic conditions are not met
  • If state management operations fail
  • If workflow cannot be paused or resumed

Documentation

Index

Constants

View Source
const (
	// State status constants
	StateStatusRunning  = "running"
	StateStatusPaused   = "paused"
	StateStatusComplete = "complete"
	StateStatusFailed   = "failed"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Dag

type Dag struct {
	// contains filtered or unexported fields
}

func (*Dag) DependencyAdd

func (d *Dag) DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)

DependencyAdd adds a dependency between two nodes.

func (*Dag) DependencyList

func (d *Dag) DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface

DependencyList returns all dependencies for a given node.

func (*Dag) GetID

func (d *Dag) GetID() string

func (*Dag) GetName

func (d *Dag) GetName() string

func (*Dag) GetState added in v0.12.0

func (d *Dag) GetState() StateInterface

GetState returns the current workflow state

func (*Dag) IsCompleted added in v0.12.0

func (d *Dag) IsCompleted() bool

func (*Dag) IsFailed added in v0.12.0

func (d *Dag) IsFailed() bool

func (*Dag) IsPaused added in v0.12.0

func (d *Dag) IsPaused() bool

func (*Dag) IsRunning added in v0.12.0

func (d *Dag) IsRunning() bool

State helper methods

func (*Dag) IsWaiting added in v0.12.0

func (d *Dag) IsWaiting() bool

func (*Dag) Pause added in v0.12.0

func (d *Dag) Pause() error

Pause pauses the workflow execution

func (*Dag) Resume added in v0.12.0

func (d *Dag) Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

Resume resumes the workflow execution from the last saved state

func (*Dag) Run

func (d *Dag) Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

Run executes all nodes in the DAG in the correct order

func (*Dag) RunnableAdd

func (d *Dag) RunnableAdd(node ...RunnableInterface)

RunnableAdd adds a single node to the DAG.

func (*Dag) RunnableList

func (d *Dag) RunnableList() []RunnableInterface

RunnableList returns all runnable nodes in the DAG.

func (*Dag) RunnableRemove

func (d *Dag) RunnableRemove(node RunnableInterface) bool

RunnableRemove removes a node from the DAG.

func (*Dag) SetID

func (d *Dag) SetID(id string)

func (*Dag) SetName

func (d *Dag) SetName(name string)

func (*Dag) SetState added in v0.12.0

func (d *Dag) SetState(state StateInterface)

SetState sets the workflow state

func (*Dag) Visualize added in v0.13.0

func (d *Dag) Visualize() string

Visualize returns a DOT graph representation of the DAG.

type DagInterface

type DagInterface interface {
	RunnableInterface

	// RunnableAdd adds a single node to the DAG.
	// Runnable nodes can be added in any order, as their execution order will be determined by their dependencies.
	RunnableAdd(node ...RunnableInterface)

	// RunnableRemove removes a node from the DAG.
	// Returns true if the node was found and removed, false if it wasn't found.
	RunnableRemove(node RunnableInterface) bool

	// RunnableList returns all runnable nodes in the DAG.
	// The order of nodes in the returned slice is not guaranteed to be their execution order.
	// Use Run() to execute nodes in the correct order based on their dependencies.
	RunnableList() []RunnableInterface

	// DependencyAdd adds a dependency between two nodes.
	// The dependent node will only execute after the dependency node has completed successfully.
	DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)

	// DependencyList returns all dependencies for a given node.
	// The actual dependencies may vary based on the context and any conditional dependencies.
	DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface

	// Pause pauses the workflow execution
	Pause() error

	// Resume resumes the workflow execution from the last saved state
	Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

	// GetState returns the current workflow state
	GetState() StateInterface

	// SetState sets the workflow state
	SetState(state StateInterface)
}

DagInterface represents a Directed Acyclic Graph (DAG) of steps that can be executed in a specific order. It manages the dependencies between steps and ensures they are executed in the correct sequence.

func NewDag

func NewDag() DagInterface

NewDag creates a new DAG

type DotEdgeSpec added in v0.13.0

type DotEdgeSpec struct {
	FromNodeName string
	ToNodeName   string
	Tooltip      string
	Style        string // Use edgeStyleSolid, etc.
	Color        string
}

DotEdgeSpec represents an edge in the DOT graph

type DotNodeSpec added in v0.13.0

type DotNodeSpec struct {
	Name        string
	DisplayName string
	Tooltip     string
	Shape       string
	Style       string // Use nodeStyleSolid or nodeStyleFilled
	FillColor   string
}

DotNodeSpec represents a node in the DOT graph

type PipelineInterface

type PipelineInterface interface {
	RunnableInterface

	// RunnableAdd adds a runnable node(s) to the pipeline.
	RunnableAdd(node ...RunnableInterface)

	// RunnableRemove removes a runnable node from the pipeline.
	RunnableRemove(node RunnableInterface) bool

	// RunnableList returns all runnable nodes in the pipeline.
	// The order of nodes in the returned slice is the order they were added.
	RunnableList() []RunnableInterface

	// Pause pauses the workflow execution
	Pause() error

	// Resume resumes the workflow execution from the last saved state
	Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

	// GetState returns the current workflow state
	GetState() StateInterface

	// SetState sets the workflow state
	SetState(state StateInterface)
}

PipelineInterface defines the interface for a pipeline

func NewPipeline

func NewPipeline() PipelineInterface

NewPipeline creates a new pipeline

type RunnableInterface

type RunnableInterface interface {
	GetID() string
	SetID(id string)
	GetName() string
	SetName(name string)
	Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

	// State helper methods
	IsRunning() bool
	IsPaused() bool
	IsCompleted() bool
	IsFailed() bool
	IsWaiting() bool

	// Visualize returns a DOT graph representation of the workflow component
	Visualize() string
}

RunnableInterface represents a single unit of work, that can be executed within a given context, and specified data. It can work wuth the data and return the result of the work.

It can be used as a single step, or combined with other nodes to form a Pipeline, Workflow or DAG.

type State added in v0.12.0

type State struct {
	Status         StateStatus
	Data           map[string]any
	CurrentStepID  string
	CompletedSteps []string
	LastUpdated    time.Time
}

State represents the current state of a workflow

func (*State) AddCompletedStep added in v0.12.0

func (s *State) AddCompletedStep(id string)

AddCompletedStep adds a step ID to the completed steps list

func (*State) FromJSON added in v0.12.0

func (s *State) FromJSON(data []byte) error

FromJSON loads the state from JSON

func (*State) GetCompletedSteps added in v0.12.0

func (s *State) GetCompletedSteps() []string

GetCompletedSteps returns the list of completed step IDs

func (*State) GetCurrentStepID added in v0.12.0

func (s *State) GetCurrentStepID() string

GetCurrentStepID returns the ID of the current step

func (*State) GetData added in v0.12.0

func (s *State) GetData() map[string]any

GetData returns the current data of the workflow

func (*State) GetLastUpdated added in v0.12.0

func (s *State) GetLastUpdated() time.Time

GetLastUpdated returns the timestamp of the last update

func (*State) GetStatus added in v0.12.0

func (s *State) GetStatus() StateStatus

GetStatus returns the current status of the workflow

func (*State) GetWorkflowData added in v0.12.0

func (s *State) GetWorkflowData() map[string]any

GetWorkflowData returns the workflow data

func (*State) SetCurrentStepID added in v0.12.0

func (s *State) SetCurrentStepID(id string)

SetCurrentStepID sets the ID of the current step

func (*State) SetData added in v0.12.0

func (s *State) SetData(data map[string]any)

SetData sets the current data of the workflow

func (*State) SetLastUpdated added in v0.12.0

func (s *State) SetLastUpdated(t time.Time)

SetLastUpdated sets the timestamp of the last update

func (*State) SetStatus added in v0.12.0

func (s *State) SetStatus(status StateStatus)

SetStatus sets the current status of the workflow

func (*State) SetWorkflowData added in v0.12.0

func (s *State) SetWorkflowData(data map[string]any)

SetWorkflowData sets the workflow data

func (*State) ToJSON added in v0.12.0

func (s *State) ToJSON() ([]byte, error)

ToJSON converts the state to JSON

type StateInterface added in v0.12.0

type StateInterface interface {
	GetStatus() StateStatus
	SetStatus(status StateStatus)

	GetData() map[string]any
	SetData(data map[string]any)

	ToJSON() ([]byte, error)
	FromJSON(data []byte) error

	GetCurrentStepID() string
	SetCurrentStepID(id string)

	GetCompletedSteps() []string
	AddCompletedStep(id string)

	GetWorkflowData() map[string]any
	SetWorkflowData(data map[string]any)

	GetLastUpdated() time.Time
	SetLastUpdated(t time.Time)
}

StateInterface defines the interface for workflow state management

func NewState added in v0.12.0

func NewState() StateInterface

NewState creates a new workflow state

type StateStatus added in v0.12.0

type StateStatus string

StateStatus represents the current status of a workflow

type StepHandler

type StepHandler func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

type StepInterface

type StepInterface interface {
	RunnableInterface

	// GetHandler returns the function that implements the step's execution logic.
	GetHandler() StepHandler

	// SetHandler allows setting or modifying the step's execution logic.
	SetHandler(handler StepHandler)

	// Pause pauses the workflow execution
	Pause() error

	// Resume resumes the workflow execution from the last saved state
	Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)

	// GetState returns the current workflow state
	GetState() StateInterface

	// SetState sets the workflow state
	SetState(state StateInterface)
}

StepInterface represents a single node in a Pipeline, Workflow or DAG. A step is a unit of work that can be executed within a given context. A step is executed by a Pipeline, Workflow or DAG which manages its dependencies and execution order.

func NewStep

func NewStep() StepInterface

NewStep creates a new step with the given execution function and optional ID.

Directories

Path Synopsis
examples
pipeline command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL