engine

package
v0.0.0-...-45fb939 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadKnownBadOverride

func LoadKnownBadOverride(path string) error

LoadKnownBadOverride replaces the default list with one parsed from an external TOML file. On error the existing list is unchanged.

func PacketInfo

func PacketInfo(packet gopacket.Packet) (srcIP, dstIP string, srcPort, dstPort uint16, proto string, ok bool)

PacketInfo extracts IP and transport info from a gopacket.Packet.

func PacketTimestamp

func PacketTimestamp(packet gopacket.Packet) time.Time

PacketTimestamp extracts the timestamp from a packet.

Types

type Engine

type Engine struct {
	ConnChan chan *tracker.Connection
	// contains filtered or unexported fields
}

Engine manages sharded packet processing.

func NewEngine

func NewEngine(numShards int, firstSeen *FirstSeenMap, evaluator *anomaly.Evaluator, stats *EngineStats, remote *RemoteLookup) *Engine

NewEngine creates an Engine with n shards. Evaluator may be nil to skip anomaly detection. remote may be nil to disable remote lookup (the default).

func (*Engine) ConnectionCount

func (e *Engine) ConnectionCount() int

ConnectionCount returns total connections across all shards.

func (*Engine) EvictAll

func (e *Engine) EvictAll(now time.Time)

EvictAll runs eviction on all shards. ONLY safe to call from single-threaded contexts like runJSON — if the shard goroutines are running concurrently (runTUI / runDaemon), use RunShard's internal eviction loop instead, otherwise Evict's OnEvict callbacks will race with ProcessPacket's map writes and the Go runtime will panic.

In pcap mode, periodic calls should pass time.Time{} so that idle-timeout eviction is skipped (only LRU capacity eviction runs). Pass time.Now() for the final shutdown call to persist all remaining connections via OnEvict.

func (*Engine) GetShard

func (e *Engine) GetShard(i int) *Shard

Shard returns the shard at index i.

func (*Engine) IsPcapMode

func (e *Engine) IsPcapMode() bool

IsPcapMode returns whether the engine is in pcap replay mode.

func (*Engine) NumShards

func (e *Engine) NumShards() int

NumShards returns the number of shards.

func (*Engine) RunShard

func (e *Engine) RunShard(shardIdx int, packets <-chan gopacket.Packet)

RunShard is the main loop for a shard. It reads packets from the given channel AND ticks a flush interval AND ticks an eviction interval, all in one goroutine. This is the key to concurrency safety: the shard goroutine is the ONLY goroutine that touches a connection, so there's no race between packet processing and snapshotting / eviction.

┌──────────────────────────────────────────────┐
│         shard goroutine (single)             │
│                                               │
│   select:                                     │
│     ├── packet arrived  → ProcessPacket       │
│     │                      markDirty          │
│     │                                         │
│     ├── 20ms tick       → flushDirty          │
│     │                      Snapshot() each    │
│     │                      send to TUI        │
│     │                                         │
│     └── 1s tick         → evictShard          │
│                            Tracker.Evict      │
│                            OnEvict callbacks  │
│                            Processor.Cleanup  │
└──────────────────────────────────────────────┘

Call this from main.go, one goroutine per shard. Exits when packets channel is closed; performs a final flush before returning.

func (*Engine) SetFpSink

func (e *Engine) SetFpSink(fn func(srcIP string, srcPort uint16, dstIP string, dstPort uint16,
	proto, fpType, fpValue string, ts time.Time))

SetFpSink installs a fingerprint sink callback on every shard. Must be called BEFORE starting RunShard goroutines to avoid data races.

func (*Engine) SetPcapMode

func (e *Engine) SetPcapMode(enabled bool)

SetPcapMode enables pcap replay mode: idle-timeout eviction is disabled during periodic ticks (connections won't be dropped mid-replay for being "idle" in pcap time). Only LRU capacity eviction still runs. The final eviction on shutdown fires normally so all connections are persisted.

func (*Engine) ShardIndex

func (e *Engine) ShardIndex(packet gopacket.Packet) int

ShardIndex returns which shard should process this packet.

func (*Engine) Stats

func (e *Engine) Stats() *EngineStats

Stats returns the engine's shared stats counter.

type EngineStats

type EngineStats struct {
	// contains filtered or unexported fields
}

EngineStats holds atomic counters for data drops and error events that happen on the hot path. Replaces the previous pattern of silently dropping packets, alerts, and flush failures without user visibility.

All counters are monotonic. Snapshot() returns a consistent read of all values at a single point in time.

+-----------------------+
| EngineStats (atomic)  |
+-----------------------+
| ConnChanDrops   uint64|  shard → TUI channel full
| AlertDrops      uint64|  evaluator alert channel full
| FanoutDrops     uint64|  slow subscriber channel full
| FlushFailures   uint64|  SQLite flush transaction error
| MigrationIssues uint64|  rows skipped during schema migration
| PacketsTotal    uint64|  total packets processed across all shards
| ProcessingLagNs  int64|  last observed packet capture-to-eval lag (ns)
+-----------------------+

func NewEngineStats

func NewEngineStats() *EngineStats

NewEngineStats creates a zeroed stats struct.

func (*EngineStats) IncAlertDrop

func (s *EngineStats) IncAlertDrop()

IncAlertDrop records that an alert was dropped because the evaluator alert channel was full.

func (*EngineStats) IncConnChanDrop

func (s *EngineStats) IncConnChanDrop()

IncConnChanDrop records that a connection update was dropped because the shard→TUI channel was full.

func (*EngineStats) IncFanoutDrop

func (s *EngineStats) IncFanoutDrop()

IncFanoutDrop records that an alert was dropped to a slow subscriber during fan-out broadcast. Each subscriber has its own channel; if that channel is full, the drop is counted here rather than blocking the other subscribers.

func (*EngineStats) IncFlushFailure

func (s *EngineStats) IncFlushFailure()

IncFlushFailure records that a SQLite flush transaction failed.

func (*EngineStats) IncMigrationIssue

func (s *EngineStats) IncMigrationIssue()

IncMigrationIssue records that a row was skipped during schema migration (for example, corrupt JSON in the fingerprints column).

func (*EngineStats) IncPacket

func (s *EngineStats) IncPacket()

IncPacket records that a packet was processed by a shard.

func (*EngineStats) SetProcessingLag

func (s *EngineStats) SetProcessingLag(ns int64)

SetProcessingLag records the most recently observed lag (in nanoseconds) between packet capture time and rule evaluation. Only meaningful in live capture mode — in pcap replay mode packet timestamps are historical.

func (*EngineStats) Snapshot

func (s *EngineStats) Snapshot() StatsSnapshot

Snapshot reads all counters into a StatsSnapshot. The reads are not atomic with respect to each other, but each individual read is atomic, so the snapshot is internally consistent enough for display purposes.

type Enricher

type Enricher struct {
	// contains filtered or unexported fields
}

Enricher adds application identification and threat intelligence to connections.

func NewEnricher

func NewEnricher() *Enricher

NewEnricher creates a new Enricher.

func (*Enricher) Enrich

func (e *Enricher) Enrich(conn *tracker.Connection, fpType, fingerprint string)

Enrich updates a connection with lookup results and JA4T analysis.

func (*Enricher) WithRemoteLookup

func (e *Enricher) WithRemoteLookup(rl remoteLookuper) *Enricher

WithRemoteLookup returns a NEW Enricher that consults the given remote-lookup wrapper after a local DB miss. Pass nil to disable (the default).

type FirstSeenMap

type FirstSeenMap struct {
	// contains filtered or unexported fields
}

FirstSeenMap tracks when fingerprints were first observed on the network. Thread-safe for concurrent access from multiple shards.

func NewFirstSeenMap

func NewFirstSeenMap(learnDuration time.Duration) *FirstSeenMap

NewFirstSeenMap creates a FirstSeenMap with an optional learning period. During the learning period, all fingerprints are auto-approved (Check returns false). After the learning period, new fingerprints return true from Check. Set learnDuration to 0 to disable learning (flag everything new immediately).

func (*FirstSeenMap) Check

func (fm *FirstSeenMap) Check(fpType, fingerprint string) bool

Check returns true if this fingerprint is NEW (never seen before AND we are past the learning period). Returns false during learning mode or if the fingerprint has been seen before.

func (*FirstSeenMap) Count

func (fm *FirstSeenMap) Count() int

Count returns the number of unique fingerprints seen.

func (*FirstSeenMap) IsLearning

func (fm *FirstSeenMap) IsLearning() bool

IsLearning returns true if currently in learning mode.

func (*FirstSeenMap) LearningRemaining

func (fm *FirstSeenMap) LearningRemaining() time.Duration

LearningRemaining returns the time remaining in learning mode.

func (*FirstSeenMap) Load

func (fm *FirstSeenMap) Load(entries map[string]time.Time)

Load populates the map from an existing set (e.g., from SQLite on startup).

func (*FirstSeenMap) Snapshot

func (fm *FirstSeenMap) Snapshot() map[string]time.Time

Snapshot returns a copy of all entries for persistence.

type KnownBadPattern

type KnownBadPattern struct {
	Match      string `toml:"match"`
	Category   string `toml:"category"`
	Confidence string `toml:"confidence"`
}

KnownBadPattern is one entry in the labeling backstop list.

type RemoteLookup

type RemoteLookup struct {
	// contains filtered or unexported fields
}

RemoteLookup is a small adapter around ja4plus.LookupFingerprintRemote with an in-process cache and a circuit breaker. The wrapper's job is best-effort enrichment: it must never block live capture, never surface a transport error to the enricher, and never grow unbounded.

Safe for concurrent use.

func NewRemoteLookup

func NewRemoteLookup(cfg *ja4plus.RemoteLookupConfig) *RemoteLookup

NewRemoteLookup returns a wrapper. cfg may be nil to use defaults (https://ja4db.com).

func (*RemoteLookup) Lookup

func (r *RemoteLookup) Lookup(ctx context.Context, fingerprint string) *ja4plus.LookupResult

Lookup returns a *LookupResult on hit, nil if not found, and nil if the breaker is open or any transport error occurred. Errors are absorbed by design.

type Shard

type Shard struct {
	Processor *ja4plus.Processor
	Tracker   *tracker.Tracker

	// FpSink, if non-nil, is called for every fingerprint computed by
	// this shard. It is called from the shard goroutine, so it must not
	// block (use a buffered channel internally). Set it before starting
	// RunShard goroutines to avoid data races.
	FpSink func(srcIP string, srcPort uint16, dstIP string, dstPort uint16,
		proto, fpType, fpValue string, ts time.Time)
	// contains filtered or unexported fields
}

Shard is a single processing unit: one Processor + one Tracker. Each shard runs on a dedicated goroutine.

shard packet loop (owns Tracker, Connections, dirty set)
                    │
                    │ markDirty(id) on each packet
                    ▼
              dirty set (mutex-protected)
                    │
                    │ drainDirty() every 20ms
                    ▼
              flush loop goroutine
                    │
                    │ conn.Snapshot() then send
                    ▼
              ConnChan → TUI

func (*Shard) ProcessPacket

func (s *Shard) ProcessPacket(packet gopacket.Packet)

ProcessPacket routes a packet to the correct shard and processes it. Must be called from a single goroutine per shard.

This function mutates the Connection in place. The shard goroutine owns the connection; the TUI never reads this pointer directly. Updates flow to the TUI through the dirty-flush loop which takes a Snapshot() before sending, guaranteeing the TUI sees an immutable copy.

type StatsSnapshot

type StatsSnapshot struct {
	ConnChanDrops   uint64
	AlertDrops      uint64
	FanoutDrops     uint64
	FlushFailures   uint64
	MigrationIssues uint64
	PacketsTotal    uint64
	ProcessingLagNs int64
}

StatsSnapshot is a consistent point-in-time read of EngineStats counters.

func (StatsSnapshot) Total

func (s StatsSnapshot) Total() uint64

Total returns the sum of all drop counters. Useful for a single "drops" number in the TUI status bar.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL