recv

package
v1.9.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const APIVersion = 1

APIVersion is incremented on breaking changes to the push API.

Variables

This section is empty.

Functions

func EntryMatchesRegex added in v1.9.2

func EntryMatchesRegex(entry LogEntry, re *regexp.Regexp) bool

EntryMatchesRegex checks whether a log entry matches the given regex.

func ExportFilteredLines added in v1.9.6

func ExportFilteredLines(entries []LogEntry, dir string) (int64, error)

ExportFilteredLines writes a slice of log entries as a new capture directory. Returns the number of lines written.

func FindTimeIndex added in v1.9.4

func FindTimeIndex(lines []LogEntry, input string) int

FindTimeIndex finds the first line whose formatted timestamp contains the input fragment. Supports fragments like: "14:32", "14:32:05", "2026-03-05T14:32". Returns -1 if no match.

func IsLiveCapture added in v1.0.5

func IsLiveCapture(dir string) bool

IsLiveCapture checks if a capture directory is still actively receiving.

func ParseRedactFlag

func ParseRedactFlag(val string) (enabled bool, names []string)

ParseRedactFlag parses the --redact flag value. "" means disabled, "true" or empty-after-flag means all patterns, "a,b" means subset.

func ParseWebhookAuth added in v1.2.0

func ParseWebhookAuth(spec string) (mode, value string, err error)

ParseWebhookAuth validates and splits an auth spec into mode and value. Accepted formats: "" (no auth), "bearer:<token>", "hmac-sha256:<secret>".

func WriteMetadata

func WriteMetadata(dir string, meta *Metadata) error

WriteMetadata writes metadata.json to the given directory.

Types

type AlertEngine added in v1.0.5

type AlertEngine struct {
	// contains filtered or unexported fields
}

AlertEngine evaluates alert rules against pipeline snapshots and fires webhook events when thresholds are crossed.

func NewAlertEngine added in v1.0.5

func NewAlertEngine(rules []AlertRule, dispatcher *WebhookDispatcher) *AlertEngine

NewAlertEngine creates an engine with the given rules and webhook dispatcher.

func (*AlertEngine) Evaluate added in v1.0.5

func (e *AlertEngine) Evaluate(snap Snapshot)

Evaluate checks all rules against the current snapshot. Call this every tick (e.g. 1s). It computes derived metrics from the delta between current and previous snapshots, then fires webhook events for rules that cross their thresholds (with hysteresis — once fired, a rule won't re-fire until the condition resolves).

func (*AlertEngine) Fired added in v1.0.5

func (e *AlertEngine) Fired() []string

Fired returns the names of rules currently in the fired state.

type AlertRule added in v1.0.5

type AlertRule struct {
	Name      string  `yaml:"name"`
	Metric    string  `yaml:"metric"` // logs_dropped, drop_rate, disk_pct
	Op        string  `yaml:"op"`     // gt, lt, gte, lte
	Threshold float64 `yaml:"threshold"`
	Detail    string  `yaml:"detail"`
}

AlertRule defines a threshold-based alert that fires a webhook.

func LoadAlertRules added in v1.0.5

func LoadAlertRules(path string) ([]AlertRule, error)

LoadAlertRules loads alert rules from a YAML file.

type AlertRulesFile added in v1.0.5

type AlertRulesFile struct {
	Rules []AlertRule `yaml:"rules"`
}

AlertRulesFile is the YAML structure for alert rules.

type AuditEntry

type AuditEntry struct {
	Timestamp time.Time     `json:"timestamp"`
	Event     string        `json:"event"`
	RemoteIP  string        `json:"remote_ip,omitempty"`
	Lines     int           `json:"lines,omitempty"`
	Bytes     int           `json:"bytes,omitempty"`
	Duration  time.Duration `json:"duration_ms,omitempty"`
}

AuditEntry records a single auditable event.

type AuditLogger

type AuditLogger struct {
	// contains filtered or unexported fields
}

AuditLogger writes append-only JSONL audit records.

func NewAuditLogger

func NewAuditLogger(dir string) (*AuditLogger, error)

NewAuditLogger creates an audit logger writing to <dir>/audit.jsonl.

func (*AuditLogger) Close

func (a *AuditLogger) Close() error

Close flushes and closes the audit log file.

func (*AuditLogger) Log

func (a *AuditLogger) Log(entry AuditEntry)

Log writes an audit entry. Safe to call from multiple goroutines. If a is nil, the call is a no-op.

type DiskReporter

type DiskReporter interface {
	DiskUsage() int64
}

DiskReporter provides disk usage for the TUI.

type LogEntry

type LogEntry struct {
	Timestamp time.Time         `json:"ts"`
	Labels    map[string]string `json:"labels,omitempty"`
	Message   string            `json:"msg"`
}

LogEntry represents a single parsed log line.

type LogRing

type LogRing struct {
	// contains filtered or unexported fields
}

LogRing is a fixed-size circular buffer of log entries for TUI display. All methods are safe for concurrent use.

func NewLogRing

func NewLogRing(cap int) *LogRing

NewLogRing creates a ring buffer with the given capacity. If cap ≤ 0, defaultRingSize is used.

func (*LogRing) Push

func (r *LogRing) Push(entry LogEntry)

Push adds an entry to the ring. If full, the oldest entry is overwritten. Never blocks.

func (*LogRing) Snapshot

func (r *LogRing) Snapshot() []LogEntry

Snapshot returns a chronological copy of all entries in the ring.

func (*LogRing) SnapshotFiltered added in v1.0.5

func (r *LogRing) SnapshotFiltered(fn func(LogEntry) bool) []LogEntry

SnapshotFiltered returns a chronological copy of entries matching the predicate.

func (*LogRing) Version

func (r *LogRing) Version() int

Version returns a monotonic counter that increments on every Push.

type LokiPushRequest

type LokiPushRequest struct {
	Streams []LokiStream `json:"streams"`
}

LokiPushRequest is the Loki push API JSON payload.

type LokiStream

type LokiStream struct {
	Stream map[string]string `json:"stream"`
	Values [][]string        `json:"values"` // [ns_timestamp, message]
}

LokiStream is one stream within a Loki push request.

type Metadata

type Metadata struct {
	Version    int            `json:"version"`
	Format     string         `json:"format"`
	Started    time.Time      `json:"started"`
	Stopped    time.Time      `json:"stopped,omitempty"`
	TotalLines int64          `json:"total_lines"`
	TotalBytes int64          `json:"total_bytes"`
	LabelsSeen []string       `json:"labels_seen"`
	Redaction  *RedactionInfo `json:"redaction,omitempty"`
}

Metadata records session-level information for a capture directory.

func ReadMetadata

func ReadMetadata(dir string) (*Metadata, error)

ReadMetadata reads metadata.json from the given directory.

type Metrics

type Metrics struct {
	LogsReceived       prometheus.Counter
	LogsDropped        prometheus.Counter
	BytesWritten       prometheus.Counter
	DiskUsage          prometheus.Gauge
	ActiveConnections  prometheus.Gauge
	BackpressureEvents prometheus.Counter
	RedactionsTotal    *prometheus.CounterVec
	PushDuration       prometheus.Histogram
	WriterQueueLength  prometheus.Gauge
	RotationTotal      *prometheus.CounterVec
	RotationErrors     prometheus.Counter
}

Metrics holds all Prometheus metrics for the receiver pipeline.

func NewMetrics

func NewMetrics(reg prometheus.Registerer) *Metrics

NewMetrics creates and registers all receiver metrics.

type RedactPattern

type RedactPattern struct {
	Name        string `yaml:"name"`
	Pattern     string `yaml:"pattern"`
	Replacement string `yaml:"replacement"`
	// contains filtered or unexported fields
}

RedactPattern defines a named PII pattern with its compiled regex.

type RedactionInfo

type RedactionInfo struct {
	Enabled  bool     `json:"enabled"`
	Patterns []string `json:"patterns"`
}

RedactionInfo records which redaction patterns were active.

type Redactor

type Redactor struct {
	// contains filtered or unexported fields
}

Redactor holds active patterns and redacts matching content.

func NewRedactor

func NewRedactor(names []string) (*Redactor, error)

NewRedactor creates a Redactor with the specified patterns enabled. If names is empty, all built-in patterns are enabled.

func (*Redactor) LoadCustomPatterns

func (r *Redactor) LoadCustomPatterns(path string) error

LoadCustomPatterns loads additional patterns from a YAML file.

func (*Redactor) PatternNames

func (r *Redactor) PatternNames() []string

PatternNames returns the names of active patterns.

func (*Redactor) Redact

func (r *Redactor) Redact(msg string) string

Redact replaces all matching PII in msg with redaction markers.

func (*Redactor) SetOnRedact added in v1.0.4

func (r *Redactor) SetOnRedact(fn func(pattern string))

SetOnRedact sets a callback invoked for each redaction hit with the pattern name.

type SearchFilter added in v1.9.2

type SearchFilter struct {
	Regex *regexp.Regexp
	Mode  SearchMode
}

SearchFilter is one entry in the filter stack.

type SearchMode added in v1.9.2

type SearchMode int

SearchMode identifies the type of search filter.

const (
	ModeHide SearchMode = iota // /!pattern — remove matching lines
	ModeGrep                   // /=pattern — keep only matching lines
)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the HTTP receiver server.

func NewServer

func NewServer(addr string, writer *Writer, redactor *Redactor, metrics *Metrics, stats *Stats, ring *LogRing) *Server

NewServer creates an HTTP server bound to addr.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

ListenAndServe starts the HTTP server.

func (*Server) ListenAndServeTLS

func (s *Server) ListenAndServeTLS(certFile, keyFile string) error

ListenAndServeTLS starts the HTTP server with TLS.

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

Serve accepts connections on a listener.

func (*Server) SetAuditLogger

func (s *Server) SetAuditLogger(a *AuditLogger)

SetAuditLogger attaches an audit logger to the server.

func (*Server) SetVersion added in v0.3.0

func (s *Server) SetVersion(v string)

SetVersion sets the application version reported by /api/version.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server.

type Snapshot

type Snapshot struct {
	LogsReceived int64
	LogsDropped  int64
	ActiveConns  int64
	DiskUsage    int64
	DiskCap      int64
	BytesWritten int64
	Talkers      []Talker
}

Snapshot is a point-in-time copy of pipeline stats.

type Stats

type Stats struct {
	LogsReceived atomic.Int64
	LogsDropped  atomic.Int64
	ActiveConns  atomic.Int64
	// contains filtered or unexported fields
}

Stats collects pipeline counters for TUI display. All methods are safe for concurrent use.

func NewStats

func NewStats() *Stats

NewStats creates a Stats collector.

func (*Stats) RecordDrop

func (s *Stats) RecordDrop()

RecordDrop increments the dropped counter.

func (*Stats) RecordEntry

func (s *Stats) RecordEntry(labels map[string]string)

RecordEntry increments received counter and tracks the talker. The talker name is the "app" label value, falling back to the first label value.

func (*Stats) Snapshot

func (s *Stats) Snapshot(diskUsage, diskCap, bytesWritten int64) Snapshot

Snapshot returns a point-in-time copy of all stats.

type TUIModel

type TUIModel struct {
	// contains filtered or unexported fields
}

TUIModel is the bubbletea model for the live receiver dashboard.

func NewTUIModel

func NewTUIModel(stats *Stats, ring *LogRing, disk DiskReporter, diskCap int64, writer *Writer, listen, dir, redactInfo string) TUIModel

NewTUIModel creates a TUI model wired to the pipeline data sources.

func (TUIModel) Init

func (m TUIModel) Init() tea.Cmd

Init starts the tick timer.

func (TUIModel) Update

func (m TUIModel) Update(msg tea.Msg) (tea.Model, tea.Cmd)

Update handles messages.

func (TUIModel) View

func (m TUIModel) View() string

View renders the TUI.

type Tailer added in v1.0.5

type Tailer struct {
	// contains filtered or unexported fields
}

Tailer follows the active JSONL file in a capture directory, emitting new lines as they are appended. It handles file rotation by switching to the newest uncompressed .jsonl file when a new one appears.

func NewTailer added in v1.0.5

func NewTailer(dir string) (*Tailer, error)

NewTailer opens the newest .jsonl file in dir and seeks to the end. Use Tail() in a loop to read new entries.

func NewTailerFromStart added in v1.0.5

func NewTailerFromStart(dir string) (*Tailer, error)

NewTailerFromStart opens the newest .jsonl file and reads from the beginning.

func (*Tailer) Close added in v1.0.5

func (t *Tailer) Close() error

Close closes the underlying file.

func (*Tailer) ReadLast added in v1.0.5

func (t *Tailer) ReadLast(n int) ([]LogEntry, error)

ReadLast reads the last n lines from the current file.

func (*Tailer) Tail added in v1.0.5

func (t *Tailer) Tail() ([]LogEntry, error)

Tail reads any new complete lines from the current file. On rotation (new .jsonl file detected), it switches to the new file. Returns entries read and any error. Returns nil, nil when no new data is available.

type Talker

type Talker struct {
	Name  string
	Count int64
}

Talker is a name and its cumulative entry count.

type WebhookDispatcher added in v1.0.0

type WebhookDispatcher struct {
	// contains filtered or unexported fields
}

WebhookDispatcher sends fire-and-forget HTTP POST notifications.

func NewWebhookDispatcher added in v1.0.0

func NewWebhookDispatcher(urls []string, eventFilter []string, authSpec string) (*WebhookDispatcher, error)

NewWebhookDispatcher creates a dispatcher for the given URLs and event filter. If eventFilter is empty, all events are accepted. authSpec controls request authentication: "" for none, "bearer:<token>", or "hmac-sha256:<secret>".

func (*WebhookDispatcher) Fire added in v1.0.0

func (d *WebhookDispatcher) Fire(evt WebhookEvent)

Fire sends the event to all configured webhooks in background goroutines. It returns immediately (non-blocking). Errors are silently dropped.

type WebhookEvent added in v1.0.0

type WebhookEvent struct {
	Event     string        `json:"event"`
	Timestamp time.Time     `json:"timestamp"`
	Dir       string        `json:"dir,omitempty"`
	Stats     *WebhookStats `json:"stats,omitempty"`
	Detail    string        `json:"detail,omitempty"`
}

WebhookEvent is the JSON payload sent to webhook URLs.

type WebhookStats added in v1.0.0

type WebhookStats struct {
	LinesWritten int64 `json:"lines_written"`
	BytesWritten int64 `json:"bytes_written"`
	DiskUsage    int64 `json:"disk_usage"`
	DiskCap      int64 `json:"disk_cap"`
}

WebhookStats contains capture metrics included in webhook payloads.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer drains LogEntry from a bounded channel and writes JSONL to a destination.

func NewWriter

func NewWriter(bufSize int, dst io.Writer, track func(time.Time, map[string]string)) *Writer

NewWriter creates a Writer with the given buffer size. dst receives JSONL output; track is called per line for metadata tracking (may be nil).

func (*Writer) BytesWritten

func (w *Writer) BytesWritten() int64

BytesWritten returns total bytes written.

func (*Writer) Close

func (w *Writer) Close()

Close signals the writer to stop, drains remaining entries, and waits.

func (*Writer) Healthy added in v0.3.0

func (w *Writer) Healthy() bool

Healthy returns true if the writer channel has capacity (not in backpressure).

func (*Writer) LinesWritten

func (w *Writer) LinesWritten() int64

LinesWritten returns total lines written.

func (*Writer) Send

func (w *Writer) Send(entry LogEntry) bool

Send attempts a non-blocking send of entry to the writer channel. Returns false if the channel is full (caller should count as dropped).

func (*Writer) SetQueueGauge added in v0.3.0

func (w *Writer) SetQueueGauge(fn func(float64))

SetQueueGauge sets a callback to report queue length changes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL