Documentation
¶
Index ¶
- func LoadKnownBadOverride(path string) error
- func PacketInfo(packet gopacket.Packet) (srcIP, dstIP string, srcPort, dstPort uint16, proto string, ok bool)
- func PacketTimestamp(packet gopacket.Packet) time.Time
- type Engine
- func (e *Engine) ConnectionCount() int
- func (e *Engine) EvictAll(now time.Time)
- func (e *Engine) GetShard(i int) *Shard
- func (e *Engine) IsPcapMode() bool
- func (e *Engine) NumShards() int
- func (e *Engine) RunShard(shardIdx int, packets <-chan gopacket.Packet)
- func (e *Engine) SetFpSink(fn func(srcIP string, srcPort uint16, dstIP string, dstPort uint16, ...))
- func (e *Engine) SetPcapMode(enabled bool)
- func (e *Engine) ShardIndex(packet gopacket.Packet) int
- func (e *Engine) Stats() *EngineStats
- type EngineStats
- func (s *EngineStats) IncAlertDrop()
- func (s *EngineStats) IncConnChanDrop()
- func (s *EngineStats) IncFanoutDrop()
- func (s *EngineStats) IncFlushFailure()
- func (s *EngineStats) IncMigrationIssue()
- func (s *EngineStats) IncPacket()
- func (s *EngineStats) SetProcessingLag(ns int64)
- func (s *EngineStats) Snapshot() StatsSnapshot
- type Enricher
- type FirstSeenMap
- func (fm *FirstSeenMap) Check(fpType, fingerprint string) bool
- func (fm *FirstSeenMap) Count() int
- func (fm *FirstSeenMap) IsLearning() bool
- func (fm *FirstSeenMap) LearningRemaining() time.Duration
- func (fm *FirstSeenMap) Load(entries map[string]time.Time)
- func (fm *FirstSeenMap) Snapshot() map[string]time.Time
- type KnownBadPattern
- type RemoteLookup
- type Shard
- type StatsSnapshot
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LoadKnownBadOverride ¶
LoadKnownBadOverride replaces the default list with one parsed from an external TOML file. On error the existing list is unchanged.
Types ¶
type Engine ¶
type Engine struct {
ConnChan chan *tracker.Connection
// contains filtered or unexported fields
}
Engine manages sharded packet processing.
func NewEngine ¶
func NewEngine(numShards int, firstSeen *FirstSeenMap, evaluator *anomaly.Evaluator, stats *EngineStats, remote *RemoteLookup) *Engine
NewEngine creates an Engine with n shards. Evaluator may be nil to skip anomaly detection. remote may be nil to disable remote lookup (the default).
func (*Engine) ConnectionCount ¶
ConnectionCount returns total connections across all shards.
func (*Engine) EvictAll ¶
EvictAll runs eviction on all shards. ONLY safe to call from single-threaded contexts like runJSON — if the shard goroutines are running concurrently (runTUI / runDaemon), use RunShard's internal eviction loop instead, otherwise Evict's OnEvict callbacks will race with ProcessPacket's map writes and the Go runtime will panic.
In pcap mode, periodic calls should pass time.Time{} so that idle-timeout eviction is skipped (only LRU capacity eviction runs). Pass time.Now() for the final shutdown call to persist all remaining connections via OnEvict.
func (*Engine) IsPcapMode ¶
IsPcapMode returns whether the engine is in pcap replay mode.
func (*Engine) RunShard ¶
RunShard is the main loop for a shard. It reads packets from the given channel AND ticks a flush interval AND ticks an eviction interval, all in one goroutine. This is the key to concurrency safety: the shard goroutine is the ONLY goroutine that touches a connection, so there's no race between packet processing and snapshotting / eviction.
┌──────────────────────────────────────────────┐ │ shard goroutine (single) │ │ │ │ select: │ │ ├── packet arrived → ProcessPacket │ │ │ markDirty │ │ │ │ │ ├── 20ms tick → flushDirty │ │ │ Snapshot() each │ │ │ send to TUI │ │ │ │ │ └── 1s tick → evictShard │ │ Tracker.Evict │ │ OnEvict callbacks │ │ Processor.Cleanup │ └──────────────────────────────────────────────┘
Call this from main.go, one goroutine per shard. Exits when packets channel is closed; performs a final flush before returning.
func (*Engine) SetFpSink ¶
func (e *Engine) SetFpSink(fn func(srcIP string, srcPort uint16, dstIP string, dstPort uint16, proto, fpType, fpValue string, ts time.Time))
SetFpSink installs a fingerprint sink callback on every shard. Must be called BEFORE starting RunShard goroutines to avoid data races.
func (*Engine) SetPcapMode ¶
SetPcapMode enables pcap replay mode: idle-timeout eviction is disabled during periodic ticks (connections won't be dropped mid-replay for being "idle" in pcap time). Only LRU capacity eviction still runs. The final eviction on shutdown fires normally so all connections are persisted.
func (*Engine) ShardIndex ¶
ShardIndex returns which shard should process this packet.
func (*Engine) Stats ¶
func (e *Engine) Stats() *EngineStats
Stats returns the engine's shared stats counter.
type EngineStats ¶
type EngineStats struct {
// contains filtered or unexported fields
}
EngineStats holds atomic counters for data drops and error events that happen on the hot path. Replaces the previous pattern of silently dropping packets, alerts, and flush failures without user visibility.
All counters are monotonic. Snapshot() returns a consistent read of all values at a single point in time.
+-----------------------+ | EngineStats (atomic) | +-----------------------+ | ConnChanDrops uint64| shard → TUI channel full | AlertDrops uint64| evaluator alert channel full | FanoutDrops uint64| slow subscriber channel full | FlushFailures uint64| SQLite flush transaction error | MigrationIssues uint64| rows skipped during schema migration | PacketsTotal uint64| total packets processed across all shards | ProcessingLagNs int64| last observed packet capture-to-eval lag (ns) +-----------------------+
func NewEngineStats ¶
func NewEngineStats() *EngineStats
NewEngineStats creates a zeroed stats struct.
func (*EngineStats) IncAlertDrop ¶
func (s *EngineStats) IncAlertDrop()
IncAlertDrop records that an alert was dropped because the evaluator alert channel was full.
func (*EngineStats) IncConnChanDrop ¶
func (s *EngineStats) IncConnChanDrop()
IncConnChanDrop records that a connection update was dropped because the shard→TUI channel was full.
func (*EngineStats) IncFanoutDrop ¶
func (s *EngineStats) IncFanoutDrop()
IncFanoutDrop records that an alert was dropped to a slow subscriber during fan-out broadcast. Each subscriber has its own channel; if that channel is full, the drop is counted here rather than blocking the other subscribers.
func (*EngineStats) IncFlushFailure ¶
func (s *EngineStats) IncFlushFailure()
IncFlushFailure records that a SQLite flush transaction failed.
func (*EngineStats) IncMigrationIssue ¶
func (s *EngineStats) IncMigrationIssue()
IncMigrationIssue records that a row was skipped during schema migration (for example, corrupt JSON in the fingerprints column).
func (*EngineStats) IncPacket ¶
func (s *EngineStats) IncPacket()
IncPacket records that a packet was processed by a shard.
func (*EngineStats) SetProcessingLag ¶
func (s *EngineStats) SetProcessingLag(ns int64)
SetProcessingLag records the most recently observed lag (in nanoseconds) between packet capture time and rule evaluation. Only meaningful in live capture mode — in pcap replay mode packet timestamps are historical.
func (*EngineStats) Snapshot ¶
func (s *EngineStats) Snapshot() StatsSnapshot
Snapshot reads all counters into a StatsSnapshot. The reads are not atomic with respect to each other, but each individual read is atomic, so the snapshot is internally consistent enough for display purposes.
type Enricher ¶
type Enricher struct {
// contains filtered or unexported fields
}
Enricher adds application identification and threat intelligence to connections.
func (*Enricher) Enrich ¶
func (e *Enricher) Enrich(conn *tracker.Connection, fpType, fingerprint string)
Enrich updates a connection with lookup results and JA4T analysis.
func (*Enricher) WithRemoteLookup ¶
WithRemoteLookup returns a NEW Enricher that consults the given remote-lookup wrapper after a local DB miss. Pass nil to disable (the default).
type FirstSeenMap ¶
type FirstSeenMap struct {
// contains filtered or unexported fields
}
FirstSeenMap tracks when fingerprints were first observed on the network. Thread-safe for concurrent access from multiple shards.
func NewFirstSeenMap ¶
func NewFirstSeenMap(learnDuration time.Duration) *FirstSeenMap
NewFirstSeenMap creates a FirstSeenMap with an optional learning period. During the learning period, all fingerprints are auto-approved (Check returns false). After the learning period, new fingerprints return true from Check. Set learnDuration to 0 to disable learning (flag everything new immediately).
func (*FirstSeenMap) Check ¶
func (fm *FirstSeenMap) Check(fpType, fingerprint string) bool
Check returns true if this fingerprint is NEW (never seen before AND we are past the learning period). Returns false during learning mode or if the fingerprint has been seen before.
func (*FirstSeenMap) Count ¶
func (fm *FirstSeenMap) Count() int
Count returns the number of unique fingerprints seen.
func (*FirstSeenMap) IsLearning ¶
func (fm *FirstSeenMap) IsLearning() bool
IsLearning returns true if currently in learning mode.
func (*FirstSeenMap) LearningRemaining ¶
func (fm *FirstSeenMap) LearningRemaining() time.Duration
LearningRemaining returns the time remaining in learning mode.
type KnownBadPattern ¶
type KnownBadPattern struct {
Match string `toml:"match"`
Category string `toml:"category"`
Confidence string `toml:"confidence"`
}
KnownBadPattern is one entry in the labeling backstop list.
type RemoteLookup ¶
type RemoteLookup struct {
// contains filtered or unexported fields
}
RemoteLookup is a small adapter around ja4plus.LookupFingerprintRemote with an in-process cache and a circuit breaker. The wrapper's job is best-effort enrichment: it must never block live capture, never surface a transport error to the enricher, and never grow unbounded.
Safe for concurrent use.
func NewRemoteLookup ¶
func NewRemoteLookup(cfg *ja4plus.RemoteLookupConfig) *RemoteLookup
NewRemoteLookup returns a wrapper. cfg may be nil to use defaults (https://ja4db.com).
func (*RemoteLookup) Lookup ¶
func (r *RemoteLookup) Lookup(ctx context.Context, fingerprint string) *ja4plus.LookupResult
Lookup returns a *LookupResult on hit, nil if not found, and nil if the breaker is open or any transport error occurred. Errors are absorbed by design.
type Shard ¶
type Shard struct {
Processor *ja4plus.Processor
Tracker *tracker.Tracker
// FpSink, if non-nil, is called for every fingerprint computed by
// this shard. It is called from the shard goroutine, so it must not
// block (use a buffered channel internally). Set it before starting
// RunShard goroutines to avoid data races.
FpSink func(srcIP string, srcPort uint16, dstIP string, dstPort uint16,
proto, fpType, fpValue string, ts time.Time)
// contains filtered or unexported fields
}
Shard is a single processing unit: one Processor + one Tracker. Each shard runs on a dedicated goroutine.
shard packet loop (owns Tracker, Connections, dirty set)
│
│ markDirty(id) on each packet
▼
dirty set (mutex-protected)
│
│ drainDirty() every 20ms
▼
flush loop goroutine
│
│ conn.Snapshot() then send
▼
ConnChan → TUI
func (*Shard) ProcessPacket ¶
ProcessPacket routes a packet to the correct shard and processes it. Must be called from a single goroutine per shard.
This function mutates the Connection in place. The shard goroutine owns the connection; the TUI never reads this pointer directly. Updates flow to the TUI through the dirty-flush loop which takes a Snapshot() before sending, guaranteeing the TUI sees an immutable copy.
type StatsSnapshot ¶
type StatsSnapshot struct {
ConnChanDrops uint64
AlertDrops uint64
FanoutDrops uint64
FlushFailures uint64
MigrationIssues uint64
PacketsTotal uint64
ProcessingLagNs int64
}
StatsSnapshot is a consistent point-in-time read of EngineStats counters.
func (StatsSnapshot) Total ¶
func (s StatsSnapshot) Total() uint64
Total returns the sum of all drop counters. Useful for a single "drops" number in the TUI status bar.