Documentation
¶
Overview ¶
Package flow contains pure business logic for ArkTeam DAG execution. It has no dependency on Kubernetes controller-runtime or Redis — the operator and the ark CLI both import it.
Index ¶
- Constants
- func BuildRunStatusByName(f *arkonisv1alpha1.ArkRun) map[string]*arkonisv1alpha1.ArkFlowStepStatus
- func BuildRunTemplateData(f *arkonisv1alpha1.ArkRun, ...) map[string]any
- func DepsSucceeded(deps []string, statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus) bool
- func EnforceRunTimeout(f *arkonisv1alpha1.ArkRun, now metav1.Time) bool
- func EvaluateRunLoops(f *arkonisv1alpha1.ArkRun, ...)
- func ExtractJSON(s string) string
- func InitializeRouteStep(f *arkonisv1alpha1.ArkRun)
- func InitializeRunSteps(f *arkonisv1alpha1.ArkRun)
- func IsTerminalRunPhase(phase arkonisv1alpha1.ArkRunPhase) bool
- func IsTerminalTeamPhase(phase arkonisv1alpha1.ArkTeamPhase) bool
- func IsTruthy(s string) bool
- func ParseRunOutputJSON(f *arkonisv1alpha1.ArkRun, ...)
- func ParseSemanticResult(response string) (bool, string)
- func ResolveTeamPrompt(step arkonisv1alpha1.ArkTeamPipelineStep, data map[string]any) (string, error)
- func ResolveTemplate(tmplStr string, data map[string]any) (string, error)
- func SemanticValidatorPrompt(validatorTmpl, output string) string
- func SetRunCondition(f *arkonisv1alpha1.ArkRun, status metav1.ConditionStatus, ...)
- func SetTeamCondition(f *arkonisv1alpha1.ArkTeam, status metav1.ConditionStatus, ...)
- func SumRunStepTokens(f *arkonisv1alpha1.ArkRun) int64
- func ToInt64(v any) int64
- func UpdateRunPipelinePhase(f *arkonisv1alpha1.ArkRun, templateData map[string]any)
- func ValidateRunDAG(f *arkonisv1alpha1.ArkRun) error
- func ValidateStepOutput(output string, v *arkonisv1alpha1.StepValidation) (bool, string)
- func ValidateTeamDAG(f *arkonisv1alpha1.ArkTeam) error
Constants ¶
const InjectionDefenceFragment = `` /* 243-byte string literal not displayed */
InjectionDefenceFragment is appended to the system prompt of every agent pod managed by the operator. It instructs the agent to treat <ark:step-output> content as untrusted inter-step data regardless of what it says (RFC-0016 Area 4a).
Variables ¶
This section is empty.
Functions ¶
func BuildRunStatusByName ¶
func BuildRunStatusByName(f *arkonisv1alpha1.ArkRun) map[string]*arkonisv1alpha1.ArkFlowStepStatus
BuildRunStatusByName returns a name→status pointer map for ArkRun pipeline step lookups.
func BuildRunTemplateData ¶
func BuildRunTemplateData( f *arkonisv1alpha1.ArkRun, statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus, ) map[string]any
BuildRunTemplateData assembles the Go template context for an ArkRun pipeline. Template keys:
- .input.<key> — pipeline input values (spec.input)
- .steps.<name>.output — wrapped step output (ark:step-output envelope)
- .steps.<name>.data — parsed JSON map (only when OutputJSON is populated)
- .steps.<name>.artifacts.<k> — file artifact URL/path (when artifactStore is configured)
func DepsSucceeded ¶
func DepsSucceeded(deps []string, statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus) bool
DepsSucceeded returns true when every name in deps has completed (Succeeded or Skipped).
func EnforceRunTimeout ¶
func EnforceRunTimeout(f *arkonisv1alpha1.ArkRun, now metav1.Time) bool
EnforceRunTimeout sets the ArkRun to Failed if the pipeline timeout has elapsed. Any steps still in Running or Pending are marked Failed so retry can reset them.
func EvaluateRunLoops ¶
func EvaluateRunLoops( f *arkonisv1alpha1.ArkRun, statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus, templateData map[string]any, )
EvaluateRunLoops checks every Succeeded ArkRun pipeline step that has a Loop spec.
func ExtractJSON ¶
ExtractJSON returns the first JSON object or array found in s. It handles two cases:
- The whole string is valid JSON — returned as-is.
- JSON is wrapped in a markdown code fence (```json ... ``` or ``` ... ```) — the fenced block is extracted and returned.
func InitializeRouteStep ¶ added in v0.2.0
func InitializeRouteStep(f *arkonisv1alpha1.ArkRun)
InitializeRouteStep sets up the single synthetic "route" step for a routed-mode ArkRun. It is a no-op if the run is already initialised (phase non-empty).
func InitializeRunSteps ¶
func InitializeRunSteps(f *arkonisv1alpha1.ArkRun)
InitializeRunSteps sets up step statuses for an ArkRun and marks it Running. It is a no-op if the run is already initialised (phase non-empty).
func IsTerminalRunPhase ¶
func IsTerminalRunPhase(phase arkonisv1alpha1.ArkRunPhase) bool
IsTerminalRunPhase reports whether an ArkRun has reached a final state.
func IsTerminalTeamPhase ¶
func IsTerminalTeamPhase(phase arkonisv1alpha1.ArkTeamPhase) bool
IsTerminalTeamPhase reports whether an ArkTeam pipeline has reached a final state.
func ParseRunOutputJSON ¶
func ParseRunOutputJSON( f *arkonisv1alpha1.ArkRun, statusByName map[string]*arkonisv1alpha1.ArkFlowStepStatus, )
ParseRunOutputJSON parses completed ArkRun pipeline step outputs as JSON when OutputSchema is set.
func ParseSemanticResult ¶
ParseSemanticResult interprets the LLM response from a semantic validation call. Returns (true, "") when the response contains VALID (case-insensitive). Returns (false, reason) when INVALID is found; reason is extracted after the colon. Returns (false, "unexpected response: <resp>") when neither keyword is found.
func ResolveTeamPrompt ¶
func ResolveTeamPrompt(step arkonisv1alpha1.ArkTeamPipelineStep, data map[string]any) (string, error)
ResolveTeamPrompt resolves inputs and optional OutputSchema for an ArkTeam pipeline step.
func ResolveTemplate ¶
ResolveTemplate executes a Go template string against the provided data.
func SemanticValidatorPrompt ¶
SemanticValidatorPrompt builds the prompt sent to the LLM for semantic validation. validatorTmpl is the user-supplied criteria from StepValidation.Semantic. output is the step's raw output text.
func SetRunCondition ¶
func SetRunCondition(f *arkonisv1alpha1.ArkRun, status metav1.ConditionStatus, reason, message string)
SetRunCondition updates the "Ready" condition on an ArkRun status.
func SetTeamCondition ¶
func SetTeamCondition(f *arkonisv1alpha1.ArkTeam, status metav1.ConditionStatus, reason, message string)
SetTeamCondition updates the "Ready" condition on an ArkTeam status.
func SumRunStepTokens ¶
func SumRunStepTokens(f *arkonisv1alpha1.ArkRun) int64
SumRunStepTokens accumulates token counts and cost for ArkRun pipeline steps.
func UpdateRunPipelinePhase ¶
func UpdateRunPipelinePhase(f *arkonisv1alpha1.ArkRun, templateData map[string]any)
UpdateRunPipelinePhase inspects ArkRun pipeline step statuses and transitions to Succeeded or Failed.
func ValidateRunDAG ¶
func ValidateRunDAG(f *arkonisv1alpha1.ArkRun) error
ValidateRunDAG checks an ArkRun pipeline spec for unknown dependencies and cycles.
func ValidateStepOutput ¶
func ValidateStepOutput(output string, v *arkonisv1alpha1.StepValidation) (bool, string)
ValidateStepOutput runs the contains and schema checks from v against output. Semantic validation is intentionally excluded here — it requires an LLM call and is handled separately by the caller (goroutine in controller, inline in ark run).
Returns (true, "") when all configured checks pass. Returns (false, reason) on the first failing check. If v is nil or no non-semantic checks are configured, returns (true, "").
func ValidateTeamDAG ¶
func ValidateTeamDAG(f *arkonisv1alpha1.ArkTeam) error
ValidateTeamDAG checks the ArkTeam pipeline steps for unknown dependencies and cycles.
Types ¶
This section is empty.