flow

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Overview

Package flow contains pure business logic for ArkTeam DAG execution. It has no dependency on Kubernetes controller-runtime or Redis — the operator and the ark CLI both import it.

Index

Constants

View Source
const InjectionDefenceFragment = `` /* 243-byte string literal not displayed */

InjectionDefenceFragment is appended to the system prompt of every agent pod managed by the operator. It instructs the agent to treat <ark:step-output> content as untrusted inter-step data regardless of what it says (RFC-0016 Area 4a).

Variables

This section is empty.

Functions

func BuildRunStatusByName

func BuildRunStatusByName(f *arkonisv1alpha1.ArkRun) map[string]*arkonisv1alpha1.ArkFlowStepStatus

BuildRunStatusByName returns a name→status pointer map for ArkRun pipeline step lookups.

func BuildRunTemplateData

func BuildRunTemplateData(
	f *arkonisv1alpha1.ArkRun,
	statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus,
) map[string]any

BuildRunTemplateData assembles the Go template context for an ArkRun pipeline. Template keys:

  • .input.<key> — pipeline input values (spec.input)
  • .steps.<name>.output — wrapped step output (ark:step-output envelope)
  • .steps.<name>.data — parsed JSON map (only when OutputJSON is populated)
  • .steps.<name>.artifacts.<k> — file artifact URL/path (when artifactStore is configured)

func DepsSucceeded

func DepsSucceeded(deps []string, statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus) bool

DepsSucceeded returns true when every name in deps has completed (Succeeded or Skipped).

func EnforceRunTimeout

func EnforceRunTimeout(f *arkonisv1alpha1.ArkRun, now metav1.Time) bool

EnforceRunTimeout sets the ArkRun to Failed if the pipeline timeout has elapsed. Any steps still in Running or Pending are marked Failed so retry can reset them.

func EvaluateRunLoops

func EvaluateRunLoops(
	f *arkonisv1alpha1.ArkRun,
	statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus,
	templateData map[string]any,
)

EvaluateRunLoops checks every Succeeded ArkRun pipeline step that has a Loop spec.

func ExtractJSON

func ExtractJSON(s string) string

ExtractJSON returns the first JSON object or array found in s. It handles two cases:

  1. The whole string is valid JSON — returned as-is.
  2. JSON is wrapped in a markdown code fence (```json ... ``` or ``` ... ```) — the fenced block is extracted and returned.

func InitializeRouteStep added in v0.2.0

func InitializeRouteStep(f *arkonisv1alpha1.ArkRun)

InitializeRouteStep sets up the single synthetic "route" step for a routed-mode ArkRun. It is a no-op if the run is already initialised (phase non-empty).

func InitializeRunSteps

func InitializeRunSteps(f *arkonisv1alpha1.ArkRun)

InitializeRunSteps sets up step statuses for an ArkRun and marks it Running. It is a no-op if the run is already initialised (phase non-empty).

func IsTerminalRunPhase

func IsTerminalRunPhase(phase arkonisv1alpha1.ArkRunPhase) bool

IsTerminalRunPhase reports whether an ArkRun has reached a final state.

func IsTerminalTeamPhase

func IsTerminalTeamPhase(phase arkonisv1alpha1.ArkTeamPhase) bool

IsTerminalTeamPhase reports whether an ArkTeam pipeline has reached a final state.

func IsTruthy

func IsTruthy(s string) bool

IsTruthy returns false for blank, "false", "0", or "no" (case-insensitive).

func ParseRunOutputJSON

func ParseRunOutputJSON(
	f *arkonisv1alpha1.ArkRun,
	statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus,
)

ParseRunOutputJSON parses completed ArkRun pipeline step outputs as JSON when OutputSchema is set.

func ParseSemanticResult

func ParseSemanticResult(response string) (bool, string)

ParseSemanticResult interprets the LLM response from a semantic validation call. Returns (true, "") when the response contains VALID (case-insensitive). Returns (false, reason) when INVALID is found; reason is extracted after the colon. Returns (false, "unexpected response: <resp>") when neither keyword is found.

func ResolveTeamPrompt

func ResolveTeamPrompt(step arkonisv1alpha1.ArkTeamPipelineStep, data map[string]any) (string, error)

ResolveTeamPrompt resolves inputs and optional OutputSchema for an ArkTeam pipeline step.

func ResolveTemplate

func ResolveTemplate(tmplStr string, data map[string]any) (string, error)

ResolveTemplate executes a Go template string against the provided data.

func SemanticValidatorPrompt

func SemanticValidatorPrompt(validatorTmpl, output string) string

SemanticValidatorPrompt builds the prompt sent to the LLM for semantic validation. validatorTmpl is the user-supplied criteria from StepValidation.Semantic. output is the step's raw output text.

func SetRunCondition

func SetRunCondition(f *arkonisv1alpha1.ArkRun, status metav1.ConditionStatus, reason, message string)

SetRunCondition updates the "Ready" condition on an ArkRun status.

func SetTeamCondition

func SetTeamCondition(f *arkonisv1alpha1.ArkTeam, status metav1.ConditionStatus, reason, message string)

SetTeamCondition updates the "Ready" condition on an ArkTeam status.

func SumRunStepTokens

func SumRunStepTokens(f *arkonisv1alpha1.ArkRun) int64

SumRunStepTokens accumulates token counts and cost for ArkRun pipeline steps.

func ToInt64

func ToInt64(v any) int64

ToInt64 coerces a Redis value (string or int64) to int64.

func UpdateRunPipelinePhase

func UpdateRunPipelinePhase(f *arkonisv1alpha1.ArkRun, templateData map[string]any)

UpdateRunPipelinePhase inspects ArkRun pipeline step statuses and transitions to Succeeded or Failed.

func ValidateRunDAG

func ValidateRunDAG(f *arkonisv1alpha1.ArkRun) error

ValidateRunDAG checks an ArkRun pipeline spec for unknown dependencies and cycles.

func ValidateStepOutput

func ValidateStepOutput(output string, v *arkonisv1alpha1.StepValidation) (bool, string)

ValidateStepOutput runs the contains and schema checks from v against output. Semantic validation is intentionally excluded here — it requires an LLM call and is handled separately by the caller (goroutine in controller, inline in ark run).

Returns (true, "") when all configured checks pass. Returns (false, reason) on the first failing check. If v is nil or no non-semantic checks are configured, returns (true, "").

func ValidateTeamDAG

func ValidateTeamDAG(f *arkonisv1alpha1.ArkTeam) error

ValidateTeamDAG checks the ArkTeam pipeline steps for unknown dependencies and cycles.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL