Documentation
¶
Overview ¶
Package observe provides default EventSink implementations for agentcore packages — structured logging via slog, OTel metric counters, and a typed-callback registry. Each implementation is a standalone EventSink; combine what you need with Tee.
sink := observe.Tee(
observe.Logger(slog.Default()),
observe.Counter(otel.Meter("agent")),
observe.NewHandlers(),
)
rt := &workflow.Runtime{Model: m, Telemetry: sink}
Logger and Counter are synchronous — slog and OTel batch downstream, so the extra goroutine layer is wasted overhead. The Handlers registry is async with a bounded queue (drop-on-overflow) so a slow handler can never block the producer.
Tracing is NOT in this package: agentcore packages use OTel directly via otel.Tracer(...) at their internal boundaries (see workflow/tracing.go). observe is the channel for typed domain events; OTel is the channel for spans.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Event ¶
type Event interface {
Name() string // stable identifier, e.g. "workflow.started"
Level() slog.Level // slog.LevelInfo / Warn / Error
Attrs() []slog.Attr // structured fields, type-safe at construction
Err() error // nil unless the event represents a failure
}
Event is the contract every observed value satisfies. agentcore packages (workflow, future supervision, etc.) define their own concrete event types and implement this interface; observers in this package consume them through the interface and never need to switch on the concrete type. New event types added to any agentcore package flow through every observer here automatically.
slog.Attr is the carrier vocabulary because it is stdlib, type-safe at construction, and already what the consumer's logging stack speaks. Tracer-style observers translate slog.Attr to OTel attribute.KeyValue at the boundary where they are needed.
type EventSink ¶
EventSink receives events from a producer (e.g. a running Workflow). Implementations must be safe for concurrent use — Fire may be called from multiple goroutines (parallel fan-out, async dispatchers).
The interface is single-method on purpose: the producer fires fire-and-forget; the sink decides what to do (log, count, dispatch to handlers, drop). Slow or panicking sinks must not affect the producer.
func Counter ¶
Counter returns an EventSink that emits one OTel metric counter per observed event, namespaced by the event's Name(). For an event with Name() == "goal.ended", the emitted counter is "agentcore.goal.ended".
Each increment carries two standard attributes derived from the event:
level=info|warn|error (from Event.Level()) errored=true|false (true iff Event.Err() != nil)
That lets operators graph rates and failure ratios per event type without configuring per-event rules:
rate(agentcore_goal_ended_total{errored="false"}[5m])
sum by (event) (rate(agentcore_*_total{errored="true"}[5m]))
Counter caches one Int64Counter instrument per event name (sync.Map). Instruments that fail to register (e.g. invalid name) are skipped silently — observability must not break the producer.
New event types in any agentcore package automatically get their own counter the first time they fire — no code changes here.
func Logger ¶
Logger returns an EventSink that emits one structured slog record per event. The event itself supplies its name, level, attribute list, and (when applicable) error — see Event. A non-nil Err is attached as an "err" attribute so the structured log retains the failure cause.
The sink is synchronous because slog handlers either are fast (text / JSON to a writer) or do their own batching. Wrapping in a goroutine here would only add overhead.
New event types in any agentcore package automatically flow through this sink without code changes — the dispatch is data-driven via the Event interface.
func Tee ¶
Tee fans an event out to every sink in declared order, synchronously. Each sink is fire-and-forget — failure or slowness in one does not affect the others (the EventSink contract has no error return).
Use Tee to compose Logger + Counter + Handlers into the single sink that a producer's Telemetry field expects. A zero-arg Tee() is a valid no-op.
type Handlers ¶
type Handlers struct {
// contains filtered or unexported fields
}
Handlers is an EventSink that routes events to typed callbacks registered via the package-level On generic function. Registration is keyed by the event's Go type, so a callback registered for workflow.GoalEnded only fires on GoalEnded values — no type switches in consumer code. The registry doesn't need to know about specific producer packages; any value can be Fire'd, and any type used as E in On[E] will be routed.
Dispatch is asynchronous via a bounded queue (default size 1024). When the queue is full, Fire returns immediately and the dropped event is counted in Dropped(). This is the explicit policy choice: a slow handler must never block the workflow's main thread.
Handlers are invoked in registration order, one event at a time, on a dedicated goroutine. Handler panics are recovered and ignored — the goroutine never dies. Use Close to drain the queue and stop the worker at the end of a run.
func NewHandlers ¶
NewHandlers builds a Handlers registry and starts its background dispatcher. Always pair with Close at the end of the run so the worker goroutine exits and the queue drains.
Pass observe.Limits{} for defaults, or set QueueSize to size the drop-on-overflow buffer to your expected burst.
func (*Handlers) Close ¶
Close signals the worker goroutine to stop, drains any queued events, and returns once the worker has exited. Calling Close more than once is safe — subsequent calls are no-ops.
type Limits ¶
type Limits struct {
// QueueSize is the bounded channel capacity for pending dispatches.
// Values < 1 fall back to the default of 1024.
QueueSize int
}
Limits caps the runtime resources a Handlers instance will consume. Today only the queue capacity matters; future fields (e.g. per-handler timeout, overflow action) belong here too. A zero value is valid and uses sensible defaults.