Documentation
¶
Index ¶
- Constants
- type Dag
- func (d *Dag) DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)
- func (d *Dag) DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface
- func (d *Dag) GetID() string
- func (d *Dag) GetName() string
- func (d *Dag) GetState() StateInterface
- func (d *Dag) IsCompleted() bool
- func (d *Dag) IsFailed() bool
- func (d *Dag) IsPaused() bool
- func (d *Dag) IsRunning() bool
- func (d *Dag) IsWaiting() bool
- func (d *Dag) Pause() error
- func (d *Dag) Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
- func (d *Dag) Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
- func (d *Dag) RunnableAdd(node ...RunnableInterface)
- func (d *Dag) RunnableList() []RunnableInterface
- func (d *Dag) RunnableRemove(node RunnableInterface) bool
- func (d *Dag) SetID(id string)
- func (d *Dag) SetName(name string)
- func (d *Dag) SetState(state StateInterface)
- func (d *Dag) Visualize() string
- type DagInterface
- type DotEdgeSpec
- type DotNodeSpec
- type PipelineInterface
- type RunnableInterface
- type State
- func (s *State) AddCompletedStep(id string)
- func (s *State) FromJSON(data []byte) error
- func (s *State) GetCompletedSteps() []string
- func (s *State) GetCurrentStepID() string
- func (s *State) GetData() map[string]any
- func (s *State) GetLastUpdated() time.Time
- func (s *State) GetStatus() StateStatus
- func (s *State) GetWorkflowData() map[string]any
- func (s *State) SetCurrentStepID(id string)
- func (s *State) SetData(data map[string]any)
- func (s *State) SetLastUpdated(t time.Time)
- func (s *State) SetStatus(status StateStatus)
- func (s *State) SetWorkflowData(data map[string]any)
- func (s *State) ToJSON() ([]byte, error)
- type StateInterface
- type StateStatus
- type StepHandler
- type StepInterface
Constants ¶
const ( // State status constants StateStatusRunning = "running" StateStatusPaused = "paused" StateStatusComplete = "complete" StateStatusFailed = "failed" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dag ¶
type Dag struct {
// contains filtered or unexported fields
}
func (*Dag) DependencyAdd ¶
func (d *Dag) DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)
DependencyAdd adds a dependency between two nodes.
func (*Dag) DependencyList ¶
func (d *Dag) DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface
DependencyList returns all dependencies for a given node.
func (*Dag) GetState ¶ added in v0.12.0
func (d *Dag) GetState() StateInterface
GetState returns the current workflow state
func (*Dag) IsCompleted ¶ added in v0.12.0
func (*Dag) Resume ¶ added in v0.12.0
func (d *Dag) Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
Resume resumes the workflow execution from the last saved state
func (*Dag) Run ¶
func (d *Dag) Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
Run executes all nodes in the DAG in the correct order
func (*Dag) RunnableAdd ¶
func (d *Dag) RunnableAdd(node ...RunnableInterface)
RunnableAdd adds a single node to the DAG.
func (*Dag) RunnableList ¶
func (d *Dag) RunnableList() []RunnableInterface
RunnableList returns all runnable nodes in the DAG.
func (*Dag) RunnableRemove ¶
func (d *Dag) RunnableRemove(node RunnableInterface) bool
RunnableRemove removes a node from the DAG.
func (*Dag) SetState ¶ added in v0.12.0
func (d *Dag) SetState(state StateInterface)
SetState sets the workflow state
type DagInterface ¶
type DagInterface interface {
RunnableInterface
// RunnableAdd adds a single node to the DAG.
// Runnable nodes can be added in any order, as their execution order will be determined by their dependencies.
RunnableAdd(node ...RunnableInterface)
// RunnableRemove removes a node from the DAG.
// Returns true if the node was found and removed, false if it wasn't found.
RunnableRemove(node RunnableInterface) bool
// RunnableList returns all runnable nodes in the DAG.
// The order of nodes in the returned slice is not guaranteed to be their execution order.
// Use Run() to execute nodes in the correct order based on their dependencies.
RunnableList() []RunnableInterface
// DependencyAdd adds a dependency between two nodes.
// The dependent node will only execute after the dependency node has completed successfully.
DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)
// DependencyList returns all dependencies for a given node.
// The actual dependencies may vary based on the context and any conditional dependencies.
DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface
// Pause pauses the workflow execution
Pause() error
// Resume resumes the workflow execution from the last saved state
Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// GetState returns the current workflow state
GetState() StateInterface
// SetState sets the workflow state
SetState(state StateInterface)
}
DagInterface represents a Directed Acyclic Graph (DAG) of steps that can be executed in a specific order. It manages the dependencies between steps and ensures they are executed in the correct sequence.
type DotEdgeSpec ¶ added in v0.13.0
type DotEdgeSpec struct {
FromNodeName string
ToNodeName string
Tooltip string
Style string // Use edgeStyleSolid, etc.
Color string
}
DotEdgeSpec represents an edge in the DOT graph
type DotNodeSpec ¶ added in v0.13.0
type DotNodeSpec struct {
Name string
DisplayName string
Tooltip string
Shape string
Style string // Use nodeStyleSolid or nodeStyleFilled
FillColor string
}
DotNodeSpec represents a node in the DOT graph
type PipelineInterface ¶
type PipelineInterface interface {
RunnableInterface
// RunnableAdd adds a runnable node(s) to the pipeline.
RunnableAdd(node ...RunnableInterface)
// RunnableRemove removes a runnable node from the pipeline.
RunnableRemove(node RunnableInterface) bool
// RunnableList returns all runnable nodes in the pipeline.
// The order of nodes in the returned slice is the order they were added.
RunnableList() []RunnableInterface
// Pause pauses the workflow execution
Pause() error
// Resume resumes the workflow execution from the last saved state
Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// GetState returns the current workflow state
GetState() StateInterface
// SetState sets the workflow state
SetState(state StateInterface)
}
PipelineInterface defines the interface for a pipeline
type RunnableInterface ¶
type RunnableInterface interface {
GetID() string
SetID(id string)
GetName() string
SetName(name string)
Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// State helper methods
IsRunning() bool
IsPaused() bool
IsCompleted() bool
IsFailed() bool
IsWaiting() bool
// Visualize returns a DOT graph representation of the workflow component
Visualize() string
}
RunnableInterface represents a single unit of work, that can be executed within a given context, and specified data. It can work wuth the data and return the result of the work.
It can be used as a single step, or combined with other nodes to form a Pipeline, Workflow or DAG.
type State ¶ added in v0.12.0
type State struct {
Status StateStatus
Data map[string]any
CurrentStepID string
CompletedSteps []string
LastUpdated time.Time
}
State represents the current state of a workflow
func (*State) AddCompletedStep ¶ added in v0.12.0
AddCompletedStep adds a step ID to the completed steps list
func (*State) GetCompletedSteps ¶ added in v0.12.0
GetCompletedSteps returns the list of completed step IDs
func (*State) GetCurrentStepID ¶ added in v0.12.0
GetCurrentStepID returns the ID of the current step
func (*State) GetLastUpdated ¶ added in v0.12.0
GetLastUpdated returns the timestamp of the last update
func (*State) GetStatus ¶ added in v0.12.0
func (s *State) GetStatus() StateStatus
GetStatus returns the current status of the workflow
func (*State) GetWorkflowData ¶ added in v0.12.0
GetWorkflowData returns the workflow data
func (*State) SetCurrentStepID ¶ added in v0.12.0
SetCurrentStepID sets the ID of the current step
func (*State) SetLastUpdated ¶ added in v0.12.0
SetLastUpdated sets the timestamp of the last update
func (*State) SetStatus ¶ added in v0.12.0
func (s *State) SetStatus(status StateStatus)
SetStatus sets the current status of the workflow
func (*State) SetWorkflowData ¶ added in v0.12.0
SetWorkflowData sets the workflow data
type StateInterface ¶ added in v0.12.0
type StateInterface interface {
GetStatus() StateStatus
SetStatus(status StateStatus)
GetData() map[string]any
SetData(data map[string]any)
ToJSON() ([]byte, error)
FromJSON(data []byte) error
GetCurrentStepID() string
SetCurrentStepID(id string)
GetCompletedSteps() []string
AddCompletedStep(id string)
GetWorkflowData() map[string]any
SetWorkflowData(data map[string]any)
GetLastUpdated() time.Time
SetLastUpdated(t time.Time)
}
StateInterface defines the interface for workflow state management
func NewState ¶ added in v0.12.0
func NewState() StateInterface
NewState creates a new workflow state
type StateStatus ¶ added in v0.12.0
type StateStatus string
StateStatus represents the current status of a workflow
type StepHandler ¶
type StepInterface ¶
type StepInterface interface {
RunnableInterface
// GetHandler returns the function that implements the step's execution logic.
GetHandler() StepHandler
// SetHandler allows setting or modifying the step's execution logic.
SetHandler(handler StepHandler)
// Pause pauses the workflow execution
Pause() error
// Resume resumes the workflow execution from the last saved state
Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// GetState returns the current workflow state
GetState() StateInterface
// SetState sets the workflow state
SetState(state StateInterface)
}
StepInterface represents a single node in a Pipeline, Workflow or DAG. A step is a unit of work that can be executed within a given context. A step is executed by a Pipeline, Workflow or DAG which manages its dependencies and execution order.
func NewStep ¶
func NewStep() StepInterface
NewStep creates a new step with the given execution function and optional ID.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
dag_conditional_logic
command
|
|
|
dag_dependencies
command
|
|
|
dag_error_handling
command
|
|
|
pipeline
command
|
|