trace

package
v1.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package trace defines trace models plus storage and async writer components.

Index

Constants

View Source
const (
	WriteErrorClassConnection = "connection"
	WriteErrorClassTimeout    = "timeout"
	WriteErrorClassContention = "contention"
	WriteErrorClassConstraint = "constraint"
	WriteErrorClassUnknown    = "unknown"
)

Error class constants for trace write failure classification.

View Source
const (
	TraceQueuePressureOK        = "ok"
	TraceQueuePressureElevated  = "elevated"
	TraceQueuePressureHigh      = "high"
	TraceQueuePressureSaturated = "saturated"
)

Variables

View Source
var ErrInvalidCursor = errors.New("trace cursor is invalid")
View Source
var ErrNotFound = errors.New("trace store record not found")
View Source
var ErrNotImplemented = errors.New("trace store method not implemented")

Functions

func ClassifyWriteError added in v1.1.8

func ClassifyWriteError(err error) string

ClassifyWriteError maps a trace write error to one of the defined error classes so operators can alert and dashboard on failure categories rather than opaque Go type names.

func CoerceInt64 added in v1.1.7

func CoerceInt64(value any) (int64, bool)

CoerceInt64 converts a loosely-typed value to int64, handling float64, float32, int, int64, int32, json.Number, and string representations.

func DecodeMetadataMap added in v1.1.7

func DecodeMetadataMap(raw string) map[string]any

DecodeMetadataMap decodes a JSON metadata string into a generic map. Returns nil for empty input or JSON parse errors.

func MetadataBool added in v1.1.7

func MetadataBool(metadata map[string]any, key string) (bool, bool)

MetadataBool extracts a boolean value from a metadata map key, handling native bools and "true"/"false" strings.

func MetadataInt64 added in v1.1.7

func MetadataInt64(metadata map[string]any, key string) (int64, bool)

MetadataInt64 extracts an int64 value from a metadata map key.

func MetadataString added in v1.1.7

func MetadataString(metadata map[string]any, key string) string

MetadataString extracts a trimmed string value from a metadata map.

func OrderTime added in v1.1.7

func OrderTime(item *Trace) time.Time

OrderTime returns the canonical ordering timestamp for a trace, preferring CreatedAt over Timestamp.

func SortLineageTraces added in v1.1.7

func SortLineageTraces(items []*Trace)

SortLineageTraces orders traces using lineage checkpoint metadata when present, then falls back to deterministic timestamp/id ordering.

Types

type AnalyticsFilter

type AnalyticsFilter struct {
	OrgID        string
	WorkspaceID  string
	GatewayKeyID string
	Provider     string
	Model        string
	From         time.Time
	To           time.Time
}

type CostPoint

type CostPoint struct {
	BucketStart  time.Time
	Group        string
	TotalCostUSD float64
	RequestCount int64
	AvgCostUSD   float64
}

type CostSummary

type CostSummary struct {
	TotalCostUSD float64
}

type ErrorRateStats added in v1.1.8

type ErrorRateStats struct {
	Group         string
	TotalRequests int64
	ErrorCount4xx int64
	ErrorCount5xx int64
	ErrorRate     float64
}

type KeyStats

type KeyStats struct {
	APIKeyHash   string
	RequestCount int64
	TotalTokens  int64
	TotalCostUSD float64
	LastActiveAt time.Time
}

type LatencyStats added in v1.1.8

type LatencyStats struct {
	Group        string
	RequestCount int64
	AvgMS        float64
	MinMS        int64
	MaxMS        int64
	P50MS        float64
	P95MS        float64
	P99MS        float64
}

type ModelStats

type ModelStats struct {
	Model        string
	RequestCount int64
	AvgLatencyMS float64
	AvgTTFTMS    float64
	TotalTokens  int64
	TotalCostUSD float64
}

type PostgresStore

type PostgresStore struct {
	DSN string
	// contains filtered or unexported fields
}

func NewPostgresStore

func NewPostgresStore(dsn string) (*PostgresStore, error)

func (*PostgresStore) Close

func (s *PostgresStore) Close() error

func (*PostgresStore) GetCostSeries

func (s *PostgresStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)

func (*PostgresStore) GetCostSummary

func (s *PostgresStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)

func (*PostgresStore) GetErrorRateBreakdown added in v1.1.8

func (s *PostgresStore) GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)

func (*PostgresStore) GetKeyStats

func (s *PostgresStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)

func (*PostgresStore) GetLatencyPercentiles added in v1.1.8

func (s *PostgresStore) GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)

func (*PostgresStore) GetModelStats

func (s *PostgresStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)

func (*PostgresStore) GetTrace

func (s *PostgresStore) GetTrace(ctx context.Context, id string) (*Trace, error)

func (*PostgresStore) GetUsageSeries

func (s *PostgresStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)

func (*PostgresStore) GetUsageSummary

func (s *PostgresStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)

func (*PostgresStore) QueryTraces

func (s *PostgresStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)

func (*PostgresStore) WriteBatch

func (s *PostgresStore) WriteBatch(ctx context.Context, traces []*Trace) error

func (*PostgresStore) WriteTrace

func (s *PostgresStore) WriteTrace(ctx context.Context, trace *Trace) error

type SQLiteStore

type SQLiteStore struct {
	Path string
	// contains filtered or unexported fields
}

func NewSQLiteStore

func NewSQLiteStore(path string) (*SQLiteStore, error)

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

func (*SQLiteStore) GetCostSeries

func (s *SQLiteStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)

func (*SQLiteStore) GetCostSummary

func (s *SQLiteStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)

func (*SQLiteStore) GetErrorRateBreakdown added in v1.1.8

func (s *SQLiteStore) GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)

func (*SQLiteStore) GetKeyStats

func (s *SQLiteStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)

func (*SQLiteStore) GetLatencyPercentiles added in v1.1.8

func (s *SQLiteStore) GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)

func (*SQLiteStore) GetModelStats

func (s *SQLiteStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)

func (*SQLiteStore) GetTrace

func (s *SQLiteStore) GetTrace(ctx context.Context, id string) (*Trace, error)

func (*SQLiteStore) GetUsageSeries

func (s *SQLiteStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)

func (*SQLiteStore) GetUsageSummary

func (s *SQLiteStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)

func (*SQLiteStore) QueryTraces

func (s *SQLiteStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)

func (*SQLiteStore) WriteBatch

func (s *SQLiteStore) WriteBatch(ctx context.Context, traces []*Trace) error

func (*SQLiteStore) WriteTrace

func (s *SQLiteStore) WriteTrace(ctx context.Context, trace *Trace) error

type Trace

type Trace struct {
	ID                 string
	TraceGroupID       string
	OrgID              string
	WorkspaceID        string
	Timestamp          time.Time
	Provider           string
	Model              string
	RequestMethod      string
	RequestPath        string
	RequestHeaders     string
	RequestBody        string
	ResponseStatus     int
	ResponseHeaders    string
	ResponseBody       string
	InputTokens        int
	OutputTokens       int
	TotalTokens        int
	LatencyMS          int64
	TimeToFirstTokenMS int64
	TimeToFirstTokenUS int64
	APIKeyHash         string
	GatewayKeyID       string
	EstimatedCostUSD   float64
	Metadata           string
	CreatedAt          time.Time
}

type TraceFilter

type TraceFilter struct {
	OrgID        string
	WorkspaceID  string
	TraceGroupID string
	ThreadID     string
	RunID        string
	Provider     string
	Model        string
	APIKeyHash   string
	StatusCode   int
	MinTokens    int
	MaxTokens    int
	From         time.Time
	To           time.Time
	Limit        int
	Cursor       string
}

type TracePipelineDiagnostics added in v1.1.7

type TracePipelineDiagnostics struct {
	QueueCapacity                    int              `json:"queue_capacity"`
	QueueDepth                       int              `json:"queue_depth"`
	QueueDepthHighWatermark          int              `json:"queue_depth_high_watermark"`
	QueueUtilizationPct              int              `json:"queue_utilization_pct"`
	QueueHighWatermarkUtilizationPct int              `json:"queue_high_watermark_utilization_pct"`
	QueuePressureState               string           `json:"queue_pressure_state"`
	QueueHighWatermarkPressureState  string           `json:"queue_high_watermark_pressure_state"`
	EnqueueAcceptedTotal             int64            `json:"enqueue_accepted_total"`
	EnqueueDroppedTotal              int64            `json:"enqueue_dropped_total"`
	WriteDroppedTotal                int64            `json:"write_dropped_total"`
	TotalDroppedTotal                int64            `json:"total_dropped_total"`
	LastEnqueueDropAt                *time.Time       `json:"last_enqueue_drop_at,omitempty"`
	LastWriteDropAt                  *time.Time       `json:"last_write_drop_at,omitempty"`
	LastWriteDropOperation           string           `json:"last_write_drop_operation,omitempty"`
	WriteFailuresByClass             map[string]int64 `json:"write_failures_by_class,omitempty"`
	StoreDriver                      string           `json:"store_driver,omitempty"`
}

TracePipelineDiagnostics captures trace pipeline queue pressure and drop signals.

type TracePipelineDiagnosticsReader added in v1.1.7

type TracePipelineDiagnosticsReader interface {
	TracePipelineDiagnostics() TracePipelineDiagnostics
}

TracePipelineDiagnosticsReader exposes runtime queue/drop diagnostics.

type TraceResult

type TraceResult struct {
	Items      []*Trace
	NextCursor string
}

type TraceStore

type TraceStore interface {
	WriteTrace(ctx context.Context, trace *Trace) error
	WriteBatch(ctx context.Context, traces []*Trace) error
	GetTrace(ctx context.Context, id string) (*Trace, error)
	QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)
	GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)
	GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)
	GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)
	GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)
	GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)
	GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)
	GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)
	GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)
}

type UsagePoint

type UsagePoint struct {
	BucketStart  time.Time
	Group        string
	InputTokens  int64
	OutputTokens int64
	TotalTokens  int64
}

type UsageSummary

type UsageSummary struct {
	TotalInputTokens  int64
	TotalOutputTokens int64
	TotalTokens       int64
}

type WriteFailure

type WriteFailure struct {
	Operation   string
	BatchSize   int
	FailedCount int
	Err         error
	ErrorClass  string
}

WriteFailure describes trace records that could not be persisted.

type WriteFailureHandler

type WriteFailureHandler func(WriteFailure)

WriteFailureHandler receives asynchronous trace write failure signals.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(store TraceStore, bufferSize int) *Writer

func (*Writer) Enqueue

func (w *Writer) Enqueue(t *Trace) bool

func (*Writer) QueueCap added in v1.1.8

func (w *Writer) QueueCap() int

QueueCap returns the capacity of the write queue.

func (*Writer) QueueLen added in v1.1.8

func (w *Writer) QueueLen() int

QueueLen returns the current number of items waiting in the write queue.

func (*Writer) SetMetrics added in v1.1.8

func (w *Writer) SetMetrics(m *WriterMetrics)

SetMetrics replaces the metric callbacks used by the writer pipeline.

func (*Writer) SetWriteFailureHandler

func (w *Writer) SetWriteFailureHandler(handler WriteFailureHandler)

SetWriteFailureHandler replaces the callback used for dropped trace write signals.

func (*Writer) Shutdown

func (w *Writer) Shutdown(ctx context.Context) error

func (*Writer) Start

func (w *Writer) Start(ctx context.Context)

func (*Writer) Stop

func (w *Writer) Stop()

func (*Writer) TracePipelineDiagnostics added in v1.1.7

func (w *Writer) TracePipelineDiagnostics() TracePipelineDiagnostics

TracePipelineDiagnostics returns a point-in-time snapshot of queue pressure and dropped-trace counters for operator diagnostics.

type WriterMetrics added in v1.1.8

type WriterMetrics struct {
	// OnEnqueue is called each time a trace is successfully placed on the queue.
	OnEnqueue func()
	// OnDrop is called each time a trace is dropped because the queue is full.
	OnDrop func()
	// OnFlush is called after each batch is flushed to storage.
	OnFlush func(batchSize int, duration time.Duration)
	// OnWriteStart is called before each storage write. It returns an end
	// function that the writer calls after the write completes (with error or nil).
	OnWriteStart func(batchSize int) func(error)
	// OnWriteSuccess is called after traces are successfully persisted to storage.
	// The count parameter indicates how many traces were written.
	OnWriteSuccess func(count int)
}

WriterMetrics holds optional callbacks the Writer invokes at key pipeline points.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL