observe

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

README

observe

Default EventSink implementations for agentcore packages — typed-event observers for logs, metrics, and custom side-effects. Each implementation is a standalone sink; combine what you need with Tee.

Sink Purpose Dispatch
observe.Logger(*slog.Logger) Structured logs, one slog record per event synchronous
observe.Counter(metric.Meter) One OTel counter per event name, tagged by level + errored synchronous
observe.NewHandlers(observe.Limits{}) Typed-callback registry — register with observe.On[E] async, bounded queue, drop-on-overflow
observe.Tee(...) Fans an event out to several sinks in declared order passthrough

Tracing is not in this package. agentcore packages emit OTel spans directly via otel.Tracer(...) at their internal boundaries (see workflow/tracing.go). observe is the channel for typed domain events; OTel is the channel for spans. The two are complementary: spans give you the structural timeline of a run; events give you the lifecycle moments worth logging or counting.

Usage

observe works against any value that satisfies observe.Event. The producer of the events doesn't matter — observe doesn't import any agentcore producer package. Define an event type, fire it through a sink, and observe.

import (
    "context"
    "log/slog"

    "go.opentelemetry.io/otel"

    "github.com/vinayprograms/agentcore/observe"
)

// 1. Define an event type that satisfies observe.Event.
type RequestCompleted struct {
    RequestID string
    Failure   error
}

func (e RequestCompleted) Name() string { return "request.completed" }
func (e RequestCompleted) Level() slog.Level {
    if e.Failure != nil {
        return slog.LevelError
    }
    return slog.LevelInfo
}
func (e RequestCompleted) Attrs() []slog.Attr {
    return []slog.Attr{slog.String("request_id", e.RequestID)}
}
func (e RequestCompleted) Err() error { return e.Failure }

// 2. Build the sink. Each piece is independent; combine with Tee.
handlers := observe.NewHandlers(observe.Limits{})
defer handlers.Close()

observe.On(handlers, func(ctx context.Context, e RequestCompleted) {
    if e.Err() != nil {
        // custom side effect: metric, alert, downstream call, etc.
    }
})

sink := observe.Tee(
    observe.Logger(slog.Default()),       // structured logs
    observe.Counter(otel.Meter("svc")),   // OTel metric counters
    handlers,                             // typed callbacks
)

// 3. Fire events.
sink.Fire(context.Background(), RequestCompleted{RequestID: "r-42"})

Any producer that defines its events to satisfy this interface plugs into the same sinks. See the Producers section below for what's wired today.

Producers

Any package that fires events through an observe.EventSink is a producer. The producer defines its event types (each implementing observe.Event) and exposes a sink-shaped field on its config. The same observe sinks consume them all without code changes.

The workflow package is one such producer. It fires WorkflowStarted, GoalEnded, etc. through Runtime.Telemetry:

import (
    "github.com/vinayprograms/agentcore/observe"
    "github.com/vinayprograms/agentcore/workflow"
)

rt := &workflow.Runtime{
    Model:     model,
    Telemetry: observe.Tee(
        observe.Logger(slog.Default()),
        observe.Counter(otel.Meter("agent")),
    ),
}

Counter

Every event becomes an OTel int64 counter under agentcore.<event-name>, tagged with level and errored:

agentcore.workflow.started{level=info,errored=false}
agentcore.goal.ended{level=info,errored=false}
agentcore.subagent.completed{level=error,errored=true}
agentcore.preflight.failed{level=error,errored=true}
...

That's enough to graph rates, failure ratios per event, and error budgets without wiring per-event rules:

rate(agentcore_goal_ended_total{errored="false"}[5m])
sum by (event) (rate(agentcore_*_total{errored="true"}[5m]))

A new event type added in any agentcore package automatically gets its own counter the first time it fires.

drop-on-overflow design

A slow handler (network call, disk write, etc.) must not block the producer. Handlers enqueues onto a bounded channel (default capacity 1024); when the queue is full, Fire increments Dropped() and returns immediately.

Size the queue at construction via observe.Limits:

// Size the queue to absorb up to 4096 in-flight events.
handlers := observe.NewHandlers(observe.Limits{QueueSize: 4096})
defer handlers.Close()

// Periodically check for drops — emit a metric, log a warning, or fail
// the build if the count is non-zero.
if dropped := handlers.Dropped(); dropped > 0 {
    slog.Warn("observe handlers dropped events", "count", dropped)
}

Pass observe.Limits{} (zero value) to use the default queue capacity of 1024. Limits is the home for any future per-handler resource cap (timeout, overflow action) — they'd be added as fields on the same struct, so consumer code never breaks when we grow the surface.

Logger and Counter are synchronous because slog and OTel batch downstream — the extra goroutine layer would only add latency and lose ordering guarantees.

How dispatch works

Logger and Counter program against the observe.Event interface, not against specific event types. Every event from any agentcore package implements Event (Name, Level, Attrs, Err). Both sinks pull the data out of those methods, so a new event type added to a producer package flows through them automatically — no edits to this package required.

Lifecycle

Handlers.Close() signals the dispatcher to stop, drains anything queued at the time, waits for in-flight handlers to return, then exits. Always pair NewHandlers() with a deferred Close() so events queued at the end of a run don't get dropped on process exit.

Logger and Counter are stateless beyond their counter cache — no Close needed.

Local-only / CLI setup

observe.Logger works without any OTel infrastructure — it writes to whatever *slog.Logger you give it:

slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, nil)))

rt := &workflow.Runtime{
    Model:     model,
    Telemetry: observe.Logger(slog.Default()),
}

If you also want OTel spans (workflow's own per-step spans, agentkit's per-Chat spans) captured locally, install the stdout exporter — no collector required:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
    "go.opentelemetry.io/otel/sdk/trace"
)

exp, _ := stdouttrace.New(stdouttrace.WithWriter(os.Stderr))
otel.SetTracerProvider(trace.NewTracerProvider(trace.WithBatcher(exp)))

After this, every span anywhere in the process — including agentcore's workflow spans and agentkit's LLM-call spans — emits to stderr as JSON. No extra wiring through observe.

Panic safety

A handler that panics does not kill the dispatcher. Panics are recovered silently; the next handler for the same event still runs. The package deliberately doesn't pollute logs from arbitrary user code — if you need to know about handler panics, install your own recover-and-log inside the handler:

observe.On(handlers, func(ctx context.Context, e workflow.GoalEnded) {
    defer func() {
        if r := recover(); r != nil {
            slog.Error("handler panic", "event", e.Name(), "panic", r)
        }
    }()
    riskyHandler(ctx, e)
})

Documentation

Overview

Package observe provides default EventSink implementations for agentcore packages — structured logging via slog, OTel metric counters, and a typed-callback registry. Each implementation is a standalone EventSink; combine what you need with Tee.

sink := observe.Tee(
    observe.Logger(slog.Default()),
    observe.Counter(otel.Meter("agent")),
    observe.NewHandlers(),
)
rt := &workflow.Runtime{Model: m, Telemetry: sink}

Logger and Counter are synchronous — slog and OTel batch downstream, so the extra goroutine layer is wasted overhead. The Handlers registry is async with a bounded queue (drop-on-overflow) so a slow handler can never block the producer.

Tracing is NOT in this package: agentcore packages use OTel directly via otel.Tracer(...) at their internal boundaries (see workflow/tracing.go). observe is the channel for typed domain events; OTel is the channel for spans.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func On

func On[E any](h *Handlers, fn func(context.Context, E))

On registers a typed handler keyed by the static type E. Multiple handlers can register for the same event type; they are invoked in registration order.

observe.On(reg, func(ctx context.Context, e workflow.GoalEnded) { ... })

Types

type Event

type Event interface {
	Name() string       // stable identifier, e.g. "workflow.started"
	Level() slog.Level  // slog.LevelInfo / Warn / Error
	Attrs() []slog.Attr // structured fields, type-safe at construction
	Err() error         // nil unless the event represents a failure
}

Event is the contract every observed value satisfies. agentcore packages (workflow, future supervision, etc.) define their own concrete event types and implement this interface; observers in this package consume them through the interface and never need to switch on the concrete type. New event types added to any agentcore package flow through every observer here automatically.

slog.Attr is the carrier vocabulary because it is stdlib, type-safe at construction, and already what the consumer's logging stack speaks. Tracer-style observers translate slog.Attr to OTel attribute.KeyValue at the boundary where they are needed.

type EventSink

type EventSink interface {
	Fire(ctx context.Context, event any)
}

EventSink receives events from a producer (e.g. a running Workflow). Implementations must be safe for concurrent use — Fire may be called from multiple goroutines (parallel fan-out, async dispatchers).

The interface is single-method on purpose: the producer fires fire-and-forget; the sink decides what to do (log, count, dispatch to handlers, drop). Slow or panicking sinks must not affect the producer.

func Counter

func Counter(meter metric.Meter) EventSink

Counter returns an EventSink that emits one OTel metric counter per observed event, namespaced by the event's Name(). For an event with Name() == "goal.ended", the emitted counter is "agentcore.goal.ended".

Each increment carries two standard attributes derived from the event:

level=info|warn|error   (from Event.Level())
errored=true|false      (true iff Event.Err() != nil)

That lets operators graph rates and failure ratios per event type without configuring per-event rules:

rate(agentcore_goal_ended_total{errored="false"}[5m])
sum by (event) (rate(agentcore_*_total{errored="true"}[5m]))

Counter caches one Int64Counter instrument per event name (sync.Map). Instruments that fail to register (e.g. invalid name) are skipped silently — observability must not break the producer.

New event types in any agentcore package automatically get their own counter the first time they fire — no code changes here.

func Logger

func Logger(l *slog.Logger) EventSink

Logger returns an EventSink that emits one structured slog record per event. The event itself supplies its name, level, attribute list, and (when applicable) error — see Event. A non-nil Err is attached as an "err" attribute so the structured log retains the failure cause.

The sink is synchronous because slog handlers either are fast (text / JSON to a writer) or do their own batching. Wrapping in a goroutine here would only add overhead.

New event types in any agentcore package automatically flow through this sink without code changes — the dispatch is data-driven via the Event interface.

func Tee

func Tee(sinks ...EventSink) EventSink

Tee fans an event out to every sink in declared order, synchronously. Each sink is fire-and-forget — failure or slowness in one does not affect the others (the EventSink contract has no error return).

Use Tee to compose Logger + Counter + Handlers into the single sink that a producer's Telemetry field expects. A zero-arg Tee() is a valid no-op.

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

Handlers is an EventSink that routes events to typed callbacks registered via the package-level On generic function. Registration is keyed by the event's Go type, so a callback registered for workflow.GoalEnded only fires on GoalEnded values — no type switches in consumer code. The registry doesn't need to know about specific producer packages; any value can be Fire'd, and any type used as E in On[E] will be routed.

Dispatch is asynchronous via a bounded queue (default size 1024). When the queue is full, Fire returns immediately and the dropped event is counted in Dropped(). This is the explicit policy choice: a slow handler must never block the workflow's main thread.

Handlers are invoked in registration order, one event at a time, on a dedicated goroutine. Handler panics are recovered and ignored — the goroutine never dies. Use Close to drain the queue and stop the worker at the end of a run.

func NewHandlers

func NewHandlers(limits Limits) *Handlers

NewHandlers builds a Handlers registry and starts its background dispatcher. Always pair with Close at the end of the run so the worker goroutine exits and the queue drains.

Pass observe.Limits{} for defaults, or set QueueSize to size the drop-on-overflow buffer to your expected burst.

func (*Handlers) Close

func (h *Handlers) Close() error

Close signals the worker goroutine to stop, drains any queued events, and returns once the worker has exited. Calling Close more than once is safe — subsequent calls are no-ops.

func (*Handlers) Dropped

func (h *Handlers) Dropped() int64

Dropped returns the number of events the queue rejected because it was full. Useful as a metric / health signal.

func (*Handlers) Fire

func (h *Handlers) Fire(ctx context.Context, event any)

Fire enqueues the event for asynchronous dispatch. Returns immediately; Dropped() increments if the queue is full.

type Limits

type Limits struct {
	// QueueSize is the bounded channel capacity for pending dispatches.
	// Values < 1 fall back to the default of 1024.
	QueueSize int
}

Limits caps the runtime resources a Handlers instance will consume. Today only the queue capacity matters; future fields (e.g. per-handler timeout, overflow action) belong here too. A zero value is valid and uses sensible defaults.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL