observability

package
v0.34.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseOTLPHeaders

func ParseOTLPHeaders(raw string) map[string]string

ParseOTLPHeaders parses the OTEL_EXPORTER_OTLP_HEADERS env var format.

func RecordBrokerAutoclaim

func RecordBrokerAutoclaim(ctx context.Context, jobID, result string, count int)

result: "reclaimed" | "dead_letter".

func RecordBrokerCounterPELSkew

func RecordBrokerCounterPELSkew(ctx context.Context, jobID string, skew float64)

Persistent non-zero skew = counter leaked (fix-broker-counter-drift). jobID kept for call-site stability; omitted from labels (cardinality).

func RecordBrokerCounterSyncSkew

func RecordBrokerCounterSyncSkew(ctx context.Context, jobID string, skew float64)

jobID kept for call-site stability; omitted from labels (cardinality).

func RecordBrokerDispatch

func RecordBrokerDispatch(ctx context.Context, jobID, outcome string)

outcome: "ok" | "err" | "capacity" | "paced". jobID kept for call-site stability.

func RecordBrokerMessageAge

func RecordBrokerMessageAge(ctx context.Context, jobID string, ageMs float64)

Call once per parsed XREADGROUP message.

func RecordBrokerOutbox

func RecordBrokerOutbox(ctx context.Context, backlog int64, oldestAgeSeconds float64)

func RecordBrokerOutboxSweep

func RecordBrokerOutboxSweep(ctx context.Context, outcome string, count int)

outcome must be "dispatched" | "retried" | "dead_lettered" (mutually exclusive per row).

func RecordBrokerPELWithoutConsumer

func RecordBrokerPELWithoutConsumer(ctx context.Context, count int64)

Healthy reading is always zero; non-zero = dispatch/consume diverged and those jobs' tasks are stalled.

func RecordBrokerPacerDelay

func RecordBrokerPacerDelay(ctx context.Context, domain string, delayMs float64)

domain must NOT be a label — unbounded cardinality.

func RecordBrokerPacerPushback

func RecordBrokerPacerPushback(ctx context.Context, domain, reason string)

reason: "gate" (domain-gate NX hold) | "rate_limited" (release feedback). domain must NOT be a label — unbounded cardinality.

func RecordBrokerRedisPing

func RecordBrokerRedisPing(ctx context.Context, duration time.Duration, ok bool)

func RecordBrokerRedisPool

func RecordBrokerRedisPool(ctx context.Context, snap RedisPoolSnapshot)

func RecordBrokerStreamStats

func RecordBrokerStreamStats(ctx context.Context, s BrokerStreamStats)

Per-job drill-down lives on traces/logs (which carry job_id), not metrics.

func RecordCrawlerPhase

func RecordCrawlerPhase(ctx context.Context, metrics CrawlerPhaseMetrics)

func RecordDBPoolRejection

func RecordDBPoolRejection(ctx context.Context)

func RecordDBPoolStats

func RecordDBPoolStats(ctx context.Context, snapshot DBPoolSnapshot)

func RecordDBPressureAdjustment

func RecordDBPressureAdjustment(ctx context.Context, direction string)

direction must be "up" or "down".

func RecordDBPressureStats

func RecordDBPressureStats(ctx context.Context, emaMs float64, limit int32)

Call alongside RecordDBPoolStats for a complete pool+pressure snapshot.

func RecordFDStats

func RecordFDStats(ctx context.Context, current, limit int, pressure float64)

func RecordHTMLPersistBodyBytes

func RecordHTMLPersistBodyBytes(ctx context.Context, bytes int64)

func RecordHTMLPersistQueueDepth

func RecordHTMLPersistQueueDepth(ctx context.Context, depth int)

Used to tune the HTML_PERSIST_QUEUE / HTML_PERSIST_WORKERS ratio.

func RecordHTMLPersistUpload

func RecordHTMLPersistUpload(ctx context.Context, outcome string)

outcome: "ok" | "err" | "skipped".

func RecordHTMLPersistUploadDuration

func RecordHTMLPersistUploadDuration(ctx context.Context, duration time.Duration)

func RecordJobConcurrencySnapshot

func RecordJobConcurrencySnapshot(ctx context.Context, jobID string, runningTasks int64, concurrencyLimit int64, unlimited bool)

jobID kept for call-site stability; omitted from labels (cardinality).

func RecordJobInfoCacheHit

func RecordJobInfoCacheHit(ctx context.Context, jobID string)

jobID kept for call-site stability; omitted from labels (cardinality).

func RecordJobInfoCacheInvalidation

func RecordJobInfoCacheInvalidation(ctx context.Context, jobID, reason string)

func RecordJobInfoCacheMiss

func RecordJobInfoCacheMiss(ctx context.Context, jobID string)

func RecordJobInfoCacheSize

func RecordJobInfoCacheSize(ctx context.Context, size int)

func RecordLighthouseRun

func RecordLighthouseRun(ctx context.Context, jobID, outcome string)

outcome: "succeeded" | "failed" | "skipped_quota" | "shed". "shed" = soft memory-shed circuit breaker deferred the audit; a rising shed rate means the analysis fleet is memory-saturated.

func RecordLighthouseRunDuration

func RecordLighthouseRunDuration(ctx context.Context, jobID, outcome string, durationMs float64)

outcome label is load-bearing — failures cluster at timeout and would otherwise distort the success-path latency distribution.

func RecordLighthouseRunRetry

func RecordLighthouseRunRetry(ctx context.Context, jobID, reason string)

reason MUST come from the runner's transientStderrSubstrings list (e.g. "target_crashed"); never pass free-form error text — cardinality.

func RecordLighthouseScheduled

func RecordLighthouseScheduled(ctx context.Context, jobID, band string, count int)

band: "fastest" | "slowest" | "reconcile". jobID kept for call-site stability.

func RecordSemaphoreWait

func RecordSemaphoreWait(ctx context.Context, waitMs float64)

func RecordTaskClaimAttempt

func RecordTaskClaimAttempt(ctx context.Context, jobID string, latency time.Duration, status string)

jobID kept for call-site stability; omitted from labels (cardinality).

func RecordTaskWaiting

func RecordTaskWaiting(ctx context.Context, jobID string, reason string, count int)

func RecordWorkerConcurrency

func RecordWorkerConcurrency(ctx context.Context, workerID int, delta int64, capacity int64)

delta: +1 starting, -1 completing. capacity: pass >0 only on startup.

func RecordWorkerTask

func RecordWorkerTask(ctx context.Context, metrics WorkerTaskMetrics)

job.id must NOT be a metric label — Prometheus active-series cardinality. Per-job pivot lives on the worker.process_task span attributes.

func RecordWorkerTaskFailure

func RecordWorkerTaskFailure(ctx context.Context, jobID string, reason string)

func RecordWorkerTaskOutcome

func RecordWorkerTaskOutcome(ctx context.Context, metrics WorkerTaskOutcomeMetrics)

func RecordWorkerTaskRetry

func RecordWorkerTaskRetry(ctx context.Context, jobID string, reason string)

func StartWorkerTaskSpan

func StartWorkerTaskSpan(ctx context.Context, info WorkerTaskSpanInfo) (context.Context, trace.Span)

func WrapHandler

func WrapHandler(handler http.Handler, prov *Providers) http.Handler

Types

type BrokerStreamStats

type BrokerStreamStats struct {
	WorkerStreamLength     int64
	WorkerScheduledDepth   int64
	WorkerPending          int64
	LighthouseStreamLength int64
	LighthousePending      int64
}

Pre-aggregated across active jobs; the per-job job.id label was dropped for Mimir cardinality and dashboards already used sum(...).

Worker covers the crawl stream (XLEN), schedule ZSET (ZCARD), and worker consumer-group pending. Lighthouse covers only the lighthouse stream and its consumer-group pending — there is no lighthouse ZSET.

type Config

type Config struct {
	Enabled        bool
	ServiceName    string
	Environment    string
	OTLPEndpoint   string
	OTLPHeaders    map[string]string
	OTLPInsecure   bool
	MetricsAddress string
}

type CrawlerPhaseMetrics

type CrawlerPhaseMetrics struct {
	Phase    string
	Outcome  string
	Duration time.Duration
}

type DBPoolSnapshot

type DBPoolSnapshot struct {
	InUse        int
	Idle         int
	WaitCount    int64
	WaitDuration time.Duration
	MaxOpen      int
	Reserved     int
	Usage        float64
}

type MetricsServer

type MetricsServer struct {
	Providers *Providers
	// contains filtered or unexported fields
}

MetricsServer ties the OTel providers to the metrics HTTP server so the caller manages a single Shutdown.

func StartMetricsServer

func StartMetricsServer(ctx context.Context, opts MetricsServerOptions) (*MetricsServer, error)

StartMetricsServer initialises the OTel providers and, when the providers expose a Prometheus handler and MetricsAddress is set, binds /metrics (plus /debug/pprof when EnablePprof is true) on that address.

Telemetry is treated as best-effort: a bind failure is logged but does not return an error, since the rest of the binary should still come up. Init failures are returned because they imply a malformed config.

func (*MetricsServer) Shutdown

func (s *MetricsServer) Shutdown(ctx context.Context)

Shutdown stops the HTTP server (5s grace) and flushes the OTel providers (10s grace, inherited from Providers.Shutdown). Safe on a nil receiver and after a partial init.

type MetricsServerOptions

type MetricsServerOptions struct {
	ServiceName    string
	Environment    string
	OTLPEndpoint   string
	OTLPHeaders    map[string]string
	OTLPInsecure   bool
	MetricsAddress string
	EnablePprof    bool         // worker + analysis enable pprof; cmd/app does not.
	Logger         *slog.Logger // optional; defaults to slog.Default().
}

MetricsServerOptions bundles the observability config and HTTP listener settings that all three Fly binaries reproduce verbatim.

type Providers

type Providers struct {
	TracerProvider *sdktrace.TracerProvider
	MeterProvider  *sdkmetric.MeterProvider
	Propagator     propagation.TextMapPropagator
	MetricsHandler http.Handler
	Shutdown       func(ctx context.Context) error
	Config         Config
}

func Init

func Init(ctx context.Context, cfg Config) (*Providers, error)

type RedisPoolSnapshot

type RedisPoolSnapshot struct {
	InUse int64
	Idle  int64
	Waits int64
}

type WorkerTaskMetrics

type WorkerTaskMetrics struct {
	JobID         string
	Status        string
	Duration      time.Duration
	QueueWait     time.Duration
	TotalDuration time.Duration
}

type WorkerTaskOutcomeMetrics

type WorkerTaskOutcomeMetrics struct {
	JobID    string
	Outcome  string
	Reason   string
	Duration time.Duration
}

type WorkerTaskSpanInfo

type WorkerTaskSpanInfo struct {
	JobID     string
	TaskID    string
	Domain    string
	Path      string
	FindLinks bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL