Documentation
¶
Overview ¶
Package otel wires synapse to OpenTelemetry tracing and metrics. It is an optional, sibling module: importing it pulls in the OTel SDK, importing only the core synapse module does not.
A single Observer value, constructed with the user's OTel providers, exposes three plug-in points that match synapse's existing middleware shapes:
- Observer.Execute — es.Middleware for es.Repository
- Observer.Dispatch — commandbus.Middleware for commandbus.Bus
- Observer.WrapProjection — wraps an es.Projection
Each plug-in emits a span and a duration histogram around the underlying call. Because the child context the middleware produces is the one threaded through to the wrapped operation, OTel parent- span propagation lights up the entire command-driven saga — across aggregates, across projections — as a single connected trace, with no additional wiring. See ADR-0034.
Typical wiring:
o := synotel.New(tp, mp)
repo := es.NewRepository(store, reg, newCounter,
es.WithMiddleware(o.Execute()))
bus := commandbus.New(
commandbus.WithMiddleware(o.Dispatch()))
view := o.WrapProjection("counter-view", myView)
runner := projection.NewRunner("counter-view", store, reg, view)
Metric names follow OpenTelemetry conventions:
- synapse.command.duration (Histogram, seconds)
- synapse.dispatch.duration (Histogram, seconds)
- synapse.projection.event.duration (Histogram, seconds)
On failure each emits an error.type attribute with a stable classification (synapse.conflict, synapse.unknown_command, synapse.decode, synapse.handler, synapse.panic, timeout, canceled). Success calls omit the attribute, matching OTel convention.
Index ¶
- Variables
- type Observer
- func (o *Observer) Dispatch() commandbus.Middleware
- func (o *Observer) Execute() es.Middleware
- func (o *Observer) WrapCheckpointStore(inner es.CheckpointStore) es.CheckpointStore
- func (o *Observer) WrapEventStore(inner es.EventStore) es.EventStore
- func (o *Observer) WrapProjection(name string, inner es.Projection) es.Projection
- func (o *Observer) WrapSnapshotStore(inner es.SnapshotStore) es.SnapshotStore
Constants ¶
This section is empty.
Variables ¶
var ( AttrCommandName = attribute.Key("synapse.command.name") AttrAggregateType = attribute.Key("synapse.aggregate.type") AttrStreamID = attribute.Key("synapse.stream.id") AttrEventType = attribute.Key("synapse.event.type") AttrEventCount = attribute.Key("synapse.event.count") AttrProjectionName = attribute.Key("synapse.projection.name") AttrCheckpointName = attribute.Key("synapse.checkpoint.name") AttrCausationID = attribute.Key("synapse.causation.id") AttrCorrelationID = attribute.Key("synapse.correlation.id") AttrErrorType = attribute.Key("error.type") )
Attribute keys. Kept as exported attribute.Key values so users can reference them when building custom filters or processors.
Functions ¶
This section is empty.
Types ¶
type Observer ¶
type Observer struct {
// contains filtered or unexported fields
}
Observer wraps a trace.TracerProvider and a metric.MeterProvider and pre-creates the synapse-named tracer, meter, and instruments. A single Observer is shared across every middleware and projection wrapper an application installs, so the instruments are created once and the per-call cost is the OTel SDK's own.
Observer is safe for concurrent use; the underlying tracer and meter must themselves be concurrent-safe per the OTel spec, which both the noop and SDK implementations are.
func Must ¶
func Must(tp trace.TracerProvider, mp metric.MeterProvider) *Observer
Must is a convenience for callers that prefer to fail fast at startup. Identical to New but panics on error.
func New ¶
func New(tp trace.TracerProvider, mp metric.MeterProvider) (*Observer, error)
New returns an Observer wired to the given OpenTelemetry providers. Both are required; pass otel.GetTracerProvider and otel.GetMeterProvider from the go.opentelemetry.io/otel root package to use the SDK's globals.
Returns an error if either provider is nil or if instrument creation fails. Instrument creation succeeds against the OTel noop and SDK providers; an error here indicates a bug in a third-party provider.
func (*Observer) Dispatch ¶
func (o *Observer) Dispatch() commandbus.Middleware
Dispatch returns a commandbus.Middleware that opens a "synapse.dispatch" span and records the wrapped Operation's duration into the synapse.dispatch.duration histogram. The command name from the inbound dispatch is carried as synapse.command.name on both the span and the metric.
Install it via commandbus.WithMiddleware when constructing the Bus. Compose it with other bus middleware in the conventional order: observability outermost, recover next, then domain concerns.
func (*Observer) Execute ¶
func (o *Observer) Execute() es.Middleware
Execute returns an es.Middleware that opens a "synapse.execute" span and records the wrapped Operation's duration into the synapse.command.duration histogram.
Install it via es.WithMiddleware when constructing the Repository. It composes left-to-right with the synapse-provided middleware in es/middleware (locking, retry) — put Execute first so per-aggregate locking and retry attempts happen inside the span.
func (*Observer) WrapCheckpointStore ¶
func (o *Observer) WrapCheckpointStore(inner es.CheckpointStore) es.CheckpointStore
WrapCheckpointStore returns an es.CheckpointStore that emits a span and duration histogram around Save, Load, and Reset. Names gets a span around the iterator walk (no histogram — listing is admin-only and infrequent). An inner store that does not support Names yields its es.ErrUnsupported terminal directly through.
func (*Observer) WrapEventStore ¶
func (o *Observer) WrapEventStore(inner es.EventStore) es.EventStore
WrapEventStore returns an es.EventStore that emits a span and duration histogram around Append and Load. Streams and StreamsBySubject get a span around the iterator walk (no histogram — list/index operations are admin-only and infrequent). Subscribe and SubscribeStream pass through unmodified: they're long-running iterators whose per-event cost is already covered by Observer.WrapProjection.
The inner store's es.ErrUnsupported terminal yields flow through unchanged.
func (*Observer) WrapProjection ¶
func (o *Observer) WrapProjection(name string, inner es.Projection) es.Projection
WrapProjection returns an es.Projection that wraps inner with a "synapse.project" span and records each event's processing duration into the synapse.projection.event.duration histogram.
name is recorded as synapse.projection.name on both the span and the metric; it should match the name the caller passes to [projection.NewRunner] so traces and dashboards line up.
The wrapper preserves inner's existing identity: it does not buffer, reorder, or batch events; it propagates the child context from tracer.Start through to inner.Project, so any downstream span the projection opens (or any es.Execute / commandbus.Dispatch it calls in turn) is parented under synapse.project automatically.
func (*Observer) WrapSnapshotStore ¶
func (o *Observer) WrapSnapshotStore(inner es.SnapshotStore) es.SnapshotStore
WrapSnapshotStore returns an es.SnapshotStore that emits a span and a duration histogram around every Latest and Save call on inner. The wrapper preserves inner's contract — including the (snap, false) "no snapshot yet" return shape — verbatim.