otel

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package otel wires synapse to OpenTelemetry tracing and metrics. It is an optional, sibling module: importing it pulls in the OTel SDK, importing only the core synapse module does not.

A single Observer value, constructed with the user's OTel providers, exposes three plug-in points that match synapse's existing middleware shapes:

Each plug-in emits a span and a duration histogram around the underlying call. Because the child context the middleware produces is the one threaded through to the wrapped operation, OTel parent- span propagation lights up the entire command-driven saga — across aggregates, across projections — as a single connected trace, with no additional wiring. See ADR-0034.

Typical wiring:

o := synotel.New(tp, mp)

repo := es.NewRepository(store, reg, newCounter,
    es.WithMiddleware(o.Execute()))

bus := commandbus.New(
    commandbus.WithMiddleware(o.Dispatch()))

view := o.WrapProjection("counter-view", myView)
runner := projection.NewRunner("counter-view", store, reg, view)

Metric names follow OpenTelemetry conventions:

  • synapse.command.duration (Histogram, seconds)
  • synapse.dispatch.duration (Histogram, seconds)
  • synapse.projection.event.duration (Histogram, seconds)

On failure each emits an error.type attribute with a stable classification (synapse.conflict, synapse.unknown_command, synapse.decode, synapse.handler, synapse.panic, timeout, canceled). Success calls omit the attribute, matching OTel convention.

Index

Constants

This section is empty.

Variables

View Source
var (
	AttrCommandName    = attribute.Key("synapse.command.name")
	AttrAggregateType  = attribute.Key("synapse.aggregate.type")
	AttrStreamID       = attribute.Key("synapse.stream.id")
	AttrEventType      = attribute.Key("synapse.event.type")
	AttrEventCount     = attribute.Key("synapse.event.count")
	AttrProjectionName = attribute.Key("synapse.projection.name")
	AttrCheckpointName = attribute.Key("synapse.checkpoint.name")
	AttrCausationID    = attribute.Key("synapse.causation.id")
	AttrCorrelationID  = attribute.Key("synapse.correlation.id")
	AttrErrorType      = attribute.Key("error.type")
)

Attribute keys. Kept as exported attribute.Key values so users can reference them when building custom filters or processors.

Functions

This section is empty.

Types

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

Observer wraps a trace.TracerProvider and a metric.MeterProvider and pre-creates the synapse-named tracer, meter, and instruments. A single Observer is shared across every middleware and projection wrapper an application installs, so the instruments are created once and the per-call cost is the OTel SDK's own.

Observer is safe for concurrent use; the underlying tracer and meter must themselves be concurrent-safe per the OTel spec, which both the noop and SDK implementations are.

func Must

Must is a convenience for callers that prefer to fail fast at startup. Identical to New but panics on error.

func New

New returns an Observer wired to the given OpenTelemetry providers. Both are required; pass otel.GetTracerProvider and otel.GetMeterProvider from the go.opentelemetry.io/otel root package to use the SDK's globals.

Returns an error if either provider is nil or if instrument creation fails. Instrument creation succeeds against the OTel noop and SDK providers; an error here indicates a bug in a third-party provider.

func (*Observer) Dispatch

func (o *Observer) Dispatch() commandbus.Middleware

Dispatch returns a commandbus.Middleware that opens a "synapse.dispatch" span and records the wrapped Operation's duration into the synapse.dispatch.duration histogram. The command name from the inbound dispatch is carried as synapse.command.name on both the span and the metric.

Install it via commandbus.WithMiddleware when constructing the Bus. Compose it with other bus middleware in the conventional order: observability outermost, recover next, then domain concerns.

func (*Observer) Execute

func (o *Observer) Execute() es.Middleware

Execute returns an es.Middleware that opens a "synapse.execute" span and records the wrapped Operation's duration into the synapse.command.duration histogram.

Install it via es.WithMiddleware when constructing the Repository. It composes left-to-right with the synapse-provided middleware in es/middleware (locking, retry) — put Execute first so per-aggregate locking and retry attempts happen inside the span.

func (*Observer) WrapCheckpointStore

func (o *Observer) WrapCheckpointStore(inner es.CheckpointStore) es.CheckpointStore

WrapCheckpointStore returns an es.CheckpointStore that emits a span and duration histogram around Save, Load, and Reset. Names gets a span around the iterator walk (no histogram — listing is admin-only and infrequent). An inner store that does not support Names yields its es.ErrUnsupported terminal directly through.

func (*Observer) WrapEventStore

func (o *Observer) WrapEventStore(inner es.EventStore) es.EventStore

WrapEventStore returns an es.EventStore that emits a span and duration histogram around Append and Load. Streams and StreamsBySubject get a span around the iterator walk (no histogram — list/index operations are admin-only and infrequent). Subscribe and SubscribeStream pass through unmodified: they're long-running iterators whose per-event cost is already covered by Observer.WrapProjection.

The inner store's es.ErrUnsupported terminal yields flow through unchanged.

func (*Observer) WrapProjection

func (o *Observer) WrapProjection(name string, inner es.Projection) es.Projection

WrapProjection returns an es.Projection that wraps inner with a "synapse.project" span and records each event's processing duration into the synapse.projection.event.duration histogram.

name is recorded as synapse.projection.name on both the span and the metric; it should match the name the caller passes to [projection.NewRunner] so traces and dashboards line up.

The wrapper preserves inner's existing identity: it does not buffer, reorder, or batch events; it propagates the child context from tracer.Start through to inner.Project, so any downstream span the projection opens (or any es.Execute / commandbus.Dispatch it calls in turn) is parented under synapse.project automatically.

func (*Observer) WrapSnapshotStore

func (o *Observer) WrapSnapshotStore(inner es.SnapshotStore) es.SnapshotStore

WrapSnapshotStore returns an es.SnapshotStore that emits a span and a duration histogram around every Latest and Save call on inner. The wrapper preserves inner's contract — including the (snap, false) "no snapshot yet" return shape — verbatim.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL