trace

package
v0.0.0-...-a9bc208 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

Trace Package

This package provides distributed tracing capabilities for the realtime-ai framework using OpenTelemetry.

Overview

The trace package offers:

  • Easy initialization: One-line setup with sensible defaults
  • Multiple exporters: stdout (development), OTLP (production), or none (disabled)
  • Rich instrumentation: Pre-built helpers for pipelines, elements, connections, and AI operations
  • Production-ready: Configurable sampling, async batching, and integration with popular backends

Quick Start

import "github.com/realtime-ai/realtime-ai/pkg/trace"

func main() {
    ctx := context.Background()

    // Initialize tracing
    cfg := trace.DefaultConfig()
    if err := trace.Initialize(ctx, cfg); err != nil {
        log.Fatal(err)
    }
    defer trace.Shutdown(ctx)

    // Use tracing in your application
    ctx, span := trace.StartSpan(ctx, "my-operation")
    defer span.End()

    // ... your code ...
}

Configuration

Environment Variables
  • TRACE_EXPORTER: Exporter type (stdout, otlp, none) - default: stdout
  • OTEL_EXPORTER_OTLP_ENDPOINT: OTLP collector endpoint - default: localhost:4317
  • ENVIRONMENT: Deployment environment - default: development
Programmatic Configuration
cfg := &trace.Config{
    ServiceName:    "my-service",
    ServiceVersion: "1.0.0",
    Environment:    "production",
    ExporterType:   "otlp",
    OTLPEndpoint:   "collector.example.com:4317",
    SamplingRate:   0.1,  // Sample 10% of traces
}

trace.Initialize(ctx, cfg)

Instrumentation Helpers

Pipeline Operations
// Start pipeline
ctx, span := trace.InstrumentPipelineStart(ctx, "my-pipeline")
defer span.End()

// Process message
ctx, span := trace.InstrumentElementProcess(ctx, "element-name", msg)
defer span.End()

// Push/pull messages
ctx, span := trace.InstrumentPipelinePush(ctx, "pipeline-name", msg)
defer span.End()
Connection Operations
// Connection created
ctx, span := trace.InstrumentConnectionCreated(ctx, connID, "webrtc")
defer span.End()

// State change
ctx, span := trace.InstrumentConnectionStateChange(ctx, connID, "webrtc", "new", "connected")
defer span.End()

// Send/receive messages
ctx, span := trace.InstrumentConnectionMessage(ctx, connID, "webrtc", "send", len(data))
defer span.End()
AI Operations
// LLM request
ctx, span := trace.InstrumentLLMRequest(ctx, "gemini", "gemini-2.0-flash-exp")
defer span.End()

// STT request
ctx, span := trace.InstrumentSTTRequest(ctx, "azure", len(audioData))
defer span.End()

// TTS request
ctx, span := trace.InstrumentTTSRequest(ctx, "azure", "en-US-Neural", text)
defer span.End()
Audio/Video Processing
// Audio processing
ctx, span := trace.InstrumentAudioProcessing(ctx, "resample", inputSize, outputSize)
defer span.End()

// Video processing
ctx, span := trace.InstrumentVideoProcessing(ctx, "encode", inputSize, outputSize)
defer span.End()

Attributes

Predefined Attribute Keys

The package defines standard attribute keys for consistency:

const (
    // Pipeline
    AttrPipelineName     = "pipeline.name"
    AttrPipelineElement  = "pipeline.element"
    AttrSessionID        = "session.id"

    // Audio
    AttrAudioSampleRate  = "audio.sample_rate"
    AttrAudioChannels    = "audio.channels"
    AttrAudioMediaType   = "audio.media_type"

    // Connection
    AttrConnectionID     = "connection.id"
    AttrConnectionType   = "connection.type"
    AttrConnectionState  = "connection.state"

    // AI
    AttrLLMProvider      = "llm.provider"
    AttrLLMModel         = "llm.model"
    // ... and more
)
Helper Functions
// Create attribute sets
attrs := trace.PipelineAttrs("my-pipeline", "element-name")
attrs := trace.AudioAttrs(16000, 1, 3200, "audio/x-raw", "opus")
attrs := trace.ConnectionAttrs("conn-123", "webrtc", "connected")
attrs := trace.LLMAttrs("gemini", "gemini-2.0-flash-exp")

// Use in spans
ctx, span := trace.StartSpan(ctx, "operation",
    trace.WithAttributes(attrs...),
)

Utility Functions

Error Recording
if err != nil {
    trace.RecordError(span, err)
    return err
}
Events
trace.AddEvent(span, "processing.started",
    attribute.String("batch.id", batchID),
)
Attributes
trace.SetAttributes(span,
    attribute.String("user.id", userID),
    attribute.Int("batch.size", batchSize),
)
Trace IDs
// Get trace ID from context
traceID := trace.TraceID(ctx)

// Get span ID from context
spanID := trace.SpanID(ctx)

// Format log message with trace info
log.Println(trace.LogWithTrace(ctx, "Processing data"))
// Output: [trace_id=abc123 span_id=def456] Processing data

Examples

See the complete example in examples/tracing-demo/main.go for a working demonstration.

Documentation

For detailed documentation, see docs/tracing.md.

Architecture

Initialization Flow
  1. Call trace.Initialize(ctx, cfg) at application startup
  2. Package creates OpenTelemetry tracer provider with configured exporter
  3. Sets global tracer provider and propagators
  4. Returns tracer for use throughout the application
Span Lifecycle
  1. Start span: ctx, span := trace.StartSpan(ctx, "operation")
  2. Add attributes: trace.SetAttributes(span, ...)
  3. Add events: trace.AddEvent(span, "event-name", ...)
  4. Record errors: trace.RecordError(span, err)
  5. End span: defer span.End()
Context Propagation

Traces are propagated through context:

func parent(ctx context.Context) {
    ctx, span := trace.StartSpan(ctx, "parent")
    defer span.End()

    // Child span automatically linked via context
    child(ctx)
}

func child(ctx context.Context) {
    ctx, span := trace.StartSpan(ctx, "child")
    defer span.End()
    // ...
}

Integration with Backends

Jaeger
# Run Jaeger
docker run -d -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest

# Configure app
export TRACE_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_ENDPOINT=localhost:4317

# View traces at http://localhost:16686
Zipkin
# Run Zipkin
docker run -d -p 9411:9411 openzipkin/zipkin

# Configure OpenTelemetry Collector with Zipkin exporter
# Point app to collector
Cloud Providers

Set TRACE_EXPORTER=otlp and configure endpoint for:

  • Google Cloud Trace
  • AWS X-Ray (via ADOT Collector)
  • Azure Monitor

Performance

  • Overhead: Minimal with batched async export
  • Sampling: Configurable sampling rate (0.0-1.0)
  • No-op mode: Set TRACE_EXPORTER=none for zero overhead

Best Practices

  1. Always propagate context: Pass ctx through your call chain
  2. Add meaningful attributes: Help with debugging and filtering
  3. Record errors: Use trace.RecordError(span, err)
  4. Use events for milestones: Mark important points in processing
  5. Control sampling: Reduce overhead in high-throughput scenarios
  6. Include trace IDs in logs: Correlate logs with traces

License

Same as the realtime-ai project.

Documentation

Index

Constants

View Source
const (
	// Pipeline attributes
	AttrPipelineName    = "pipeline.name"
	AttrPipelineElement = "pipeline.element"
	AttrSessionID       = "session.id"
	AttrMessageType     = "message.type"

	// Audio attributes
	AttrAudioSampleRate = "audio.sample_rate"
	AttrAudioChannels   = "audio.channels"
	AttrAudioMediaType  = "audio.media_type"
	AttrAudioCodec      = "audio.codec"
	AttrAudioDataSize   = "audio.data_size"

	// Video attributes
	AttrVideoWidth     = "video.width"
	AttrVideoHeight    = "video.height"
	AttrVideoMediaType = "video.media_type"
	AttrVideoCodec     = "video.codec"
	AttrVideoDataSize  = "video.data_size"

	// Connection attributes
	AttrConnectionID    = "connection.id"
	AttrConnectionType  = "connection.type"
	AttrConnectionState = "connection.state"

	// AI/LLM attributes
	AttrLLMProvider     = "llm.provider"
	AttrLLMModel        = "llm.model"
	AttrLLMResponseType = "llm.response_type"

	// STT/TTS attributes
	AttrSTTProvider = "stt.provider"
	AttrTTSProvider = "tts.provider"
	AttrTTSVoice    = "tts.voice"

	// Error attributes
	AttrErrorType    = "error.type"
	AttrErrorMessage = "error.message"
)

Common attribute keys used throughout the application

View Source
const (
	// TracerName is the name of the tracer used throughout the application
	TracerName = "github.com/realtime-ai/realtime-ai"
)

Variables

This section is empty.

Functions

func AddEvent

func AddEvent(span trace.Span, name string, attrs ...attribute.KeyValue)

AddEvent adds an event to a span

func AudioAttrs

func AudioAttrs(sampleRate, channels, dataSize int, mediaType, codec string) []attribute.KeyValue

AudioAttrs creates attributes for audio data

func ConnectionAttrs

func ConnectionAttrs(connID, connType, state string) []attribute.KeyValue

ConnectionAttrs creates attributes for connection information

func ContextWithSpan

func ContextWithSpan(ctx context.Context, span trace.Span) context.Context

ContextWithSpan returns a new context with the given span

func ErrorAttrs

func ErrorAttrs(errType, errMsg string) []attribute.KeyValue

ErrorAttrs creates attributes for errors

func GetTracer

func GetTracer() trace.Tracer

GetTracer returns the global tracer

func Initialize

func Initialize(ctx context.Context, cfg *Config) error

Initialize sets up the global tracer provider

func InstrumentAudioProcessing

func InstrumentAudioProcessing(ctx context.Context, operation string, inputSize, outputSize int) (context.Context, trace.Span)

InstrumentAudioProcessing creates a span for audio processing operations

func InstrumentConnectionClosed

func InstrumentConnectionClosed(ctx context.Context, connID, connType string) (context.Context, trace.Span)

InstrumentConnectionClosed creates a span for connection closure

func InstrumentConnectionCreated

func InstrumentConnectionCreated(ctx context.Context, connID, connType string) (context.Context, trace.Span)

InstrumentConnectionCreated creates a span for connection creation

func InstrumentConnectionError

func InstrumentConnectionError(ctx context.Context, connID, connType string, err error) (context.Context, trace.Span)

InstrumentConnectionError creates a span for connection errors

func InstrumentConnectionMessage

func InstrumentConnectionMessage(ctx context.Context, connID, connType, direction string, dataSize int) (context.Context, trace.Span)

InstrumentConnectionMessage creates a span for sending/receiving messages over a connection

func InstrumentConnectionStateChange

func InstrumentConnectionStateChange(ctx context.Context, connID, connType, oldState, newState string) (context.Context, trace.Span)

InstrumentConnectionStateChange creates a span for connection state changes

func InstrumentElementProcess

func InstrumentElementProcess(ctx context.Context, elementName string, msg *pipeline.PipelineMessage) (context.Context, trace.Span)

InstrumentElementProcess creates a span for element message processing

func InstrumentElementStart

func InstrumentElementStart(ctx context.Context, elementName string) (context.Context, trace.Span)

InstrumentElementStart creates a span for element start

func InstrumentElementStop

func InstrumentElementStop(ctx context.Context, elementName string) (context.Context, trace.Span)

InstrumentElementStop creates a span for element stop

func InstrumentLLMRequest

func InstrumentLLMRequest(ctx context.Context, provider, model string) (context.Context, trace.Span)

InstrumentLLMRequest creates a span for LLM requests

func InstrumentLLMResponse

func InstrumentLLMResponse(ctx context.Context, provider, model, responseType string, dataSize int) (context.Context, trace.Span)

InstrumentLLMResponse creates a span for LLM responses

func InstrumentPipelinePull

func InstrumentPipelinePull(ctx context.Context, pipelineName string) (context.Context, trace.Span)

InstrumentPipelinePull creates a span for pulling a message from the pipeline

func InstrumentPipelinePush

func InstrumentPipelinePush(ctx context.Context, pipelineName string, msg *pipeline.PipelineMessage) (context.Context, trace.Span)

InstrumentPipelinePush creates a span for pushing a message to the pipeline

func InstrumentPipelineStart

func InstrumentPipelineStart(ctx context.Context, pipelineName string) (context.Context, trace.Span)

InstrumentPipelineStart creates a span for pipeline start

func InstrumentPipelineStop

func InstrumentPipelineStop(ctx context.Context, pipelineName string) (context.Context, trace.Span)

InstrumentPipelineStop creates a span for pipeline stop

func InstrumentSTTRequest

func InstrumentSTTRequest(ctx context.Context, provider string, audioSize int) (context.Context, trace.Span)

InstrumentSTTRequest creates a span for STT (Speech-to-Text) requests

func InstrumentSTTResponse

func InstrumentSTTResponse(ctx context.Context, provider, text string) (context.Context, trace.Span)

InstrumentSTTResponse creates a span for STT responses

func InstrumentTTSRequest

func InstrumentTTSRequest(ctx context.Context, provider, voice, text string) (context.Context, trace.Span)

InstrumentTTSRequest creates a span for TTS (Text-to-Speech) requests

func InstrumentTTSResponse

func InstrumentTTSResponse(ctx context.Context, provider string, audioSize int) (context.Context, trace.Span)

InstrumentTTSResponse creates a span for TTS responses

func InstrumentVideoProcessing

func InstrumentVideoProcessing(ctx context.Context, operation string, inputSize, outputSize int) (context.Context, trace.Span)

InstrumentVideoProcessing creates a span for video processing operations

func LLMAttrs

func LLMAttrs(provider, model string) []attribute.KeyValue

LLMAttrs creates attributes for LLM operations

func LogWithTrace

func LogWithTrace(ctx context.Context, message string) string

LogWithTrace returns a formatted string with trace information

func PipelineAttrs

func PipelineAttrs(pipelineName, elementName string) []attribute.KeyValue

PipelineAttrs creates attributes for pipeline operations

func RecordError

func RecordError(span trace.Span, err error)

RecordError records an error on a span

func SessionAttrs

func SessionAttrs(sessionID string) []attribute.KeyValue

SessionAttrs creates attributes for session information

func SetAttributes

func SetAttributes(span trace.Span, attrs ...attribute.KeyValue)

SetAttributes sets multiple attributes on a span

func Shutdown

func Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the tracer provider

func SpanFromContext

func SpanFromContext(ctx context.Context) trace.Span

SpanFromContext returns the current span from context

func SpanID

func SpanID(ctx context.Context) string

SpanID returns the span ID from the current span in context

func StartSpan

func StartSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span)

StartSpan is a convenience function to start a new span

func TraceID

func TraceID(ctx context.Context) string

TraceID returns the trace ID from the current span in context

func VideoAttrs

func VideoAttrs(width, height, dataSize int, mediaType, codec string) []attribute.KeyValue

VideoAttrs creates attributes for video data

func WithSpan

func WithSpan(ctx context.Context, spanName string, fn func(context.Context) error, opts ...trace.SpanStartOption) error

WithSpan executes a function within a new span

Types

type Config

type Config struct {
	// ServiceName is the name of the service
	ServiceName string
	// ServiceVersion is the version of the service
	ServiceVersion string
	// Environment is the deployment environment (dev, staging, prod)
	Environment string
	// ExporterType defines which exporter to use: "stdout", "otlp", or "none"
	ExporterType string
	// OTLPEndpoint is the endpoint for OTLP exporter (e.g., "localhost:4317")
	OTLPEndpoint string
	// SamplingRate is the rate at which traces are sampled (0.0 to 1.0)
	SamplingRate float64
}

Config holds the configuration for tracing

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a default configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL