workflow

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package workflow provides workflow management and execution capabilities for muster.

This package manages workflow definitions that can be stored as YAML files and executed as multi-step operations. Workflows are automatically registered as MCP tools when loaded, enabling programmatic access through the aggregator API with the "action_" prefix.

Workflow Definition Structure

Workflows are defined in YAML format with the following structure:

name: "my-workflow"
description: "A sample workflow that demonstrates multi-step operations"
args:
  environment:
    type: string
    description: "Target environment"
    default: "development"
    required: false
steps:
- id: "step1"
  tool: "some_tool"
  args:
    key: "value"
    env: "{{.environment}}"
- id: "step2"
  tool: "another_tool"
  args:
    input: "{{step1.result}}"
    environment: "{{.environment}}"

Tool Integration

Each workflow is automatically registered as an MCP tool with the name pattern: **"action_{workflow_name}"**

This allows workflows to be executed through:

  • MCP aggregator API
  • Other MCP clients
  • AI assistants via MCP protocol
  • Direct API calls through the workflow handler

The tool registration happens immediately when workflows are loaded.

Workflow Execution

Workflows are executed step by step in the defined order. Each step:

  • Calls the specified tool with the provided arguments
  • Can reference outputs from previous steps using {{stepId.field}} syntax
  • Can reference input args using {{.argumentName}} syntax
  • Has access to the workflow's execution context
  • Supports arg templating for dynamic argument construction

## Arg Templating

Workflows support Go template syntax for dynamic arg substitution:

  • **Input arguments**: {{.argumentName}}
  • **Step outputs**: {{stepId.result}} or {{stepId.specificField}}
  • **Default values**: Via input schema default properties

Workflow Adapter

The Workflow Adapter provides comprehensive workflow lifecycle management through the unified client interface with automatic backend selection (Kubernetes CRDs or filesystem):

  • **Backend Selection**: Automatic choice between Kubernetes CRDs and filesystem storage
  • **Definition Management**: Load workflows from CRDs or YAML files
  • **Validation**: Comprehensive validation of workflow definitions
  • **CRUD Operations**: Create, read, update, and delete workflows
  • **Execution**: Execute workflows with arg validation and tracking
  • **Tool Integration**: Automatic registration as MCP tools via API layer
  • **Availability Checking**: Dynamic tool availability validation

Input Argument Support

Workflows can define input arguments for validation:

args:
  environment:
    type: string
    description: "Deployment environment"
    default: "development"
    required: false
  version:
    type: string
    description: "Application version"
    required: true
  replicas:
    type: number
    description: "Number of replicas"
    default: 3

This enables:

  • **Arg validation** before execution
  • **Default value** assignment for optional arguments
  • **Type checking** for input arguments
  • **Documentation** for workflow consumers

Error Handling

The workflow adapter provides comprehensive error handling:

  • Invalid workflow files are logged but don't prevent other workflows from loading
  • Missing tools are detected and reported during validation
  • Execution errors can be configured to stop or continue the workflow
  • Arg validation errors prevent workflow execution
  • Tool execution errors are propagated with context

Dynamic Tool Availability

Workflows use dynamic tool availability checking:

  • Tools are validated against the aggregator at execution time
  • Workflows automatically become available when all required tools are present
  • No caching of tool availability to ensure real-time accuracy
  • Integration with tool update events for logging

Usage Examples

## Creating Workflows

workflow := api.Workflow{
    Name:        "deploy-app",
    Description: "Deploy application to environment",
    Args: map[string]api.ArgDefinition{
        "environment": {
            Type:        "string",
            Description: "Target environment",
            Default:     "development",
            Required:    false,
        },
        "version": {
            Type:        "string",
            Description: "Application version",
            Required:    true,
        },
    },
    Steps: []api.WorkflowStep{
        {
            ID:   "validate",
            Tool: "validate_environment",
            Args: map[string]interface{}{
                "environment": "{{.environment}}",
            },
        },
        {
            ID:   "deploy",
            Tool: "deploy_application",
            Args: map[string]interface{}{
                "environment": "{{.environment}}",
                "version":     "{{.version}}",
            },
        },
    },
}

workflowHandler := api.GetWorkflow()
if err := workflowHandler.CreateWorkflowFromStructured(workflowData); err != nil {
    log.Fatal(err)
}

## Executing Workflows

// Execute through API handler
workflowHandler := api.GetWorkflow()
result, err := workflowHandler.ExecuteWorkflow(ctx, "deploy-app", map[string]interface{}{
    "environment": "production",
    "version":     "v1.2.3",
})
if err != nil {
    log.Fatal(err)
}

// Execute as MCP tool (through aggregator)
// Tool name: "action_deploy-app"
result, err := toolCaller.CallTool(ctx, "action_deploy-app", map[string]interface{}{
    "environment": "production",
    "version":     "v1.2.3",
})

## Querying Workflows

// List all workflows
workflowHandler := api.GetWorkflow()
workflows := workflowHandler.GetWorkflows()
for _, wf := range workflows {
    fmt.Printf("Workflow: %s - %s\n", wf.Name, wf.Description)
}

// Get specific workflow
workflow, err := workflowHandler.GetWorkflow("deploy-app")
if err != nil {
    log.Fatal(err)
}

// Get workflows as MCP tools (through tool provider interface)
if provider, ok := workflowHandler.(api.ToolProvider); ok {
    tools := provider.GetTools()
    for _, tool := range tools {
        fmt.Printf("MCP Tool: %s - %s\n", tool.Name, tool.Description)
    }
}

File Management

Workflows can be created, updated, and deleted at runtime through the unified client interface:

  • **Create**: Save workflow as CRD (Kubernetes) or YAML file (filesystem)
  • **Update**: Modify existing workflow CRDs or files
  • **Delete**: Remove workflow CRDs or files

The adapter automatically detects changes through the client interface and updates the available tools accordingly. Storage backend is automatically selected based on environment (Kubernetes vs filesystem).

API Integration

The workflow package integrates with muster's API layer:

  • **WorkflowHandler**: API interface for workflow management
  • **Tool provider**: Exposes workflows as MCP tools
  • **Event integration**: Subscribes to tool update events
  • **Registration pattern**: Proper API layer registration

Thread Safety

All workflow operations are thread-safe:

  • Concurrent workflow execution
  • Thread-safe definition management
  • Protected access to workflow registry
  • Safe tool availability checking

Validation

Comprehensive validation ensures workflow quality:

  • **Name validation**: Unique and valid workflow names
  • **Step validation**: Required fields and step ID uniqueness
  • **Tool validation**: Tool names cannot be empty
  • **Schema validation**: Input schema structure and types
  • **Arg validation**: Required and optional arg checking

Performance Characteristics

  • **Dynamic checking**: Tool availability checked at execution time
  • **No caching**: Ensures real-time tool availability
  • **Efficient loading**: Workflows loaded only when needed
  • **Template caching**: Template compilation cached for performance

This package enables powerful workflow orchestration capabilities within muster, allowing users to define complex multi-step operations that can be executed programmatically or through AI assistant interactions.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

Adapter provides the API adapter for workflow management

func NewAdapterWithClient

func NewAdapterWithClient(musterClient client.MusterClient, namespace string, toolCaller ToolCaller, toolChecker ToolAvailabilityChecker, configPath string) *Adapter

NewAdapterWithClient creates a new workflow adapter with a pre-configured client

func (*Adapter) CallToolInternal

func (a *Adapter) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)

CallToolInternal calls a tool internally - required by ToolCaller interface

func (*Adapter) CreateWorkflowFromStructured

func (a *Adapter) CreateWorkflowFromStructured(args map[string]interface{}) error

CreateWorkflowFromStructured creates a new workflow from structured arguments

func (*Adapter) DeleteWorkflow

func (a *Adapter) DeleteWorkflow(name string) error

DeleteWorkflow deletes a workflow

func (*Adapter) ExecuteTool

func (a *Adapter) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*api.CallToolResult, error)

ExecuteTool executes a tool by name

func (*Adapter) ExecuteWorkflow

func (a *Adapter) ExecuteWorkflow(ctx context.Context, workflowName string, args map[string]interface{}) (*api.CallToolResult, error)

ExecuteWorkflow executes a workflow and returns MCP result

func (*Adapter) GenerateStepEvent

func (a *Adapter) GenerateStepEvent(workflowName string, stepID string, eventType string, data map[string]interface{})

GenerateStepEvent implements the EventCallback interface for step-level events

func (*Adapter) GetTools

func (a *Adapter) GetTools() []api.ToolMetadata

GetTools returns all tools this provider offers

func (*Adapter) GetWorkflow

func (a *Adapter) GetWorkflow(name string) (*api.Workflow, error)

GetWorkflow returns a specific workflow definition.

Availability is computed against the process-global tool view (no session context). Session-aware availability is computed in the tool-dispatch path (handleGet / handleWorkflowAvailable) via getWorkflow.

func (*Adapter) GetWorkflowExecution

func (a *Adapter) GetWorkflowExecution(ctx context.Context, req *api.GetWorkflowExecutionRequest) (*api.WorkflowExecution, error)

GetWorkflowExecution returns detailed information about a specific workflow execution

func (*Adapter) GetWorkflows

func (a *Adapter) GetWorkflows() []api.Workflow

GetWorkflows returns information about all workflows.

Availability is computed against the process-global tool view (no session context). Session-aware availability is computed in the tool-dispatch path (handleList) via getWorkflows.

func (*Adapter) ListWorkflowExecutions

ListWorkflowExecutions returns paginated list of workflow executions with optional filtering

func (*Adapter) Register

func (a *Adapter) Register()

Register registers this adapter with the API layer

func (*Adapter) ReloadWorkflows

func (a *Adapter) ReloadWorkflows() error

ReloadWorkflows reloads workflow definitions - not needed for CRD-based approach

func (*Adapter) Stop

func (a *Adapter) Stop()

Stop stops the workflow adapter

func (*Adapter) UpdateWorkflowFromStructured

func (a *Adapter) UpdateWorkflowFromStructured(name string, args map[string]interface{}) error

UpdateWorkflowFromStructured updates an existing workflow from structured arguments

func (*Adapter) ValidateWorkflowFromStructured

func (a *Adapter) ValidateWorkflowFromStructured(args map[string]interface{}) error

ValidateWorkflowFromStructured validates a workflow definition from structured arguments

type EventCallback

type EventCallback interface {
	// GenerateStepEvent generates an event for a workflow step operation
	GenerateStepEvent(workflowName string, stepID string, eventType string, data map[string]interface{})
}

EventCallback interface for generating workflow step events

type ExecutionStorage

type ExecutionStorage interface {
	// Store persists a workflow execution record
	Store(ctx context.Context, execution *api.WorkflowExecution) error

	// Get retrieves a specific workflow execution by ID
	Get(ctx context.Context, executionID string) (*api.WorkflowExecution, error)

	// List returns paginated workflow executions with optional filtering
	List(ctx context.Context, req *api.ListWorkflowExecutionsRequest) (*api.ListWorkflowExecutionsResponse, error)

	// Delete removes a workflow execution record (for cleanup)
	Delete(ctx context.Context, executionID string) error
}

ExecutionStorage defines the interface for persisting workflow executions. This interface abstracts the storage mechanism to enable different implementations while maintaining consistent behavior for execution persistence and retrieval.

The storage interface follows the same patterns as other config storage in the muster system, enabling consistent path resolution and file management.

func NewExecutionStorage

func NewExecutionStorage(configPath string) ExecutionStorage

NewExecutionStorage creates a new execution storage instance that integrates with the existing configuration storage system.

The storage follows the established precedence of project directory over user directory and supports custom configuration paths for standalone mode.

type ExecutionStorageImpl

type ExecutionStorageImpl struct {
	// contains filtered or unexported fields
}

ExecutionStorageImpl implements ExecutionStorage using the existing config.Storage patterns for consistent file management and path resolution.

This implementation stores each execution as a separate JSON file for optimal performance with concurrent access and efficient individual file operations.

func (*ExecutionStorageImpl) Delete

func (es *ExecutionStorageImpl) Delete(ctx context.Context, executionID string) error

Delete removes a workflow execution record from storage. This is used for cleanup operations and maintenance.

func (*ExecutionStorageImpl) Get

func (es *ExecutionStorageImpl) Get(ctx context.Context, executionID string) (*api.WorkflowExecution, error)

Get retrieves a specific workflow execution by ID from storage. This loads the complete execution record including all step details.

func (*ExecutionStorageImpl) List

List returns paginated workflow executions with optional filtering. This method efficiently scans execution files and applies filtering and pagination.

func (*ExecutionStorageImpl) Store

func (es *ExecutionStorageImpl) Store(ctx context.Context, execution *api.WorkflowExecution) error

Store persists a workflow execution record as a JSON file. Each execution is stored in a separate file using the execution ID as the filename to enable efficient concurrent access and individual file operations.

type ExecutionTracker

type ExecutionTracker struct {
	// contains filtered or unexported fields
}

ExecutionTracker handles automatic workflow execution tracking. It wraps workflow execution with comprehensive tracking including timing, step-by-step results, and error handling while preserving the original execution behavior and results.

The tracker integrates seamlessly with the existing workflow execution flow, providing transparent tracking without modifying workflow execution logic.

func NewExecutionTracker

func NewExecutionTracker(storage ExecutionStorage) *ExecutionTracker

NewExecutionTracker creates a new execution tracker with the specified storage. The tracker provides automatic execution tracking for all workflow executions while maintaining thread safety for concurrent workflow executions.

func (*ExecutionTracker) GetExecution

GetExecution returns detailed information about a specific workflow execution. This provides a convenient way to access individual execution records through the tracker.

func (*ExecutionTracker) ListExecutions

ListExecutions returns paginated workflow executions with optional filtering. This provides a convenient way to access execution history through the tracker.

func (*ExecutionTracker) TrackExecution

func (et *ExecutionTracker) TrackExecution(ctx context.Context, workflowName string, args map[string]interface{}, executeFn func() (*mcp.CallToolResult, error)) (*mcp.CallToolResult, *api.WorkflowExecution, error)

TrackExecution wraps workflow execution with automatic tracking. This method creates an execution record, tracks step-by-step progress, and persists the complete execution history while preserving the original execution behavior and results.

Arguments:

  • ctx: Context for the operation
  • workflowName: Name of the workflow being executed
  • args: Arguments passed to the workflow
  • executeFn: Function that performs the actual workflow execution

Returns:

  • *mcp.CallToolResult: Original workflow execution result (unchanged)
  • *api.WorkflowExecution: Complete execution record for reference
  • error: Error if execution or tracking fails

type NoOpEventCallback

type NoOpEventCallback struct{}

NoOpEventCallback provides a no-operation implementation of EventCallback

func (*NoOpEventCallback) GenerateStepEvent

func (n *NoOpEventCallback) GenerateStepEvent(workflowName string, stepID string, eventType string, data map[string]interface{})

type ToolAvailabilityChecker

type ToolAvailabilityChecker interface {
	// MissingToolsForSession returns the subset of toolNames that is unavailable
	// for the calling session. Availability is resolved against the session's
	// accessible tools, so SSO family tools are considered even without a prior
	// list_tools call (see #764). The session tool set is resolved once, so
	// checking all of a workflow's step tools is a single pass.
	MissingToolsForSession(ctx context.Context, toolNames []string) []string
}

ToolAvailabilityChecker interface for checking tool availability

type ToolCaller

type ToolCaller interface {
	CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
}

ToolCaller interface - what we need from the aggregator

type WorkflowExecutor

type WorkflowExecutor struct {
	// contains filtered or unexported fields
}

WorkflowExecutor executes workflow steps

func NewWorkflowExecutor

func NewWorkflowExecutor(toolCaller ToolCaller, eventCallback EventCallback) *WorkflowExecutor

NewWorkflowExecutor creates a new workflow executor

func (*WorkflowExecutor) ExecuteWorkflow

func (we *WorkflowExecutor) ExecuteWorkflow(ctx context.Context, workflow *api.Workflow, args map[string]interface{}) (*mcp.CallToolResult, error)

ExecuteWorkflow executes a workflow with the given arguments

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL