Documentation
¶
Overview ¶
Package agent contains the AI agent mode pipeline:
SignalSource → Redact → Detector pipeline (Regex → Miner → Catalog → Frequency) → (training: persist | shadow: log | detect: AI → Incident)
Training mode is end-to-end. Shadow and detect honor the same classification path; the AI analyzer call in detect mode is wired up separately from the worker.
Index ¶
- func BuildSources(cfg config.AgentConfig) ([]core.SignalSource, []error)
- type AIBundle
- type Catalog
- func (c *Catalog) All() []*Pattern
- func (c *Catalog) AllServices() map[string]ServiceInfo
- func (c *Catalog) Delete(patternID string) bool
- func (c *Catalog) Dirty() bool
- func (c *Catalog) EndServiceGrace(name string) bool
- func (c *Catalog) Get(id string) *Pattern
- func (c *Catalog) IsServiceInGrace(name string, graceDuration time.Duration) bool
- func (c *Catalog) Label(patternID, verdict string, tags []string) bool
- func (c *Catalog) Len() int
- func (c *Catalog) MarkKnown(patternID string) bool
- func (c *Catalog) Persist() error
- func (c *Catalog) RegisterService(name string) bool
- func (c *Catalog) RestartServiceGrace(name string) bool
- func (c *Catalog) Upsert(patternID, template, source string, tickCount int, alpha float64, ...) *Pattern
- type CursorStore
- type DetectEvent
- type DetectLog
- func (d *DetectLog) All() []*DetectEvent
- func (d *DetectLog) Clear() int
- func (d *DetectLog) Dirty() bool
- func (d *DetectLog) Get(id string) *DetectEvent
- func (d *DetectLog) Len() int
- func (d *DetectLog) Persist() error
- func (d *DetectLog) Record(e *DetectEvent)
- func (d *DetectLog) Stats() map[string]int
- type Emitter
- type MatchResult
- type Miner
- type MinerCluster
- type Pattern
- type Redactor
- type RegexMatcher
- type RegexRule
- type ServiceInfo
- type ServiceMatcher
- type ShadowEvent
- type ShadowLog
- func (s *ShadowLog) All() []*ShadowEvent
- func (s *ShadowLog) Clear() int
- func (s *ShadowLog) Dirty() bool
- func (s *ShadowLog) Len() int
- func (s *ShadowLog) Persist() error
- func (s *ShadowLog) Record(source, patternID, template, sample, rule, verdict string, freq int)
- func (s *ShadowLog) Stats() map[string]int
- type Worker
- type WorkerOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildSources ¶
func BuildSources(cfg config.AgentConfig) ([]core.SignalSource, []error)
BuildSources constructs every enabled SignalSource from AgentConfig. Sources with type set to an unknown value are skipped (with a non-fatal error in the returned slice) so a config typo cannot stop the agent from starting.
Types ¶
type AIBundle ¶ added in v1.4.0
type AIBundle struct {
SRE core.AISRE
Cache *ai.ResultCache
Rate *ai.RateLimiter
}
AIBundle bundles the three AI-side dependencies the worker needs. All three are nil-safe: the worker accepts a zero bundle and falls back to "dry detect" (classify but do not emit).
func BuildAI ¶ added in v1.4.0
BuildAI constructs the AI SRE, result cache, and rate limiter for detect mode.
Returns a zero AIBundle when cfg.AI.Enable is false so callers can pass it straight to NewWorker without nil checks.
httpClient may be nil — a default *http.Client (30s timeout, no proxy) is used. store may be nil — the cache then degrades to in-memory only.
type Catalog ¶
type Catalog struct {
// contains filtered or unexported fields
}
Catalog is the in-memory + on-disk pattern store.
All public methods are safe for concurrent use. Disk persistence is debounced — calls to MarkDirty() set a flag that the agent worker flushes at most once per `persist_interval`.
func LoadCatalog ¶
LoadCatalog opens an existing patterns blob from the storage provider or returns an empty catalog if none exists. The blob name is config.CatalogBlobName ("patterns").
func (*Catalog) All ¶
All returns a stable, sorted snapshot of every pattern (sorted by Count descending so the most-frequent patterns appear first in admin views).
func (*Catalog) AllServices ¶
func (c *Catalog) AllServices() map[string]ServiceInfo
AllServices returns a snapshot of every tracked service, sorted by FirstSeen ascending.
func (*Catalog) EndServiceGrace ¶
EndServiceGrace forces a service out of its grace period by setting FirstSeen to the zero time. Returns false when the service doesn't exist.
func (*Catalog) Get ¶
Get returns a deep copy of the pattern keyed by id, or nil when not found. Returning a copy (rather than the live pointer) prevents callers from observing torn reads while a concurrent Upsert mutates the same struct.
func (*Catalog) IsServiceInGrace ¶
IsServiceInGrace reports whether the named service is still inside its new-service grace window. A zero graceDuration means grace is disabled (always returns false). An unknown service is registered on the spot and enters grace.
func (*Catalog) Label ¶
Label updates operator-curated metadata for a pattern. Empty fields are left unchanged. Returns false when the pattern doesn't exist.
func (*Catalog) Persist ¶
Persist flushes the in-memory catalog to the storage backend. Safe to call concurrently with Upsert/Label/Delete.
func (*Catalog) RegisterService ¶
RegisterService records a service name the first time it is seen. Returns true when the service was newly registered (first sighting), false when it was already known. The caller uses this to decide whether to log a "new service discovered" message.
func (*Catalog) RestartServiceGrace ¶
RestartServiceGrace resets a service's FirstSeen to now, restarting the grace window. Returns false when the service doesn't exist.
func (*Catalog) Upsert ¶
func (c *Catalog) Upsert(patternID, template, source string, tickCount int, alpha float64, ruleName, service string) *Pattern
Upsert records an observation against patternID. If the pattern is new it is created with FirstSeen=now; otherwise Count is incremented and LastSeen is updated. tickCount is the number of matches observed in the current worker tick — used to update the EWMA baseline.
service is the service name extracted from the log via agent.service_patterns. It is stamped on the pattern only on first sighting (subsequent observations preserve the original attribution to keep the catalog stable). Pass "" when service detection is disabled or the pattern's regexes did not match.
ruleName comes from the regex pre-filter and is applied:
- on first-seen: always
- subsequently: only when a non-default named rule supersedes a previous default tag, or when the previous tag was empty
type CursorStore ¶
type CursorStore struct {
// contains filtered or unexported fields
}
CursorStore persists the last-seen timestamp for each SignalSource. Redis is preferred so cursors survive crashes and replica restarts; the worker falls back to in-memory storage when Redis is unavailable so that development setups don't require Redis just to try training mode.
func NewCursorStore ¶
func NewCursorStore(rdb *redis.Client) *CursorStore
NewCursorStore returns a CursorStore. Pass `rdb=nil` for in-memory only.
type DetectEvent ¶ added in v1.4.0
type DetectEvent struct {
ID string `json:"id"` // 16-byte hex; stable over restarts
Timestamp time.Time `json:"timestamp"` // worker decision time
// Pattern context
Source string `json:"source"`
PatternID string `json:"pattern_id"`
Template string `json:"template"`
Service string `json:"service,omitempty"`
Verdict string `json:"verdict"` // unknown | spike
Frequency int `json:"frequency"`
Baseline float64 `json:"baseline"`
Samples []string `json:"samples,omitempty"` // up to 3, redacted
// AI call (empty when outcome != "emitted" — cached/dry/quota/etc.
// did not invoke the model; ai_error fills RawResponse only when
// available).
Model string `json:"model,omitempty"`
UserPrompt string `json:"user_prompt,omitempty"`
RawResponse string `json:"raw_response,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
// Final structured finding (nil when no finding was produced).
Finding *core.AIFinding `json:"finding,omitempty"`
// Outcome label — see Worker.emitDetect: emitted | cached | dry |
// quota | ai_error | send_error.
Outcome string `json:"outcome"`
// Error message when Outcome is ai_error / send_error.
Error string `json:"error,omitempty"`
}
DetectEvent is the audit record for a single detect-mode handling of a pattern: what was sent to the model, what came back, what the final structured finding looked like, and what the worker did with it. Surfaced to the UI so operators can validate AI behavior end to end.
One DetectEvent is recorded per worker decision — including "cached", "quota", "ai_error", and "send_error" outcomes — so the log doubles as a debugging aid when alerts disappear.
type DetectLog ¶ added in v1.4.0
type DetectLog struct {
// contains filtered or unexported fields
}
DetectLog is the in-memory + on-disk store of detect-mode AI calls. Bounded ring of the most-recent N events; older entries are evicted FIFO by Timestamp.
All public methods are safe for concurrent use. Disk persistence is debounced — Record sets a dirty flag that the worker flushes at most once per persist tick.
func LoadDetectLog ¶ added in v1.4.0
LoadDetectLog opens an existing detect log from the storage provider or returns an empty one when no blob is present. `max` caps retained events; pass 0 for the default. A nil store disables persistence (in-memory only).
func (*DetectLog) All ¶ added in v1.4.0
func (d *DetectLog) All() []*DetectEvent
All returns a snapshot of every event, sorted by Timestamp descending (newest first). Returned events are shallow copies; the embedded *AIFinding pointer is shared (read-only by convention).
func (*DetectLog) Get ¶ added in v1.4.0
func (d *DetectLog) Get(id string) *DetectEvent
Get returns a copy of the event with the given ID, or nil.
func (*DetectLog) Persist ¶ added in v1.4.0
Persist atomically writes the detect log via the storage backend. No-op when store is nil or there are no unflushed changes.
func (*DetectLog) Record ¶ added in v1.4.0
func (d *DetectLog) Record(e *DetectEvent)
Record appends a DetectEvent. The caller passes a partially-filled event; ID and Timestamp are assigned here if zero.
Field-size caps are applied defensively so a misbehaving model can't bloat the on-disk file.
type Emitter ¶ added in v1.4.0
Emitter delivers an AI finding to the rest of the system. In production this is services.CreateIncidentFromFinding; tests inject a capturing stub. A nil Emitter disables emission (worker logs the would-be call and moves on).
type MatchResult ¶
type MatchResult struct {
RuleName string // empty when no rule matched
Default bool // true when only the default pattern matched
}
MatchResult is the (possibly empty) tag returned by Match.
func (MatchResult) Matched ¶
func (m MatchResult) Matched() bool
Matched reports whether at least one rule (named or default) hit. The worker uses this to decide whether a signal is interesting enough to learn from. To train on every line, set `regex.default_pattern: ".*"`.
type Miner ¶
type Miner struct {
// contains filtered or unexported fields
}
Miner is a small, dependency-free, Drain-style log clusterer.
The classic Drain algorithm uses a fixed-depth prefix tree where the first few tokens index into buckets, and each leaf holds a list of templates. To classify an incoming log line:
- Tokenize the message and replace tokens that look like variables (numbers, IPs, UUIDs, hex, IDs) with the wildcard `<*>` so that "user_id=42" and "user_id=99" share a structural shape.
- Walk the tree using `len(tokens)` as the first bucket key (so messages of different lengths never collide), then the first N non-wildcard tokens as inner bucket keys.
- At the leaf, pick the existing template with the highest token-by-token similarity. If similarity ≥ threshold, merge (replace differing tokens with `<*>`) and return that template's ID. Otherwise, register a new template.
This implementation is intentionally simpler than full Drain3 (no parameter masking learning, no parser tree compaction) but covers the 95% case for production logs. ADR-0001 tracks the longer-term plan.
func NewMiner ¶
NewMiner builds a Miner from config. Sensible defaults are applied when values are zero.
func (*Miner) AddCluster ¶
AddCluster pre-registers a known cluster (used when loading a catalog from disk so mining can resume against existing patterns).
func (*Miner) Cluster ¶
Cluster classifies a single message and returns the matched (or newly created) cluster ID, the (post-merge) template tokens joined by space, and a flag indicating whether this was the first time we saw the pattern.
func (*Miner) Snapshot ¶
func (m *Miner) Snapshot() []MinerCluster
Snapshot returns a copy of every learned cluster — used for catalog persistence and admin endpoints.
type MinerCluster ¶
type MinerCluster struct {
ID string // "p-<sha1[:12]>"
Tokens []string // current template tokens (with `<*>` for variables)
Size int // total observations matched into this cluster
}
MinerCluster is one learned log template plus its observation count.
type Pattern ¶
type Pattern struct {
ID string `json:"id"`
Template string `json:"template"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
Count int `json:"count"`
// BaselineFrequency is the EWMA of per-tick counts. Computed during
// training; consumed by the spike detector in detect mode.
BaselineFrequency float64 `json:"baseline_frequency"`
// Verdict is the agent's classification of this pattern: "known" once
// it is part of baseline (auto-promoted by count or set explicitly via
// the admin API), otherwise empty. Operators flip a pattern to
// "known" by POSTing {"verdict":"known"} to /api/agent/patterns/:id.
Verdict string `json:"verdict"`
// RuleName is the regex tag attached on first sighting ("default" when
// only the default pattern matched, or the named rule otherwise).
RuleName string `json:"rule_name"`
// Source is the SignalSource name where the pattern was first observed.
Source string `json:"source"`
// Service is the service name extracted from the pattern's first
// matching log message (via the agent.service_pattern regex). Empty
// when service detection is disabled or the regex did not match. The
// agent uses this to gate detect-mode AI analysis behind the
// new-service grace window.
Service string `json:"service,omitempty"`
// Tags are arbitrary operator-supplied markers.
Tags []string `json:"tags,omitempty"`
}
Pattern is one entry in the on-disk catalog (`patterns.json`).
The catalog is the agent's long-term memory. During training we add patterns; during shadow / detect we look them up to decide whether a signal is "known". Operators curate it via the admin REST endpoints.
type Redactor ¶
type Redactor struct {
// contains filtered or unexported fields
}
Redactor scrubs sensitive substrings before any other component sees them. It is intentionally regex-based (not a full parser) — the goal is to make it operationally reasonable to send log content to an external LLM, not to be a perfect DLP solution.
func NewRedactor ¶
NewRedactor builds a Redactor from defaults + user-supplied extra patterns. Invalid extra patterns are skipped (with their compile error returned in the slice for the caller to log).
func (*Redactor) Scrub ¶
Scrub returns s with every match of every rule replaced by `<REDACTED:<rule>>`.
func (*Redactor) ScrubFields ¶
ScrubFields recursively scrubs string values inside a fields map. Maps and slices are walked; non-string scalars are returned untouched.
type RegexMatcher ¶
type RegexMatcher struct {
// contains filtered or unexported fields
}
RegexMatcher is a small helper around a list of RegexRule plus an optional "default" pattern. It acts as the agent's pre-filter: only signals whose message matches at least one rule (named or default) are forwarded to the pattern miner / catalog. Set `regex.default_pattern: ".*"` to learn from every line.
func NewRegexMatcher ¶
func NewRegexMatcher(cfg config.AgentRegexConfig) (*RegexMatcher, []error)
NewRegexMatcher compiles user-supplied rules. Compilation errors are reported but the matcher is still returned with the rules that did compile, so a single bad rule cannot disable the entire pipeline.
func (*RegexMatcher) Match ¶
func (m *RegexMatcher) Match(message string) MatchResult
Match runs explicit rules first (in declaration order — first hit wins), then falls back to the default pattern.
type ServiceInfo ¶
ServiceInfo tracks when a service was first seen by the agent. Stored in the same patterns.json file alongside patterns — one data store, no Redis dependency for this feature.
type ServiceMatcher ¶
type ServiceMatcher struct {
// contains filtered or unexported fields
}
ServiceMatcher extracts a service name from a log message using an ordered list of regexes. The first pattern that matches wins; the FIRST capture group is returned. A nil/empty matcher returns "" — service detection is off and every signal is attributed to "_unknown" in the worker. There is no built-in default list: operators who want service detection MUST configure `agent.service_patterns` (or set `AGENT_SERVICE_PATTERNS`).
func NewServiceMatcher ¶
func NewServiceMatcher(patterns []string) (*ServiceMatcher, []error)
NewServiceMatcher compiles the supplied regexes. Bad patterns are reported in the returned error slice but do not prevent the matcher from being built with whatever did compile, so a single typo cannot disable the entire pipeline. An empty/nil `patterns` list yields a matcher whose Extract always returns "" (service detection disabled).
func (*ServiceMatcher) Extract ¶
func (m *ServiceMatcher) Extract(message string) string
Extract returns the first capture group of the first matching pattern, or "" when nothing matches.
type ShadowEvent ¶
type ShadowEvent struct {
PatternID string `json:"pattern_id"`
Template string `json:"template"`
Source string `json:"source"`
RuleName string `json:"rule_name,omitempty"`
Verdict string `json:"verdict"` // "unknown" | "spike"
SampleMessage string `json:"sample_message"`
Count int `json:"count"` // total signals across all ticks
Occurrences int `json:"occurrences"` // number of ticks that flagged this
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
}
ShadowEvent is one "we would have alerted on this" record produced by the worker while running in shadow mode. Events are coalesced per (source, pattern_id): repeat hits update Count / LastSeen / Occurrences rather than appending new rows, so the on-disk log stays small and operator-reviewable.
type ShadowLog ¶
type ShadowLog struct {
// contains filtered or unexported fields
}
ShadowLog is the in-memory + on-disk store of shadow-mode verdicts.
All public methods are safe for concurrent use. Disk persistence is debounced — Record sets a dirty flag that the worker flushes at most once per `persist_interval`.
func LoadShadowLog ¶
LoadShadowLog opens an existing shadow log from the storage provider or returns an empty one when no blob is present. `max` caps distinct events; pass 0 for the default. A nil store disables persistence (in-memory only).
func (*ShadowLog) All ¶
func (s *ShadowLog) All() []*ShadowEvent
All returns a snapshot of every shadow event, sorted by LastSeen descending (most recent first). Returned events are copies — callers cannot mutate the log.
func (*ShadowLog) Persist ¶
Persist atomically writes the shadow log via the storage backend. Safe to call concurrently with Record / Clear. No-op when store is nil.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker runs the agent loop:
Pull (per source) → Redact → Cluster → Catalog upsert → (mode-specific tail)
Training mode is fully end-to-end. Shadow and detect modes share the same classification path; shadow records would-have-alerted events to the shadow log, while detect calls the AI SRE and emits an incident through the existing services.CreateIncident pipeline.
func NewWorker ¶
func NewWorker(opt WorkerOptions) (*Worker, error)
NewWorker validates options and applies defaults.
type WorkerOptions ¶
type WorkerOptions struct {
Cfg config.AgentConfig
Sources []core.SignalSource
Cursors *CursorStore // optional; pass nil for in-memory cursors
Redactor *Redactor
Matcher *RegexMatcher
Miner *Miner
Catalog *Catalog
Shadow *ShadowLog // optional; pass nil to disable shadow recording
Detect *DetectLog // optional; pass nil to disable detect-call audit log
Services *ServiceMatcher // optional; pass nil to disable service detection
// AI is the detect-mode bundle (analyzer + cache + rate limiter).
// Pass a zero-value AIBundle to disable AI emission.
AI AIBundle
// Emitter is invoked for each finding in detect mode. nil disables
// emission (worker still calls AI and caches the result, but the
// finding does not flow through to channels).
Emitter Emitter
}
WorkerOptions bundles the dependencies a Worker needs. Construction does not connect to anything; the worker dials lazily inside Run.