Documentation
¶
Overview ¶
Package collector implements an async event buffer that decouples ingestion from SQLite writes. Events are accepted into a ring buffer and batch-flushed to the store on a configurable interval, ensuring <1ms overhead on the ingestion hot path.
Index ¶
- type Collector
- func (c *Collector) DropRate() float64
- func (c *Collector) Dropped() int64
- func (c *Collector) HighWaterMark() int64
- func (c *Collector) Len() int
- func (c *Collector) RecordAgentLLMUsage(ev pulsetypes.AgentLLMUsageEvent)
- func (c *Collector) RecordBrainUsage(ev pulsetypes.BrainUsageEvent)
- func (c *Collector) RecordConfigReload(ev pulsetypes.ConfigReloadEvent)
- func (c *Collector) RecordContextDelivery(ev pulsetypes.ContextDeliveryEvent)
- func (c *Collector) RecordEmbeddingEvent(ev pulsetypes.EmbeddingEvent)
- func (c *Collector) RecordEnrichmentEvent(ev pulsetypes.EnrichmentEvent)
- func (c *Collector) RecordFederationEvent(ev pulsetypes.FederationDetectEvent)
- func (c *Collector) RecordGraphSnapshot(ev pulsetypes.GraphSnapshotEvent)
- func (c *Collector) RecordGuardEvent(ev pulsetypes.GuardEvent)
- func (c *Collector) RecordHeartbeat()
- func (c *Collector) RecordIndexEvent(ev pulsetypes.IndexEvent)
- func (c *Collector) RecordMemoryOp(ev pulsetypes.MemoryOperationEvent)
- func (c *Collector) RecordOutcomeSignal(ev pulsetypes.OutcomeSignalEvent)
- func (c *Collector) RecordParseEvent(ev pulsetypes.ParseEvent)
- func (c *Collector) RecordPersistenceEvent(ev pulsetypes.PersistenceEvent)
- func (c *Collector) RecordReparseEvent(ev pulsetypes.ReparseEvent)
- func (c *Collector) RecordRuleEvalEvent(ev pulsetypes.RuleEvalEvent)
- func (c *Collector) RecordSearchEvent(ev pulsetypes.SearchEvent)
- func (c *Collector) RecordSessionEvent(id, agentID, projectID, eventType string)
- func (c *Collector) RecordSessionEventFull(id, agentID, projectID, eventType, agentVersion string)
- func (c *Collector) RecordSessionModel(sessionID, agentID, projectID, model, provider string)
- func (c *Collector) RecordToolCall(ev pulsetypes.ToolCallEvent)
- func (c *Collector) RecordToolSequenceEntry(sessionID, toolName string, position int, success bool)
- func (c *Collector) RecordValidationEvent(ev pulsetypes.ValidationEvent)
- func (c *Collector) Start()
- func (c *Collector) Stop()
- func (c *Collector) WriteErrors() int64
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Collector ¶
type Collector struct {
// P12-7: optional callback invoked after each flush with the batch size.
OnFlush func(count int)
// contains filtered or unexported fields
}
Collector buffers analytics events and batch-writes them to the store.
func New ¶
func New(st *pulsestore.Store, capacity int, flushIntervalMs int) *Collector
New creates a Collector with the given buffer capacity and flush interval.
func (*Collector) DropRate ¶
DropRate returns the fraction of enqueued events that were dropped (Bug 28 — ROI-E8). Returns 0.0 if no events have been dropped or the high-water mark is zero. If either counter has overflowed to negative (extremely high-throughput case), returns 1.0 (100% drop rate) as a conservative safe value rather than 0.0.
func (*Collector) Dropped ¶
Dropped returns the number of events dropped due to buffer overflow (P2-19).
func (*Collector) HighWaterMark ¶
HighWaterMark returns the peak buffer depth since the collector started (P2-17).
func (*Collector) RecordAgentLLMUsage ¶
func (c *Collector) RecordAgentLLMUsage(ev pulsetypes.AgentLLMUsageEvent)
RecordAgentLLMUsage enqueues an agent-reported LLM usage event (Option B).
func (*Collector) RecordBrainUsage ¶
func (c *Collector) RecordBrainUsage(ev pulsetypes.BrainUsageEvent)
RecordBrainUsage enqueues a brain usage event.
func (*Collector) RecordConfigReload ¶
func (c *Collector) RecordConfigReload(ev pulsetypes.ConfigReloadEvent)
RecordConfigReload enqueues a configuration hot-reload event (Bug 68 — COV-9).
func (*Collector) RecordContextDelivery ¶
func (c *Collector) RecordContextDelivery(ev pulsetypes.ContextDeliveryEvent)
RecordContextDelivery enqueues a context delivery event.
func (*Collector) RecordEmbeddingEvent ¶
func (c *Collector) RecordEmbeddingEvent(ev pulsetypes.EmbeddingEvent)
RecordEmbeddingEvent enqueues an embedding batch event (P2-6).
func (*Collector) RecordEnrichmentEvent ¶
func (c *Collector) RecordEnrichmentEvent(ev pulsetypes.EnrichmentEvent)
RecordEnrichmentEvent enqueues a code enrichment pass outcome event (Bug 70 — COV-Subsys).
func (*Collector) RecordFederationEvent ¶
func (c *Collector) RecordFederationEvent(ev pulsetypes.FederationDetectEvent)
RecordFederationEvent enqueues a federation detection event (P5 — COV-8).
func (*Collector) RecordGraphSnapshot ¶
func (c *Collector) RecordGraphSnapshot(ev pulsetypes.GraphSnapshotEvent)
RecordGraphSnapshot enqueues a graph topology snapshot (P2-7).
func (*Collector) RecordGuardEvent ¶
func (c *Collector) RecordGuardEvent(ev pulsetypes.GuardEvent)
RecordGuardEvent enqueues a loop-guard or rate-limiter block event (P3-2/P3-3).
func (*Collector) RecordHeartbeat ¶
func (c *Collector) RecordHeartbeat()
RecordHeartbeat enqueues a system uptime heartbeat tick (P5 — ROI-E1).
func (*Collector) RecordIndexEvent ¶
func (c *Collector) RecordIndexEvent(ev pulsetypes.IndexEvent)
RecordIndexEvent enqueues a full-index completion event (P2-8).
func (*Collector) RecordMemoryOp ¶
func (c *Collector) RecordMemoryOp(ev pulsetypes.MemoryOperationEvent)
RecordMemoryOp enqueues a recall hit/miss or memory write event (P3-4).
func (*Collector) RecordOutcomeSignal ¶
func (c *Collector) RecordOutcomeSignal(ev pulsetypes.OutcomeSignalEvent)
RecordOutcomeSignal enqueues an intent alignment outcome signal (R29).
func (*Collector) RecordParseEvent ¶
func (c *Collector) RecordParseEvent(ev pulsetypes.ParseEvent)
RecordParseEvent enqueues a per-file parse event (P2-2).
func (*Collector) RecordPersistenceEvent ¶
func (c *Collector) RecordPersistenceEvent(ev pulsetypes.PersistenceEvent)
RecordPersistenceEvent enqueues a store write duration/size event (Bug 69 — COV-12).
func (*Collector) RecordReparseEvent ¶
func (c *Collector) RecordReparseEvent(ev pulsetypes.ReparseEvent)
RecordReparseEvent enqueues an incremental reparse event (P2-3).
func (*Collector) RecordRuleEvalEvent ¶
func (c *Collector) RecordRuleEvalEvent(ev pulsetypes.RuleEvalEvent)
RecordRuleEvalEvent enqueues an architecture rule evaluation event (Bug 71 — COV-Subsys).
func (*Collector) RecordSearchEvent ¶
func (c *Collector) RecordSearchEvent(ev pulsetypes.SearchEvent)
RecordSearchEvent enqueues a search or find_entity analytics event (P4-8).
func (*Collector) RecordSessionEvent ¶
RecordSessionEvent enqueues a session lifecycle event.
func (*Collector) RecordSessionEventFull ¶
RecordSessionEventFull enqueues a session lifecycle event with agent version (Bug 16 — DQ-C.6).
func (*Collector) RecordSessionModel ¶
RecordSessionModel enqueues a model/provider update for an existing session. Called when an agent reports its model via session_init (Option A).
func (*Collector) RecordToolCall ¶
func (c *Collector) RecordToolCall(ev pulsetypes.ToolCallEvent)
RecordToolCall enqueues a tool call event.
func (*Collector) RecordToolSequenceEntry ¶
RecordToolSequenceEntry enqueues a tool call sequence entry (P5 — SA-C1).
func (*Collector) RecordValidationEvent ¶
func (c *Collector) RecordValidationEvent(ev pulsetypes.ValidationEvent)
RecordValidationEvent enqueues a validate_plan or verify_implementation outcome event (P3-5).
func (*Collector) Stop ¶
func (c *Collector) Stop()
Stop signals the flush loop to exit and waits for a final flush. Safe to call multiple times.
func (*Collector) WriteErrors ¶
WriteErrors returns the total number of batch-write errors since collector start (P5 — DQ-Integrity.1).