Documentation
¶
Overview ¶
Package controller contains helpers for creating and managing KEDA ScaledObjects. The operator uses the unstructured client so that it has no hard compile-time dependency on the KEDA CRDs — the cluster just needs KEDA v2 installed at runtime.
Index ¶
- func WithMetrics(r reconcile.Reconciler, controller string) reconcile.Reconciler
- type ArkAgentReconciler
- type ArkBudgetReconciler
- type ArkEventReconciler
- type ArkMemoryReconciler
- type ArkNotifyReconciler
- type ArkRegistryReconciler
- type ArkRunReconciler
- type ArkSecretReconciler
- type ArkSettingsReconciler
- type ArkTeamReconciler
- type CostConfigMapWatcher
- type FireContext
- type NotifyDispatcher
- type NotifyPayload
- type TriggerWebhookServer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithMetrics ¶
func WithMetrics(r reconcile.Reconciler, controller string) reconcile.Reconciler
WithMetrics wraps r so that every Reconcile call records operator metrics. controller is the label value for the "controller" metric attribute (e.g. "arkagent"). If NewOperatorMetrics fails (which only happens on SDK misconfiguration), the original reconciler is returned unwrapped.
Types ¶
type ArkAgentReconciler ¶
type ArkAgentReconciler struct {
client.Client
Scheme *runtime.Scheme
AgentImage string
AgentImagePullPolicy corev1.PullPolicy
// MCPGatewayURL is the base URL of the MCP gateway, e.g.
// "http://ark-mcp-gateway.ark-system.svc:8082". When set, arkAgentRef entries
// in spec.mcpServers are resolved to gateway URLs at reconcile time.
// When empty, agents with arkAgentRef entries will fail reconciliation with
// MCPResolutionError until the gateway is configured.
MCPGatewayURL string
}
ArkAgentReconciler reconciles a ArkAgent object
func (*ArkAgentReconciler) SetupWithManager ¶
func (r *ArkAgentReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ArkBudgetReconciler ¶
type ArkBudgetReconciler struct {
client.Client
Scheme *runtime.Scheme
SpendStore costs.SpendStore
BudgetPolicy costs.BudgetPolicy
NotifyDispatcher *NotifyDispatcher
}
ArkBudgetReconciler reconciles ArkBudget objects. It runs every 5 minutes per budget to refresh spend totals from the SpendStore and fire notifications when thresholds are crossed.
func (*ArkBudgetReconciler) SetupWithManager ¶
func (r *ArkBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error
type ArkEventReconciler ¶
type ArkEventReconciler struct {
client.Client
Scheme *runtime.Scheme
// TriggerWebhookURL is the base URL exposed by the operator's webhook HTTP server.
// Example: "http://ark.ark-system.svc.cluster.local:8092"
// If empty, webhook-type triggers will record an empty webhookURL in status.
TriggerWebhookURL string
Recorder record.EventRecorder
}
ArkEventReconciler reconciles ArkEvent objects. NOTE: RBAC markers for arkevents are in arkagent_controller.go (controller-gen skips this file).
func (*ArkEventReconciler) SetupWithManager ¶
func (r *ArkEventReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager registers the controller and sets up watches.
type ArkMemoryReconciler ¶
ArkMemoryReconciler reconciles a ArkMemory object.
func (*ArkMemoryReconciler) Reconcile ¶
ArkMemory is a configuration resource (analogous to PersistentVolumeClaim). The reconciler validates the spec and sets a Ready condition. ArkAgents reference it by name; the operator reads it during pod construction.
func (*ArkMemoryReconciler) SetupWithManager ¶
func (r *ArkMemoryReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ArkNotifyReconciler ¶
type ArkNotifyReconciler struct {
client.Client
Scheme *runtime.Scheme
Dispatcher *NotifyDispatcher
}
ArkNotifyReconciler reconciles ArkNotify objects.
func (*ArkNotifyReconciler) SetupWithManager ¶
func (r *ArkNotifyReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ArkRegistryReconciler ¶
type ArkRegistryReconciler struct {
client.Client
Scheme *runtime.Scheme
Registry *registry.Registry
TaskQueueURL string // operator-side connection URL
AgentTaskQueueURL string // in-cluster URL injected into pods (may differ from above)
}
ArkRegistryReconciler reconciles ArkRegistry objects.
+kubebuilder:rbac:groups=arkonis.dev,resources=arkregistries,verbs=get;list;watch;create;update;patch;delete +kubebuilder:rbac:groups=arkonis.dev,resources=arkregistries/status,verbs=get;update;patch +kubebuilder:rbac:groups=arkonis.dev,resources=arkregistries/finalizers,verbs=update
func (*ArkRegistryReconciler) SetupWithManager ¶
func (r *ArkRegistryReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ArkRunReconciler ¶
type ArkRunReconciler struct {
client.Client
Scheme *runtime.Scheme
TaskQueueURL string
AgentTaskQueueURL string
TaskQueue queue.TaskQueue
Recorder record.EventRecorder
NotifyDispatcher *NotifyDispatcher
// SemanticValidateFn makes a single-turn LLM call for semantic validation.
// When nil, semantic validation is skipped (Phase 2 feature; wired by cmd/main.go).
SemanticValidateFn func(ctx context.Context, model, prompt string) (string, error)
// RouterFn makes a single-turn LLM call for routed-mode capability dispatch.
// Uses the same signature and builder as SemanticValidateFn.
// When nil, routed-mode runs fail immediately with RoutingFailed.
RouterFn func(ctx context.Context, model, prompt string) (string, error)
// CostProvider translates token counts into dollar costs.
// Defaults to costs.Default() (static pricing) when nil.
CostProvider costs.CostProvider
// SpendStore records historical spend for dashboards and budget enforcement.
// When nil, spend history is not recorded (Phase 1 behaviour).
SpendStore costs.SpendStore
// BudgetPolicy evaluates current spend against ArkBudget limits.
// Defaults to costs.DefaultBudgetPolicy() when nil.
BudgetPolicy costs.BudgetPolicy
// Registry is the shared capability index maintained by ArkRegistryController.
// When nil, registryLookup steps fail immediately.
Registry *registry.Registry
}
ArkRunReconciler reconciles ArkRun objects.
func (*ArkRunReconciler) SetupWithManager ¶
func (r *ArkRunReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ArkSecretReconciler ¶
ArkSecretReconciler reconciles ArkSecret objects.
func (*ArkSecretReconciler) SetupWithManager ¶
func (r *ArkSecretReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ArkSettingsReconciler ¶
ArkSettingsReconciler reconciles a ArkSettings object
func (*ArkSettingsReconciler) Reconcile ¶
func (r *ArkSettingsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
ArkSettings is a storage-only resource (analogous to ConfigMap). The reconciler just acknowledges the resource and sets a Ready condition. ArkAgents reference it by name; the operator reads it during pod construction.
func (*ArkSettingsReconciler) SetupWithManager ¶
func (r *ArkSettingsReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type ArkTeamReconciler ¶
type ArkTeamReconciler struct {
client.Client
Scheme *runtime.Scheme
TaskQueueURL string // operator's own Redis connection URL
AgentTaskQueueURL string // Redis URL injected into agent pods (may differ when operator runs outside cluster)
TaskQueue queue.TaskQueue
Recorder record.EventRecorder
}
ArkTeamReconciler reconciles ArkTeam objects.
func (*ArkTeamReconciler) SetupWithManager ¶
func (r *ArkTeamReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type CostConfigMapWatcher ¶
type CostConfigMapWatcher struct {
// contains filtered or unexported fields
}
CostConfigMapWatcher is a controller-runtime Runnable that polls a named ConfigMap and reloads the ConfigMapCostProvider whenever its data changes. Polling interval is 60 seconds — pricing changes are not latency-sensitive.
func NewCostConfigMapWatcher ¶
func NewCostConfigMapWatcher( c client.Client, namespace, name string, provider *costs.ConfigMapCostProvider, ) *CostConfigMapWatcher
NewCostConfigMapWatcher creates a watcher that keeps provider in sync with the ConfigMap identified by namespace/name.
type FireContext ¶
type FireContext struct {
Name string
FiredAt string
Output string // upstream team output (team-output type)
Body map[string]any // webhook request body fields
StreamKey string // optional Redis List key for token-level streaming (SSE mode)
}
FireContext carries information about a trigger firing event, used to resolve input templates on dispatched team pipelines.
type NotifyDispatcher ¶
type NotifyDispatcher struct {
// contains filtered or unexported fields
}
NotifyDispatcher dispatches ArkNotify policies on behalf of controllers.
func NewNotifyDispatcher ¶
func NewNotifyDispatcher(c client.Client) *NotifyDispatcher
NewNotifyDispatcher creates a dispatcher with a 10-second HTTP timeout.
func (*NotifyDispatcher) DispatchBudget ¶
func (d *NotifyDispatcher) DispatchBudget( ctx context.Context, budget *arkonisv1alpha1.ArkBudget, decision costs.BudgetDecision, event arkonisv1alpha1.NotifyEvent, policy *arkonisv1alpha1.ArkNotify, )
DispatchBudget fires a budget notification for a phase transition on an ArkBudget. Best-effort: errors are logged but do not affect the reconcile result.
func (*NotifyDispatcher) DispatchRun ¶
func (d *NotifyDispatcher) DispatchRun(ctx context.Context, run *arkonisv1alpha1.ArkRun)
DispatchRun fires notifications for a terminal ArkRun phase transition. It looks up the parent ArkTeam's notifyRef and dispatches to all matching channels. Best-effort: errors are logged but do not affect the reconcile result.
func (*NotifyDispatcher) DispatchTest ¶
func (d *NotifyDispatcher) DispatchTest(ctx context.Context, policy *arkonisv1alpha1.ArkNotify)
DispatchTest fires a synthetic TeamFailed payload through all channels of the given ArkNotify policy. Used by the dashboard Test button. Rate limiting is intentionally bypassed for test dispatches.
type NotifyPayload ¶
type NotifyPayload struct {
// Event is the notification trigger type.
Event arkonisv1alpha1.NotifyEvent `json:"event"`
// Team is the ArkTeam name (or ArkAgent name for AgentDegraded).
Team string `json:"team"`
Namespace string `json:"namespace"`
// Phase is the terminal phase of the run.
Phase string `json:"phase"`
// RunName is the ArkRun name.
RunName string `json:"runName,omitempty"`
// FailedStep is the name of the first step that reached Failed phase.
FailedStep string `json:"failedStep,omitempty"`
// TotalTokens is the sum of tokens for the run.
TotalTokens int64 `json:"totalTokens,omitempty"`
// DurationMs is wall-clock time from StartTime to CompletionTime.
DurationMs int64 `json:"durationMs,omitempty"`
// Output is the resolved pipeline output, capped at 500 characters.
Output string `json:"output,omitempty"`
// Message is a human-readable summary line.
Message string `json:"message"`
// DashboardURL is a deep-link to the team in ark-dashboard (optional).
DashboardURL string `json:"dashboardURL,omitempty"`
}
NotifyPayload is the JSON body sent to every notification channel.
type TriggerWebhookServer ¶
type TriggerWebhookServer struct {
// contains filtered or unexported fields
}
TriggerWebhookServer handles inbound HTTP requests that fire webhook-type ArkEvents.
Async (default):
POST /triggers/{namespace}/{name}/fire
→ 202 Accepted { "fired": true, "firedAt": "...", "trigger": "...", "targets": N }
Sync mode — holds the connection open until the flow completes:
POST /triggers/{namespace}/{name}/fire?mode=sync
POST /triggers/{namespace}/{name}/fire?mode=sync&timeout=30s
→ 200 OK { "status": "succeeded", "output": "...", "durationMs": N, "tokenUsage": { "inputTokens": N, "outputTokens": N, "totalTokens": N } }
→ 500 { "status": "failed", "error": "..." }
→ 504 { "error": "timed out" }
SSE mode — streams progress events and individual tokens as the flow runs:
POST /triggers/{namespace}/{name}/fire?mode=sse
POST /triggers/{namespace}/{name}/fire?mode=sse&timeout=5m
→ text/event-stream
event: step.started
data: {"step":"<name>","phase":"Running"}
event: token
data: {"token":"<text chunk>"} (one event per generated token)
event: step.completed
data: {"step":"<name>","output":"...","tokenUsage":{...},"durationMs":N}
event: flow.completed
data: {"status":"succeeded","output":"...","tokenUsage":{...},"durationMs":N}
event: flow.failed
data: {"status":"failed","error":"...","durationMs":N}
event: error
data: {"error":"timed out"}
Sync and SSE modes require exactly one target flow. The default timeout is 60s; maximum is 5m.
Authentication: pass the trigger's token as a Bearer token in the Authorization header. The token is stored in a Secret named <trigger-name>-webhook-token in the same namespace. Token comparison is constant-time to prevent timing attacks.
The request body is optional JSON. Fields are available in target input templates as {{ .trigger.body.<field> }}.
func NewTriggerWebhookServer ¶
func NewTriggerWebhookServer(r *ArkEventReconciler, watcher client.WithWatch, stream queue.StreamChannel) *TriggerWebhookServer
NewTriggerWebhookServer returns a new TriggerWebhookServer. watcher must be a client.WithWatch (the manager's cache-backed client satisfies this). stream may be nil to disable token streaming in SSE mode.
func (*TriggerWebhookServer) ServeHTTP ¶
func (s *TriggerWebhookServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler.
Source Files
¶
- arkagent_controller.go
- arkbudget_controller.go
- arkevent_controller.go
- arkmemory_controller.go
- arknotify_controller.go
- arkregistry_controller.go
- arkrun_controller.go
- arksecret_controller.go
- arksettings_controller.go
- arkteam_controller.go
- cost_configmap_watcher.go
- helpers.go
- keda.go
- metrics_reconciler.go
- notify.go
- trigger_webhook_server.go