Documentation
¶
Overview ¶
Package workflow provides workflow management and execution capabilities for muster.
This package manages workflow definitions that can be stored as YAML files and executed as multi-step operations. Workflows are automatically registered as MCP tools when loaded, enabling programmatic access through the aggregator API with the "action_" prefix.
Workflow Definition Structure ¶
Workflows are defined in YAML format with the following structure:
name: "my-workflow"
description: "A sample workflow that demonstrates multi-step operations"
args:
environment:
type: string
description: "Target environment"
default: "development"
required: false
steps:
- id: "step1"
tool: "some_tool"
args:
key: "value"
env: "{{.environment}}"
- id: "step2"
tool: "another_tool"
args:
input: "{{step1.result}}"
environment: "{{.environment}}"
Tool Integration ¶
Each workflow is automatically registered as an MCP tool with the name pattern: **"action_{workflow_name}"**
This allows workflows to be executed through:
- MCP aggregator API
- Other MCP clients
- AI assistants via MCP protocol
- Direct API calls through the workflow handler
The tool registration happens immediately when workflows are loaded.
Workflow Execution ¶
Workflows are executed step by step in the defined order. Each step:
- Calls the specified tool with the provided arguments
- Can reference outputs from previous steps using {{stepId.field}} syntax
- Can reference input args using {{.argumentName}} syntax
- Has access to the workflow's execution context
- Supports arg templating for dynamic argument construction
## Arg Templating
Workflows support Go template syntax for dynamic arg substitution:
- **Input arguments**: {{.argumentName}}
- **Step outputs**: {{stepId.result}} or {{stepId.specificField}}
- **Default values**: Via input schema default properties
Workflow Adapter ¶
The Workflow Adapter provides comprehensive workflow lifecycle management through the unified client interface with automatic backend selection (Kubernetes CRDs or filesystem):
- **Backend Selection**: Automatic choice between Kubernetes CRDs and filesystem storage
- **Definition Management**: Load workflows from CRDs or YAML files
- **Validation**: Comprehensive validation of workflow definitions
- **CRUD Operations**: Create, read, update, and delete workflows
- **Execution**: Execute workflows with arg validation and tracking
- **Tool Integration**: Automatic registration as MCP tools via API layer
- **Availability Checking**: Dynamic tool availability validation
Input Argument Support ¶
Workflows can define input arguments for validation:
args:
environment:
type: string
description: "Deployment environment"
default: "development"
required: false
version:
type: string
description: "Application version"
required: true
replicas:
type: number
description: "Number of replicas"
default: 3
This enables:
- **Arg validation** before execution
- **Default value** assignment for optional arguments
- **Type checking** for input arguments
- **Documentation** for workflow consumers
Error Handling ¶
The workflow adapter provides comprehensive error handling:
- Invalid workflow files are logged but don't prevent other workflows from loading
- Missing tools are detected and reported during validation
- Execution errors can be configured to stop or continue the workflow
- Arg validation errors prevent workflow execution
- Tool execution errors are propagated with context
Dynamic Tool Availability ¶
Workflows use dynamic tool availability checking:
- Tools are validated against the aggregator at execution time
- Workflows automatically become available when all required tools are present
- No caching of tool availability to ensure real-time accuracy
- Integration with tool update events for logging
Usage Examples ¶
## Creating Workflows
workflow := api.Workflow{
Name: "deploy-app",
Description: "Deploy application to environment",
Args: map[string]api.ArgDefinition{
"environment": {
Type: "string",
Description: "Target environment",
Default: "development",
Required: false,
},
"version": {
Type: "string",
Description: "Application version",
Required: true,
},
},
Steps: []api.WorkflowStep{
{
ID: "validate",
Tool: "validate_environment",
Args: map[string]interface{}{
"environment": "{{.environment}}",
},
},
{
ID: "deploy",
Tool: "deploy_application",
Args: map[string]interface{}{
"environment": "{{.environment}}",
"version": "{{.version}}",
},
},
},
}
workflowHandler := api.GetWorkflow()
if err := workflowHandler.CreateWorkflowFromStructured(workflowData); err != nil {
log.Fatal(err)
}
## Executing Workflows
// Execute through API handler
workflowHandler := api.GetWorkflow()
result, err := workflowHandler.ExecuteWorkflow(ctx, "deploy-app", map[string]interface{}{
"environment": "production",
"version": "v1.2.3",
})
if err != nil {
log.Fatal(err)
}
// Execute as MCP tool (through aggregator)
// Tool name: "action_deploy-app"
result, err := toolCaller.CallTool(ctx, "action_deploy-app", map[string]interface{}{
"environment": "production",
"version": "v1.2.3",
})
## Querying Workflows
// List all workflows
workflowHandler := api.GetWorkflow()
workflows := workflowHandler.GetWorkflows()
for _, wf := range workflows {
fmt.Printf("Workflow: %s - %s\n", wf.Name, wf.Description)
}
// Get specific workflow
workflow, err := workflowHandler.GetWorkflow("deploy-app")
if err != nil {
log.Fatal(err)
}
// Get workflows as MCP tools (through tool provider interface)
if provider, ok := workflowHandler.(api.ToolProvider); ok {
tools := provider.GetTools()
for _, tool := range tools {
fmt.Printf("MCP Tool: %s - %s\n", tool.Name, tool.Description)
}
}
File Management ¶
Workflows can be created, updated, and deleted at runtime through the unified client interface:
- **Create**: Save workflow as CRD (Kubernetes) or YAML file (filesystem)
- **Update**: Modify existing workflow CRDs or files
- **Delete**: Remove workflow CRDs or files
The adapter automatically detects changes through the client interface and updates the available tools accordingly. Storage backend is automatically selected based on environment (Kubernetes vs filesystem).
API Integration ¶
The workflow package integrates with muster's API layer:
- **WorkflowHandler**: API interface for workflow management
- **Tool provider**: Exposes workflows as MCP tools
- **Event integration**: Subscribes to tool update events
- **Registration pattern**: Proper API layer registration
Thread Safety ¶
All workflow operations are thread-safe:
- Concurrent workflow execution
- Thread-safe definition management
- Protected access to workflow registry
- Safe tool availability checking
Validation ¶
Comprehensive validation ensures workflow quality:
- **Name validation**: Unique and valid workflow names
- **Step validation**: Required fields and step ID uniqueness
- **Tool validation**: Tool names cannot be empty
- **Schema validation**: Input schema structure and types
- **Arg validation**: Required and optional arg checking
Performance Characteristics ¶
- **Dynamic checking**: Tool availability checked at execution time
- **No caching**: Ensures real-time tool availability
- **Efficient loading**: Workflows loaded only when needed
- **Template caching**: Template compilation cached for performance
This package enables powerful workflow orchestration capabilities within muster, allowing users to define complex multi-step operations that can be executed programmatically or through AI assistant interactions.
Index ¶
- type Adapter
- func (a *Adapter) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
- func (a *Adapter) CreateWorkflowFromStructured(args map[string]interface{}) error
- func (a *Adapter) DeleteWorkflow(name string) error
- func (a *Adapter) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*api.CallToolResult, error)
- func (a *Adapter) ExecuteWorkflow(ctx context.Context, workflowName string, args map[string]interface{}) (*api.CallToolResult, error)
- func (a *Adapter) GenerateStepEvent(workflowName string, stepID string, eventType string, ...)
- func (a *Adapter) GetTools() []api.ToolMetadata
- func (a *Adapter) GetWorkflow(name string) (*api.Workflow, error)
- func (a *Adapter) GetWorkflowExecution(ctx context.Context, req *api.GetWorkflowExecutionRequest) (*api.WorkflowExecution, error)
- func (a *Adapter) GetWorkflows() []api.Workflow
- func (a *Adapter) ListWorkflowExecutions(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)
- func (a *Adapter) Register()
- func (a *Adapter) ReloadWorkflows() error
- func (a *Adapter) Stop()
- func (a *Adapter) UpdateWorkflowFromStructured(name string, args map[string]interface{}) error
- func (a *Adapter) ValidateWorkflowFromStructured(args map[string]interface{}) error
- type EventCallback
- type ExecutionStorage
- type ExecutionStorageImpl
- func (es *ExecutionStorageImpl) Delete(ctx context.Context, executionID string) error
- func (es *ExecutionStorageImpl) Get(ctx context.Context, executionID string) (*api.WorkflowExecution, error)
- func (es *ExecutionStorageImpl) List(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)
- func (es *ExecutionStorageImpl) Store(ctx context.Context, execution *api.WorkflowExecution) error
- type ExecutionTracker
- func (et *ExecutionTracker) GetExecution(ctx context.Context, req *api.GetWorkflowExecutionRequest) (*api.WorkflowExecution, error)
- func (et *ExecutionTracker) ListExecutions(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)
- func (et *ExecutionTracker) TrackExecution(ctx context.Context, workflowName string, args map[string]interface{}, ...) (*mcp.CallToolResult, *api.WorkflowExecution, error)
- type NoOpEventCallback
- type ToolAvailabilityChecker
- type ToolCaller
- type WorkflowExecutor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
Adapter provides the API adapter for workflow management
func NewAdapterWithClient ¶
func NewAdapterWithClient(musterClient client.MusterClient, namespace string, toolCaller ToolCaller, toolChecker ToolAvailabilityChecker, configPath string) *Adapter
NewAdapterWithClient creates a new workflow adapter with a pre-configured client
func (*Adapter) CallToolInternal ¶
func (a *Adapter) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
CallToolInternal calls a tool internally - required by ToolCaller interface
func (*Adapter) CreateWorkflowFromStructured ¶
CreateWorkflowFromStructured creates a new workflow from structured arguments
func (*Adapter) DeleteWorkflow ¶
DeleteWorkflow deletes a workflow
func (*Adapter) ExecuteTool ¶
func (a *Adapter) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*api.CallToolResult, error)
ExecuteTool executes a tool by name
func (*Adapter) ExecuteWorkflow ¶
func (a *Adapter) ExecuteWorkflow(ctx context.Context, workflowName string, args map[string]interface{}) (*api.CallToolResult, error)
ExecuteWorkflow executes a workflow and returns MCP result
func (*Adapter) GenerateStepEvent ¶
func (a *Adapter) GenerateStepEvent(workflowName string, stepID string, eventType string, data map[string]interface{})
GenerateStepEvent implements the EventCallback interface for step-level events
func (*Adapter) GetTools ¶
func (a *Adapter) GetTools() []api.ToolMetadata
GetTools returns all tools this provider offers
func (*Adapter) GetWorkflow ¶
GetWorkflow returns a specific workflow definition.
Availability is computed against the process-global tool view (no session context). Session-aware availability is computed in the tool-dispatch path (handleGet / handleWorkflowAvailable) via getWorkflow.
func (*Adapter) GetWorkflowExecution ¶
func (a *Adapter) GetWorkflowExecution(ctx context.Context, req *api.GetWorkflowExecutionRequest) (*api.WorkflowExecution, error)
GetWorkflowExecution returns detailed information about a specific workflow execution
func (*Adapter) GetWorkflows ¶
GetWorkflows returns information about all workflows.
Availability is computed against the process-global tool view (no session context). Session-aware availability is computed in the tool-dispatch path (handleList) via getWorkflows.
func (*Adapter) ListWorkflowExecutions ¶
func (a *Adapter) ListWorkflowExecutions(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)
ListWorkflowExecutions returns paginated list of workflow executions with optional filtering
func (*Adapter) Register ¶
func (a *Adapter) Register()
Register registers this adapter with the API layer
func (*Adapter) ReloadWorkflows ¶
ReloadWorkflows reloads workflow definitions - not needed for CRD-based approach
func (*Adapter) UpdateWorkflowFromStructured ¶
UpdateWorkflowFromStructured updates an existing workflow from structured arguments
func (*Adapter) ValidateWorkflowFromStructured ¶
ValidateWorkflowFromStructured validates a workflow definition from structured arguments
type EventCallback ¶
type EventCallback interface {
// GenerateStepEvent generates an event for a workflow step operation
GenerateStepEvent(workflowName string, stepID string, eventType string, data map[string]interface{})
}
EventCallback interface for generating workflow step events
type ExecutionStorage ¶
type ExecutionStorage interface {
// Store persists a workflow execution record
Store(ctx context.Context, execution *api.WorkflowExecution) error
// Get retrieves a specific workflow execution by ID
Get(ctx context.Context, executionID string) (*api.WorkflowExecution, error)
// List returns paginated workflow executions with optional filtering
List(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)
// Delete removes a workflow execution record (for cleanup)
Delete(ctx context.Context, executionID string) error
}
ExecutionStorage defines the interface for persisting workflow executions. This interface abstracts the storage mechanism to enable different implementations while maintaining consistent behavior for execution persistence and retrieval.
The storage interface follows the same patterns as other config storage in the muster system, enabling consistent path resolution and file management.
func NewExecutionStorage ¶
func NewExecutionStorage(configPath string) ExecutionStorage
NewExecutionStorage creates a new execution storage instance that integrates with the existing configuration storage system.
The storage follows the established precedence of project directory over user directory and supports custom configuration paths for standalone mode.
type ExecutionStorageImpl ¶
type ExecutionStorageImpl struct {
// contains filtered or unexported fields
}
ExecutionStorageImpl implements ExecutionStorage using the existing config.Storage patterns for consistent file management and path resolution.
This implementation stores each execution as a separate JSON file for optimal performance with concurrent access and efficient individual file operations.
func (*ExecutionStorageImpl) Delete ¶
func (es *ExecutionStorageImpl) Delete(ctx context.Context, executionID string) error
Delete removes a workflow execution record from storage. This is used for cleanup operations and maintenance.
func (*ExecutionStorageImpl) Get ¶
func (es *ExecutionStorageImpl) Get(ctx context.Context, executionID string) (*api.WorkflowExecution, error)
Get retrieves a specific workflow execution by ID from storage. This loads the complete execution record including all step details.
func (*ExecutionStorageImpl) List ¶
func (es *ExecutionStorageImpl) List(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)
List returns paginated workflow executions with optional filtering. This method efficiently scans execution files and applies filtering and pagination.
func (*ExecutionStorageImpl) Store ¶
func (es *ExecutionStorageImpl) Store(ctx context.Context, execution *api.WorkflowExecution) error
Store persists a workflow execution record as a JSON file. Each execution is stored in a separate file using the execution ID as the filename to enable efficient concurrent access and individual file operations.
type ExecutionTracker ¶
type ExecutionTracker struct {
// contains filtered or unexported fields
}
ExecutionTracker handles automatic workflow execution tracking. It wraps workflow execution with comprehensive tracking including timing, step-by-step results, and error handling while preserving the original execution behavior and results.
The tracker integrates seamlessly with the existing workflow execution flow, providing transparent tracking without modifying workflow execution logic.
func NewExecutionTracker ¶
func NewExecutionTracker(storage ExecutionStorage) *ExecutionTracker
NewExecutionTracker creates a new execution tracker with the specified storage. The tracker provides automatic execution tracking for all workflow executions while maintaining thread safety for concurrent workflow executions.
func (*ExecutionTracker) GetExecution ¶
func (et *ExecutionTracker) GetExecution(ctx context.Context, req *api.GetWorkflowExecutionRequest) (*api.WorkflowExecution, error)
GetExecution returns detailed information about a specific workflow execution. This provides a convenient way to access individual execution records through the tracker.
func (*ExecutionTracker) ListExecutions ¶
func (et *ExecutionTracker) ListExecutions(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)
ListExecutions returns paginated workflow executions with optional filtering. This provides a convenient way to access execution history through the tracker.
func (*ExecutionTracker) TrackExecution ¶
func (et *ExecutionTracker) TrackExecution(ctx context.Context, workflowName string, args map[string]interface{}, executeFn func() (*mcp.CallToolResult, error)) (*mcp.CallToolResult, *api.WorkflowExecution, error)
TrackExecution wraps workflow execution with automatic tracking. This method creates an execution record, tracks step-by-step progress, and persists the complete execution history while preserving the original execution behavior and results.
Arguments:
- ctx: Context for the operation
- workflowName: Name of the workflow being executed
- args: Arguments passed to the workflow
- executeFn: Function that performs the actual workflow execution
Returns:
- *mcp.CallToolResult: Original workflow execution result (unchanged)
- *api.WorkflowExecution: Complete execution record for reference
- error: Error if execution or tracking fails
type NoOpEventCallback ¶
type NoOpEventCallback struct{}
NoOpEventCallback provides a no-operation implementation of EventCallback
func (*NoOpEventCallback) GenerateStepEvent ¶
func (n *NoOpEventCallback) GenerateStepEvent(workflowName string, stepID string, eventType string, data map[string]interface{})
type ToolAvailabilityChecker ¶
type ToolAvailabilityChecker interface {
// MissingToolsForSession returns the subset of toolNames that is unavailable
// for the calling session. Availability is resolved against the session's
// accessible tools, so SSO family tools are considered even without a prior
// list_tools call (see #764). The session tool set is resolved once, so
// checking all of a workflow's step tools is a single pass.
MissingToolsForSession(ctx context.Context, toolNames []string) []string
}
ToolAvailabilityChecker interface for checking tool availability
type ToolCaller ¶
type ToolCaller interface {
CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
}
ToolCaller interface - what we need from the aggregator
type WorkflowExecutor ¶
type WorkflowExecutor struct {
// contains filtered or unexported fields
}
WorkflowExecutor executes workflow steps
func NewWorkflowExecutor ¶
func NewWorkflowExecutor(toolCaller ToolCaller, eventCallback EventCallback) *WorkflowExecutor
NewWorkflowExecutor creates a new workflow executor
func (*WorkflowExecutor) ExecuteWorkflow ¶
func (we *WorkflowExecutor) ExecuteWorkflow(ctx context.Context, workflow *api.Workflow, args map[string]interface{}) (*mcp.CallToolResult, error)
ExecuteWorkflow executes a workflow with the given arguments