ingest

package
v0.19.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// APIVersion is the current ingestion API version. Increment on breaking changes.
	APIVersion = 1
	// MinClientAPIVersion is the minimum client API version the server accepts.
	MinClientAPIVersion = 1
)

Variables

This section is empty.

Functions

func ApplySamplingRules

func ApplySamplingRules(entries []store.LogEntry, rules []store.SamplingRule) []store.LogEntry

ApplySamplingRules filters log entries according to the given sampling rules. Entries whose service matches a rule are randomly kept based on the rule's rate. Error/warn/fatal logs are always kept when KeepErrors is true for the matching rule.

func GenerateErrorFingerprint

func GenerateErrorFingerprint(e *store.LogEntry) string

GenerateErrorFingerprint computes a server-side error fingerprint for a log entry. The fingerprint groups "same error, different occurrence" using a language-agnostic algorithm inspired by Bugsnag's default grouping:

MD5(service + error_class + source_file)[:16]

When source_file is absent, falls back to:

MD5(service + error_class + normalized_message)[:16]

This replaces SDK-side fingerprinting for consistency across all languages.

func IsErrorLevel

func IsErrorLevel(level string) bool

IsErrorLevel returns true if the log level indicates an error, warning, or fatal.

func IsValidBatchID

func IsValidBatchID(id string) bool

IsValidBatchID checks whether the given string is a valid UUID format.

func NormalizeMessage

func NormalizeMessage(msg string) string

NormalizeMessage strips dynamic values from an error message so that structurally identical messages produce the same fingerprint. Order matters: replace most specific patterns first.

Types

type FlatHandler

type FlatHandler struct {
	Engine *engine.Store
}

FlatHandler handles log ingestion in the new flat SDK format. This is the preferred format for new SDKs.

func (*FlatHandler) HandleFlatIngest

func (h *FlatHandler) HandleFlatIngest(w http.ResponseWriter, r *http.Request)

HandleFlatIngest is the HTTP handler for POST /api/v2/logs. Accepts the flat SDK format: top-level keys = columns, body = opaque blob.

type Handler

type Handler struct {
	LogStore         store.LogStore
	SettingsStore    store.SettingsStore
	ErrorGroupStore  store.ErrorGroupStore
	ErrorImpactStore store.ErrorImpactStore
	CodeEntityStore  store.CodeEntityStore
	TraceStore       store.TraceStore
	DSStore          store.DataSourceStore
	Registry         *connector.Registry
	Cfg              *config.Config
	WatchStream      WatchStreamEvaluator
	Queue            *Queue
	// contains filtered or unexported fields
}

Handler holds the dependencies for the log ingestion HTTP handler.

func (*Handler) HandleIngestLogs

func (h *Handler) HandleIngestLogs(w http.ResponseWriter, r *http.Request)

HandleIngestLogs is the HTTP handler for POST /api/logs.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue buffers incoming log entries and flushes them in batches to reduce write contention on SQLite (which allows only a single writer at a time). Entries are flushed when: the buffer reaches maxBatchSize, the flush interval fires, or Flush()/Stop() is called explicitly.

func NewQueue

func NewQueue(logStore store.LogStore, cfg QueueConfig) *Queue

NewQueue creates and starts a new Queue with the given configuration. The queue starts a background goroutine that flushes on a timer.

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, entries []store.LogEntry) (int, error)

Enqueue adds entries to the buffer. It returns immediately (non-blocking for the HTTP handler). If the queue is full, it falls back to synchronous insert so no data is lost.

func (*Queue) Flush

func (q *Queue) Flush()

Flush drains the buffer and writes all buffered entries to the store.

func (*Queue) FlushCount

func (q *Queue) FlushCount() int64

FlushCount returns the total number of flush operations performed.

func (*Queue) OverflowCount

func (q *Queue) OverflowCount() int64

OverflowCount returns the total number of times the queue overflowed and entries were inserted synchronously.

func (*Queue) QueueDepth

func (q *Queue) QueueDepth() int

QueueDepth returns the current number of buffered entries.

func (*Queue) Stop

func (q *Queue) Stop()

Stop gracefully shuts down the queue: it marks the queue as stopped, signals the flush loop to exit, and waits for remaining entries to be flushed.

type QueueConfig

type QueueConfig struct {
	MaxQueueSize  int           // Maximum entries buffered before overflow fallback (default 10000)
	MaxBatchSize  int           // Maximum entries per flush batch (default 1000)
	FlushInterval time.Duration // Timer-based flush interval (default 100ms)
}

QueueConfig holds configuration for the Queue.

type WatchStreamEvaluator

type WatchStreamEvaluator interface {
	OnLogsReceived(entries []store.LogEntry)
}

WatchStreamEvaluator is a minimal interface to avoid importing the watcher package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL