controller

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Overview

Package controller contains helpers for creating and managing KEDA ScaledObjects. The operator uses the unstructured client so that it has no hard compile-time dependency on the KEDA CRDs — the cluster just needs KEDA v2 installed at runtime.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithMetrics

func WithMetrics(r reconcile.Reconciler, controller string) reconcile.Reconciler

WithMetrics wraps r so that every Reconcile call records operator metrics. controller is the label value for the "controller" metric attribute (e.g. "arkagent"). If NewOperatorMetrics fails (which only happens on SDK misconfiguration), the original reconciler is returned unwrapped.

Types

type ArkAgentReconciler

type ArkAgentReconciler struct {
	client.Client
	Scheme               *runtime.Scheme
	AgentImage           string
	AgentImagePullPolicy corev1.PullPolicy
	// MCPGatewayURL is the base URL of the MCP gateway, e.g.
	// "http://ark-mcp-gateway.ark-system.svc:8082". When set, arkAgentRef entries
	// in spec.mcpServers are resolved to gateway URLs at reconcile time.
	// When empty, agents with arkAgentRef entries will fail reconciliation with
	// MCPResolutionError until the gateway is configured.
	MCPGatewayURL string
}

ArkAgentReconciler reconciles a ArkAgent object

func (*ArkAgentReconciler) Reconcile

func (r *ArkAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkAgentReconciler) SetupWithManager

func (r *ArkAgentReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ArkBudgetReconciler

type ArkBudgetReconciler struct {
	client.Client
	Scheme           *runtime.Scheme
	SpendStore       costs.SpendStore
	BudgetPolicy     costs.BudgetPolicy
	NotifyDispatcher *NotifyDispatcher
}

ArkBudgetReconciler reconciles ArkBudget objects. It runs every 5 minutes per budget to refresh spend totals from the SpendStore and fire notifications when thresholds are crossed.

func (*ArkBudgetReconciler) Reconcile

func (r *ArkBudgetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkBudgetReconciler) SetupWithManager

func (r *ArkBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error

type ArkEventReconciler

type ArkEventReconciler struct {
	client.Client
	Scheme *runtime.Scheme
	// TriggerWebhookURL is the base URL exposed by the operator's webhook HTTP server.
	// Example: "http://ark.ark-system.svc.cluster.local:8092"
	// If empty, webhook-type triggers will record an empty webhookURL in status.
	TriggerWebhookURL string
	Recorder          record.EventRecorder
}

ArkEventReconciler reconciles ArkEvent objects. NOTE: RBAC markers for arkevents are in arkagent_controller.go (controller-gen skips this file).

func (*ArkEventReconciler) Reconcile

func (r *ArkEventReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkEventReconciler) SetupWithManager

func (r *ArkEventReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager registers the controller and sets up watches.

type ArkMemoryReconciler

type ArkMemoryReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

ArkMemoryReconciler reconciles a ArkMemory object.

func (*ArkMemoryReconciler) Reconcile

func (r *ArkMemoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

ArkMemory is a configuration resource (analogous to PersistentVolumeClaim). The reconciler validates the spec and sets a Ready condition. ArkAgents reference it by name; the operator reads it during pod construction.

func (*ArkMemoryReconciler) SetupWithManager

func (r *ArkMemoryReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ArkNotifyReconciler

type ArkNotifyReconciler struct {
	client.Client
	Scheme     *runtime.Scheme
	Dispatcher *NotifyDispatcher
}

ArkNotifyReconciler reconciles ArkNotify objects.

func (*ArkNotifyReconciler) Reconcile

func (r *ArkNotifyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkNotifyReconciler) SetupWithManager

func (r *ArkNotifyReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ArkRegistryReconciler

type ArkRegistryReconciler struct {
	client.Client
	Scheme            *runtime.Scheme
	Registry          *registry.Registry
	TaskQueueURL      string // operator-side connection URL
	AgentTaskQueueURL string // in-cluster URL injected into pods (may differ from above)
}

ArkRegistryReconciler reconciles ArkRegistry objects.

+kubebuilder:rbac:groups=arkonis.dev,resources=arkregistries,verbs=get;list;watch;create;update;patch;delete +kubebuilder:rbac:groups=arkonis.dev,resources=arkregistries/status,verbs=get;update;patch +kubebuilder:rbac:groups=arkonis.dev,resources=arkregistries/finalizers,verbs=update

func (*ArkRegistryReconciler) Reconcile

func (r *ArkRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkRegistryReconciler) SetupWithManager

func (r *ArkRegistryReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ArkRunReconciler

type ArkRunReconciler struct {
	client.Client
	Scheme            *runtime.Scheme
	TaskQueueURL      string
	AgentTaskQueueURL string
	TaskQueue         queue.TaskQueue
	Recorder          record.EventRecorder
	NotifyDispatcher  *NotifyDispatcher
	// SemanticValidateFn makes a single-turn LLM call for semantic validation.
	// When nil, semantic validation is skipped (Phase 2 feature; wired by cmd/main.go).
	SemanticValidateFn func(ctx context.Context, model, prompt string) (string, error)
	// RouterFn makes a single-turn LLM call for routed-mode capability dispatch.
	// Uses the same signature and builder as SemanticValidateFn.
	// When nil, routed-mode runs fail immediately with RoutingFailed.
	RouterFn func(ctx context.Context, model, prompt string) (string, error)
	// CostProvider translates token counts into dollar costs.
	// Defaults to costs.Default() (static pricing) when nil.
	CostProvider costs.CostProvider
	// SpendStore records historical spend for dashboards and budget enforcement.
	// When nil, spend history is not recorded (Phase 1 behaviour).
	SpendStore costs.SpendStore
	// BudgetPolicy evaluates current spend against ArkBudget limits.
	// Defaults to costs.DefaultBudgetPolicy() when nil.
	BudgetPolicy costs.BudgetPolicy
	// Registry is the shared capability index maintained by ArkRegistryController.
	// When nil, registryLookup steps fail immediately.
	Registry *registry.Registry
}

ArkRunReconciler reconciles ArkRun objects.

func (*ArkRunReconciler) Reconcile

func (r *ArkRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkRunReconciler) SetupWithManager

func (r *ArkRunReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ArkSecretReconciler

type ArkSecretReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

ArkSecretReconciler reconciles ArkSecret objects.

func (*ArkSecretReconciler) Reconcile

func (r *ArkSecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkSecretReconciler) SetupWithManager

func (r *ArkSecretReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ArkSettingsReconciler

type ArkSettingsReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

ArkSettingsReconciler reconciles a ArkSettings object

func (*ArkSettingsReconciler) Reconcile

func (r *ArkSettingsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

ArkSettings is a storage-only resource (analogous to ConfigMap). The reconciler just acknowledges the resource and sets a Ready condition. ArkAgents reference it by name; the operator reads it during pod construction.

func (*ArkSettingsReconciler) SetupWithManager

func (r *ArkSettingsReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type ArkTeamReconciler

type ArkTeamReconciler struct {
	client.Client
	Scheme            *runtime.Scheme
	TaskQueueURL      string // operator's own Redis connection URL
	AgentTaskQueueURL string // Redis URL injected into agent pods (may differ when operator runs outside cluster)
	TaskQueue         queue.TaskQueue
	Recorder          record.EventRecorder
}

ArkTeamReconciler reconciles ArkTeam objects.

func (*ArkTeamReconciler) Reconcile

func (r *ArkTeamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*ArkTeamReconciler) SetupWithManager

func (r *ArkTeamReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type CostConfigMapWatcher

type CostConfigMapWatcher struct {
	// contains filtered or unexported fields
}

CostConfigMapWatcher is a controller-runtime Runnable that polls a named ConfigMap and reloads the ConfigMapCostProvider whenever its data changes. Polling interval is 60 seconds — pricing changes are not latency-sensitive.

func NewCostConfigMapWatcher

func NewCostConfigMapWatcher(
	c client.Client, namespace, name string, provider *costs.ConfigMapCostProvider,
) *CostConfigMapWatcher

NewCostConfigMapWatcher creates a watcher that keeps provider in sync with the ConfigMap identified by namespace/name.

func (*CostConfigMapWatcher) Start

func (w *CostConfigMapWatcher) Start(ctx context.Context) error

Start polls the ConfigMap every 60 seconds until ctx is cancelled.

type FireContext

type FireContext struct {
	Name      string
	FiredAt   string
	Output    string         // upstream team output (team-output type)
	Body      map[string]any // webhook request body fields
	StreamKey string         // optional Redis List key for token-level streaming (SSE mode)
}

FireContext carries information about a trigger firing event, used to resolve input templates on dispatched team pipelines.

type NotifyDispatcher

type NotifyDispatcher struct {
	// contains filtered or unexported fields
}

NotifyDispatcher dispatches ArkNotify policies on behalf of controllers.

func NewNotifyDispatcher

func NewNotifyDispatcher(c client.Client) *NotifyDispatcher

NewNotifyDispatcher creates a dispatcher with a 10-second HTTP timeout.

func (*NotifyDispatcher) DispatchBudget

DispatchBudget fires a budget notification for a phase transition on an ArkBudget. Best-effort: errors are logged but do not affect the reconcile result.

func (*NotifyDispatcher) DispatchRun

func (d *NotifyDispatcher) DispatchRun(ctx context.Context, run *arkonisv1alpha1.ArkRun)

DispatchRun fires notifications for a terminal ArkRun phase transition. It looks up the parent ArkTeam's notifyRef and dispatches to all matching channels. Best-effort: errors are logged but do not affect the reconcile result.

func (*NotifyDispatcher) DispatchTest

func (d *NotifyDispatcher) DispatchTest(ctx context.Context, policy *arkonisv1alpha1.ArkNotify)

DispatchTest fires a synthetic TeamFailed payload through all channels of the given ArkNotify policy. Used by the dashboard Test button. Rate limiting is intentionally bypassed for test dispatches.

type NotifyPayload

type NotifyPayload struct {
	// Event is the notification trigger type.
	Event arkonisv1alpha1.NotifyEvent `json:"event"`
	// Team is the ArkTeam name (or ArkAgent name for AgentDegraded).
	Team      string `json:"team"`
	Namespace string `json:"namespace"`
	// Phase is the terminal phase of the run.
	Phase string `json:"phase"`
	// RunName is the ArkRun name.
	RunName string `json:"runName,omitempty"`
	// FailedStep is the name of the first step that reached Failed phase.
	FailedStep string `json:"failedStep,omitempty"`
	// TotalTokens is the sum of tokens for the run.
	TotalTokens int64 `json:"totalTokens,omitempty"`
	// DurationMs is wall-clock time from StartTime to CompletionTime.
	DurationMs int64 `json:"durationMs,omitempty"`
	// Output is the resolved pipeline output, capped at 500 characters.
	Output string `json:"output,omitempty"`
	// Message is a human-readable summary line.
	Message string `json:"message"`
	// DashboardURL is a deep-link to the team in ark-dashboard (optional).
	DashboardURL string `json:"dashboardURL,omitempty"`
}

NotifyPayload is the JSON body sent to every notification channel.

type TriggerWebhookServer

type TriggerWebhookServer struct {
	// contains filtered or unexported fields
}

TriggerWebhookServer handles inbound HTTP requests that fire webhook-type ArkEvents.

Async (default):

POST /triggers/{namespace}/{name}/fire
→ 202 Accepted  { "fired": true, "firedAt": "...", "trigger": "...", "targets": N }

Sync mode — holds the connection open until the flow completes:

POST /triggers/{namespace}/{name}/fire?mode=sync
POST /triggers/{namespace}/{name}/fire?mode=sync&timeout=30s
→ 200 OK        { "status": "succeeded", "output": "...", "durationMs": N, "tokenUsage": { "inputTokens": N, "outputTokens": N, "totalTokens": N } }
→ 500           { "status": "failed",    "error":  "..." }
→ 504           { "error": "timed out" }

SSE mode — streams progress events and individual tokens as the flow runs:

POST /triggers/{namespace}/{name}/fire?mode=sse
POST /triggers/{namespace}/{name}/fire?mode=sse&timeout=5m
→ text/event-stream
  event: step.started
  data: {"step":"<name>","phase":"Running"}
  event: token
  data: {"token":"<text chunk>"}     (one event per generated token)
  event: step.completed
  data: {"step":"<name>","output":"...","tokenUsage":{...},"durationMs":N}
  event: flow.completed
  data: {"status":"succeeded","output":"...","tokenUsage":{...},"durationMs":N}
  event: flow.failed
  data: {"status":"failed","error":"...","durationMs":N}
  event: error
  data: {"error":"timed out"}

Sync and SSE modes require exactly one target flow. The default timeout is 60s; maximum is 5m.

Authentication: pass the trigger's token as a Bearer token in the Authorization header. The token is stored in a Secret named <trigger-name>-webhook-token in the same namespace. Token comparison is constant-time to prevent timing attacks.

The request body is optional JSON. Fields are available in target input templates as {{ .trigger.body.<field> }}.

func NewTriggerWebhookServer

func NewTriggerWebhookServer(r *ArkEventReconciler, watcher client.WithWatch, stream queue.StreamChannel) *TriggerWebhookServer

NewTriggerWebhookServer returns a new TriggerWebhookServer. watcher must be a client.WithWatch (the manager's cache-backed client satisfies this). stream may be nil to disable token streaming in SSE mode.

func (*TriggerWebhookServer) ServeHTTP

func (s *TriggerWebhookServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL