engine

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultOnTimeout = "fail"

DefaultOnTimeout is the default behavior when a workflow-level timeout fires.

View Source
const DefaultPoolSize = 10

DefaultPoolSize is the default worker pool concurrency.

Variables

View Source
var ErrPoolShutdown = errors.New("worker pool is shut down")

ErrPoolShutdown is returned when work is submitted to a shut-down pool.

ValidStepTransitions defines the allowed state transitions for steps.

ValidWorkflowTransitions defines the allowed state transitions for workflows.

Functions

func CancelWorkflow

func CancelWorkflow(ctx context.Context, wfFSM *WorkflowFSM, stepFSM *StepFSM, workflowID string, currentStatus schema.WorkflowStatus, stepStates map[string]schema.StepStatus) error

CancelWorkflow transitions a workflow to cancelled and skips all non-terminal steps. stepStates is a map of step_id -> current StepStatus for all known steps.

func ComputeBackoff

func ComputeBackoff(policy *schema.RetryPolicy, attempt int) time.Duration

ComputeBackoff calculates the delay before the next retry attempt. Supports none, constant, linear, and exponential backoff with optional max_delay cap.

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError classifies whether an error should be retried. Retryable by default: network errors, timeouts, context.DeadlineExceeded. Non-retryable: validation errors, permission denied, typed OpcodeErrors with non-retryable codes.

func WaitForBackoff

func WaitForBackoff(ctx context.Context, delay time.Duration) error

WaitForBackoff sleeps for the computed backoff duration or returns early if the context is cancelled. Returns an error if the context was cancelled during the wait.

Types

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// FailureThreshold is the number of consecutive failures before opening the circuit.
	FailureThreshold int
	// Cooldown is how long the circuit stays open before transitioning to half-open.
	Cooldown time.Duration
	// HalfOpenMax is the number of test requests allowed in half-open state.
	HalfOpenMax int
}

CircuitBreakerConfig configures the circuit breaker behavior.

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig returns a sensible default configuration.

type CircuitBreakerRegistry

type CircuitBreakerRegistry struct {
	// contains filtered or unexported fields
}

CircuitBreakerRegistry manages per-action circuit breakers.

func NewCircuitBreakerRegistry

func NewCircuitBreakerRegistry(config CircuitBreakerConfig) *CircuitBreakerRegistry

NewCircuitBreakerRegistry creates a new registry with the given config.

func (*CircuitBreakerRegistry) AllowRequest

func (r *CircuitBreakerRegistry) AllowRequest(actionName string) error

AllowRequest checks whether a request to the given action is allowed. Returns nil if allowed, or an OpcodeError if the circuit is open.

func (*CircuitBreakerRegistry) GetState

func (r *CircuitBreakerRegistry) GetState(actionName string) CircuitState

GetState returns the current state of the circuit for an action.

func (*CircuitBreakerRegistry) GetStats

func (r *CircuitBreakerRegistry) GetStats(actionName string) map[string]any

GetStats returns diagnostic information about a circuit breaker.

func (*CircuitBreakerRegistry) RecordFailure

func (r *CircuitBreakerRegistry) RecordFailure(actionName string) CircuitState

RecordFailure records a failed execution for the action. Returns the new circuit state.

func (*CircuitBreakerRegistry) RecordSuccess

func (r *CircuitBreakerRegistry) RecordSuccess(actionName string)

RecordSuccess records a successful execution for the action.

type CircuitState

type CircuitState int

CircuitState represents the state of a circuit breaker.

const (
	CircuitClosed   CircuitState = iota // Normal operation
	CircuitOpen                         // Failing, rejecting calls
	CircuitHalfOpen                     // Testing recovery
)

func (CircuitState) String

func (s CircuitState) String() string

type DAG

type DAG struct {
	Steps   map[string]*schema.StepDefinition // step ID → definition
	Edges   map[string][]string               // step ID → dependencies (depends_on)
	Reverse map[string][]string               // step ID → dependents (who depends on me)
	Sorted  []string                          // topological order
	Roots   []string                          // steps with no dependencies
	Levels  [][]string                        // parallel execution levels
}

DAG is the in-memory directed acyclic graph representation of a workflow. Built from a WorkflowDefinition, used by the Executor to determine execution order.

func ParseDAG

func ParseDAG(def *schema.WorkflowDefinition) (*DAG, error)

ParseDAG parses a WorkflowDefinition into an executable DAG. It validates the definition, builds adjacency lists, performs topological sorting using Kahn's algorithm, detects cycles, and computes parallel execution levels.

type ErrorHandlerResult

type ErrorHandlerResult struct {
	// Handled is true if the error was handled (ignore, fallback).
	Handled bool
	// FallbackStepID is set when the handler triggers a fallback step.
	FallbackStepID string
	// ShouldFailWorkflow is true when the handler explicitly requests workflow failure.
	ShouldFailWorkflow bool
}

ErrorHandlerResult describes the outcome of an error handler invocation.

func HandleStepError

func HandleStepError(
	ctx context.Context,
	eventLog EventAppender,
	workflowID, stepID string,
	handler *schema.ErrorHandler,
	stepErr error,
) (*ErrorHandlerResult, error)

HandleStepError evaluates the on_error handler for a step and determines the next action. It logs the invocation as an event. If no handler is configured, it returns unhandled.

type EventAppender

type EventAppender interface {
	AppendEvent(ctx context.Context, event *store.Event) error
}

EventAppender is satisfied by the Store and EventLog; used by FSMs to emit events on transitions.

type EventLogger

type EventLogger interface {
	EventAppender
	GetEvents(ctx context.Context, workflowID string, since int64) ([]*store.Event, error)
	GetEventsByType(ctx context.Context, eventType string, filter store.EventFilter) ([]*store.Event, error)
	ReplayEvents(ctx context.Context, workflowID string) (map[string]*store.StepState, error)
}

EventLogger abstracts the event log operations needed by the executor. Satisfied by *store.EventLog and test mocks.

type ExecutionResult

type ExecutionResult struct {
	WorkflowID  string                 `json:"workflow_id"`
	Status      schema.WorkflowStatus  `json:"status"`
	Output      json.RawMessage        `json:"output,omitempty"`
	Error       *schema.OpcodeError    `json:"error,omitempty"`
	StartedAt   time.Time              `json:"started_at"`
	CompletedAt *time.Time             `json:"completed_at,omitempty"`
	Steps       map[string]*StepResult `json:"steps,omitempty"`
}

ExecutionResult is returned by Run and Resume with the workflow outcome.

type Executor

type Executor interface {
	// Run starts a new workflow from a persisted Workflow record.
	Run(ctx context.Context, wf *store.Workflow, params map[string]any) (*ExecutionResult, error)

	// Resume continues a suspended or interrupted workflow from its last checkpoint.
	// Replays events to rebuild state, then continues from first pending step.
	// Reasoning nodes are NEVER replayed — stored decisions are injected.
	Resume(ctx context.Context, workflowID string) (*ExecutionResult, error)

	// Signal delivers an agent message to a suspended workflow.
	Signal(ctx context.Context, workflowID string, signal schema.Signal) error

	// Extend mutates the DAG of a running workflow in-flight.
	Extend(ctx context.Context, workflowID string, mutation schema.DAGMutation) error

	// Cancel terminates a workflow with a reason, cascading to active steps.
	Cancel(ctx context.Context, workflowID string, reason string) error

	// Status returns the current state of a workflow.
	Status(ctx context.Context, workflowID string) (*WorkflowStatus, error)
}

Executor is the central workflow execution coordinator.

func NewExecutor

func NewExecutor(s store.Store, el EventLogger, registry actions.ActionRegistry, cfg ExecutorConfig, vault ...secrets.Vault) Executor

NewExecutor creates a new Executor with the given dependencies. vault is optional (nil = secrets interpolation disabled).

type ExecutorConfig

type ExecutorConfig struct {
	PoolSize       int                   // max concurrent step goroutines
	CircuitBreaker *CircuitBreakerConfig // circuit breaker config (nil = defaults)
}

ExecutorConfig holds configuration for the executor.

type PoolMetrics

type PoolMetrics struct {
	Active    int64 `json:"active"`
	Completed int64 `json:"completed"`
	Failed    int64 `json:"failed"`
	Panics    int64 `json:"panics"`
}

PoolMetrics tracks worker pool operational metrics.

type StepFSM

type StepFSM struct {
	// contains filtered or unexported fields
}

StepFSM manages step lifecycle state transitions.

func NewStepFSM

func NewStepFSM(appender EventAppender) *StepFSM

NewStepFSM creates a new StepFSM that emits events via the given appender.

func (*StepFSM) OnAfter

func (f *StepFSM) OnAfter(from, to schema.StepStatus, hook TransitionHook)

OnAfter registers a hook called after a step transition.

func (*StepFSM) OnBefore

func (f *StepFSM) OnBefore(from, to schema.StepStatus, hook TransitionHook)

OnBefore registers a hook called before a step transition.

func (*StepFSM) Transition

func (f *StepFSM) Transition(ctx context.Context, workflowID, stepID string, from, to schema.StepStatus) error

Transition validates and executes a step state transition. It emits the corresponding event via the appender.

type StepResult

type StepResult struct {
	StepID     string              `json:"step_id"`
	Status     schema.StepStatus   `json:"status"`
	Output     json.RawMessage     `json:"output,omitempty"`
	Error      *schema.OpcodeError `json:"error,omitempty"`
	DurationMs int64               `json:"duration_ms,omitempty"`
}

StepResult summarizes the outcome of a single step.

type TransitionHook

type TransitionHook func(from, to string) error

TransitionHook is called before or after a state transition.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a bounded goroutine pool for concurrent step execution.

func NewWorkerPool

func NewWorkerPool(size int) *WorkerPool

NewWorkerPool creates a pool with the given max concurrency.

func (*WorkerPool) Metrics

func (p *WorkerPool) Metrics() PoolMetrics

Metrics returns a snapshot of the current pool metrics.

func (*WorkerPool) Shutdown

func (p *WorkerPool) Shutdown()

Shutdown gracefully stops the pool. It prevents new submissions and waits for all active work to complete.

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(ctx context.Context, fn func(ctx context.Context) error) error

Submit enqueues work into the pool. It blocks if the pool is at capacity (backpressure) and respects context cancellation while waiting. Returns ErrPoolShutdown if the pool has been shut down.

func (*WorkerPool) Wait

func (p *WorkerPool) Wait()

Wait blocks until all submitted work completes.

type WorkflowFSM

type WorkflowFSM struct {
	// contains filtered or unexported fields
}

WorkflowFSM manages workflow lifecycle state transitions.

func NewWorkflowFSM

func NewWorkflowFSM(appender EventAppender) *WorkflowFSM

NewWorkflowFSM creates a new WorkflowFSM that emits events via the given appender.

func (*WorkflowFSM) OnAfter

func (f *WorkflowFSM) OnAfter(from, to schema.WorkflowStatus, hook TransitionHook)

OnAfter registers a hook called after a workflow transition.

func (*WorkflowFSM) OnBefore

func (f *WorkflowFSM) OnBefore(from, to schema.WorkflowStatus, hook TransitionHook)

OnBefore registers a hook called before a workflow transition.

func (*WorkflowFSM) Transition

func (f *WorkflowFSM) Transition(ctx context.Context, workflowID string, from, to schema.WorkflowStatus) error

Transition validates and executes a workflow state transition. It emits the corresponding event via the appender. The caller (Executor) is responsible for persisting the new state to the store.

type WorkflowStatus

type WorkflowStatus struct {
	WorkflowID       string                      `json:"workflow_id"`
	Status           schema.WorkflowStatus       `json:"status"`
	Steps            map[string]*store.StepState `json:"steps,omitempty"`
	PendingDecisions []*store.PendingDecision    `json:"pending_decisions,omitempty"`
	Context          *store.WorkflowContext      `json:"context,omitempty"`
	Events           []*store.Event              `json:"events,omitempty"`
}

WorkflowStatus is a snapshot of a workflow's current state for querying.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL