observerimpl

package
v0.0.0-...-a2e63a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package observerimpl implements the observer component.

Index

Constants

View Source
const (
	AggregateAverage = observer.AggregateAverage
	AggregateSum     = observer.AggregateSum
	AggregateCount   = observer.AggregateCount
	AggregateMin     = observer.AggregateMin
	AggregateMax     = observer.AggregateMax
)

Re-export aggregate constants for internal use.

View Source
const LogMetricsExtractorName = "log_metrics_extractor"

LogMetricsExtractorName is the canonical name for the log metrics extractor.

View Source
const LogPatternExtractorName = "log_pattern_extractor"

LogPatternExtractorName is the canonical name for the log pattern extractor. It is used as the storage namespace for emitted metrics, as the component name in the catalog, and in notify formatting for log-derived anomalies.

Variables

This section is empty.

Functions

This section is empty.

Types

type AddResult

type AddResult struct {
	// IsNew is true if this Add created a brand-new series (cardinality +1).
	IsNew bool
	// Ref is the SeriesRef assigned to this point's series.
	// -1 when the point is dropped (non-finite or sentinel values).
	Ref observer.SeriesRef
}

AddResult bundles the outputs of timeSeriesStorage.Add.

type Aggregate

type Aggregate = observer.Aggregate

Aggregate is an alias to the definition in the observer component for internal use.

type BOCPDConfig

type BOCPDConfig struct {
	// WarmupPoints is the number of initial points used for baseline estimation.
	// A longer warmup captures more of the metric's natural variability, reducing
	// false positives from normal fluctuation. Default: 120 (~2 minutes at 1Hz).
	WarmupPoints int `json:"warmup_points"`

	// Hazard is the constant changepoint hazard probability.
	// Default: 0.05
	Hazard float64 `json:"hazard"`

	// CPThreshold is the posterior P(changepoint at t) threshold to emit.
	// Default: 0.6
	CPThreshold float64 `json:"cp_threshold"`

	// ShortRunLength is the run-length horizon k for short-run posterior mass P(r_t <= k).
	// Default: 5
	ShortRunLength int `json:"short_run_length"`

	// CPMassThreshold is the threshold for short-run posterior mass P(r_t <= k).
	// Default: 0.7
	CPMassThreshold float64 `json:"cp_mass_threshold"`

	// MaxRunLength caps tracked run-length hypotheses for bounded compute.
	// Default: 200
	MaxRunLength int `json:"max_run_length"`

	// PriorVarianceScale controls prior variance over the mean relative to observed variance.
	// Default: 10.0
	PriorVarianceScale float64 `json:"prior_variance_scale"`

	// MinVariance is the floor for observation variance. When warmup data has
	// near-zero variance (e.g. constant series), this prevents pathologically
	// sharp PDFs that would flag any tiny fluctuation as anomalous. Default: 1.0
	MinVariance float64 `json:"min_variance"`

	// RecoveryPoints is how many consecutive non-triggering points are needed
	// to exit alert state. Default: 10
	RecoveryPoints int `json:"recovery_points"`

	// Aggregations to run detection on. Default: [Average, Count]
	Aggregations []observer.Aggregate `json:"-"`
}

BOCPDConfig holds configuration for the BOCPD detector.

func DefaultBOCPDConfig

func DefaultBOCPDConfig() BOCPDConfig

DefaultBOCPDConfig returns a BOCPDConfig with default values.

type BOCPDDetector

type BOCPDDetector struct {
	// contains filtered or unexported fields
}

BOCPDDetector detects changepoints using Bayesian Online Changepoint Detection. This is a streaming, stateful Detector implementation that maintains per-series posterior state and processes only newly visible points on each advance.

func NewBOCPDDetector

func NewBOCPDDetector(config BOCPDConfig) *BOCPDDetector

NewBOCPDDetector creates a streaming BOCPD detector with the given config. Zero-valued fields are filled from DefaultBOCPDConfig().

func (*BOCPDDetector) Detect

func (b *BOCPDDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult

Detect implements Detector. It discovers series, reads only newly visible points, and updates per-series BOCPD posterior state incrementally.

Correctness takes priority over positional cursoring: storage may insert points into existing history, so this detector gates incremental work on visible point counts rather than raw slice positions.

func (*BOCPDDetector) Name

func (b *BOCPDDetector) Name() string

Name returns the detector name.

func (*BOCPDDetector) RemoveSeries

func (b *BOCPDDetector) RemoveSeries(refs []observer.SeriesRef)

RemoveSeries drops posterior state for refs that storage has freed. Each (ref, agg) entry in the per-series map carries six float64 arrays of size MaxRunLength+2 (~9.7 KB at default config), so without this teardown the map grows with the cumulative number of series ever seen even after their storage payload is gone. Called by the engine right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs.

func (*BOCPDDetector) Reset

func (b *BOCPDDetector) Reset()

Reset clears all per-series state for replay/reanalysis.

type CUSUMConfig

type CUSUMConfig struct {
	// MinPoints is the minimum number of points required for analysis.
	// Default: 5
	MinPoints int `json:"min_points"`

	// BaselineFraction is the fraction of points to use for baseline estimation.
	// Default: 0.25 (first 25% of data)
	BaselineFraction float64 `json:"baseline_fraction"`

	// SlackFactor is multiplied by baseline stddev to get k (slack parameter).
	// Higher values make detection less sensitive to small shifts.
	// Default: 0.5
	SlackFactor float64 `json:"slack_factor"`

	// ThresholdFactor is multiplied by baseline stddev to get h (threshold).
	// Higher values require larger cumulative deviation to trigger.
	// Default: 4.0
	ThresholdFactor float64 `json:"threshold_factor"`
}

CUSUMConfig holds configuration for the CUSUM detector.

func DefaultCUSUMConfig

func DefaultCUSUMConfig() CUSUMConfig

DefaultCUSUMConfig returns a CUSUMConfig with default values.

type CUSUMDetector

type CUSUMDetector struct {
	// contains filtered or unexported fields
}

CUSUMDetector uses the Cumulative Sum (CUSUM) algorithm to detect when a metric shifts from its baseline. CUSUM is designed for detecting change points.

Algorithm:

S[0] = 0
S[t] = max(0, S[t-1] + (x[t] - μ - k))

Where μ is the baseline mean and k is the slack parameter (allowance for noise). An anomaly is emitted when S[t] first exceeds threshold h, representing the point of change detection.

func NewCUSUMDetector

func NewCUSUMDetector(config CUSUMConfig) *CUSUMDetector

NewCUSUMDetector creates a CUSUMDetector with the given config. Zero-valued fields are filled from DefaultCUSUMConfig().

func (*CUSUMDetector) Detect

Analyze runs CUSUM on the series and returns an anomaly if a shift is detected. The anomaly's Timestamp indicates when the shift was first detected (threshold crossing).

func (*CUSUMDetector) Name

func (c *CUSUMDetector) Name() string

Name returns the detector name.

type CatalogEntry

type CatalogEntry struct {
	Name           string
	DisplayName    string
	Kind           string // "detector", "correlator", or "extractor"
	DefaultEnabled bool
}

CatalogEntry is a public view of a catalog component.

func TestbenchCatalogEntries

func TestbenchCatalogEntries() []CatalogEntry

TestbenchCatalogEntries returns all component names and kinds from the testbench catalog. Used by the CLI to implement --only without hardcoding component lists.

type ComponentSettings

type ComponentSettings struct {
	// Enabled maps component name to whether it should be active.
	// Components not listed use their catalog default.
	Enabled map[string]bool
	// contains filtered or unexported fields
}

ComponentSettings holds per-component configuration provided by the consumer. Both the live observer and the testbench build this from their respective config sources, giving a single path through instantiation.

type ComponentStateInfo

type ComponentStateInfo struct {
	Name    string
	Enabled bool
}

ComponentStateInfo describes a component currently active in the engine.

type CompressedGroup

type CompressedGroup struct {
	CorrelatorName string            `json:"correlator"`
	GroupID        string            `json:"groupId"`
	Title          string            `json:"title"`
	CommonTags     map[string]string `json:"commonTags"`
	Patterns       []MetricPattern   `json:"patterns"`
	MemberSources  []string          `json:"memberSources"`
	SeriesCount    int               `json:"seriesCount"`
	Precision      float64           `json:"precision"`
	FirstSeen      int64             `json:"firstSeen,omitempty"`
	LastUpdated    int64             `json:"lastUpdated,omitempty"`
}

CompressedGroup is a compact structural description of a correlated group of anomalies.

func CompressGroup

func CompressGroup(correlatorName, groupID, title string, members []seriesCompact, universe []seriesCompact, threshold float64) CompressedGroup

CompressGroup produces a CompressedGroup from a set of member series and a universe of all series.

type ConfigReader

type ConfigReader interface {
	GetBool(key string) bool
	GetInt(key string) int
	GetFloat64(key string) float64
	GetString(key string) string
	IsConfigured(key string) bool
}

ConfigReader provides read access to a key-value configuration source. This is a minimal interface satisfied by the agent's config.Component, allowing component configs to read values without depending on the full agent config package.

type ConnectionErrorExtractor

type ConnectionErrorExtractor struct{}

ConnectionErrorExtractor detects connection errors in logs and emits a connection.errors metric with inline MetricContext for anomaly enrichment.

func (*ConnectionErrorExtractor) Name

func (c *ConnectionErrorExtractor) Name() string

Name returns the detector name.

func (*ConnectionErrorExtractor) ProcessLog

ProcessLog checks if a log contains connection error patterns and returns a metric if so. Anomaly detection is handled by metrics detection on the count aggregation of the emitted metric.

type ConnectionErrorExtractorConfig

type ConnectionErrorExtractorConfig struct{}

ConnectionErrorExtractorConfig holds configuration for the ConnectionErrorExtractor.

func DefaultConnectionErrorExtractorConfig

func DefaultConnectionErrorExtractorConfig() ConnectionErrorExtractorConfig

DefaultConnectionErrorExtractorConfig returns a ConnectionErrorExtractorConfig with default values.

type CorrelatorConfig

type CorrelatorConfig struct {
	// WindowSeconds is the time window (in seconds) for clustering anomalies.
	// Anomalies with data timestamps older than (currentDataTime - WindowSeconds) are evicted.
	// Default: 30 seconds.
	WindowSeconds int64 `json:"window_seconds"`
}

CorrelatorConfig configures the CrossSignalCorrelator.

func DefaultCorrelatorConfig

func DefaultCorrelatorConfig() CorrelatorConfig

DefaultCorrelatorConfig returns a CorrelatorConfig with default values.

type CrossSignalCorrelator

type CrossSignalCorrelator struct {
	// contains filtered or unexported fields
}

CrossSignalCorrelator clusters anomalies from different sources within a time window and detects known patterns. It implements Correlator. Reporters read the current correlation state.

Time is derived entirely from input data timestamps (anomaly.Timestamp), making the correlator deterministic with respect to input data.

func NewCorrelator

func NewCorrelator(config CorrelatorConfig) *CrossSignalCorrelator

NewCorrelator creates a new CrossSignalCorrelator with the given config. If config has zero values, defaults are applied.

func (*CrossSignalCorrelator) ActiveCorrelations

func (c *CrossSignalCorrelator) ActiveCorrelations() []observer.ActiveCorrelation

ActiveCorrelations returns a copy of the currently active correlation patterns.

func (*CrossSignalCorrelator) Advance

func (c *CrossSignalCorrelator) Advance(dataTime int64)

Advance implements Correlator. It checks for known patterns in the anomaly buffer and updates activeCorrelations state.

func (*CrossSignalCorrelator) Name

func (c *CrossSignalCorrelator) Name() string

Name returns the correlator name.

func (*CrossSignalCorrelator) ProcessAnomaly

func (c *CrossSignalCorrelator) ProcessAnomaly(anomaly observer.Anomaly)

Process implements Correlator. It adds an anomaly to the buffer using its data timestamp and evicts old entries.

func (*CrossSignalCorrelator) Reset

func (c *CrossSignalCorrelator) Reset()

Reset clears all internal state for reanalysis.

type DebugView

type DebugView interface {
	StateView() StateView
	CatalogEntries() []CatalogEntry
	// Flush blocks until all observations queued in the dispatch channel have
	// been processed by the engine. The testbench calls this after feeding
	// parquet data to ensure StateView reflects all ingested observations.
	Flush()
	// Reset clears all engine state, resets storage, and reconfigures components.
	Reset(settings ComponentSettings)
	// GetReplayProgress returns lock-free replay progress counters.
	GetReplayProgress() ReplayProgress
	// SetReplayPhase updates the replay phase string for progress reporting.
	SetReplayPhase(phase string)
	// ExtractorCount returns the number of extractors active in the engine.
	ExtractorCount() int
	// AddTelemetry writes a data point into the engine's telemetry namespace.
	// Used by the testbench to store per-detector timing stats for UI display.
	AddTelemetry(name string, value float64, timestamp int64, tags []string)
	// ReplayStoredData resets analysis state (preserving extractor context)
	// then replays all data currently in storage through the scheduler in
	// chronological order. Call after Flush().
	ReplayStoredData()
	// StorageReader returns a read-only view of the engine's time-series storage.
	// Used by the testbench to compute windowed log rates in change messages.
	StorageReader() observerdef.StorageReader
	// IngestLogSync feeds a log directly into the engine, bypassing the
	// dispatch channel. Synchronous: returns after IngestLog and any
	// scheduler-triggered advances complete. Testbench-only — never call
	// from production hot paths; not safe to interleave with live ObserveLog.
	IngestLogSync(source string, msg observerdef.LogView)
	// IngestMetricSync feeds a metric directly into the engine, bypassing
	// the dispatch channel. Synchronous; same caveats as IngestLogSync.
	IngestMetricSync(source string, sample observerdef.MetricView)
}

DebugView is a read-only introspection surface implemented by observerImpl. It is defined in observer/impl (not observer/def) so production code is never coupled to the debug surface. The testbench obtains it via type assertion:

debug := obs.(observerimpl.DebugView)

type DetectDivergence

type DetectDivergence struct {
	DetectorName       string
	DataTime           int64
	LiveAnomalyCount   int
	ReplayAnomalyCount int
	LiveFingerprints   []string
	ReplayFingerprints []string
	InputHashMatch     bool
	LiveInputHash      uint64
	ReplayInputHash    uint64
}

DetectDivergence describes a mismatch between live and replay detection results.

func (DetectDivergence) String

func (d DetectDivergence) String() string

type DetectorPassthroughCorrelator

type DetectorPassthroughCorrelator struct {
	// contains filtered or unexported fields
}

DetectorPassthroughCorrelator passes all anomalies straight through, grouped by detector name. It does no time-clustering or filtering — each detector's anomalies become a separate ActiveCorrelation. This is used for Level 1 detector-specific evaluation where we want to score each detector's raw output independently.

func NewDetectorPassthroughCorrelator

func NewDetectorPassthroughCorrelator() *DetectorPassthroughCorrelator

NewDetectorPassthroughCorrelator creates a new DetectorPassthroughCorrelator.

func (*DetectorPassthroughCorrelator) ActiveCorrelations

func (c *DetectorPassthroughCorrelator) ActiveCorrelations() []observer.ActiveCorrelation

ActiveCorrelations returns one ActiveCorrelation per individual anomaly, sorted by detector name then timestamp.

Each anomaly becomes its own correlation with:

  • Pattern: "passthrough_{detectorName}_{index}"
  • Title: "Passthrough[{detectorName}]: {anomaly.Source}"
  • Anomalies: single-element slice containing the original anomaly
  • FirstSeen/LastUpdated: the anomaly's timestamp

When serialized via WriteObserverOutput, each correlation becomes an ObserverCorrelation where period_start == period_end == anomaly.Timestamp. This allows the scorer to evaluate each detection independently

func (*DetectorPassthroughCorrelator) Advance

func (c *DetectorPassthroughCorrelator) Advance(_ int64)

Advance is a no-op for the passthrough correlator (no windowing).

func (*DetectorPassthroughCorrelator) Name

Name returns the correlator name.

func (*DetectorPassthroughCorrelator) ProcessAnomaly

func (c *DetectorPassthroughCorrelator) ProcessAnomaly(anomaly observer.Anomaly)

ProcessAnomaly stores the anomaly, grouped by its DetectorName.

func (*DetectorPassthroughCorrelator) Reset

func (c *DetectorPassthroughCorrelator) Reset()

Reset clears all internal state for reanalysis.

type EvictedCluster

type EvictedCluster struct {
	GroupHash uint64
	ClusterID int64
}

EvictedCluster identifies a cluster that was removed during garbage collection, pairing its tag-group hash with its intra-clusterer ID.

type LogMetricsExtractor

type LogMetricsExtractor struct {
	// contains filtered or unexported fields
}

LogMetricsExtractor converts logs into timeseries metric outputs: - JSON logs: numeric fields -> Avg aggregation - Unstructured logs: pattern frequency -> Sum aggregation

This is intentionally minimal; cardinality controls live in the observer storage (Step 5).

func NewLogMetricsExtractor

func NewLogMetricsExtractor(config LogMetricsExtractorConfig) *LogMetricsExtractor

NewLogMetricsExtractor creates a LogMetricsExtractor with the given config.

func (*LogMetricsExtractor) Name

func (a *LogMetricsExtractor) Name() string

func (*LogMetricsExtractor) ProcessLog

type LogMetricsExtractorConfig

type LogMetricsExtractorConfig struct {
	// MaxEvalBytes caps how many bytes we evaluate for unstructured signature generation (0 = no cap).
	MaxEvalBytes int

	// IncludeFields, if non-empty, restricts JSON numeric extraction to these field names.
	IncludeFields map[string]struct{}
	// ExcludeFields always excludes JSON fields from numeric extraction.
	ExcludeFields map[string]struct{}
}

LogMetricsExtractorConfig holds configuration for the LogMetricsExtractor.

func DefaultLogMetricsExtractorConfig

func DefaultLogMetricsExtractorConfig() LogMetricsExtractorConfig

DefaultLogMetricsExtractorConfig returns a LogMetricsExtractorConfig with default values.

type LogPatternExtractor

type LogPatternExtractor struct {
	NextGarbageCollectionTime int64
	// contains filtered or unexported fields
}

LogPatternExtractor is a LogMetricsExtractor that clusters log messages into patterns and emits a count metric per pattern.

func NewLogPatternExtractor

func NewLogPatternExtractor(cfg LogPatternExtractorConfig) *LogPatternExtractor

NewLogPatternExtractor creates a new LogPatternExtractor. A zero-value cfg is accepted; zero fields fall back to DefaultLogPatternExtractorConfig values. MaxPatternsPerGroup and MaxTagGroups follow the same convention: 0 → default, negative → disabled (unbounded).

func (*LogPatternExtractor) Name

func (e *LogPatternExtractor) Name() string

Name returns the extractor name.

func (*LogPatternExtractor) ProcessLog

ProcessLog clusters the log message and emits a count metric for its pattern.

func (*LogPatternExtractor) Reset

func (e *LogPatternExtractor) Reset()

Reset clears clustering state so reanalysis starts from the currently observed logs. The registry is kept so that previously registered hashes remain resolvable.

type LogPatternExtractorConfig

type LogPatternExtractorConfig struct {
	// This will disable all optimizations like MinClusterSizeBeforeEmit, ClusterTimeToLiveSec, etc.
	DisableOptimizations bool `json:"disable_optimizations,omitempty"`
	// MinClusterSizeBeforeEmit is the minimum number of logs matching a pattern
	// before emitting metrics. Zero means the default from DefaultLogPatternExtractorConfig.
	MinClusterSizeBeforeEmit int `json:"min_cluster_size_before_emit,omitempty"`
	// MaxTokenizedStringLength caps input length before tokenization (0 = patterns default).
	MaxTokenizedStringLength int `json:"max_tokenized_string_length,omitempty"`
	// MaxNumTokens caps token count per message (0 = patterns default).
	MaxNumTokens int `json:"max_num_tokens,omitempty"`
	// ParseHexDump controls hex-dump recognition in the tokenizer. When nil, the
	// patterns package default applies (true).
	ParseHexDump *bool `json:"parse_hex_dump,omitempty"`
	// MinTokenMatchRatio is the minimum fraction of token positions (by value)
	// that must match for two log lines to merge into one pattern. Range (0,1];
	// zero means the default 0.5 (Drain-style).
	MinTokenMatchRatio float64 `json:"min_token_match_ratio,omitempty"`
	// ClusterTimeToLiveSec is how long (seconds) a cluster may go without a matching log before it is removed.
	// Zero disables cluster garbage collection.
	ClusterTimeToLiveSec int64 `json:"cluster_time_to_live_sec,omitempty"`
	// GarbageCollectionIntervalSec is the minimum time between GC passes when ClusterTimeToLiveSec > 0.
	GarbageCollectionIntervalSec int64 `json:"garbage_collection_interval_sec,omitempty"`
	// MaxPatternsPerGroup caps the number of live clusters in any single tag
	// group. When exceeded, the least-recently-seen cluster is evicted (LRU)
	// and its engine context is dropped. Zero means use the default; set
	// negative to disable. Bounds memory/series-cardinality on workloads with
	// high pattern diversity (e.g. container log churn).
	MaxPatternsPerGroup int `json:"max_patterns_per_group,omitempty"`
	// MaxTagGroups caps the number of distinct tag groups (source/service/env/host
	// combinations) tracked simultaneously. When exceeded, the least-recently-
	// touched group's clusters are all evicted at once. Zero means use the
	// default; set negative to disable.
	MaxTagGroups int `json:"max_tag_groups,omitempty"`
}

LogPatternExtractorConfig holds hyperparameters for the log pattern extractor.

func DefaultLogPatternExtractorConfig

func DefaultLogPatternExtractorConfig() LogPatternExtractorConfig

DefaultLogPatternExtractorConfig returns defaults aligned with the patterns package.

func (*LogPatternExtractorConfig) RefreshConfig

func (c *LogPatternExtractorConfig) RefreshConfig()

type MetricPattern

type MetricPattern struct {
	Pattern   string  `json:"pattern"`
	Matched   int     `json:"matched"`
	Universe  int     `json:"universe"`
	Precision float64 `json:"precision"`
}

MetricPattern describes a wildcard or exact metric name pattern within a compressed group.

type Provides

type Provides struct {
	Comp observerdef.Component
}

Provides defines the output of the observer component.

func NewComponent

func NewComponent(deps Requires) Provides

NewComponent creates an observer.Component.

type RRCFConfig

type RRCFConfig struct {
	// NumTrees is the number of trees in the forest. More trees = more robust but slower.
	NumTrees int `json:"num_trees"`
	// TreeSize is the maximum number of points per tree (sliding window size).
	TreeSize int `json:"tree_size"`
	// ShingleSize is the number of consecutive timestamps to combine into one point.
	// ShingleSize=4 means each "point" is 4 consecutive samples, enabling temporal pattern detection.
	ShingleSize int `json:"shingle_size"`
	// ThresholdSigma controls dynamic anomaly thresholding. A point is flagged if its
	// CoDisp score exceeds mean + ThresholdSigma*stddev of the recent score window.
	// Set to 0 to disable anomaly detection (scores still computed for analysis).
	ThresholdSigma float64 `json:"threshold_sigma"`
	// Metrics defines which series to include. If nil, uses DefaultRRCFMetrics().
	Metrics []RRCFMetricDef `json:"-"`
}

RRCFConfig holds configuration for the RRCF analysis.

func DefaultRRCFConfig

func DefaultRRCFConfig() RRCFConfig

DefaultRRCFConfig returns sensible defaults for RRCF.

type RRCFConfigSummary

type RRCFConfigSummary struct {
	NumTrees       int     `json:"numTrees"`
	TreeSize       int     `json:"treeSize"`
	ShingleSize    int     `json:"shingleSize"`
	ShingleDim     int     `json:"shingleDim"`
	ThresholdSigma float64 `json:"thresholdSigma"`
}

RRCFConfigSummary is a JSON-friendly summary of RRCF configuration.

type RRCFDetector

type RRCFDetector struct {
	// contains filtered or unexported fields
}

RRCFDetector implements multivariate anomaly detection using Robust Random Cut Forest. It queries multiple system metrics and detects unusual combinations/trajectories.

func NewRRCFDetector

func NewRRCFDetector(config RRCFConfig) *RRCFDetector

NewRRCFDetector creates an RRCF detector with the given config.

func (*RRCFDetector) Detect

func (r *RRCFDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult

Detect implements Detector. It queries storage for system metrics, builds multivariate shingles, and detects anomalies using RRCF.

func (*RRCFDetector) GetExtraData

func (r *RRCFDetector) GetExtraData() interface{}

GetExtraData implements ComponentDataProvider, exposing score stats via /api/components/rrcf/data.

func (*RRCFDetector) GetScoreStats

func (r *RRCFDetector) GetScoreStats() RRCFScoreStats

GetScoreStats returns distribution statistics and full score history.

func (*RRCFDetector) Name

func (r *RRCFDetector) Name() string

Name returns the detector name.

func (*RRCFDetector) Reset

func (r *RRCFDetector) Reset()

Reset clears all state, useful for testing or after major regime changes.

type RRCFMetricDef

type RRCFMetricDef struct {
	Namespace string
	Name      string
	Agg       observer.Aggregate
}

RRCFMetricDef defines a metric to include in the RRCF analysis.

func DefaultRRCFMetrics

func DefaultRRCFMetrics() []RRCFMetricDef

DefaultRRCFMetrics returns the default metric set for RRCF. These match cgroup v2 metrics from FGM parquet exports, which is the format used by the testbench scenarios where RRCF has been validated.

type RRCFScoreStats

type RRCFScoreStats struct {
	Enabled       bool              `json:"enabled"`
	SampleCount   int               `json:"sampleCount"`
	AlignedPoints int               `json:"alignedPoints"`
	ShinglesBuilt int               `json:"shinglesBuilt"`
	MinScore      float64           `json:"minScore"`
	MaxScore      float64           `json:"maxScore"`
	MeanScore     float64           `json:"meanScore"`
	StddevScore   float64           `json:"stddevScore"`
	P50           float64           `json:"p50"`
	P75           float64           `json:"p75"`
	P90           float64           `json:"p90"`
	P95           float64           `json:"p95"`
	P99           float64           `json:"p99"`
	Config        RRCFConfigSummary `json:"config"`
	Metrics       []string          `json:"metrics"`
	Scores        []RRCFScoredPoint `json:"scores"`
}

RRCFScoreStats contains distribution statistics and full score history for threshold analysis.

type RRCFScoredPoint

type RRCFScoredPoint struct {
	Timestamp int64   `json:"timestamp"`
	Score     float64 `json:"score"`
}

RRCFScoredPoint records a CoDisp score at a specific timestamp.

type ReplayProgress

type ReplayProgress struct {
	Phase           string `json:"phase"` // "", "loading", "detecting", "done"
	TimestampsDone  int64  `json:"timestampsDone"`
	TimestampsTotal int64  `json:"timestampsTotal"`
	Advances        int64  `json:"advances"`
	Anomalies       int64  `json:"anomalies"`
}

ReplayProgress holds lock-free replay progress counters.

type Requires

type Requires struct {
	Lifecycle compdef.Lifecycle
	Config    config.Component
	Log       log.Component
	Telemetry telemetry.Component

	// Recorder is an optional component for transparent metric recording.
	// If provided, all handles will be wrapped to record metrics to parquet files.
	Recorder option.Option[recorderdef.Component]

	// Reporters are provided by reporter/fx, reporter/fx-testbench, etc. via the
	// `anomalydetection_reporters` Fx group. Each reporter gets its own subscription
	// so it receives advance events independently. StorageConsumer reporters receive
	// storage for windowed log-rate annotations.
	Reporters []reporterdef.Reporter `group:"anomalydetection_reporters"`

	// HFRunner runs system and container checks at 1s and routes them into the
	// observer pipeline. The noop variant (hfrunner/fx-noop) is wired for the
	// main agent build; the real implementation lands with the algorithm PRs.
	HFRunner hfrunnerdef.Component
}

Requires declares the input types to the observer component constructor.

type ScanMWDetector

type ScanMWDetector struct {
	// MinSegment is the minimum number of points in each segment.
	// Default: 12
	MinSegment int

	// MinPoints is the minimum total points before detection runs.
	// Default: 30
	MinPoints int

	// SignificanceThreshold is the maximum p-value for the best split to be
	// considered a changepoint. Default: 1e-8
	SignificanceThreshold float64

	// MinEffectSize is the minimum |rank-biserial correlation| for reporting.
	// Default: 0.85
	MinEffectSize float64

	// MinDeviationMAD is the minimum |post_median - pre_median| / MAD.
	// Default: 3.0
	MinDeviationMAD float64

	// Aggregations to run detection on. Default: [Average, Count]
	Aggregations []observer.Aggregate
	// contains filtered or unexported fields
}

ScanMWDetector detects changepoints by scanning all possible split points with the Mann-Whitney U test. It picks the split that gives the most significant test result (smallest p-value), making it a non-parametric changepoint detector that's robust to distribution shape.

Uses an efficient O(n log n) implementation: ranks are assigned once via sorting, then the rank sum is updated incrementally as the split point moves.

Implements Detector (streaming) — after finding a changepoint, advances the segment start so subsequent scans only examine post-change data.

func NewScanMWDetector

func NewScanMWDetector() *ScanMWDetector

NewScanMWDetector creates a ScanMW detector with default settings.

func (*ScanMWDetector) Detect

func (d *ScanMWDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult

Detect implements Detector. It discovers series, reads segment data, and scans for changepoints. After finding one, the segment start advances so subsequent calls only examine post-change data.

Iteration pattern is the same as BOCPD (metrics_detector_bocpd.go:140-221) and ScanWelch — consider dedup if more scan-based detectors are added.

func (*ScanMWDetector) Name

func (d *ScanMWDetector) Name() string

Name returns the detector name.

func (*ScanMWDetector) RemoveSeries

func (d *ScanMWDetector) RemoveSeries(refs []observer.SeriesRef)

RemoveSeries drops segment-tracking state for refs that storage has freed. Each per-series entry holds a reusable point buffer that grows to the segment size, so without this teardown the map keeps growing with the cumulative series count even after storage shrinks. Called by the engine right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs.

func (*ScanMWDetector) Reset

func (d *ScanMWDetector) Reset()

Reset clears all per-series state for replay/reanalysis.

type ScanWelchDetector

type ScanWelchDetector struct {
	// MinSegment is the minimum number of points in each segment.
	MinSegment int

	// MinPoints is the minimum total points before detection runs.
	MinPoints int

	// MinTStatistic is the minimum |t| for the candidate selection phase.
	MinTStatistic float64

	// SignificanceThreshold is the maximum MW p-value for reporting.
	SignificanceThreshold float64

	// MinEffectSize is the minimum |rank-biserial correlation|.
	MinEffectSize float64

	// MinDeviationMAD is the minimum |post_median - pre_median| / MAD.
	MinDeviationMAD float64

	// Aggregations to run detection on. Default: [Average, Count]
	Aggregations []observer.Aggregate
	// contains filtered or unexported fields
}

ScanWelchDetector detects changepoints by scanning all possible split points with Welch's t-test for candidate selection, then verifies each candidate with a Mann-Whitney p-value filter and MAD-based deviation check.

This hybrid uses parametric detection (t-test finds mean shifts efficiently) combined with nonparametric verification (MW p-value for selectivity).

Implements Detector (streaming) — after finding a changepoint, advances the segment start so subsequent scans only examine post-change data.

func NewScanWelchDetector

func NewScanWelchDetector() *ScanWelchDetector

NewScanWelchDetector creates a ScanWelch detector with default settings.

func (*ScanWelchDetector) Detect

Detect implements Detector. Same iteration pattern as ScanMW and BOCPD — consider dedup if more scan-based detectors are added.

func (*ScanWelchDetector) Name

func (d *ScanWelchDetector) Name() string

Name returns the detector name.

func (*ScanWelchDetector) RemoveSeries

func (d *ScanWelchDetector) RemoveSeries(refs []observer.SeriesRef)

RemoveSeries drops segment-tracking state for refs that storage has freed. Each per-series entry holds a reusable point buffer that grows to the segment size, so without this teardown the map keeps growing with the cumulative series count even after storage shrinks. Called by the engine right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs.

func (*ScanWelchDetector) Reset

func (d *ScanWelchDetector) Reset()

Reset clears all per-series state for replay/reanalysis.

type StateView

type StateView interface {
	// Storage
	ListSeries(filter observerdef.SeriesFilter) []observerdef.SeriesMeta
	GetSeriesRange(ref observerdef.SeriesRef, start, end int64, agg observerdef.Aggregate) *observerdef.Series
	ScenarioBounds() (start, end int64, ok bool)

	// Anomalies
	Anomalies() []observerdef.Anomaly
	TotalAnomalyCount() int
	UniqueAnomalySourceCount() int
	DetectorAnomalies(name string) []observerdef.Anomaly
	AnomaliesByDetector() map[string][]observerdef.Anomaly
	AnomaliesForSource(sd observerdef.SeriesDescriptor) []observerdef.Anomaly

	// Correlations
	ActiveCorrelations() []observerdef.ActiveCorrelation
	CorrelationHistory() []observerdef.ActiveCorrelation

	// Detector / correlator metadata
	ListDetectors() []ComponentStateInfo
	ListCorrelators() []ComponentStateInfo

	// Timing
	LastAnalyzedTime() int64
	LatestDataTime() int64
	MaxTimestamp() int64

	// Storage stats (excluding a given namespace, typically TelemetryNamespace)
	TotalSeriesCount(excludeNamespace string) int
	TotalSampleCount(excludeNamespace string) int64

	// GetSeriesAll returns all points for a series.
	GetSeriesAll(ref observerdef.SeriesRef, agg observerdef.Aggregate) *observerdef.Series
}

StateView is a read-only window into engine state. All methods correspond to existing methods on the unexported stateView struct in stateview.go — they are being promoted to a public interface.

type TagGroupByKey

type TagGroupByKey struct {
	// Warning: Don't forget to update functions parsing tags when adding new fields
	Source  string
	Service string
	Env     string
	Host    string
}

TagGroupByKey holds the tags that are responsible for grouping logs into different clusters. Absent tags (e.g. a log with no "env" tag) are represented by an empty string.

func (TagGroupByKey) AsMap

func (c TagGroupByKey) AsMap() map[string]string

AsMap returns a map of non-empty tag key→value pairs for this group.

type TagGroupByKeyRegistry

type TagGroupByKeyRegistry struct {
	// contains filtered or unexported fields
}

TagGroupByKeyRegistry is a bidirectional, append-only store between a uint64 hash and a TagGroupByKey. It is NOT thread-safe; access must be confined to a single goroutine.

func NewTagGroupByKeyRegistry

func NewTagGroupByKeyRegistry() *TagGroupByKeyRegistry

NewTagGroupByKeyRegistry creates an empty TagGroupByKeyRegistry.

func (*TagGroupByKeyRegistry) Lookup

func (r *TagGroupByKeyRegistry) Lookup(hash uint64) (TagGroupByKey, bool)

Lookup returns the TagGroupByKey for the given hash, and whether it was found.

func (*TagGroupByKeyRegistry) Register

func (r *TagGroupByKeyRegistry) Register(group TagGroupByKey) uint64

Register inserts (or confirms) a TagGroupByKey and returns its stable hash. Calling Register twice with the same group returns the same hash.

type TaggedClusterEntry

type TaggedClusterEntry struct {
	GroupHash uint64
	Cluster   *patterns.Cluster
}

TaggedClusterEntry pairs a cluster with its tag-group hash so callers can compute the correct globalClusterHash.

type TaggedPatternClusterer

type TaggedPatternClusterer struct {

	// MaxClustersPerGroup, when > 0, is propagated as patterns.PatternClusterer.MaxClusters
	// on each newly created sub-clusterer; existing sub-clusterers are NOT
	// retroactively updated. Zero means unbounded.
	MaxClustersPerGroup int
	// MaxTagGroups, when > 0, caps the number of live sub-clusterers. When a new
	// tag group would push subClusterers past this size, the least-recently-
	// touched group (smallest entry in lastTouchByGroup) is evicted; all of
	// its clusters are surfaced via DrainLRUEvictions. Zero disables the cap.
	MaxTagGroups int
	// contains filtered or unexported fields
}

TaggedPatternClusterer wraps one *patterns.PatternClusterer per tag-group hash so that each unique (source, service, env, host) combination is clustered independently.

NOT thread-safe: all calls must be made from the same goroutine.

func NewTaggedPatternClusterer

func NewTaggedPatternClusterer(registry *TagGroupByKeyRegistry) *TaggedPatternClusterer

NewTaggedPatternClusterer creates a TaggedPatternClusterer that writes group hashes into registry.

func NewTaggedPatternClustererWithFactory

func NewTaggedPatternClustererWithFactory(registry *TagGroupByKeyRegistry, newPC func() *patterns.PatternClusterer) *TaggedPatternClusterer

NewTaggedPatternClustererWithFactory is like NewTaggedPatternClusterer but uses newPC to construct each per-tag-group sub-clusterer (e.g. to plug tokenizer hyperparameters).

func (*TaggedPatternClusterer) Classify

func (tc *TaggedPatternClusterer) Classify(groupHash uint64, message string) *patterns.Cluster

Classify returns the cluster that best matches message within the sub-clusterer identified by groupHash, or nil if no match is found.

func (*TaggedPatternClusterer) DrainLRUEvictions

func (tc *TaggedPatternClusterer) DrainLRUEvictions() []EvictedCluster

DrainLRUEvictions returns and clears all LRU evictions accumulated since the last call. Includes both per-group MaxClusters evictions and whole-group MaxTagGroups evictions. GC evictions go through GarbageCollectBefore instead.

func (*TaggedPatternClusterer) GarbageCollectBefore

func (tc *TaggedPatternClusterer) GarbageCollectBefore(cutoff int64) []EvictedCluster

GarbageCollectBefore removes all clusters whose LastSeenUnix is strictly less than cutoff from every sub-clusterer and returns the (GroupHash, ClusterID) pairs that were removed.

func (*TaggedPatternClusterer) GetAllClusters

func (tc *TaggedPatternClusterer) GetAllClusters() []TaggedClusterEntry

GetAllClusters returns every cluster across all sub-clusterers, each paired with its group hash.

func (*TaggedPatternClusterer) GetCluster

func (tc *TaggedPatternClusterer) GetCluster(groupHash uint64, clusterID int64) (*patterns.Cluster, error)

GetCluster retrieves a cluster by group hash and intra-clusterer ID.

func (*TaggedPatternClusterer) NumSubClusterers

func (tc *TaggedPatternClusterer) NumSubClusterers() int

NumSubClusterers returns the number of currently active sub-clusterers.

func (*TaggedPatternClusterer) Process

func (tc *TaggedPatternClusterer) Process(tags []string, message string, unixSec int64) (uint64, *patterns.Cluster, bool)

Process extracts the tag group from tags, routes the message to the matching sub-clusterer (created lazily), and returns the group hash plus the cluster. unixSec is Unix seconds for timestamp tracking (use time.Now().Unix() when unknown).

LRU evictions (if any) caused by this call — from per-group MaxClusters or global MaxTagGroups caps — must be retrieved via DrainLRUEvictions before the next Process call to avoid silently dropping eviction context.

func (*TaggedPatternClusterer) Reset

func (tc *TaggedPatternClusterer) Reset()

Reset drops all sub-clusterers. The registry is intentionally kept so that previously registered hashes remain resolvable after a reset. LRU bookkeeping (lastTouchByGroup, pending evictions) is also cleared.

type TimeClusterConfig

type TimeClusterConfig struct {
	// ProximitySeconds is the maximum time difference between anomaly timestamps
	// for them to be considered part of the same cluster.
	// Default: 10 seconds.
	ProximitySeconds int64 `json:"proximity_seconds"`

	// WindowSeconds is how long to keep anomalies before eviction.
	// Default: 60 seconds.
	WindowSeconds int64 `json:"window_seconds"`

	// MinClusterSize is the minimum number of anomalies a cluster must contain
	// to be included in output (ActiveCorrelations, GetClusters).
	// Clusters below this threshold are still tracked internally but not reported.
	// Default: 0 (no minimum).
	MinClusterSize int `json:"min_cluster_size"`
}

TimeClusterConfig configures the TimeClusterCorrelator.

func DefaultTimeClusterConfig

func DefaultTimeClusterConfig() TimeClusterConfig

DefaultTimeClusterConfig returns a TimeClusterConfig with default values.

type TimeClusterCorrelator

type TimeClusterCorrelator struct {
	// contains filtered or unexported fields
}

TimeClusterCorrelator clusters anomalies based on timestamp proximity. Anomalies whose timestamps are within ProximitySeconds of each other are grouped together.

func NewTimeClusterCorrelator

func NewTimeClusterCorrelator(config TimeClusterConfig) *TimeClusterCorrelator

NewTimeClusterCorrelator creates a new TimeClusterCorrelator with the given config.

func (*TimeClusterCorrelator) ActiveCorrelations

func (c *TimeClusterCorrelator) ActiveCorrelations() []observer.ActiveCorrelation

ActiveCorrelations returns clusters as active correlation patterns.

func (*TimeClusterCorrelator) Advance

func (c *TimeClusterCorrelator) Advance(dataTime int64)

Flush evicts old clusters and returns empty (reporters pull state via ActiveCorrelations).

func (*TimeClusterCorrelator) GetClusters

func (c *TimeClusterCorrelator) GetClusters() []TimeClusterInfo

GetClusters returns all clusters for visualization.

func (*TimeClusterCorrelator) GetExtraData

func (c *TimeClusterCorrelator) GetExtraData() interface{}

GetExtraData implements ComponentDataProvider.

func (*TimeClusterCorrelator) GetStats

func (c *TimeClusterCorrelator) GetStats() map[string]interface{}

GetStats returns statistics about the correlator state.

func (*TimeClusterCorrelator) Name

func (c *TimeClusterCorrelator) Name() string

Name returns the correlator name.

func (*TimeClusterCorrelator) ProcessAnomaly

func (c *TimeClusterCorrelator) ProcessAnomaly(anomaly observer.Anomaly)

Process adds an anomaly, either to an existing cluster or a new one.

func (*TimeClusterCorrelator) Reset

func (c *TimeClusterCorrelator) Reset()

Reset clears all internal state for reanalysis.

type TimeClusterInfo

type TimeClusterInfo struct {
	ID           int      `json:"id"`
	Sources      []string `json:"sources"`
	StartTime    int64    `json:"start_time"`
	EndTime      int64    `json:"end_time"`
	AnomalyCount int      `json:"anomaly_count"`
}

TimeClusterInfo represents a cluster for visualization.

Directories

Path Synopsis
Package patterns provides log tokenization and clustering utilities.
Package patterns provides log tokenization and clustering utilities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL