Documentation
¶
Overview ¶
Package observerimpl implements the observer component.
Index ¶
- Constants
- type AddResult
- type Aggregate
- type BOCPDConfig
- type BOCPDDetector
- type CUSUMConfig
- type CUSUMDetector
- type CatalogEntry
- type ComponentSettings
- type ComponentStateInfo
- type CompressedGroup
- type ConfigReader
- type ConnectionErrorExtractor
- type ConnectionErrorExtractorConfig
- type CorrelatorConfig
- type CrossSignalCorrelator
- type DebugView
- type DetectDivergence
- type DetectorPassthroughCorrelator
- func (c *DetectorPassthroughCorrelator) ActiveCorrelations() []observer.ActiveCorrelation
- func (c *DetectorPassthroughCorrelator) Advance(_ int64)
- func (c *DetectorPassthroughCorrelator) Name() string
- func (c *DetectorPassthroughCorrelator) ProcessAnomaly(anomaly observer.Anomaly)
- func (c *DetectorPassthroughCorrelator) Reset()
- type EvictedCluster
- type LogMetricsExtractor
- type LogMetricsExtractorConfig
- type LogPatternExtractor
- type LogPatternExtractorConfig
- type MetricPattern
- type Provides
- type RRCFConfig
- type RRCFConfigSummary
- type RRCFDetector
- type RRCFMetricDef
- type RRCFScoreStats
- type RRCFScoredPoint
- type ReplayProgress
- type Requires
- type ScanMWDetector
- type ScanWelchDetector
- type StateView
- type TagGroupByKey
- type TagGroupByKeyRegistry
- type TaggedClusterEntry
- type TaggedPatternClusterer
- func (tc *TaggedPatternClusterer) Classify(groupHash uint64, message string) *patterns.Cluster
- func (tc *TaggedPatternClusterer) DrainLRUEvictions() []EvictedCluster
- func (tc *TaggedPatternClusterer) GarbageCollectBefore(cutoff int64) []EvictedCluster
- func (tc *TaggedPatternClusterer) GetAllClusters() []TaggedClusterEntry
- func (tc *TaggedPatternClusterer) GetCluster(groupHash uint64, clusterID int64) (*patterns.Cluster, error)
- func (tc *TaggedPatternClusterer) NumSubClusterers() int
- func (tc *TaggedPatternClusterer) Process(tags []string, message string, unixSec int64) (uint64, *patterns.Cluster, bool)
- func (tc *TaggedPatternClusterer) Reset()
- type TimeClusterConfig
- type TimeClusterCorrelator
- func (c *TimeClusterCorrelator) ActiveCorrelations() []observer.ActiveCorrelation
- func (c *TimeClusterCorrelator) Advance(dataTime int64)
- func (c *TimeClusterCorrelator) GetClusters() []TimeClusterInfo
- func (c *TimeClusterCorrelator) GetExtraData() interface{}
- func (c *TimeClusterCorrelator) GetStats() map[string]interface{}
- func (c *TimeClusterCorrelator) Name() string
- func (c *TimeClusterCorrelator) ProcessAnomaly(anomaly observer.Anomaly)
- func (c *TimeClusterCorrelator) Reset()
- type TimeClusterInfo
Constants ¶
const ( AggregateAverage = observer.AggregateAverage AggregateSum = observer.AggregateSum AggregateCount = observer.AggregateCount AggregateMin = observer.AggregateMin AggregateMax = observer.AggregateMax )
Re-export aggregate constants for internal use.
const LogMetricsExtractorName = "log_metrics_extractor"
LogMetricsExtractorName is the canonical name for the log metrics extractor.
const LogPatternExtractorName = "log_pattern_extractor"
LogPatternExtractorName is the canonical name for the log pattern extractor. It is used as the storage namespace for emitted metrics, as the component name in the catalog, and in notify formatting for log-derived anomalies.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddResult ¶
type AddResult struct {
// IsNew is true if this Add created a brand-new series (cardinality +1).
IsNew bool
// Ref is the SeriesRef assigned to this point's series.
// -1 when the point is dropped (non-finite or sentinel values).
Ref observer.SeriesRef
}
AddResult bundles the outputs of timeSeriesStorage.Add.
type Aggregate ¶
Aggregate is an alias to the definition in the observer component for internal use.
type BOCPDConfig ¶
type BOCPDConfig struct {
// WarmupPoints is the number of initial points used for baseline estimation.
// A longer warmup captures more of the metric's natural variability, reducing
// false positives from normal fluctuation. Default: 120 (~2 minutes at 1Hz).
WarmupPoints int `json:"warmup_points"`
// Hazard is the constant changepoint hazard probability.
// Default: 0.05
Hazard float64 `json:"hazard"`
// CPThreshold is the posterior P(changepoint at t) threshold to emit.
// Default: 0.6
CPThreshold float64 `json:"cp_threshold"`
// ShortRunLength is the run-length horizon k for short-run posterior mass P(r_t <= k).
// Default: 5
ShortRunLength int `json:"short_run_length"`
// CPMassThreshold is the threshold for short-run posterior mass P(r_t <= k).
// Default: 0.7
CPMassThreshold float64 `json:"cp_mass_threshold"`
// MaxRunLength caps tracked run-length hypotheses for bounded compute.
// Default: 200
MaxRunLength int `json:"max_run_length"`
// PriorVarianceScale controls prior variance over the mean relative to observed variance.
// Default: 10.0
PriorVarianceScale float64 `json:"prior_variance_scale"`
// MinVariance is the floor for observation variance. When warmup data has
// near-zero variance (e.g. constant series), this prevents pathologically
// sharp PDFs that would flag any tiny fluctuation as anomalous. Default: 1.0
MinVariance float64 `json:"min_variance"`
// RecoveryPoints is how many consecutive non-triggering points are needed
// to exit alert state. Default: 10
RecoveryPoints int `json:"recovery_points"`
// Aggregations to run detection on. Default: [Average, Count]
Aggregations []observer.Aggregate `json:"-"`
}
BOCPDConfig holds configuration for the BOCPD detector.
func DefaultBOCPDConfig ¶
func DefaultBOCPDConfig() BOCPDConfig
DefaultBOCPDConfig returns a BOCPDConfig with default values.
type BOCPDDetector ¶
type BOCPDDetector struct {
// contains filtered or unexported fields
}
BOCPDDetector detects changepoints using Bayesian Online Changepoint Detection. This is a streaming, stateful Detector implementation that maintains per-series posterior state and processes only newly visible points on each advance.
func NewBOCPDDetector ¶
func NewBOCPDDetector(config BOCPDConfig) *BOCPDDetector
NewBOCPDDetector creates a streaming BOCPD detector with the given config. Zero-valued fields are filled from DefaultBOCPDConfig().
func (*BOCPDDetector) Detect ¶
func (b *BOCPDDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult
Detect implements Detector. It discovers series, reads only newly visible points, and updates per-series BOCPD posterior state incrementally.
Correctness takes priority over positional cursoring: storage may insert points into existing history, so this detector gates incremental work on visible point counts rather than raw slice positions.
func (*BOCPDDetector) RemoveSeries ¶
func (b *BOCPDDetector) RemoveSeries(refs []observer.SeriesRef)
RemoveSeries drops posterior state for refs that storage has freed. Each (ref, agg) entry in the per-series map carries six float64 arrays of size MaxRunLength+2 (~9.7 KB at default config), so without this teardown the map grows with the cumulative number of series ever seen even after their storage payload is gone. Called by the engine right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs.
func (*BOCPDDetector) Reset ¶
func (b *BOCPDDetector) Reset()
Reset clears all per-series state for replay/reanalysis.
type CUSUMConfig ¶
type CUSUMConfig struct {
// MinPoints is the minimum number of points required for analysis.
// Default: 5
MinPoints int `json:"min_points"`
// BaselineFraction is the fraction of points to use for baseline estimation.
// Default: 0.25 (first 25% of data)
BaselineFraction float64 `json:"baseline_fraction"`
// SlackFactor is multiplied by baseline stddev to get k (slack parameter).
// Higher values make detection less sensitive to small shifts.
// Default: 0.5
SlackFactor float64 `json:"slack_factor"`
// ThresholdFactor is multiplied by baseline stddev to get h (threshold).
// Higher values require larger cumulative deviation to trigger.
// Default: 4.0
ThresholdFactor float64 `json:"threshold_factor"`
}
CUSUMConfig holds configuration for the CUSUM detector.
func DefaultCUSUMConfig ¶
func DefaultCUSUMConfig() CUSUMConfig
DefaultCUSUMConfig returns a CUSUMConfig with default values.
type CUSUMDetector ¶
type CUSUMDetector struct {
// contains filtered or unexported fields
}
CUSUMDetector uses the Cumulative Sum (CUSUM) algorithm to detect when a metric shifts from its baseline. CUSUM is designed for detecting change points.
Algorithm:
S[0] = 0 S[t] = max(0, S[t-1] + (x[t] - μ - k))
Where μ is the baseline mean and k is the slack parameter (allowance for noise). An anomaly is emitted when S[t] first exceeds threshold h, representing the point of change detection.
func NewCUSUMDetector ¶
func NewCUSUMDetector(config CUSUMConfig) *CUSUMDetector
NewCUSUMDetector creates a CUSUMDetector with the given config. Zero-valued fields are filled from DefaultCUSUMConfig().
func (*CUSUMDetector) Detect ¶
func (c *CUSUMDetector) Detect(series observer.Series) observer.DetectionResult
Analyze runs CUSUM on the series and returns an anomaly if a shift is detected. The anomaly's Timestamp indicates when the shift was first detected (threshold crossing).
type CatalogEntry ¶
type CatalogEntry struct {
Name string
DisplayName string
Kind string // "detector", "correlator", or "extractor"
DefaultEnabled bool
}
CatalogEntry is a public view of a catalog component.
func TestbenchCatalogEntries ¶
func TestbenchCatalogEntries() []CatalogEntry
TestbenchCatalogEntries returns all component names and kinds from the testbench catalog. Used by the CLI to implement --only without hardcoding component lists.
type ComponentSettings ¶
type ComponentSettings struct {
// Enabled maps component name to whether it should be active.
// Components not listed use their catalog default.
Enabled map[string]bool
// contains filtered or unexported fields
}
ComponentSettings holds per-component configuration provided by the consumer. Both the live observer and the testbench build this from their respective config sources, giving a single path through instantiation.
type ComponentStateInfo ¶
ComponentStateInfo describes a component currently active in the engine.
type CompressedGroup ¶
type CompressedGroup struct {
CorrelatorName string `json:"correlator"`
GroupID string `json:"groupId"`
Title string `json:"title"`
CommonTags map[string]string `json:"commonTags"`
Patterns []MetricPattern `json:"patterns"`
MemberSources []string `json:"memberSources"`
SeriesCount int `json:"seriesCount"`
Precision float64 `json:"precision"`
FirstSeen int64 `json:"firstSeen,omitempty"`
LastUpdated int64 `json:"lastUpdated,omitempty"`
}
CompressedGroup is a compact structural description of a correlated group of anomalies.
func CompressGroup ¶
func CompressGroup(correlatorName, groupID, title string, members []seriesCompact, universe []seriesCompact, threshold float64) CompressedGroup
CompressGroup produces a CompressedGroup from a set of member series and a universe of all series.
type ConfigReader ¶
type ConfigReader interface {
GetBool(key string) bool
GetInt(key string) int
GetFloat64(key string) float64
GetString(key string) string
IsConfigured(key string) bool
}
ConfigReader provides read access to a key-value configuration source. This is a minimal interface satisfied by the agent's config.Component, allowing component configs to read values without depending on the full agent config package.
type ConnectionErrorExtractor ¶
type ConnectionErrorExtractor struct{}
ConnectionErrorExtractor detects connection errors in logs and emits a connection.errors metric with inline MetricContext for anomaly enrichment.
func (*ConnectionErrorExtractor) Name ¶
func (c *ConnectionErrorExtractor) Name() string
Name returns the detector name.
func (*ConnectionErrorExtractor) ProcessLog ¶
func (c *ConnectionErrorExtractor) ProcessLog(log observer.LogView) observer.LogMetricsExtractorOutput
ProcessLog checks if a log contains connection error patterns and returns a metric if so. Anomaly detection is handled by metrics detection on the count aggregation of the emitted metric.
type ConnectionErrorExtractorConfig ¶
type ConnectionErrorExtractorConfig struct{}
ConnectionErrorExtractorConfig holds configuration for the ConnectionErrorExtractor.
func DefaultConnectionErrorExtractorConfig ¶
func DefaultConnectionErrorExtractorConfig() ConnectionErrorExtractorConfig
DefaultConnectionErrorExtractorConfig returns a ConnectionErrorExtractorConfig with default values.
type CorrelatorConfig ¶
type CorrelatorConfig struct {
// WindowSeconds is the time window (in seconds) for clustering anomalies.
// Anomalies with data timestamps older than (currentDataTime - WindowSeconds) are evicted.
// Default: 30 seconds.
WindowSeconds int64 `json:"window_seconds"`
}
CorrelatorConfig configures the CrossSignalCorrelator.
func DefaultCorrelatorConfig ¶
func DefaultCorrelatorConfig() CorrelatorConfig
DefaultCorrelatorConfig returns a CorrelatorConfig with default values.
type CrossSignalCorrelator ¶
type CrossSignalCorrelator struct {
// contains filtered or unexported fields
}
CrossSignalCorrelator clusters anomalies from different sources within a time window and detects known patterns. It implements Correlator. Reporters read the current correlation state.
Time is derived entirely from input data timestamps (anomaly.Timestamp), making the correlator deterministic with respect to input data.
func NewCorrelator ¶
func NewCorrelator(config CorrelatorConfig) *CrossSignalCorrelator
NewCorrelator creates a new CrossSignalCorrelator with the given config. If config has zero values, defaults are applied.
func (*CrossSignalCorrelator) ActiveCorrelations ¶
func (c *CrossSignalCorrelator) ActiveCorrelations() []observer.ActiveCorrelation
ActiveCorrelations returns a copy of the currently active correlation patterns.
func (*CrossSignalCorrelator) Advance ¶
func (c *CrossSignalCorrelator) Advance(dataTime int64)
Advance implements Correlator. It checks for known patterns in the anomaly buffer and updates activeCorrelations state.
func (*CrossSignalCorrelator) Name ¶
func (c *CrossSignalCorrelator) Name() string
Name returns the correlator name.
func (*CrossSignalCorrelator) ProcessAnomaly ¶
func (c *CrossSignalCorrelator) ProcessAnomaly(anomaly observer.Anomaly)
Process implements Correlator. It adds an anomaly to the buffer using its data timestamp and evicts old entries.
func (*CrossSignalCorrelator) Reset ¶
func (c *CrossSignalCorrelator) Reset()
Reset clears all internal state for reanalysis.
type DebugView ¶
type DebugView interface {
StateView() StateView
CatalogEntries() []CatalogEntry
// Flush blocks until all observations queued in the dispatch channel have
// been processed by the engine. The testbench calls this after feeding
// parquet data to ensure StateView reflects all ingested observations.
Flush()
// Reset clears all engine state, resets storage, and reconfigures components.
Reset(settings ComponentSettings)
// GetReplayProgress returns lock-free replay progress counters.
GetReplayProgress() ReplayProgress
// SetReplayPhase updates the replay phase string for progress reporting.
SetReplayPhase(phase string)
// ExtractorCount returns the number of extractors active in the engine.
ExtractorCount() int
// AddTelemetry writes a data point into the engine's telemetry namespace.
// Used by the testbench to store per-detector timing stats for UI display.
AddTelemetry(name string, value float64, timestamp int64, tags []string)
// ReplayStoredData resets analysis state (preserving extractor context)
// then replays all data currently in storage through the scheduler in
// chronological order. Call after Flush().
ReplayStoredData()
// StorageReader returns a read-only view of the engine's time-series storage.
// Used by the testbench to compute windowed log rates in change messages.
StorageReader() observerdef.StorageReader
// IngestLogSync feeds a log directly into the engine, bypassing the
// dispatch channel. Synchronous: returns after IngestLog and any
// scheduler-triggered advances complete. Testbench-only — never call
// from production hot paths; not safe to interleave with live ObserveLog.
IngestLogSync(source string, msg observerdef.LogView)
// IngestMetricSync feeds a metric directly into the engine, bypassing
// the dispatch channel. Synchronous; same caveats as IngestLogSync.
IngestMetricSync(source string, sample observerdef.MetricView)
}
DebugView is a read-only introspection surface implemented by observerImpl. It is defined in observer/impl (not observer/def) so production code is never coupled to the debug surface. The testbench obtains it via type assertion:
debug := obs.(observerimpl.DebugView)
type DetectDivergence ¶
type DetectDivergence struct {
DetectorName string
DataTime int64
LiveAnomalyCount int
ReplayAnomalyCount int
LiveFingerprints []string
ReplayFingerprints []string
InputHashMatch bool
LiveInputHash uint64
ReplayInputHash uint64
}
DetectDivergence describes a mismatch between live and replay detection results.
func (DetectDivergence) String ¶
func (d DetectDivergence) String() string
type DetectorPassthroughCorrelator ¶
type DetectorPassthroughCorrelator struct {
// contains filtered or unexported fields
}
DetectorPassthroughCorrelator passes all anomalies straight through, grouped by detector name. It does no time-clustering or filtering — each detector's anomalies become a separate ActiveCorrelation. This is used for Level 1 detector-specific evaluation where we want to score each detector's raw output independently.
func NewDetectorPassthroughCorrelator ¶
func NewDetectorPassthroughCorrelator() *DetectorPassthroughCorrelator
NewDetectorPassthroughCorrelator creates a new DetectorPassthroughCorrelator.
func (*DetectorPassthroughCorrelator) ActiveCorrelations ¶
func (c *DetectorPassthroughCorrelator) ActiveCorrelations() []observer.ActiveCorrelation
ActiveCorrelations returns one ActiveCorrelation per individual anomaly, sorted by detector name then timestamp.
Each anomaly becomes its own correlation with:
- Pattern: "passthrough_{detectorName}_{index}"
- Title: "Passthrough[{detectorName}]: {anomaly.Source}"
- Anomalies: single-element slice containing the original anomaly
- FirstSeen/LastUpdated: the anomaly's timestamp
When serialized via WriteObserverOutput, each correlation becomes an ObserverCorrelation where period_start == period_end == anomaly.Timestamp. This allows the scorer to evaluate each detection independently
func (*DetectorPassthroughCorrelator) Advance ¶
func (c *DetectorPassthroughCorrelator) Advance(_ int64)
Advance is a no-op for the passthrough correlator (no windowing).
func (*DetectorPassthroughCorrelator) Name ¶
func (c *DetectorPassthroughCorrelator) Name() string
Name returns the correlator name.
func (*DetectorPassthroughCorrelator) ProcessAnomaly ¶
func (c *DetectorPassthroughCorrelator) ProcessAnomaly(anomaly observer.Anomaly)
ProcessAnomaly stores the anomaly, grouped by its DetectorName.
func (*DetectorPassthroughCorrelator) Reset ¶
func (c *DetectorPassthroughCorrelator) Reset()
Reset clears all internal state for reanalysis.
type EvictedCluster ¶
EvictedCluster identifies a cluster that was removed during garbage collection, pairing its tag-group hash with its intra-clusterer ID.
type LogMetricsExtractor ¶
type LogMetricsExtractor struct {
// contains filtered or unexported fields
}
LogMetricsExtractor converts logs into timeseries metric outputs: - JSON logs: numeric fields -> Avg aggregation - Unstructured logs: pattern frequency -> Sum aggregation
This is intentionally minimal; cardinality controls live in the observer storage (Step 5).
func NewLogMetricsExtractor ¶
func NewLogMetricsExtractor(config LogMetricsExtractorConfig) *LogMetricsExtractor
NewLogMetricsExtractor creates a LogMetricsExtractor with the given config.
func (*LogMetricsExtractor) Name ¶
func (a *LogMetricsExtractor) Name() string
func (*LogMetricsExtractor) ProcessLog ¶
func (a *LogMetricsExtractor) ProcessLog(log observer.LogView) observer.LogMetricsExtractorOutput
type LogMetricsExtractorConfig ¶
type LogMetricsExtractorConfig struct {
// MaxEvalBytes caps how many bytes we evaluate for unstructured signature generation (0 = no cap).
MaxEvalBytes int
// IncludeFields, if non-empty, restricts JSON numeric extraction to these field names.
IncludeFields map[string]struct{}
// ExcludeFields always excludes JSON fields from numeric extraction.
ExcludeFields map[string]struct{}
}
LogMetricsExtractorConfig holds configuration for the LogMetricsExtractor.
func DefaultLogMetricsExtractorConfig ¶
func DefaultLogMetricsExtractorConfig() LogMetricsExtractorConfig
DefaultLogMetricsExtractorConfig returns a LogMetricsExtractorConfig with default values.
type LogPatternExtractor ¶
type LogPatternExtractor struct {
NextGarbageCollectionTime int64
// contains filtered or unexported fields
}
LogPatternExtractor is a LogMetricsExtractor that clusters log messages into patterns and emits a count metric per pattern.
func NewLogPatternExtractor ¶
func NewLogPatternExtractor(cfg LogPatternExtractorConfig) *LogPatternExtractor
NewLogPatternExtractor creates a new LogPatternExtractor. A zero-value cfg is accepted; zero fields fall back to DefaultLogPatternExtractorConfig values. MaxPatternsPerGroup and MaxTagGroups follow the same convention: 0 → default, negative → disabled (unbounded).
func (*LogPatternExtractor) Name ¶
func (e *LogPatternExtractor) Name() string
Name returns the extractor name.
func (*LogPatternExtractor) ProcessLog ¶
func (e *LogPatternExtractor) ProcessLog(log observerdef.LogView) observerdef.LogMetricsExtractorOutput
ProcessLog clusters the log message and emits a count metric for its pattern.
func (*LogPatternExtractor) Reset ¶
func (e *LogPatternExtractor) Reset()
Reset clears clustering state so reanalysis starts from the currently observed logs. The registry is kept so that previously registered hashes remain resolvable.
type LogPatternExtractorConfig ¶
type LogPatternExtractorConfig struct {
// This will disable all optimizations like MinClusterSizeBeforeEmit, ClusterTimeToLiveSec, etc.
DisableOptimizations bool `json:"disable_optimizations,omitempty"`
// MinClusterSizeBeforeEmit is the minimum number of logs matching a pattern
// before emitting metrics. Zero means the default from DefaultLogPatternExtractorConfig.
MinClusterSizeBeforeEmit int `json:"min_cluster_size_before_emit,omitempty"`
// MaxTokenizedStringLength caps input length before tokenization (0 = patterns default).
MaxTokenizedStringLength int `json:"max_tokenized_string_length,omitempty"`
// MaxNumTokens caps token count per message (0 = patterns default).
MaxNumTokens int `json:"max_num_tokens,omitempty"`
// ParseHexDump controls hex-dump recognition in the tokenizer. When nil, the
// patterns package default applies (true).
ParseHexDump *bool `json:"parse_hex_dump,omitempty"`
// MinTokenMatchRatio is the minimum fraction of token positions (by value)
// that must match for two log lines to merge into one pattern. Range (0,1];
// zero means the default 0.5 (Drain-style).
MinTokenMatchRatio float64 `json:"min_token_match_ratio,omitempty"`
// ClusterTimeToLiveSec is how long (seconds) a cluster may go without a matching log before it is removed.
// Zero disables cluster garbage collection.
ClusterTimeToLiveSec int64 `json:"cluster_time_to_live_sec,omitempty"`
// GarbageCollectionIntervalSec is the minimum time between GC passes when ClusterTimeToLiveSec > 0.
GarbageCollectionIntervalSec int64 `json:"garbage_collection_interval_sec,omitempty"`
// MaxPatternsPerGroup caps the number of live clusters in any single tag
// group. When exceeded, the least-recently-seen cluster is evicted (LRU)
// and its engine context is dropped. Zero means use the default; set
// negative to disable. Bounds memory/series-cardinality on workloads with
// high pattern diversity (e.g. container log churn).
MaxPatternsPerGroup int `json:"max_patterns_per_group,omitempty"`
// MaxTagGroups caps the number of distinct tag groups (source/service/env/host
// combinations) tracked simultaneously. When exceeded, the least-recently-
// touched group's clusters are all evicted at once. Zero means use the
// default; set negative to disable.
MaxTagGroups int `json:"max_tag_groups,omitempty"`
}
LogPatternExtractorConfig holds hyperparameters for the log pattern extractor.
func DefaultLogPatternExtractorConfig ¶
func DefaultLogPatternExtractorConfig() LogPatternExtractorConfig
DefaultLogPatternExtractorConfig returns defaults aligned with the patterns package.
func (*LogPatternExtractorConfig) RefreshConfig ¶
func (c *LogPatternExtractorConfig) RefreshConfig()
type MetricPattern ¶
type MetricPattern struct {
Pattern string `json:"pattern"`
Matched int `json:"matched"`
Universe int `json:"universe"`
Precision float64 `json:"precision"`
}
MetricPattern describes a wildcard or exact metric name pattern within a compressed group.
type Provides ¶
type Provides struct {
Comp observerdef.Component
}
Provides defines the output of the observer component.
func NewComponent ¶
NewComponent creates an observer.Component.
type RRCFConfig ¶
type RRCFConfig struct {
// NumTrees is the number of trees in the forest. More trees = more robust but slower.
NumTrees int `json:"num_trees"`
// TreeSize is the maximum number of points per tree (sliding window size).
TreeSize int `json:"tree_size"`
// ShingleSize is the number of consecutive timestamps to combine into one point.
// ShingleSize=4 means each "point" is 4 consecutive samples, enabling temporal pattern detection.
ShingleSize int `json:"shingle_size"`
// ThresholdSigma controls dynamic anomaly thresholding. A point is flagged if its
// CoDisp score exceeds mean + ThresholdSigma*stddev of the recent score window.
// Set to 0 to disable anomaly detection (scores still computed for analysis).
ThresholdSigma float64 `json:"threshold_sigma"`
// Metrics defines which series to include. If nil, uses DefaultRRCFMetrics().
Metrics []RRCFMetricDef `json:"-"`
}
RRCFConfig holds configuration for the RRCF analysis.
func DefaultRRCFConfig ¶
func DefaultRRCFConfig() RRCFConfig
DefaultRRCFConfig returns sensible defaults for RRCF.
type RRCFConfigSummary ¶
type RRCFConfigSummary struct {
NumTrees int `json:"numTrees"`
TreeSize int `json:"treeSize"`
ShingleSize int `json:"shingleSize"`
ShingleDim int `json:"shingleDim"`
ThresholdSigma float64 `json:"thresholdSigma"`
}
RRCFConfigSummary is a JSON-friendly summary of RRCF configuration.
type RRCFDetector ¶
type RRCFDetector struct {
// contains filtered or unexported fields
}
RRCFDetector implements multivariate anomaly detection using Robust Random Cut Forest. It queries multiple system metrics and detects unusual combinations/trajectories.
func NewRRCFDetector ¶
func NewRRCFDetector(config RRCFConfig) *RRCFDetector
NewRRCFDetector creates an RRCF detector with the given config.
func (*RRCFDetector) Detect ¶
func (r *RRCFDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult
Detect implements Detector. It queries storage for system metrics, builds multivariate shingles, and detects anomalies using RRCF.
func (*RRCFDetector) GetExtraData ¶
func (r *RRCFDetector) GetExtraData() interface{}
GetExtraData implements ComponentDataProvider, exposing score stats via /api/components/rrcf/data.
func (*RRCFDetector) GetScoreStats ¶
func (r *RRCFDetector) GetScoreStats() RRCFScoreStats
GetScoreStats returns distribution statistics and full score history.
func (*RRCFDetector) Reset ¶
func (r *RRCFDetector) Reset()
Reset clears all state, useful for testing or after major regime changes.
type RRCFMetricDef ¶
RRCFMetricDef defines a metric to include in the RRCF analysis.
func DefaultRRCFMetrics ¶
func DefaultRRCFMetrics() []RRCFMetricDef
DefaultRRCFMetrics returns the default metric set for RRCF. These match cgroup v2 metrics from FGM parquet exports, which is the format used by the testbench scenarios where RRCF has been validated.
type RRCFScoreStats ¶
type RRCFScoreStats struct {
Enabled bool `json:"enabled"`
SampleCount int `json:"sampleCount"`
AlignedPoints int `json:"alignedPoints"`
ShinglesBuilt int `json:"shinglesBuilt"`
MinScore float64 `json:"minScore"`
MaxScore float64 `json:"maxScore"`
MeanScore float64 `json:"meanScore"`
StddevScore float64 `json:"stddevScore"`
P50 float64 `json:"p50"`
P75 float64 `json:"p75"`
P90 float64 `json:"p90"`
P95 float64 `json:"p95"`
P99 float64 `json:"p99"`
Config RRCFConfigSummary `json:"config"`
Metrics []string `json:"metrics"`
Scores []RRCFScoredPoint `json:"scores"`
}
RRCFScoreStats contains distribution statistics and full score history for threshold analysis.
type RRCFScoredPoint ¶
RRCFScoredPoint records a CoDisp score at a specific timestamp.
type ReplayProgress ¶
type ReplayProgress struct {
Phase string `json:"phase"` // "", "loading", "detecting", "done"
TimestampsDone int64 `json:"timestampsDone"`
TimestampsTotal int64 `json:"timestampsTotal"`
Advances int64 `json:"advances"`
Anomalies int64 `json:"anomalies"`
}
ReplayProgress holds lock-free replay progress counters.
type Requires ¶
type Requires struct {
Lifecycle compdef.Lifecycle
Config config.Component
Log log.Component
Telemetry telemetry.Component
// Recorder is an optional component for transparent metric recording.
// If provided, all handles will be wrapped to record metrics to parquet files.
Recorder option.Option[recorderdef.Component]
// Reporters are provided by reporter/fx, reporter/fx-testbench, etc. via the
// `anomalydetection_reporters` Fx group. Each reporter gets its own subscription
// so it receives advance events independently. StorageConsumer reporters receive
// storage for windowed log-rate annotations.
Reporters []reporterdef.Reporter `group:"anomalydetection_reporters"`
// HFRunner runs system and container checks at 1s and routes them into the
// observer pipeline. The noop variant (hfrunner/fx-noop) is wired for the
// main agent build; the real implementation lands with the algorithm PRs.
HFRunner hfrunnerdef.Component
}
Requires declares the input types to the observer component constructor.
type ScanMWDetector ¶
type ScanMWDetector struct {
// MinSegment is the minimum number of points in each segment.
// Default: 12
MinSegment int
// MinPoints is the minimum total points before detection runs.
// Default: 30
MinPoints int
// SignificanceThreshold is the maximum p-value for the best split to be
// considered a changepoint. Default: 1e-8
SignificanceThreshold float64
// MinEffectSize is the minimum |rank-biserial correlation| for reporting.
// Default: 0.85
MinEffectSize float64
// MinDeviationMAD is the minimum |post_median - pre_median| / MAD.
// Default: 3.0
MinDeviationMAD float64
// Aggregations to run detection on. Default: [Average, Count]
Aggregations []observer.Aggregate
// contains filtered or unexported fields
}
ScanMWDetector detects changepoints by scanning all possible split points with the Mann-Whitney U test. It picks the split that gives the most significant test result (smallest p-value), making it a non-parametric changepoint detector that's robust to distribution shape.
Uses an efficient O(n log n) implementation: ranks are assigned once via sorting, then the rank sum is updated incrementally as the split point moves.
Implements Detector (streaming) — after finding a changepoint, advances the segment start so subsequent scans only examine post-change data.
func NewScanMWDetector ¶
func NewScanMWDetector() *ScanMWDetector
NewScanMWDetector creates a ScanMW detector with default settings.
func (*ScanMWDetector) Detect ¶
func (d *ScanMWDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult
Detect implements Detector. It discovers series, reads segment data, and scans for changepoints. After finding one, the segment start advances so subsequent calls only examine post-change data.
Iteration pattern is the same as BOCPD (metrics_detector_bocpd.go:140-221) and ScanWelch — consider dedup if more scan-based detectors are added.
func (*ScanMWDetector) Name ¶
func (d *ScanMWDetector) Name() string
Name returns the detector name.
func (*ScanMWDetector) RemoveSeries ¶
func (d *ScanMWDetector) RemoveSeries(refs []observer.SeriesRef)
RemoveSeries drops segment-tracking state for refs that storage has freed. Each per-series entry holds a reusable point buffer that grows to the segment size, so without this teardown the map keeps growing with the cumulative series count even after storage shrinks. Called by the engine right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs.
func (*ScanMWDetector) Reset ¶
func (d *ScanMWDetector) Reset()
Reset clears all per-series state for replay/reanalysis.
type ScanWelchDetector ¶
type ScanWelchDetector struct {
// MinSegment is the minimum number of points in each segment.
MinSegment int
// MinPoints is the minimum total points before detection runs.
MinPoints int
// MinTStatistic is the minimum |t| for the candidate selection phase.
MinTStatistic float64
// SignificanceThreshold is the maximum MW p-value for reporting.
SignificanceThreshold float64
// MinEffectSize is the minimum |rank-biserial correlation|.
MinEffectSize float64
// MinDeviationMAD is the minimum |post_median - pre_median| / MAD.
MinDeviationMAD float64
// Aggregations to run detection on. Default: [Average, Count]
Aggregations []observer.Aggregate
// contains filtered or unexported fields
}
ScanWelchDetector detects changepoints by scanning all possible split points with Welch's t-test for candidate selection, then verifies each candidate with a Mann-Whitney p-value filter and MAD-based deviation check.
This hybrid uses parametric detection (t-test finds mean shifts efficiently) combined with nonparametric verification (MW p-value for selectivity).
Implements Detector (streaming) — after finding a changepoint, advances the segment start so subsequent scans only examine post-change data.
func NewScanWelchDetector ¶
func NewScanWelchDetector() *ScanWelchDetector
NewScanWelchDetector creates a ScanWelch detector with default settings.
func (*ScanWelchDetector) Detect ¶
func (d *ScanWelchDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult
Detect implements Detector. Same iteration pattern as ScanMW and BOCPD — consider dedup if more scan-based detectors are added.
func (*ScanWelchDetector) Name ¶
func (d *ScanWelchDetector) Name() string
Name returns the detector name.
func (*ScanWelchDetector) RemoveSeries ¶
func (d *ScanWelchDetector) RemoveSeries(refs []observer.SeriesRef)
RemoveSeries drops segment-tracking state for refs that storage has freed. Each per-series entry holds a reusable point buffer that grows to the segment size, so without this teardown the map keeps growing with the cumulative series count even after storage shrinks. Called by the engine right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs.
func (*ScanWelchDetector) Reset ¶
func (d *ScanWelchDetector) Reset()
Reset clears all per-series state for replay/reanalysis.
type StateView ¶
type StateView interface {
// Storage
ListSeries(filter observerdef.SeriesFilter) []observerdef.SeriesMeta
GetSeriesRange(ref observerdef.SeriesRef, start, end int64, agg observerdef.Aggregate) *observerdef.Series
ScenarioBounds() (start, end int64, ok bool)
// Anomalies
Anomalies() []observerdef.Anomaly
TotalAnomalyCount() int
UniqueAnomalySourceCount() int
DetectorAnomalies(name string) []observerdef.Anomaly
AnomaliesByDetector() map[string][]observerdef.Anomaly
AnomaliesForSource(sd observerdef.SeriesDescriptor) []observerdef.Anomaly
// Correlations
ActiveCorrelations() []observerdef.ActiveCorrelation
CorrelationHistory() []observerdef.ActiveCorrelation
// Detector / correlator metadata
ListDetectors() []ComponentStateInfo
ListCorrelators() []ComponentStateInfo
// Timing
LastAnalyzedTime() int64
LatestDataTime() int64
MaxTimestamp() int64
// Storage stats (excluding a given namespace, typically TelemetryNamespace)
TotalSeriesCount(excludeNamespace string) int
TotalSampleCount(excludeNamespace string) int64
// GetSeriesAll returns all points for a series.
GetSeriesAll(ref observerdef.SeriesRef, agg observerdef.Aggregate) *observerdef.Series
}
StateView is a read-only window into engine state. All methods correspond to existing methods on the unexported stateView struct in stateview.go — they are being promoted to a public interface.
type TagGroupByKey ¶
type TagGroupByKey struct {
// Warning: Don't forget to update functions parsing tags when adding new fields
Source string
Service string
Env string
Host string
}
TagGroupByKey holds the tags that are responsible for grouping logs into different clusters. Absent tags (e.g. a log with no "env" tag) are represented by an empty string.
func (TagGroupByKey) AsMap ¶
func (c TagGroupByKey) AsMap() map[string]string
AsMap returns a map of non-empty tag key→value pairs for this group.
type TagGroupByKeyRegistry ¶
type TagGroupByKeyRegistry struct {
// contains filtered or unexported fields
}
TagGroupByKeyRegistry is a bidirectional, append-only store between a uint64 hash and a TagGroupByKey. It is NOT thread-safe; access must be confined to a single goroutine.
func NewTagGroupByKeyRegistry ¶
func NewTagGroupByKeyRegistry() *TagGroupByKeyRegistry
NewTagGroupByKeyRegistry creates an empty TagGroupByKeyRegistry.
func (*TagGroupByKeyRegistry) Lookup ¶
func (r *TagGroupByKeyRegistry) Lookup(hash uint64) (TagGroupByKey, bool)
Lookup returns the TagGroupByKey for the given hash, and whether it was found.
func (*TagGroupByKeyRegistry) Register ¶
func (r *TagGroupByKeyRegistry) Register(group TagGroupByKey) uint64
Register inserts (or confirms) a TagGroupByKey and returns its stable hash. Calling Register twice with the same group returns the same hash.
type TaggedClusterEntry ¶
TaggedClusterEntry pairs a cluster with its tag-group hash so callers can compute the correct globalClusterHash.
type TaggedPatternClusterer ¶
type TaggedPatternClusterer struct {
// MaxClustersPerGroup, when > 0, is propagated as patterns.PatternClusterer.MaxClusters
// on each newly created sub-clusterer; existing sub-clusterers are NOT
// retroactively updated. Zero means unbounded.
MaxClustersPerGroup int
// MaxTagGroups, when > 0, caps the number of live sub-clusterers. When a new
// tag group would push subClusterers past this size, the least-recently-
// touched group (smallest entry in lastTouchByGroup) is evicted; all of
// its clusters are surfaced via DrainLRUEvictions. Zero disables the cap.
MaxTagGroups int
// contains filtered or unexported fields
}
TaggedPatternClusterer wraps one *patterns.PatternClusterer per tag-group hash so that each unique (source, service, env, host) combination is clustered independently.
NOT thread-safe: all calls must be made from the same goroutine.
func NewTaggedPatternClusterer ¶
func NewTaggedPatternClusterer(registry *TagGroupByKeyRegistry) *TaggedPatternClusterer
NewTaggedPatternClusterer creates a TaggedPatternClusterer that writes group hashes into registry.
func NewTaggedPatternClustererWithFactory ¶
func NewTaggedPatternClustererWithFactory(registry *TagGroupByKeyRegistry, newPC func() *patterns.PatternClusterer) *TaggedPatternClusterer
NewTaggedPatternClustererWithFactory is like NewTaggedPatternClusterer but uses newPC to construct each per-tag-group sub-clusterer (e.g. to plug tokenizer hyperparameters).
func (*TaggedPatternClusterer) Classify ¶
func (tc *TaggedPatternClusterer) Classify(groupHash uint64, message string) *patterns.Cluster
Classify returns the cluster that best matches message within the sub-clusterer identified by groupHash, or nil if no match is found.
func (*TaggedPatternClusterer) DrainLRUEvictions ¶
func (tc *TaggedPatternClusterer) DrainLRUEvictions() []EvictedCluster
DrainLRUEvictions returns and clears all LRU evictions accumulated since the last call. Includes both per-group MaxClusters evictions and whole-group MaxTagGroups evictions. GC evictions go through GarbageCollectBefore instead.
func (*TaggedPatternClusterer) GarbageCollectBefore ¶
func (tc *TaggedPatternClusterer) GarbageCollectBefore(cutoff int64) []EvictedCluster
GarbageCollectBefore removes all clusters whose LastSeenUnix is strictly less than cutoff from every sub-clusterer and returns the (GroupHash, ClusterID) pairs that were removed.
func (*TaggedPatternClusterer) GetAllClusters ¶
func (tc *TaggedPatternClusterer) GetAllClusters() []TaggedClusterEntry
GetAllClusters returns every cluster across all sub-clusterers, each paired with its group hash.
func (*TaggedPatternClusterer) GetCluster ¶
func (tc *TaggedPatternClusterer) GetCluster(groupHash uint64, clusterID int64) (*patterns.Cluster, error)
GetCluster retrieves a cluster by group hash and intra-clusterer ID.
func (*TaggedPatternClusterer) NumSubClusterers ¶
func (tc *TaggedPatternClusterer) NumSubClusterers() int
NumSubClusterers returns the number of currently active sub-clusterers.
func (*TaggedPatternClusterer) Process ¶
func (tc *TaggedPatternClusterer) Process(tags []string, message string, unixSec int64) (uint64, *patterns.Cluster, bool)
Process extracts the tag group from tags, routes the message to the matching sub-clusterer (created lazily), and returns the group hash plus the cluster. unixSec is Unix seconds for timestamp tracking (use time.Now().Unix() when unknown).
LRU evictions (if any) caused by this call — from per-group MaxClusters or global MaxTagGroups caps — must be retrieved via DrainLRUEvictions before the next Process call to avoid silently dropping eviction context.
func (*TaggedPatternClusterer) Reset ¶
func (tc *TaggedPatternClusterer) Reset()
Reset drops all sub-clusterers. The registry is intentionally kept so that previously registered hashes remain resolvable after a reset. LRU bookkeeping (lastTouchByGroup, pending evictions) is also cleared.
type TimeClusterConfig ¶
type TimeClusterConfig struct {
// ProximitySeconds is the maximum time difference between anomaly timestamps
// for them to be considered part of the same cluster.
// Default: 10 seconds.
ProximitySeconds int64 `json:"proximity_seconds"`
// WindowSeconds is how long to keep anomalies before eviction.
// Default: 60 seconds.
WindowSeconds int64 `json:"window_seconds"`
// MinClusterSize is the minimum number of anomalies a cluster must contain
// to be included in output (ActiveCorrelations, GetClusters).
// Clusters below this threshold are still tracked internally but not reported.
// Default: 0 (no minimum).
MinClusterSize int `json:"min_cluster_size"`
}
TimeClusterConfig configures the TimeClusterCorrelator.
func DefaultTimeClusterConfig ¶
func DefaultTimeClusterConfig() TimeClusterConfig
DefaultTimeClusterConfig returns a TimeClusterConfig with default values.
type TimeClusterCorrelator ¶
type TimeClusterCorrelator struct {
// contains filtered or unexported fields
}
TimeClusterCorrelator clusters anomalies based on timestamp proximity. Anomalies whose timestamps are within ProximitySeconds of each other are grouped together.
func NewTimeClusterCorrelator ¶
func NewTimeClusterCorrelator(config TimeClusterConfig) *TimeClusterCorrelator
NewTimeClusterCorrelator creates a new TimeClusterCorrelator with the given config.
func (*TimeClusterCorrelator) ActiveCorrelations ¶
func (c *TimeClusterCorrelator) ActiveCorrelations() []observer.ActiveCorrelation
ActiveCorrelations returns clusters as active correlation patterns.
func (*TimeClusterCorrelator) Advance ¶
func (c *TimeClusterCorrelator) Advance(dataTime int64)
Flush evicts old clusters and returns empty (reporters pull state via ActiveCorrelations).
func (*TimeClusterCorrelator) GetClusters ¶
func (c *TimeClusterCorrelator) GetClusters() []TimeClusterInfo
GetClusters returns all clusters for visualization.
func (*TimeClusterCorrelator) GetExtraData ¶
func (c *TimeClusterCorrelator) GetExtraData() interface{}
GetExtraData implements ComponentDataProvider.
func (*TimeClusterCorrelator) GetStats ¶
func (c *TimeClusterCorrelator) GetStats() map[string]interface{}
GetStats returns statistics about the correlator state.
func (*TimeClusterCorrelator) Name ¶
func (c *TimeClusterCorrelator) Name() string
Name returns the correlator name.
func (*TimeClusterCorrelator) ProcessAnomaly ¶
func (c *TimeClusterCorrelator) ProcessAnomaly(anomaly observer.Anomaly)
Process adds an anomaly, either to an existing cluster or a new one.
func (*TimeClusterCorrelator) Reset ¶
func (c *TimeClusterCorrelator) Reset()
Reset clears all internal state for reanalysis.
Source Files
¶
- advance_log.go
- agent_logs.go
- anomaly_correlator_passthrough.go
- anomaly_correlator_time_cluster.go
- anomaly_processor_correlator.go
- component_catalog.go
- context_provider.go
- correlation_identity.go
- debug.go
- detect_digest.go
- detect_digest_log.go
- engine.go
- events.go
- group_compression.go
- log_detector_connection_errors.go
- log_metrics_extractor.go
- log_pattern_extractor.go
- log_tagged_pattern_clusterer.go
- metrics_detector_bocpd.go
- metrics_detector_cusum.go
- metrics_detector_rrcf.go
- metrics_detector_scanmw.go
- metrics_detector_scanwelch.go
- metrics_detector_util.go
- observer.go
- rrcf.go
- scheduler.go
- series_pair_key.go
- stateview.go
- storage.go
- storage_instrumented.go
- telemetry.go