agent

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package agent contains the AI agent mode pipeline:

SignalSource → Redact → Detector pipeline (Regex → Miner → Catalog →
Frequency) → (training: persist | shadow: log | detect: AI → Incident)

Training mode is end-to-end. Shadow and detect honor the same classification path; the AI analyzer call in detect mode is wired up separately from the worker.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildSources

func BuildSources(cfg config.AgentConfig) ([]core.SignalSource, []error)

BuildSources constructs every enabled SignalSource from AgentConfig. Sources with type set to an unknown value are skipped (with a non-fatal error in the returned slice) so a config typo cannot stop the agent from starting.

Types

type AIBundle added in v1.4.0

type AIBundle struct {
	SRE   core.AISRE
	Cache *ai.ResultCache
	Rate  *ai.RateLimiter
}

AIBundle bundles the three AI-side dependencies the worker needs. All three are nil-safe: the worker accepts a zero bundle and falls back to "dry detect" (classify but do not emit).

func BuildAI added in v1.4.0

func BuildAI(cfg config.AgentConfig, store storage.Provider, httpClient *http.Client) AIBundle

BuildAI constructs the AI SRE, result cache, and rate limiter for detect mode.

Returns a zero AIBundle when cfg.AI.Enable is false so callers can pass it straight to NewWorker without nil checks.

httpClient may be nil — a default *http.Client (30s timeout, no proxy) is used. store may be nil — the cache then degrades to in-memory only.

type Catalog

type Catalog struct {
	// contains filtered or unexported fields
}

Catalog is the in-memory + on-disk pattern store.

All public methods are safe for concurrent use. Disk persistence is debounced — calls to MarkDirty() set a flag that the agent worker flushes at most once per `persist_interval`.

func LoadCatalog

func LoadCatalog(store storage.Provider) (*Catalog, error)

LoadCatalog opens an existing patterns blob from the storage provider or returns an empty catalog if none exists. The blob name is config.CatalogBlobName ("patterns").

func (*Catalog) All

func (c *Catalog) All() []*Pattern

All returns a stable, sorted snapshot of every pattern (sorted by Count descending so the most-frequent patterns appear first in admin views).

func (*Catalog) AllServices

func (c *Catalog) AllServices() map[string]ServiceInfo

AllServices returns a snapshot of every tracked service, sorted by FirstSeen ascending.

func (*Catalog) Delete

func (c *Catalog) Delete(patternID string) bool

Delete removes a pattern (e.g. operator marks a false-positive cluster).

func (*Catalog) Dirty

func (c *Catalog) Dirty() bool

Dirty reports whether there are unflushed changes.

func (*Catalog) EndServiceGrace

func (c *Catalog) EndServiceGrace(name string) bool

EndServiceGrace forces a service out of its grace period by setting FirstSeen to the zero time. Returns false when the service doesn't exist.

func (*Catalog) Get

func (c *Catalog) Get(id string) *Pattern

Get returns a deep copy of the pattern keyed by id, or nil when not found. Returning a copy (rather than the live pointer) prevents callers from observing torn reads while a concurrent Upsert mutates the same struct.

func (*Catalog) IsServiceInGrace

func (c *Catalog) IsServiceInGrace(name string, graceDuration time.Duration) bool

IsServiceInGrace reports whether the named service is still inside its new-service grace window. A zero graceDuration means grace is disabled (always returns false). An unknown service is registered on the spot and enters grace.

func (*Catalog) Label

func (c *Catalog) Label(patternID, verdict string, tags []string) bool

Label updates operator-curated metadata for a pattern. Empty fields are left unchanged. Returns false when the pattern doesn't exist.

func (*Catalog) Len

func (c *Catalog) Len() int

Len returns the number of patterns currently in the catalog.

func (*Catalog) MarkKnown

func (c *Catalog) MarkKnown(patternID string) bool

MarkKnown stamps a pattern as auto-promoted ("known") in the catalog.

func (*Catalog) Persist

func (c *Catalog) Persist() error

Persist flushes the in-memory catalog to the storage backend. Safe to call concurrently with Upsert/Label/Delete.

func (*Catalog) RegisterService

func (c *Catalog) RegisterService(name string) bool

RegisterService records a service name the first time it is seen. Returns true when the service was newly registered (first sighting), false when it was already known. The caller uses this to decide whether to log a "new service discovered" message.

func (*Catalog) RestartServiceGrace

func (c *Catalog) RestartServiceGrace(name string) bool

RestartServiceGrace resets a service's FirstSeen to now, restarting the grace window. Returns false when the service doesn't exist.

func (*Catalog) Upsert

func (c *Catalog) Upsert(patternID, template, source string, tickCount int, alpha float64, ruleName, service string) *Pattern

Upsert records an observation against patternID. If the pattern is new it is created with FirstSeen=now; otherwise Count is incremented and LastSeen is updated. tickCount is the number of matches observed in the current worker tick — used to update the EWMA baseline.

service is the service name extracted from the log via agent.service_patterns. It is stamped on the pattern only on first sighting (subsequent observations preserve the original attribution to keep the catalog stable). Pass "" when service detection is disabled or the pattern's regexes did not match.

ruleName comes from the regex pre-filter and is applied:

  • on first-seen: always
  • subsequently: only when a non-default named rule supersedes a previous default tag, or when the previous tag was empty

type CursorStore

type CursorStore struct {
	// contains filtered or unexported fields
}

CursorStore persists the last-seen timestamp for each SignalSource. Redis is preferred so cursors survive crashes and replica restarts; the worker falls back to in-memory storage when Redis is unavailable so that development setups don't require Redis just to try training mode.

func NewCursorStore

func NewCursorStore(rdb *redis.Client) *CursorStore

NewCursorStore returns a CursorStore. Pass `rdb=nil` for in-memory only.

func (*CursorStore) Get

func (s *CursorStore) Get(ctx context.Context, source string) (time.Time, bool)

Get returns the cursor for a source. The bool is false when no cursor has been recorded yet (caller should fall back to AgentConfig.Lookback).

func (*CursorStore) Set

func (s *CursorStore) Set(ctx context.Context, source string, t time.Time) error

Set records a cursor. Redis errors are returned to the caller (the worker logs them and continues — mem cache always succeeds).

type DetectEvent added in v1.4.0

type DetectEvent struct {
	ID        string    `json:"id"`        // 16-byte hex; stable over restarts
	Timestamp time.Time `json:"timestamp"` // worker decision time

	// Pattern context
	Source    string   `json:"source"`
	PatternID string   `json:"pattern_id"`
	Template  string   `json:"template"`
	Service   string   `json:"service,omitempty"`
	Verdict   string   `json:"verdict"` // unknown | spike
	Frequency int      `json:"frequency"`
	Baseline  float64  `json:"baseline"`
	Samples   []string `json:"samples,omitempty"` // up to 3, redacted

	// AI call (empty when outcome != "emitted" — cached/dry/quota/etc.
	// did not invoke the model; ai_error fills RawResponse only when
	// available).
	Model       string `json:"model,omitempty"`
	UserPrompt  string `json:"user_prompt,omitempty"`
	RawResponse string `json:"raw_response,omitempty"`
	DurationMs  int64  `json:"duration_ms,omitempty"`

	// Final structured finding (nil when no finding was produced).
	Finding *core.AIFinding `json:"finding,omitempty"`

	// Outcome label — see Worker.emitDetect: emitted | cached | dry |
	// quota | ai_error | send_error.
	Outcome string `json:"outcome"`
	// Error message when Outcome is ai_error / send_error.
	Error string `json:"error,omitempty"`
}

DetectEvent is the audit record for a single detect-mode handling of a pattern: what was sent to the model, what came back, what the final structured finding looked like, and what the worker did with it. Surfaced to the UI so operators can validate AI behavior end to end.

One DetectEvent is recorded per worker decision — including "cached", "quota", "ai_error", and "send_error" outcomes — so the log doubles as a debugging aid when alerts disappear.

type DetectLog added in v1.4.0

type DetectLog struct {
	// contains filtered or unexported fields
}

DetectLog is the in-memory + on-disk store of detect-mode AI calls. Bounded ring of the most-recent N events; older entries are evicted FIFO by Timestamp.

All public methods are safe for concurrent use. Disk persistence is debounced — Record sets a dirty flag that the worker flushes at most once per persist tick.

func LoadDetectLog added in v1.4.0

func LoadDetectLog(store storage.Provider, max int) (*DetectLog, error)

LoadDetectLog opens an existing detect log from the storage provider or returns an empty one when no blob is present. `max` caps retained events; pass 0 for the default. A nil store disables persistence (in-memory only).

func (*DetectLog) All added in v1.4.0

func (d *DetectLog) All() []*DetectEvent

All returns a snapshot of every event, sorted by Timestamp descending (newest first). Returned events are shallow copies; the embedded *AIFinding pointer is shared (read-only by convention).

func (*DetectLog) Clear added in v1.4.0

func (d *DetectLog) Clear() int

Clear removes every event.

func (*DetectLog) Dirty added in v1.4.0

func (d *DetectLog) Dirty() bool

Dirty reports whether there are unflushed changes.

func (*DetectLog) Get added in v1.4.0

func (d *DetectLog) Get(id string) *DetectEvent

Get returns a copy of the event with the given ID, or nil.

func (*DetectLog) Len added in v1.4.0

func (d *DetectLog) Len() int

Len returns the number of stored events.

func (*DetectLog) Persist added in v1.4.0

func (d *DetectLog) Persist() error

Persist atomically writes the detect log via the storage backend. No-op when store is nil or there are no unflushed changes.

func (*DetectLog) Record added in v1.4.0

func (d *DetectLog) Record(e *DetectEvent)

Record appends a DetectEvent. The caller passes a partially-filled event; ID and Timestamp are assigned here if zero.

Field-size caps are applied defensively so a misbehaving model can't bloat the on-disk file.

func (*DetectLog) Stats added in v1.4.0

func (d *DetectLog) Stats() map[string]int

Stats returns aggregate counts useful for /api/agent/detect/stats.

type Emitter added in v1.4.0

type Emitter func(f *core.AIFinding, r core.AgentResult, source, service string) error

Emitter delivers an AI finding to the rest of the system. In production this is services.CreateIncidentFromFinding; tests inject a capturing stub. A nil Emitter disables emission (worker logs the would-be call and moves on).

type MatchResult

type MatchResult struct {
	RuleName string // empty when no rule matched
	Default  bool   // true when only the default pattern matched
}

MatchResult is the (possibly empty) tag returned by Match.

func (MatchResult) Matched

func (m MatchResult) Matched() bool

Matched reports whether at least one rule (named or default) hit. The worker uses this to decide whether a signal is interesting enough to learn from. To train on every line, set `regex.default_pattern: ".*"`.

type Miner

type Miner struct {
	// contains filtered or unexported fields
}

Miner is a small, dependency-free, Drain-style log clusterer.

The classic Drain algorithm uses a fixed-depth prefix tree where the first few tokens index into buckets, and each leaf holds a list of templates. To classify an incoming log line:

  1. Tokenize the message and replace tokens that look like variables (numbers, IPs, UUIDs, hex, IDs) with the wildcard `<*>` so that "user_id=42" and "user_id=99" share a structural shape.
  2. Walk the tree using `len(tokens)` as the first bucket key (so messages of different lengths never collide), then the first N non-wildcard tokens as inner bucket keys.
  3. At the leaf, pick the existing template with the highest token-by-token similarity. If similarity ≥ threshold, merge (replace differing tokens with `<*>`) and return that template's ID. Otherwise, register a new template.

This implementation is intentionally simpler than full Drain3 (no parameter masking learning, no parser tree compaction) but covers the 95% case for production logs. ADR-0001 tracks the longer-term plan.

func NewMiner

func NewMiner(similarityThreshold float64, depth, maxChildren int) *Miner

NewMiner builds a Miner from config. Sensible defaults are applied when values are zero.

func (*Miner) AddCluster

func (m *Miner) AddCluster(id string, template string, size int)

AddCluster pre-registers a known cluster (used when loading a catalog from disk so mining can resume against existing patterns).

func (*Miner) Cluster

func (m *Miner) Cluster(message string) (id string, template string, isNew bool)

Cluster classifies a single message and returns the matched (or newly created) cluster ID, the (post-merge) template tokens joined by space, and a flag indicating whether this was the first time we saw the pattern.

func (*Miner) Snapshot

func (m *Miner) Snapshot() []MinerCluster

Snapshot returns a copy of every learned cluster — used for catalog persistence and admin endpoints.

type MinerCluster

type MinerCluster struct {
	ID     string   // "p-<sha1[:12]>"
	Tokens []string // current template tokens (with `<*>` for variables)
	Size   int      // total observations matched into this cluster
}

MinerCluster is one learned log template plus its observation count.

type Pattern

type Pattern struct {
	ID        string    `json:"id"`
	Template  string    `json:"template"`
	FirstSeen time.Time `json:"first_seen"`
	LastSeen  time.Time `json:"last_seen"`
	Count     int       `json:"count"`
	// BaselineFrequency is the EWMA of per-tick counts. Computed during
	// training; consumed by the spike detector in detect mode.
	BaselineFrequency float64 `json:"baseline_frequency"`
	// Verdict is the agent's classification of this pattern: "known" once
	// it is part of baseline (auto-promoted by count or set explicitly via
	// the admin API), otherwise empty. Operators flip a pattern to
	// "known" by POSTing {"verdict":"known"} to /api/agent/patterns/:id.
	Verdict string `json:"verdict"`
	// RuleName is the regex tag attached on first sighting ("default" when
	// only the default pattern matched, or the named rule otherwise).
	RuleName string `json:"rule_name"`
	// Source is the SignalSource name where the pattern was first observed.
	Source string `json:"source"`
	// Service is the service name extracted from the pattern's first
	// matching log message (via the agent.service_pattern regex). Empty
	// when service detection is disabled or the regex did not match. The
	// agent uses this to gate detect-mode AI analysis behind the
	// new-service grace window.
	Service string `json:"service,omitempty"`
	// Tags are arbitrary operator-supplied markers.
	Tags []string `json:"tags,omitempty"`
}

Pattern is one entry in the on-disk catalog (`patterns.json`).

The catalog is the agent's long-term memory. During training we add patterns; during shadow / detect we look them up to decide whether a signal is "known". Operators curate it via the admin REST endpoints.

type Redactor

type Redactor struct {
	// contains filtered or unexported fields
}

Redactor scrubs sensitive substrings before any other component sees them. It is intentionally regex-based (not a full parser) — the goal is to make it operationally reasonable to send log content to an external LLM, not to be a perfect DLP solution.

func NewRedactor

func NewRedactor(redactIPs bool, extra []string) (*Redactor, []error)

NewRedactor builds a Redactor from defaults + user-supplied extra patterns. Invalid extra patterns are skipped (with their compile error returned in the slice for the caller to log).

func (*Redactor) Scrub

func (r *Redactor) Scrub(s string) string

Scrub returns s with every match of every rule replaced by `<REDACTED:<rule>>`.

func (*Redactor) ScrubFields

func (r *Redactor) ScrubFields(fields map[string]interface{}) map[string]interface{}

ScrubFields recursively scrubs string values inside a fields map. Maps and slices are walked; non-string scalars are returned untouched.

type RegexMatcher

type RegexMatcher struct {
	// contains filtered or unexported fields
}

RegexMatcher is a small helper around a list of RegexRule plus an optional "default" pattern. It acts as the agent's pre-filter: only signals whose message matches at least one rule (named or default) are forwarded to the pattern miner / catalog. Set `regex.default_pattern: ".*"` to learn from every line.

func NewRegexMatcher

func NewRegexMatcher(cfg config.AgentRegexConfig) (*RegexMatcher, []error)

NewRegexMatcher compiles user-supplied rules. Compilation errors are reported but the matcher is still returned with the rules that did compile, so a single bad rule cannot disable the entire pipeline.

func (*RegexMatcher) Match

func (m *RegexMatcher) Match(message string) MatchResult

Match runs explicit rules first (in declaration order — first hit wins), then falls back to the default pattern.

type RegexRule

type RegexRule struct {
	Name    string
	Pattern *regexp.Regexp
}

RegexRule is a compiled user-defined rule.

type ServiceInfo

type ServiceInfo struct {
	FirstSeen time.Time `json:"first_seen"`
}

ServiceInfo tracks when a service was first seen by the agent. Stored in the same patterns.json file alongside patterns — one data store, no Redis dependency for this feature.

type ServiceMatcher

type ServiceMatcher struct {
	// contains filtered or unexported fields
}

ServiceMatcher extracts a service name from a log message using an ordered list of regexes. The first pattern that matches wins; the FIRST capture group is returned. A nil/empty matcher returns "" — service detection is off and every signal is attributed to "_unknown" in the worker. There is no built-in default list: operators who want service detection MUST configure `agent.service_patterns` (or set `AGENT_SERVICE_PATTERNS`).

func NewServiceMatcher

func NewServiceMatcher(patterns []string) (*ServiceMatcher, []error)

NewServiceMatcher compiles the supplied regexes. Bad patterns are reported in the returned error slice but do not prevent the matcher from being built with whatever did compile, so a single typo cannot disable the entire pipeline. An empty/nil `patterns` list yields a matcher whose Extract always returns "" (service detection disabled).

func (*ServiceMatcher) Extract

func (m *ServiceMatcher) Extract(message string) string

Extract returns the first capture group of the first matching pattern, or "" when nothing matches.

type ShadowEvent

type ShadowEvent struct {
	PatternID     string    `json:"pattern_id"`
	Template      string    `json:"template"`
	Source        string    `json:"source"`
	RuleName      string    `json:"rule_name,omitempty"`
	Verdict       string    `json:"verdict"` // "unknown" | "spike"
	SampleMessage string    `json:"sample_message"`
	Count         int       `json:"count"`       // total signals across all ticks
	Occurrences   int       `json:"occurrences"` // number of ticks that flagged this
	FirstSeen     time.Time `json:"first_seen"`
	LastSeen      time.Time `json:"last_seen"`
}

ShadowEvent is one "we would have alerted on this" record produced by the worker while running in shadow mode. Events are coalesced per (source, pattern_id): repeat hits update Count / LastSeen / Occurrences rather than appending new rows, so the on-disk log stays small and operator-reviewable.

type ShadowLog

type ShadowLog struct {
	// contains filtered or unexported fields
}

ShadowLog is the in-memory + on-disk store of shadow-mode verdicts.

All public methods are safe for concurrent use. Disk persistence is debounced — Record sets a dirty flag that the worker flushes at most once per `persist_interval`.

func LoadShadowLog

func LoadShadowLog(store storage.Provider, max int) (*ShadowLog, error)

LoadShadowLog opens an existing shadow log from the storage provider or returns an empty one when no blob is present. `max` caps distinct events; pass 0 for the default. A nil store disables persistence (in-memory only).

func (*ShadowLog) All

func (s *ShadowLog) All() []*ShadowEvent

All returns a snapshot of every shadow event, sorted by LastSeen descending (most recent first). Returned events are copies — callers cannot mutate the log.

func (*ShadowLog) Clear

func (s *ShadowLog) Clear() int

Clear removes every event. The change is persisted on the next Persist.

func (*ShadowLog) Dirty

func (s *ShadowLog) Dirty() bool

Dirty reports whether there are unflushed changes.

func (*ShadowLog) Len

func (s *ShadowLog) Len() int

Len returns the number of distinct (source, pattern) events.

func (*ShadowLog) Persist

func (s *ShadowLog) Persist() error

Persist atomically writes the shadow log via the storage backend. Safe to call concurrently with Record / Clear. No-op when store is nil.

func (*ShadowLog) Record

func (s *ShadowLog) Record(source, patternID, template, sample, rule, verdict string, freq int)

Record merges a shadow-mode hit into the log. The record is coalesced per (source, pattern_id); repeat hits bump Count and Occurrences instead of appending a new entry. `freq` is the number of signals observed in the current worker tick.

func (*ShadowLog) Stats

func (s *ShadowLog) Stats() map[string]int

Stats returns aggregate counts useful for /api/agent/shadow/stats.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker runs the agent loop:

Pull (per source) → Redact → Cluster → Catalog upsert → (mode-specific tail)

Training mode is fully end-to-end. Shadow and detect modes share the same classification path; shadow records would-have-alerted events to the shadow log, while detect calls the AI SRE and emits an incident through the existing services.CreateIncident pipeline.

func NewWorker

func NewWorker(opt WorkerOptions) (*Worker, error)

NewWorker validates options and applies defaults.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run drives the worker until ctx is canceled. It is intended to be called in a goroutine from cmd/main.go.

type WorkerOptions

type WorkerOptions struct {
	Cfg      config.AgentConfig
	Sources  []core.SignalSource
	Cursors  *CursorStore // optional; pass nil for in-memory cursors
	Redactor *Redactor
	Matcher  *RegexMatcher
	Miner    *Miner
	Catalog  *Catalog
	Shadow   *ShadowLog      // optional; pass nil to disable shadow recording
	Detect   *DetectLog      // optional; pass nil to disable detect-call audit log
	Services *ServiceMatcher // optional; pass nil to disable service detection

	// AI is the detect-mode bundle (analyzer + cache + rate limiter).
	// Pass a zero-value AIBundle to disable AI emission.
	AI AIBundle
	// Emitter is invoked for each finding in detect mode. nil disables
	// emission (worker still calls AI and caches the result, but the
	// finding does not flow through to channels).
	Emitter Emitter
}

WorkerOptions bundles the dependencies a Worker needs. Construction does not connect to anything; the worker dials lazily inside Run.

Directories

Path Synopsis
Package ai contains the AI SRE analyzer that turns Unknown / Spike AgentResults into structured AIFindings.
Package ai contains the AI SRE analyzer that turns Unknown / Spike AgentResults into structured AIFindings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL