collector

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package collector implements an async event buffer that decouples ingestion from SQLite writes. Events are accepted into a ring buffer and batch-flushed to the store on a configurable interval, ensuring <1ms overhead on the ingestion hot path.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Collector

type Collector struct {

	// P12-7: optional callback invoked after each flush with the batch size.
	OnFlush func(count int)
	// contains filtered or unexported fields
}

Collector buffers analytics events and batch-writes them to the store.

func New

func New(st *pulsestore.Store, capacity int, flushIntervalMs int) *Collector

New creates a Collector with the given buffer capacity and flush interval.

func (*Collector) DropRate

func (c *Collector) DropRate() float64

DropRate returns the fraction of enqueued events that were dropped (Bug 28 — ROI-E8). Returns 0.0 if no events have been dropped or the high-water mark is zero. If either counter has overflowed to negative (extremely high-throughput case), returns 1.0 (100% drop rate) as a conservative safe value rather than 0.0.

func (*Collector) Dropped

func (c *Collector) Dropped() int64

Dropped returns the number of events dropped due to buffer overflow (P2-19).

func (*Collector) HighWaterMark

func (c *Collector) HighWaterMark() int64

HighWaterMark returns the peak buffer depth since the collector started (P2-17).

func (*Collector) Len

func (c *Collector) Len() int

Len returns the current buffer length (for diagnostics).

func (*Collector) RecordAgentLLMUsage

func (c *Collector) RecordAgentLLMUsage(ev pulsetypes.AgentLLMUsageEvent)

RecordAgentLLMUsage enqueues an agent-reported LLM usage event (Option B).

func (*Collector) RecordBrainUsage

func (c *Collector) RecordBrainUsage(ev pulsetypes.BrainUsageEvent)

RecordBrainUsage enqueues a brain usage event.

func (*Collector) RecordConfigReload

func (c *Collector) RecordConfigReload(ev pulsetypes.ConfigReloadEvent)

RecordConfigReload enqueues a configuration hot-reload event (Bug 68 — COV-9).

func (*Collector) RecordContextDelivery

func (c *Collector) RecordContextDelivery(ev pulsetypes.ContextDeliveryEvent)

RecordContextDelivery enqueues a context delivery event.

func (*Collector) RecordEmbeddingEvent

func (c *Collector) RecordEmbeddingEvent(ev pulsetypes.EmbeddingEvent)

RecordEmbeddingEvent enqueues an embedding batch event (P2-6).

func (*Collector) RecordEnrichmentEvent

func (c *Collector) RecordEnrichmentEvent(ev pulsetypes.EnrichmentEvent)

RecordEnrichmentEvent enqueues a code enrichment pass outcome event (Bug 70 — COV-Subsys).

func (*Collector) RecordFederationEvent

func (c *Collector) RecordFederationEvent(ev pulsetypes.FederationDetectEvent)

RecordFederationEvent enqueues a federation detection event (P5 — COV-8).

func (*Collector) RecordGraphSnapshot

func (c *Collector) RecordGraphSnapshot(ev pulsetypes.GraphSnapshotEvent)

RecordGraphSnapshot enqueues a graph topology snapshot (P2-7).

func (*Collector) RecordGuardEvent

func (c *Collector) RecordGuardEvent(ev pulsetypes.GuardEvent)

RecordGuardEvent enqueues a loop-guard or rate-limiter block event (P3-2/P3-3).

func (*Collector) RecordHeartbeat

func (c *Collector) RecordHeartbeat()

RecordHeartbeat enqueues a system uptime heartbeat tick (P5 — ROI-E1).

func (*Collector) RecordIndexEvent

func (c *Collector) RecordIndexEvent(ev pulsetypes.IndexEvent)

RecordIndexEvent enqueues a full-index completion event (P2-8).

func (*Collector) RecordMemoryOp

func (c *Collector) RecordMemoryOp(ev pulsetypes.MemoryOperationEvent)

RecordMemoryOp enqueues a recall hit/miss or memory write event (P3-4).

func (*Collector) RecordOutcomeSignal

func (c *Collector) RecordOutcomeSignal(ev pulsetypes.OutcomeSignalEvent)

RecordOutcomeSignal enqueues an intent alignment outcome signal (R29).

func (*Collector) RecordParseEvent

func (c *Collector) RecordParseEvent(ev pulsetypes.ParseEvent)

RecordParseEvent enqueues a per-file parse event (P2-2).

func (*Collector) RecordPersistenceEvent

func (c *Collector) RecordPersistenceEvent(ev pulsetypes.PersistenceEvent)

RecordPersistenceEvent enqueues a store write duration/size event (Bug 69 — COV-12).

func (*Collector) RecordReparseEvent

func (c *Collector) RecordReparseEvent(ev pulsetypes.ReparseEvent)

RecordReparseEvent enqueues an incremental reparse event (P2-3).

func (*Collector) RecordRuleEvalEvent

func (c *Collector) RecordRuleEvalEvent(ev pulsetypes.RuleEvalEvent)

RecordRuleEvalEvent enqueues an architecture rule evaluation event (Bug 71 — COV-Subsys).

func (*Collector) RecordSearchEvent

func (c *Collector) RecordSearchEvent(ev pulsetypes.SearchEvent)

RecordSearchEvent enqueues a search or find_entity analytics event (P4-8).

func (*Collector) RecordSessionEvent

func (c *Collector) RecordSessionEvent(id, agentID, projectID, eventType string)

RecordSessionEvent enqueues a session lifecycle event.

func (*Collector) RecordSessionEventFull

func (c *Collector) RecordSessionEventFull(id, agentID, projectID, eventType, agentVersion string)

RecordSessionEventFull enqueues a session lifecycle event with agent version (Bug 16 — DQ-C.6).

func (*Collector) RecordSessionModel

func (c *Collector) RecordSessionModel(sessionID, agentID, projectID, model, provider string)

RecordSessionModel enqueues a model/provider update for an existing session. Called when an agent reports its model via session_init (Option A).

func (*Collector) RecordToolCall

func (c *Collector) RecordToolCall(ev pulsetypes.ToolCallEvent)

RecordToolCall enqueues a tool call event.

func (*Collector) RecordToolSequenceEntry

func (c *Collector) RecordToolSequenceEntry(sessionID, toolName string, position int, success bool)

RecordToolSequenceEntry enqueues a tool call sequence entry (P5 — SA-C1).

func (*Collector) RecordValidationEvent

func (c *Collector) RecordValidationEvent(ev pulsetypes.ValidationEvent)

RecordValidationEvent enqueues a validate_plan or verify_implementation outcome event (P3-5).

func (*Collector) Start

func (c *Collector) Start()

Start begins the background flush goroutine.

func (*Collector) Stop

func (c *Collector) Stop()

Stop signals the flush loop to exit and waits for a final flush. Safe to call multiple times.

func (*Collector) WriteErrors

func (c *Collector) WriteErrors() int64

WriteErrors returns the total number of batch-write errors since collector start (P5 — DQ-Integrity.1).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL