Documentation
¶
Overview ¶
Package observability — k8sevents.go provides a best-effort Kubernetes Event emitter for agent pods. When the pod is not running inside a Kubernetes cluster, all calls are no-ops so local development is unaffected.
Package observability initialises OpenTelemetry for the operator and agent runtime. Call Init (cluster mode, OTLP export) or InitStdout (local dev, pretty-print to stdout). Both return a shutdown function that must be deferred by the caller.
When OTEL_EXPORTER_OTLP_ENDPOINT is empty and Init is called, a no-op provider is installed — safe for local dev and unit tests.
Index ¶
- Constants
- func Init(ctx context.Context, serviceName string) (shutdown func(), err error)
- func InitStdout(ctx context.Context, serviceName string) (shutdown func(), err error)
- func Meter(name string) metric.Meter
- func PrintRemoteTraceTree(w io.Writer, spans []RemoteSpan)
- func PrintTraceTree(w io.Writer, spans []sdktrace.ReadOnlySpan)
- func Tracer(name string) trace.Tracer
- type AgentEventRecorder
- type AgentMetrics
- func (am *AgentMetrics) RecordDelegate(ctx context.Context, attrs ...attribute.KeyValue)
- func (am *AgentMetrics) RecordLLMCall(ctx context.Context, since time.Time, inputTokens, outputTokens int64, ...)
- func (am *AgentMetrics) RecordQueueWait(ctx context.Context, enqueuedAt string, attrs ...attribute.KeyValue)
- func (am *AgentMetrics) RecordTaskCompleted(ctx context.Context, since time.Time, attrs ...attribute.KeyValue)
- func (am *AgentMetrics) RecordTaskFailed(ctx context.Context, attrs ...attribute.KeyValue)
- func (am *AgentMetrics) RecordTaskStarted(ctx context.Context, attrs ...attribute.KeyValue)
- func (am *AgentMetrics) RecordToolCall(ctx context.Context, since time.Time, failed bool, attrs ...attribute.KeyValue)
- type OperatorMetrics
- type RemoteSpan
- type SpanCollector
Constants ¶
const (
// EnvOTLPEndpoint is the standard OTel env var for the OTLP collector endpoint.
EnvOTLPEndpoint = "OTEL_EXPORTER_OTLP_ENDPOINT"
)
Variables ¶
This section is empty.
Functions ¶
func Init ¶
Init sets up the global TracerProvider and MeterProvider for cluster mode. If OTEL_EXPORTER_OTLP_ENDPOINT is empty, no-op providers are installed and the returned shutdown is a no-op — no collector required.
Wires W3C TraceContext propagation globally so trace context flows through task.Meta["traceparent"] across Redis queue boundaries.
func InitStdout ¶
InitStdout sets up a stdout trace exporter for local `ark run` mode. Spans are written to stdout as JSON as they complete. The returned shutdown must be deferred — it flushes any pending spans. Metrics are no-op in stdout mode (not useful for single local runs).
func PrintRemoteTraceTree ¶
func PrintRemoteTraceTree(w io.Writer, spans []RemoteSpan)
PrintRemoteTraceTree renders a slice of RemoteSpan as a human-readable tree. Format is consistent with PrintTraceTree (used by the in-process collector).
func PrintTraceTree ¶
func PrintTraceTree(w io.Writer, spans []sdktrace.ReadOnlySpan)
PrintTraceTree renders the collected spans as a human-readable tree to w. Spans are grouped by trace, then printed parent-first with indentation. Format mirrors the --watch step output for visual consistency.
Types ¶
type AgentEventRecorder ¶
type AgentEventRecorder struct {
// contains filtered or unexported fields
}
AgentEventRecorder emits Kubernetes Events on the ArkAgent object that owns this pod. All methods are safe to call when not running inside a cluster — they become no-ops.
func NewAgentEventRecorder ¶
func NewAgentEventRecorder(namespace, agentName string) *AgentEventRecorder
NewAgentEventRecorder attempts to create an in-cluster EventRecorder for the given ArkAgent. Returns a no-op recorder (no error) when not in a cluster.
func (*AgentEventRecorder) TaskCompleted ¶
func (r *AgentEventRecorder) TaskCompleted(taskID string, inputTokens, outputTokens int64)
TaskCompleted emits a Normal event when a task finishes successfully.
func (*AgentEventRecorder) TaskDelegated ¶
func (r *AgentEventRecorder) TaskDelegated(taskID, toRole, childTaskID string)
TaskDelegated emits a Normal event when a task is delegated to another role.
func (*AgentEventRecorder) TaskFailed ¶
func (r *AgentEventRecorder) TaskFailed(taskID, reason string)
TaskFailed emits a Warning event when a task errors out.
func (*AgentEventRecorder) TaskStarted ¶
func (r *AgentEventRecorder) TaskStarted(taskID, role string)
TaskStarted emits a Normal event when a task begins processing.
type AgentMetrics ¶
type AgentMetrics struct {
// contains filtered or unexported fields
}
AgentMetrics holds all OTel instruments for the agent runtime. Obtain one via NewAgentMetrics and reuse it for the lifetime of the process.
func NewAgentMetrics ¶
func NewAgentMetrics() (*AgentMetrics, error)
NewAgentMetrics creates and registers all agent runtime instruments.
func (*AgentMetrics) RecordDelegate ¶
func (am *AgentMetrics) RecordDelegate(ctx context.Context, attrs ...attribute.KeyValue)
RecordDelegate increments the delegate submission counter.
func (*AgentMetrics) RecordLLMCall ¶
func (am *AgentMetrics) RecordLLMCall(ctx context.Context, since time.Time, inputTokens, outputTokens int64, attrs ...attribute.KeyValue)
RecordLLMCall records a single LLM round-trip duration and token usage.
func (*AgentMetrics) RecordQueueWait ¶
func (am *AgentMetrics) RecordQueueWait(ctx context.Context, enqueuedAt string, attrs ...attribute.KeyValue)
RecordQueueWait records queue wait time parsed from the enqueued_at RFC3339 timestamp. If enqueuedAt is empty or unparseable, the observation is skipped.
func (*AgentMetrics) RecordTaskCompleted ¶
func (am *AgentMetrics) RecordTaskCompleted(ctx context.Context, since time.Time, attrs ...attribute.KeyValue)
RecordTaskCompleted increments the completed counter and records duration.
func (*AgentMetrics) RecordTaskFailed ¶
func (am *AgentMetrics) RecordTaskFailed(ctx context.Context, attrs ...attribute.KeyValue)
RecordTaskFailed increments the failed counter.
func (*AgentMetrics) RecordTaskStarted ¶
func (am *AgentMetrics) RecordTaskStarted(ctx context.Context, attrs ...attribute.KeyValue)
RecordTaskStarted increments the started counter.
func (*AgentMetrics) RecordToolCall ¶
func (am *AgentMetrics) RecordToolCall(ctx context.Context, since time.Time, failed bool, attrs ...attribute.KeyValue)
RecordToolCall records a tool invocation duration and optionally an error.
type OperatorMetrics ¶
type OperatorMetrics struct {
// contains filtered or unexported fields
}
OperatorMetrics holds OTel instruments for the operator reconcile loops.
func NewOperatorMetrics ¶
func NewOperatorMetrics() (*OperatorMetrics, error)
NewOperatorMetrics creates and registers all operator instruments.
func (*OperatorMetrics) RecordReconcile ¶
func (om *OperatorMetrics) RecordReconcile(ctx context.Context, since time.Time, failed bool, attrs ...attribute.KeyValue)
RecordReconcile records reconcile latency and optionally an error.
type RemoteSpan ¶
type RemoteSpan struct {
TraceID string
SpanID string
ParentID string // empty for root spans
Name string
StartTime time.Time
Duration time.Duration
Error bool
ErrorMsg string
}
RemoteSpan is a simplified span representation fetched from a Jaeger-compatible HTTP API. It carries only the fields needed for tree rendering.
func FetchTraceByTaskID ¶
func FetchTraceByTaskID(ctx context.Context, endpoint, service, taskID string) ([]RemoteSpan, error)
FetchTraceByTaskID queries a Jaeger-compatible HTTP API for all spans tagged with `task.id=<taskID>` under the given service. Returns a flat list of spans that can be passed to PrintRemoteTraceTree.
Supported backends:
- Jaeger (default port :16686) — GET /api/traces?service=<svc>&tags=task.id:<id>
- Grafana Tempo (:3200) — GET /api/search?tags=task.id=<id>&service.name=<svc> followed by individual trace fetch at /api/traces/<traceID>
type SpanCollector ¶
type SpanCollector struct {
// contains filtered or unexported fields
}
SpanCollector is an in-memory OTel span exporter used by `ark run --trace` to capture spans locally without requiring a Tempo/Jaeger backend. Implements sdktrace.SpanExporter.
func InitCollector ¶
func InitCollector(ctx context.Context, serviceName string) (*SpanCollector, func(), error)
InitCollector sets up an in-memory TracerProvider for local `ark run --trace` mode. Metrics are no-op (not needed for single local runs). Returns the collector (call .Spans() after shutdown to get all spans) and a shutdown func.
func (*SpanCollector) ExportSpans ¶
func (c *SpanCollector) ExportSpans(_ context.Context, spans []sdktrace.ReadOnlySpan) error
ExportSpans accumulates completed spans in memory.
func (*SpanCollector) Shutdown ¶
func (c *SpanCollector) Shutdown(_ context.Context) error
Shutdown is a no-op — all spans are in memory.
func (*SpanCollector) Spans ¶
func (c *SpanCollector) Spans() []sdktrace.ReadOnlySpan
Spans returns a snapshot of all collected spans.