application

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2026 License: EUPL-1.2 Imports: 25 Imported by: 0

Documentation

Overview

Package application provides application services that orchestrate domain operations.

The application layer sits between the interfaces layer (CLI, API) and the domain layer, coordinating workflow execution through ports and domain entities. Services in this package implement use cases and business workflows without containing business rules (which reside in the domain layer). All infrastructure dependencies are injected via port interfaces.

Architecture Role

In the hexagonal architecture:

  • Application services receive requests from interfaces layer (CLI commands)
  • Services orchestrate domain entities (Workflow, Step, ExecutionContext)
  • Services delegate infrastructure work to ports (CommandExecutor, StateStore)
  • Services coordinate cross-cutting concerns (logging, state persistence, history)

The application layer enforces the dependency rule: it depends on domain ports (inward) but is depended upon by the interfaces layer (outward). Infrastructure adapters implement the ports that services consume.

Core Orchestration Services

## WorkflowService (service.go)

High-level workflow operations:

  • ListWorkflows: Enumerate available workflows
  • GetWorkflow: Load workflow definition by name
  • ValidateWorkflow: Static validation of workflow structure
  • WorkflowExists: Check if workflow is available

Dependencies: WorkflowRepository, StateStore, CommandExecutor, Logger

## ExecutionService (execution_service.go)

Core workflow execution orchestrator:

  • ExecuteWorkflow: Full workflow execution with state machine traversal
  • ExecuteStep: Single step execution dispatcher (routes by StepType)
  • HandleTransitions: Conditional transition evaluation
  • SaveCheckpoint: Persist execution state for resumption

Step type handlers:

  • executeCommandStep: Shell command execution with capture
  • executeAgentStep: AI agent invocation (single-shot and conversation)
  • executeParallelStep: Concurrent branch execution
  • executeForEachStep: Loop over items
  • executeWhileStep: Conditional loop
  • executeOperationStep: Plugin operation invocation
  • executeCallWorkflowStep: Nested sub-workflow execution
  • executeTerminalStep: Workflow completion

Dependencies: WorkflowService, CommandExecutor, ParallelExecutor, StateStore, Logger, ExpressionEvaluator, HookExecutor, LoopExecutor, TemplateService, HistoryService, OperationProvider, AgentRegistry, ConversationExecutor

Optional configuration:

  • SetOutputWriters: Configure stdout/stderr streaming
  • SetTemplateService: Enable template expansion
  • SetOperationProvider: Enable plugin operations
  • SetAgentRegistry: Enable AI agent steps
  • SetEvaluator: Enable conditional transitions
  • SetConversationManager: Enable multi-turn conversations

## HistoryService (history_service.go)

Workflow execution history and analytics:

  • RecordExecution: Persist workflow run record with metadata
  • GetExecutionHistory: Query execution records (filtering, pagination)
  • GetExecutionStats: Aggregate statistics (counts, durations, success rates)
  • PruneOldHistory: Clean up old execution records

Dependencies: HistoryStore, Logger

## TemplateService (template_service.go)

Workflow template expansion and validation:

  • ExpandTemplate: Resolve template reference into concrete steps
  • ValidateTemplate: Check template syntax and parameter bindings
  • LoadTemplate: Retrieve template definition by name

Dependencies: TemplateRepository, Logger, ExpressionValidator

## PluginService (plugin_service.go)

Plugin lifecycle management:

  • DiscoverPlugins: Scan and list available plugins
  • LoadPlugin: Initialize plugin with configuration
  • EnablePlugin: Activate plugin for workflow use
  • DisablePlugin: Deactivate plugin
  • GetPluginStatus: Check plugin enabled/disabled state

Dependencies: PluginManager, PluginStateStore, Logger

## InputCollectionService (input_collection_service.go)

Interactive input gathering:

  • CollectInputs: Prompt user for missing workflow inputs
  • ValidateInputs: Check input values against validation rules
  • BuildInputMap: Construct workflow input map from user responses

Dependencies: InputCollector, Logger

Specialized Executors

## ParallelExecutor (parallel_executor.go)

Concurrent branch execution with strategies:

  • Execute: Run branches concurrently with semaphore control
  • all_succeed: All branches must succeed
  • any_succeed: At least one branch succeeds
  • best_effort: Execute all, report best outcome

Features:

  • MaxConcurrent: Limit concurrent goroutines
  • DependsOn: Branch execution ordering constraints
  • Graceful cancellation via context

Dependencies: StepExecutor, Logger

## LoopExecutor (loop_executor.go)

Iterative step execution:

  • ExecuteForEach: Iterate over items collection
  • ExecuteWhile: Conditional loop with guard expression
  • BuildLoopContext: Construct loop variables (item, index, first, last)
  • EvaluateLoopCondition: Check while loop guard

Loop context variables:

  • {{loop.item}}: Current item value
  • {{loop.index}}: Zero-based iteration index
  • {{loop.first}}: Boolean first iteration flag
  • {{loop.last}}: Boolean last iteration flag
  • {{loop.length}}: Total items count (for_each only)

Dependencies: StepExecutor, ExpressionEvaluator, Logger

## InteractiveExecutor (interactive_executor.go)

Step-by-step interactive execution:

  • ExecuteInteractive: Prompt user before each step
  • PromptUserAction: Present execution options (run, skip, retry, abort, edit)
  • HandleUserChoice: Process user action and update state

User actions:

  • Run: Execute step normally
  • Skip: Skip step and proceed to next
  • Retry: Re-attempt previous step
  • Abort: Halt workflow execution
  • Edit: Modify step parameters before execution

Dependencies: InteractivePrompt, StepExecutor, Logger

## DryRunExecutor (dry_run_executor.go)

Workflow validation without side effects:

  • ExecuteDryRun: Traverse workflow without executing commands
  • LogPlannedActions: Report steps that would execute
  • ValidateGraph: Check state transitions are valid

Dependencies: WorkflowService, Logger

## HookExecutor (hook_executor.go)

Workflow and step lifecycle hooks:

  • ExecuteWorkflowHooks: Workflow start/end/error/cancel events
  • ExecuteStepHooks: Step pre/post execution hooks
  • ExecuteHookAction: Process individual hook (log or command)

Hook types:

  • Log: Structured log message with template interpolation
  • Command: Shell command execution

Dependencies: CommandExecutor, Logger, ExpressionEvaluator

## ConversationManager (conversation_manager.go)

Multi-turn AI agent conversations:

  • ExecuteConversation: Manage conversation loop with turn limits
  • EvaluateStopConditions: Check conversation termination criteria
  • ManageContextWindow: Trim conversation history for token budget
  • RecordMessage: Append user/assistant messages to history

Context window strategies:

  • truncate_middle: Remove middle messages, keep first/last
  • summarize: Use LLM to summarize old messages
  • truncate_oldest: Drop oldest messages first

Dependencies: AgentProvider, Tokenizer, ExpressionEvaluator, Logger

Supporting Services

## OutputStreamer (output_streamer.go)

Real-time command output streaming:

  • StreamOutput: Write command stdout to writer during execution
  • StreamError: Write command stderr to writer during execution
  • CaptureOutput: Buffer output for context interpolation

Dependencies: io.Writer (stdout, stderr)

## OutputLimiter (output_limiter.go)

Memory protection for large outputs:

  • LimitOutput: Enforce maximum output size
  • TruncateOutput: Trim output exceeding limit
  • ReportTruncation: Log truncation events

Prevents out-of-memory errors from unbounded command output accumulation.

Dependencies: Logger

Usage Examples

## Basic Workflow Execution

Orchestrate workflow execution with injected dependencies:

wfSvc := NewWorkflowService(repo, store, executor, logger, validator)
execSvc := NewExecutionService(wfSvc, executor, parallelExec, store, logger, resolver, historySvc)

// Validate workflow before execution
if err := wfSvc.ValidateWorkflow(ctx, "deploy-app"); err != nil {
    log.Fatalf("validation failed: %v", err)
}

// Execute workflow with inputs
result, err := execSvc.ExecuteWorkflow(ctx, "deploy-app", map[string]any{
    "environment": "production",
    "version": "v1.2.3",
})
if err != nil {
    log.Fatalf("execution failed: %v", err)
}

## Configure Optional Features

Enable specialized executors:

execSvc.SetOutputWriters(os.Stdout, os.Stderr)
execSvc.SetTemplateService(templateSvc)
execSvc.SetOperationProvider(pluginRegistry)
execSvc.SetAgentRegistry(agentRegistry)
execSvc.SetEvaluator(expressionEvaluator)
execSvc.SetConversationManager(conversationMgr)

## Interactive Mode Execution

Run workflow with step-by-step control:

interactiveExec := NewInteractiveExecutor(prompter, stepExecutor, logger)
result, err := interactiveExec.ExecuteInteractive(ctx, workflow, inputs)

## Query Execution History

historySvc := NewHistoryService(historyStore, logger)

// Get recent executions
history, err := historySvc.GetExecutionHistory(ctx, &HistoryQuery{
    WorkflowName: "deploy-app",
    Limit: 10,
})

// Get aggregate statistics
stats, err := historySvc.GetExecutionStats(ctx, "deploy-app")
fmt.Printf("Success rate: %.1f%%\n", stats.SuccessRate*100)

## Manage Plugins

pluginSvc := NewPluginService(manager, stateStore, logger)

// Discover available plugins
plugins, err := pluginSvc.DiscoverPlugins(ctx)

// Enable plugin for use
if err := pluginSvc.EnablePlugin(ctx, "my-plugin"); err != nil {
    log.Fatalf("failed to enable plugin: %v", err)
}

## Expand Workflow Template

templateSvc := NewTemplateService(templateRepo, logger)

// Expand template reference
steps, err := templateSvc.ExpandTemplate(ctx, &workflow.WorkflowTemplateRef{
    Template: "deploy-steps",
    With: map[string]any{
        "environment": "staging",
    },
})

## Collect Missing Inputs

inputSvc := NewInputCollectionService(collector, logger)

// Prompt user for undefined inputs
inputs, err := inputSvc.CollectInputs(ctx, workflow, providedInputs)

Design Principles

## Dependency Injection

All services use constructor injection:

  • Port interfaces injected via NewXxxService constructors
  • No global state or singletons
  • Testable with mock implementations
  • Optional dependencies via SetXxx methods

## Orchestration, Not Business Logic

Services coordinate but don't contain rules:

  • Validation logic lives in domain entities (Workflow.Validate)
  • State transitions handled by domain (ExecutionContext)
  • Services delegate to domain methods, don't duplicate logic

## Error Propagation

Services wrap errors with context:

  • Use fmt.Errorf with %w for error chains
  • Preserve underlying error for error.Is checks
  • Add operation context for debugging

Example:

if err := s.repo.Load(ctx, name); err != nil {
    return fmt.Errorf("load workflow %s: %w", name, err)
}

## Context Propagation

All operations accept context.Context:

  • Enables cancellation and timeout control
  • Graceful shutdown of long-running operations
  • Pass context to all port calls

## Thread Safety

Services are stateless and thread-safe:

  • No mutable state fields (all dependencies are ports)
  • Multiple goroutines can call service methods concurrently
  • State mutations delegated to thread-safe ExecutionContext
  • internal/domain/workflow: Core entities orchestrated by services
  • internal/domain/ports: Port interfaces implemented by infrastructure
  • internal/infrastructure: Concrete port implementations
  • internal/interfaces/cli: CLI commands that invoke services

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCircularWorkflowCall is returned when a circular sub-workflow call is detected.
	ErrCircularWorkflowCall = errors.New("circular workflow call detected")

	// ErrMaxNestingExceeded is returned when sub-workflow nesting depth exceeds the maximum allowed.
	ErrMaxNestingExceeded = errors.New("maximum sub-workflow nesting depth exceeded")

	// ErrSubWorkflowNotFound is returned when a referenced sub-workflow does not exist.
	ErrSubWorkflowNotFound = errors.New("sub-workflow not found")
)

Sub-workflow execution errors.

View Source
var ErrNoAgentRegistry = errors.New("agent registry not configured")

ErrNoAgentRegistry is returned when an agent step is executed without a configured registry.

View Source
var ErrNoOperationProvider = errors.New("operation provider not configured")

ErrNoOperationProvider is returned when an operation step is executed without a configured provider.

View Source
var ErrNoStepTypeProvider = errors.New("step type provider not configured")

ErrNoStepTypeProvider is returned when a custom step type is executed without a configured provider.

View Source
var ErrPluginDisabled = errors.New("plugin is disabled")

ErrPluginDisabled indicates the plugin is disabled and cannot be loaded.

View Source
var ErrPluginNameEmpty = errors.New("plugin name cannot be empty")

ErrPluginNameEmpty indicates an empty plugin name was provided.

View Source
var ErrUnknownPlugin = errors.New("unknown plugin")

ErrUnknownPlugin indicates the plugin name is not registered (builtin or external).

Functions

This section is empty.

Types

type ContextBuilderFunc

type ContextBuilderFunc func(execCtx *workflow.ExecutionContext) *interpolation.Context

ContextBuilderFunc builds an interpolation context from execution context.

type ConversationExecutor

type ConversationExecutor interface {
	ExecuteConversation(
		ctx context.Context,
		step *workflow.Step,
		config *workflow.ConversationConfig,
		execCtx *workflow.ExecutionContext,
		buildContext ContextBuilderFunc,
	) (*workflow.ConversationResult, error)
}

ConversationExecutor defines the interface for executing multi-turn conversations. This interface allows for dependency injection and testing with mocks.

type ConversationManager

type ConversationManager struct {
	// contains filtered or unexported fields
}

ConversationManager orchestrates multi-turn agent conversations with automatic context window management, token counting, and stop condition evaluation.

Following the LoopExecutor pattern, ConversationManager: - Manages turn iteration (analogous to loop iterations) - Evaluates stop conditions (analogous to break conditions) - Maintains conversation state (analogous to loop context) - Integrates with AgentProvider for turn execution

func NewConversationManager

func NewConversationManager(
	logger ports.Logger,
	evaluator ports.ExpressionEvaluator,
	resolver interpolation.Resolver,
	tokenizer ports.Tokenizer,
	agentRegistry ports.AgentRegistry,
) *ConversationManager

func (*ConversationManager) ExecuteConversation

func (m *ConversationManager) ExecuteConversation(
	ctx context.Context,
	step *workflow.Step,
	config *workflow.ConversationConfig,
	execCtx *workflow.ExecutionContext,
	buildContext ContextBuilderFunc,
) (*workflow.ConversationResult, error)

ExecuteConversation orchestrates a multi-turn conversation according to the configuration in the agent step's conversation settings.

Flow:

  1. Initialize conversation state with system prompt (if provided)
  2. Execute initial user prompt to start conversation
  3. For each turn: a. Execute agent provider with conversation history b. Add agent response to conversation state c. Count tokens and apply context window strategy if needed d. Evaluate stop condition e. Check max turns/tokens limits f. If continuing, prepare next user prompt
  4. Return final ConversationResult

Parameters: - ctx: context for cancellation and timeout - step: agent step configuration with conversation settings - config: conversation configuration (max_turns, strategy, stop_condition) - execCtx: execution context with state and inputs - buildContext: function to build interpolation context for template resolution

Returns: - ConversationResult with final state, output, token counts, and stop reason - error if conversation execution fails

type DryRunExecutor

type DryRunExecutor struct {
	// contains filtered or unexported fields
}

DryRunExecutor walks through a workflow without executing commands. It produces an execution plan showing what would happen.

func NewDryRunExecutor

func NewDryRunExecutor(
	wfSvc *WorkflowService,
	resolver interpolation.Resolver,
	evaluator ports.ExpressionEvaluator,
	logger ports.Logger,
) *DryRunExecutor

func (*DryRunExecutor) Execute

func (e *DryRunExecutor) Execute(ctx context.Context, workflowName string, inputs map[string]any) (*workflow.DryRunPlan, error)

func (*DryRunExecutor) SetAWFPaths

func (e *DryRunExecutor) SetAWFPaths(paths map[string]string)

SetAWFPaths configures the AWF XDG directory paths for F063 template interpolation.

func (*DryRunExecutor) SetTemplateService

func (e *DryRunExecutor) SetTemplateService(svc *TemplateService)

type ExecutionService

type ExecutionService struct {
	// contains filtered or unexported fields
}

ExecutionService orchestrates workflow execution.

func NewExecutionService

func NewExecutionService(
	wfSvc *WorkflowService,
	executor ports.CommandExecutor,
	parallelExecutor ports.ParallelExecutor,
	store ports.StateStore,
	logger ports.Logger,
	resolver interpolation.Resolver,
	historySvc *HistoryService,
) *ExecutionService

NewExecutionService - historySvc can be nil to disable history recording.

func NewExecutionServiceWithEvaluator

func NewExecutionServiceWithEvaluator(
	wfSvc *WorkflowService,
	executor ports.CommandExecutor,
	parallelExecutor ports.ParallelExecutor,
	store ports.StateStore,
	logger ports.Logger,
	resolver interpolation.Resolver,
	historySvc *HistoryService,
	evaluator ports.ExpressionEvaluator,
) *ExecutionService

NewExecutionServiceWithEvaluator enables conditional transitions using the `when:` clause.

func (*ExecutionService) ExecuteSingleStep

func (s *ExecutionService) ExecuteSingleStep(
	ctx context.Context,
	workflowName string,
	stepName string,
	inputs map[string]any,
	mocks map[string]string,
) (*SingleStepResult, error)

ExecuteSingleStep executes a single step from a workflow in isolation. It bypasses the state machine and directly runs the specified step. Mocked states can be provided to simulate dependencies on previous steps.

func (*ExecutionService) HandleMaxIterationFailure

func (s *ExecutionService) HandleMaxIterationFailure(
	ctx context.Context,
	result *workflow.LoopResult,
	step *workflow.Step,
	wf *workflow.Workflow,
	execCtx *workflow.ExecutionContext,
	loopState *workflow.StepState,
) (string, error)

HandleMaxIterationFailure handles the case when a loop hits max iterations with problematic patterns. If OnComplete is configured, completes successfully via that transition (e.g., retry patterns). Otherwise, treats as failure and returns error or transitions via OnFailure.

func (*ExecutionService) IsProblematicMaxIterationPattern

func (s *ExecutionService) IsProblematicMaxIterationPattern(
	result *workflow.LoopResult,
	step *workflow.Step,
	wf *workflow.Workflow,
) bool

IsProblematicMaxIterationPattern checks if a loop hit max iterations with problematic patterns. Returns true if the loop completed by hitting max iterations AND has step failures or complex nesting.

func (*ExecutionService) ListResumable

func (s *ExecutionService) ListResumable(ctx context.Context) ([]*workflow.ExecutionContext, error)

ListResumable returns all workflow executions that can be resumed. A workflow is resumable if its status is not completed.

func (*ExecutionService) Resume

func (s *ExecutionService) Resume(
	ctx context.Context,
	workflowID string,
	inputOverrides map[string]any,
) (*workflow.ExecutionContext, error)

Resume continues an interrupted workflow execution from where it left off. It loads persisted state, validates resumability, merges input overrides, and continues execution from CurrentStep while skipping completed steps.

func (*ExecutionService) Run

func (s *ExecutionService) Run(
	ctx context.Context,
	workflowName string,
	inputs map[string]any,
) (*workflow.ExecutionContext, error)

Run executes a workflow by name with the given inputs.

func (*ExecutionService) RunWithWorkflow

func (s *ExecutionService) RunWithWorkflow(
	ctx context.Context,
	wf *workflow.Workflow,
	inputs map[string]any,
) (*workflow.ExecutionContext, error)

RunWithWorkflow executes a pre-loaded workflow with the given inputs. This avoids double-loading when the workflow has already been loaded (e.g., for input validation).

func (*ExecutionService) SetAWFPaths

func (s *ExecutionService) SetAWFPaths(paths map[string]string)

SetAWFPaths configures the AWF XDG directory paths for F063 template interpolation. Keys: prompts_dir, config_dir, data_dir, workflows_dir, plugins_dir, scripts_dir.

func (*ExecutionService) SetAgentRegistry

func (s *ExecutionService) SetAgentRegistry(registry ports.AgentRegistry)

SetAgentRegistry configures the agent registry for F039 agent step execution. When set, agent-type steps can execute AI provider operations.

func (*ExecutionService) SetAuditTrailWriter

func (s *ExecutionService) SetAuditTrailWriter(w ports.AuditTrailWriter)

SetAuditTrailWriter configures the audit trail writer for F071 structured audit events. When nil, audit emission is skipped without error.

func (*ExecutionService) SetConversationManager

func (s *ExecutionService) SetConversationManager(mgr ConversationExecutor)

SetConversationManager configures the conversation manager for F033 multi-turn conversations. When set, agent-type steps with mode: conversation can execute managed conversations. Accepts ConversationExecutor interface to allow dependency injection and testing with mocks.

func (*ExecutionService) SetEvaluator

func (s *ExecutionService) SetEvaluator(evaluator ports.ExpressionEvaluator)

SetEvaluator configures the expression evaluator for conditional transitions. When set, enables evaluation of "when" clauses in workflow transitions.

func (*ExecutionService) SetOperationProvider

func (s *ExecutionService) SetOperationProvider(provider ports.OperationProvider)

SetOperationProvider configures the plugin operation provider for F021. When set, operation-type steps can execute plugin-provided operations.

func (*ExecutionService) SetOutputWriters

func (s *ExecutionService) SetOutputWriters(stdout, stderr io.Writer)

SetOutputWriters configures streaming output writers.

func (*ExecutionService) SetPluginService

func (s *ExecutionService) SetPluginService(svc *PluginService)

SetPluginService configures the plugin service for disabled-plugin detection. When set, executePluginOperation checks if the plugin is disabled before lookup. When nil, the check is skipped (backward compatible).

func (*ExecutionService) SetStepTypeProvider added in v0.5.0

func (s *ExecutionService) SetStepTypeProvider(provider ports.StepTypeProvider)

SetStepTypeProvider configures the plugin step type provider for C069 custom step type execution. When set, unknown step types are delegated to the provider via ExecuteStep. When nil, unknown step types fall through to the default command executor (backward compatible).

func (*ExecutionService) SetTemplateService

func (s *ExecutionService) SetTemplateService(svc *TemplateService)

SetTemplateService configures the template service for expanding template references.

type HistoryService

type HistoryService struct {
	// contains filtered or unexported fields
}

HistoryService provides business logic for workflow execution history.

func NewHistoryService

func NewHistoryService(store ports.HistoryStore, logger ports.Logger) *HistoryService

func (*HistoryService) Cleanup

func (s *HistoryService) Cleanup(ctx context.Context) (int, error)

func (*HistoryService) Close

func (s *HistoryService) Close() error

func (*HistoryService) GetStats

func (*HistoryService) List

func (*HistoryService) Record

func (s *HistoryService) Record(ctx context.Context, record *workflow.ExecutionRecord) error

type HookExecutor

type HookExecutor struct {
	// contains filtered or unexported fields
}

HookExecutor executes workflow and step hooks.

func NewHookExecutor

func NewHookExecutor(
	executor ports.CommandExecutor,
	logger ports.Logger,
	resolver interpolation.Resolver,
) *HookExecutor

func (*HookExecutor) ExecuteHooks

func (h *HookExecutor) ExecuteHooks(
	ctx context.Context,
	hook workflow.Hook,
	intCtx *interpolation.Context,
	failOnError bool,
) error

ExecuteHooks executes a list of hook actions sequentially. If failOnError is false, errors are logged but execution continues. If failOnError is true, execution stops on first error.

type InputCollectionService

type InputCollectionService struct {
	// contains filtered or unexported fields
}

InputCollectionService coordinates interactive collection of missing workflow inputs. This service detects which required inputs are missing from command-line arguments and delegates to the InputCollector port for user interaction.

Key responsibilities:

  • Detect missing required inputs by comparing workflow definition with provided values
  • Coordinate input collection via InputCollector port
  • Merge collected inputs with provided inputs
  • Handle optional inputs with default values

Usage:

service := NewInputCollectionService(collector, logger)
allInputs, err := service.CollectMissingInputs(workflow, providedInputs)
if err != nil {
    return fmt.Errorf("input collection failed: %w", err)
}

func NewInputCollectionService

func NewInputCollectionService(
	collector ports.InputCollector,
	logger ports.Logger,
) *InputCollectionService

NewInputCollectionService creates a new input collection service.

func (*InputCollectionService) CollectMissingInputs

func (s *InputCollectionService) CollectMissingInputs(
	ctx context.Context,
	wf *workflow.Workflow,
	providedInputs map[string]any,
) (map[string]any, error)

CollectMissingInputs detects missing required inputs and prompts the user interactively.

Logic:

  • Iterates through workflow.Inputs
  • For each required input not in providedInputs: prompt user and collect value
  • For each optional input not in providedInputs: prompt user (can skip with Enter)
  • Returns merged map containing both provided and collected inputs

Parameters:

  • ctx: Context for cancellation (e.g., Ctrl+C via signal)
  • wf: Workflow definition containing input specifications
  • providedInputs: Input values already provided via command-line flags

Returns:

  • Complete input map (provided + collected)
  • Error if collection fails or user cancels

type InteractiveExecutor

type InteractiveExecutor struct {
	// contains filtered or unexported fields
}

InteractiveExecutor executes workflows in step-by-step interactive mode. It pauses before each step, prompts for user action, and displays results.

func NewInteractiveExecutor

func NewInteractiveExecutor(
	wfSvc *WorkflowService,
	executor ports.CommandExecutor,
	parallelExecutor ports.ParallelExecutor,
	store ports.StateStore,
	logger ports.Logger,
	resolver interpolation.Resolver,
	evaluator ports.ExpressionEvaluator,
	prompt ports.InteractivePrompt,
) *InteractiveExecutor

NewInteractiveExecutor creates a new interactive executor.

func (*InteractiveExecutor) HandleExecutionError

func (e *InteractiveExecutor) HandleExecutionError(
	ctx context.Context,
	step *workflow.Step,
	state *workflow.StepState,
	execErr error,
	intCtx *interpolation.Context,
) (string, error)

HandleExecutionError handles the case where command execution returns an error. It records the failure state, executes post-hooks, and determines the next step based on ContinueOnError or OnFailure configuration.

func (*InteractiveExecutor) HandleNonZeroExit

func (e *InteractiveExecutor) HandleNonZeroExit(
	ctx context.Context,
	step *workflow.Step,
	state *workflow.StepState,
	result *ports.CommandResult,
	intCtx *interpolation.Context,
) (string, error)

HandleNonZeroExit handles the case where command execution succeeds but returns non-zero exit code. It records the failure state, executes post-hooks, and determines the next step based on ContinueOnError or OnFailure configuration.

func (*InteractiveExecutor) HandleSuccess

func (e *InteractiveExecutor) HandleSuccess(
	ctx context.Context,
	step *workflow.Step,
	state *workflow.StepState,
	intCtx *interpolation.Context,
) (string, error)

HandleSuccess handles the case where command execution succeeds with zero exit code. It records the success state, executes post-hooks, and determines the next step based on transitions or legacy OnSuccess configuration.

func (*InteractiveExecutor) Run

func (e *InteractiveExecutor) Run(ctx context.Context, workflowName string, inputs map[string]any) (*workflow.ExecutionContext, error)

Run executes the workflow in interactive mode. It returns the final execution context and any error.

func (*InteractiveExecutor) SetAWFPaths

func (e *InteractiveExecutor) SetAWFPaths(paths map[string]string)

SetAWFPaths configures the AWF XDG directory paths for template interpolation. Keys: prompts_dir, config_dir, data_dir, workflows_dir, plugins_dir, scripts_dir.

func (*InteractiveExecutor) SetBreakpoints

func (e *InteractiveExecutor) SetBreakpoints(breakpoints []string)

SetBreakpoints sets specific steps to pause at. If breakpoints is nil or empty, all steps will pause.

func (*InteractiveExecutor) SetOutputWriters

func (e *InteractiveExecutor) SetOutputWriters(stdout, stderr io.Writer)

SetOutputWriters configures streaming output writers.

func (*InteractiveExecutor) SetTemplateService

func (e *InteractiveExecutor) SetTemplateService(svc *TemplateService)

SetTemplateService configures the template service for workflow expansion.

type LimitResult

type LimitResult struct {
	Output     string // Truncated or full output
	Stderr     string // Truncated or full stderr
	OutputPath string // Path to temp file if streamed
	StderrPath string // Path to temp file if streamed
	Truncated  bool   // True if output was truncated
}

LimitResult contains the result of applying output limits.

type LoopExecutor

type LoopExecutor struct {
	// contains filtered or unexported fields
}

LoopExecutor executes for_each and while loop constructs.

func NewLoopExecutor

func NewLoopExecutor(
	logger ports.Logger,
	evaluator ports.ExpressionEvaluator,
	resolver interpolation.Resolver,
) *LoopExecutor

NewLoopExecutor creates a new LoopExecutor.

func (*LoopExecutor) BuildBodyStepIndices

func (e *LoopExecutor) BuildBodyStepIndices(body []string) (map[string]int, error)

BuildBodyStepIndices creates a map from step name to index in the body slice. Used by F048 to support transitions within loop bodies by enabling jump-to-index logic. Returns a map where keys are step names and values are their positions in the body array, or an error if duplicate step names are detected (to prevent silent configuration errors). F048 T004: Body Step Index Mapping

INTERNAL: This method is exported for testing purposes only.

func (*LoopExecutor) ExecuteForEach

func (e *LoopExecutor) ExecuteForEach(
	ctx context.Context,
	wf *workflow.Workflow,
	step *workflow.Step,
	execCtx *workflow.ExecutionContext,
	stepExecutor StepExecutorFunc,
	buildContext ContextBuilderFunc,
) (*workflow.LoopResult, error)

ExecuteForEach iterates over items and executes body steps for each.

func (*LoopExecutor) ExecuteWhile

func (e *LoopExecutor) ExecuteWhile(
	ctx context.Context,
	wf *workflow.Workflow,
	step *workflow.Step,
	execCtx *workflow.ExecutionContext,
	stepExecutor StepExecutorFunc,
	buildContext ContextBuilderFunc,
) (*workflow.LoopResult, error)

ExecuteWhile repeats body steps while condition is true.

func (*LoopExecutor) ParseItems

func (e *LoopExecutor) ParseItems(itemsStr string) ([]any, error)

ParseItems converts items string to slice. Used by ExecuteForEach to parse the items expression result. Supports JSON arrays and comma-separated values.

func (*LoopExecutor) PopLoopContext

func (e *LoopExecutor) PopLoopContext(execCtx *workflow.ExecutionContext) *workflow.LoopContext

PopLoopContext restores the parent loop context. Returns the popped context for inspection if needed. F043: Nested Loop Execution

func (*LoopExecutor) PushLoopContext

func (e *LoopExecutor) PushLoopContext(
	execCtx *workflow.ExecutionContext,
	item any,
	index int,
	first, last bool,
	length int,
)

PushLoopContext sets a new loop context, linking to existing as parent. This enables nested loops by preserving outer loop state. F043: Nested Loop Execution

func (*LoopExecutor) ResolveMaxIterations

func (e *LoopExecutor) ResolveMaxIterations(maxIterExpr string, ctx *interpolation.Context) (int, error)

ResolveMaxIterations resolves a dynamic max_iterations expression to an integer. It performs template interpolation first ({{var}} substitution), then evaluates any arithmetic expressions, and finally validates the result is within bounds. Returns error if the expression cannot be resolved, is not a valid integer, or the value is outside the allowed range (1-10000). F037: Dynamic Variable Interpolation in Loops

type OutputLimiter

type OutputLimiter struct {
	// contains filtered or unexported fields
}

OutputLimiter handles output size management for step execution. C019: Prevents OOM from unbounded StepState.Output/Stderr growth.

func NewOutputLimiter

func NewOutputLimiter(config workflow.OutputLimits) *OutputLimiter

NewOutputLimiter creates a new OutputLimiter with the given configuration.

func (*OutputLimiter) Apply

func (l *OutputLimiter) Apply(output, stderr string) (*LimitResult, error)

Apply applies output limits to the given output and stderr strings. Returns a LimitResult indicating what happened (truncation, streaming, or pass-through).

func (*OutputLimiter) ShouldLimit

func (l *OutputLimiter) ShouldLimit(output string) bool

ShouldLimit returns true if the output exceeds the configured limit.

func (*OutputLimiter) Truncate

func (l *OutputLimiter) Truncate(output string) string

Truncate truncates the output to the configured limit with a truncation marker.

type OutputStreamer

type OutputStreamer struct {
	// contains filtered or unexported fields
}

OutputStreamer handles streaming large outputs to temporary files. C019: Prevents OOM from unbounded StepState.Output/Stderr growth by streaming to disk.

func NewOutputStreamer

func NewOutputStreamer(config workflow.OutputLimits) *OutputStreamer

NewOutputStreamer creates a new OutputStreamer with the given configuration.

func (*OutputStreamer) StreamBoth

func (s *OutputStreamer) StreamBoth(output, stderr string) (outputPath, stderrPath string, err error)

StreamBoth streams both output and stderr if they exceed limits. Returns paths to temp files (empty if not streamed) and any error.

func (*OutputStreamer) StreamOutput

func (s *OutputStreamer) StreamOutput(content string) (string, error)

StreamOutput writes output to a temp file if it exceeds the configured limit. Returns the path to the temp file if streamed, or empty string if not streamed. Returns an error if temp file creation fails.

type ParallelExecutor

type ParallelExecutor struct {
	// contains filtered or unexported fields
}

ParallelExecutor executes parallel branches using errgroup with semaphore.

func NewParallelExecutor

func NewParallelExecutor(logger ports.Logger) *ParallelExecutor

NewParallelExecutor creates a new ParallelExecutor.

func (*ParallelExecutor) CheckBranchSuccess

func (e *ParallelExecutor) CheckBranchSuccess(
	branchResult *workflow.BranchResult,
	firstSuccess *bool,
	mu *sync.Mutex,
	successChan chan struct{},
	cancel context.CancelFunc,
)

CheckBranchSuccess checks if a branch succeeded and signals cancellation if it's the first success. Handles synchronization for success detection in any_succeed strategy.

NOTE: Exported temporarily for T008 testing. Will be made private after TDD cycle completes.

func (*ParallelExecutor) Execute

func (e *ParallelExecutor) Execute(
	ctx context.Context,
	wf *workflow.Workflow,
	branches []string,
	config workflow.ParallelConfig,
	execCtx *workflow.ExecutionContext,
	stepExecutor ports.StepExecutor,
) (*workflow.ParallelResult, error)

Execute runs multiple branches concurrently according to the given strategy.

func (*ParallelExecutor) RunBranchWithSemaphore

func (e *ParallelExecutor) RunBranchWithSemaphore(
	ctx context.Context,
	branch string,
	sem chan struct{},
	stepExecutor ports.StepExecutor,
	wf *workflow.Workflow,
	execCtx *workflow.ExecutionContext,
	result *workflow.ParallelResult,
	mu *sync.Mutex,
) (*workflow.BranchResult, error)

RunBranchWithSemaphore executes a single branch with semaphore control. It acquires the semaphore, executes the branch, and adds the result to the parallel result. Returns the branch result and any error encountered during execution.

NOTE: Exported temporarily for T007 testing. Will be made private after TDD cycle completes.

type PluginService

type PluginService struct {
	// contains filtered or unexported fields
}

PluginService orchestrates plugin lifecycle operations. It coordinates between the PluginManager (loading/init/shutdown) and PluginStateStore (enable/disable persistence).

func NewPluginService

func NewPluginService(
	manager ports.PluginManager,
	stateStore ports.PluginStateStore,
	logger ports.Logger,
) *PluginService

NewPluginService creates a new PluginService with injected dependencies.

func (*PluginService) DisablePlugin

func (s *PluginService) DisablePlugin(ctx context.Context, name string) error

DisablePlugin disables a plugin, shuts it down if running, and persists the state.

func (*PluginService) DiscoverPlugins

func (s *PluginService) DiscoverPlugins(ctx context.Context) ([]*pluginmodel.PluginInfo, error)

DiscoverPlugins scans the plugins directory and returns discovered plugins. It filters out disabled plugins based on the state store.

func (*PluginService) EnablePlugin

func (s *PluginService) EnablePlugin(ctx context.Context, name string) error

EnablePlugin enables a plugin and persists the state. Does not automatically load or initialize the plugin.

func (*PluginService) GetPlugin

func (s *PluginService) GetPlugin(name string) (*pluginmodel.PluginInfo, bool)

GetPlugin returns plugin info by name. Returns (nil, false) if plugin not found.

func (*PluginService) GetPluginConfig

func (s *PluginService) GetPluginConfig(name string) map[string]any

GetPluginConfig retrieves stored configuration for a plugin.

func (*PluginService) InitPlugin

func (s *PluginService) InitPlugin(ctx context.Context, name string) error

InitPlugin initializes a loaded plugin with stored configuration. The configuration is retrieved from the state store.

func (*PluginService) IsPluginEnabled

func (s *PluginService) IsPluginEnabled(name string) bool

IsPluginEnabled returns whether a plugin is enabled.

func (*PluginService) ListDisabledPlugins

func (s *PluginService) ListDisabledPlugins() []string

ListDisabledPlugins returns names of disabled plugins.

func (*PluginService) ListEnabledPlugins

func (s *PluginService) ListEnabledPlugins() []*pluginmodel.PluginInfo

ListEnabledPlugins returns only enabled plugins.

func (*PluginService) ListPlugins

func (s *PluginService) ListPlugins() []*pluginmodel.PluginInfo

ListPlugins returns all known plugins.

func (*PluginService) LoadAndInitPlugin

func (s *PluginService) LoadAndInitPlugin(ctx context.Context, name string) error

LoadAndInitPlugin loads and initializes a plugin in one operation. Convenience method for typical plugin startup.

func (*PluginService) LoadPlugin

func (s *PluginService) LoadPlugin(ctx context.Context, name string) error

LoadPlugin loads a single plugin by name. Returns an error if the plugin is disabled.

func (*PluginService) LoadState

func (s *PluginService) LoadState(ctx context.Context) error

LoadState loads plugin states from storage.

func (*PluginService) RegisterBuiltin

func (s *PluginService) RegisterBuiltin(name, description, version string, operations []string)

RegisterBuiltin registers a built-in operation provider as a synthesized PluginInfo entry. Must be called during initialization before any concurrent access.

func (*PluginService) SaveState

func (s *PluginService) SaveState(ctx context.Context) error

SaveState persists all plugin states to storage.

func (*PluginService) SetPluginConfig

func (s *PluginService) SetPluginConfig(ctx context.Context, name string, config map[string]any) error

SetPluginConfig stores configuration for a plugin.

func (*PluginService) ShutdownAll

func (s *PluginService) ShutdownAll(ctx context.Context) error

ShutdownAll gracefully stops all running plugins.

func (*PluginService) ShutdownPlugin

func (s *PluginService) ShutdownPlugin(ctx context.Context, name string) error

ShutdownPlugin gracefully stops a running plugin.

func (*PluginService) StartupEnabledPlugins

func (s *PluginService) StartupEnabledPlugins(ctx context.Context) error

StartupEnabledPlugins discovers, loads, and initializes all enabled plugins. Typically called at application startup.

type SingleStepResult

type SingleStepResult struct {
	StepName    string
	Output      string
	Stderr      string
	ExitCode    int
	Status      workflow.ExecutionStatus
	Error       string
	StartedAt   time.Time
	CompletedAt time.Time
}

SingleStepResult holds the result of executing a single step.

type StepExecutorFunc

type StepExecutorFunc func(ctx context.Context, stepName string, intCtx *interpolation.Context) (string, error)

StepExecutorFunc executes a step by name within the current loop context. Returns the next step name (if transition matched) and any error encountered. F048: Support transitions within loop body

type TemplateService

type TemplateService struct {
	// contains filtered or unexported fields
}

TemplateService handles template resolution and expansion.

func NewTemplateService

func NewTemplateService(
	repo ports.TemplateRepository,
	logger ports.Logger,
) *TemplateService

NewTemplateService creates a new template service.

func (*TemplateService) ApplyTemplateFields

func (s *TemplateService) ApplyTemplateFields(
	step *workflow.Step,
	templateStep *workflow.Step,
	params map[string]any,
) error

ApplyTemplateFields merges template fields into the workflow step. Step values take precedence over template values for most fields. Exported for testing during TDD RED phase (C005-T001).

func (*TemplateService) ExpandNestedTemplate

func (s *TemplateService) ExpandNestedTemplate(
	ctx context.Context,
	stepName string,
	templateStep *workflow.Step,
	visited map[string]bool,
) error

ExpandNestedTemplate recursively expands nested template references. Exported for testing during TDD RED phase (C005-T001).

func (*TemplateService) ExpandWorkflow

func (s *TemplateService) ExpandWorkflow(ctx context.Context, wf *workflow.Workflow) error

ExpandWorkflow resolves all template references in a workflow.

func (*TemplateService) SelectPrimaryStep

func (s *TemplateService) SelectPrimaryStep(tmpl *workflow.Template) (*workflow.Step, error)

SelectPrimaryStep determines which step to use from the template. Priority: step with same name as template > first step. Exported for testing during TDD RED phase (C005-T001).

func (*TemplateService) ValidateAndLoadTemplate

func (s *TemplateService) ValidateAndLoadTemplate(
	ctx context.Context,
	ref *workflow.WorkflowTemplateRef,
	visited map[string]bool,
) (*workflow.Template, error)

ValidateAndLoadTemplate checks for circular references and loads the template. Combines circular detection with template loading. Exported for testing during TDD RED phase (C005-T001). Note: Does not clean up visited map - caller is responsible for cleanup.

func (*TemplateService) ValidateTemplateRef

func (s *TemplateService) ValidateTemplateRef(ctx context.Context, ref *workflow.WorkflowTemplateRef) error

ValidateTemplateRef validates a template reference without expanding.

type WorkflowService

type WorkflowService struct {
	// contains filtered or unexported fields
}

func NewWorkflowService

func NewWorkflowService(
	repo ports.WorkflowRepository,
	store ports.StateStore,
	executor ports.CommandExecutor,
	logger ports.Logger,
	validator ports.ExpressionValidator,
) *WorkflowService

func (*WorkflowService) GetWorkflow

func (s *WorkflowService) GetWorkflow(ctx context.Context, name string) (*workflow.Workflow, error)

func (*WorkflowService) ListWorkflows

func (s *WorkflowService) ListWorkflows(ctx context.Context) ([]string, error)

func (*WorkflowService) SetValidatorProvider added in v0.5.0

func (s *WorkflowService) SetValidatorProvider(p ports.WorkflowValidatorProvider)

func (*WorkflowService) ValidateWorkflow

func (s *WorkflowService) ValidateWorkflow(ctx context.Context, name string) error

func (*WorkflowService) WorkflowExists

func (s *WorkflowService) WorkflowExists(ctx context.Context, name string) (bool, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL