observers

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package observers provides ready-made, dependency-free implementations of flywheel.Observer — the telemetry seam the Runner invokes around each attempt — so a consumer gets metrics, structured logs, and a Prometheus endpoint without hand-rolling and wiring an adapter first.

The package depends only on the standard library and the flywheel root package, so importing it adds no metrics or tracing dependency to a consumer. The boundary that does carry a third-party stack is MetricsRecorder: a one-method sink a consumer implements against Prometheus, OpenTelemetry, statsd, or CloudWatch. flywheel itself imports none of them — MemRecorder, the in-memory reference recorder, backs the local `/metrics` endpoint with no external system.

The three building blocks compose:

  • MetricsObserver translates lifecycle events into MetricsRecorder calls (the metric taxonomy lives on MetricsObserver).
  • SlogObserver logs each event at debug level for `--log debug` diagnosis.
  • Multi fans one event out to several observers, so a Node can run metrics and logging side by side: NewMulti(NewSlog(logger), NewMetrics(rec)).

WritePrometheus and MetricsHandler render a MemRecorder's counters plus a freshly sampled flywheel.QueueHealth into the Prometheus text-exposition format.

Every method runs synchronously on the dispatch path and must not block, the same contract as flywheel.Observer: MemRecorder takes a short mutex-guarded map write and SlogObserver defaults to debug, so neither stalls a worker.

Index

Constants

View Source
const (
	MetricJobsClaimed  = "flywheel_jobs_claimed_total"
	MetricJobsStarted  = "flywheel_jobs_started_total"
	MetricJobsFinished = "flywheel_jobs_finished_total"
	MetricJobsErrored  = "flywheel_jobs_errored_total"
	MetricJobsRetried  = "flywheel_jobs_retried_total"
	MetricJobDuration  = "flywheel_job_duration_seconds"
)

Metric names. They follow Prometheus convention: a flywheel_ prefix, a unit suffix where one applies, and a _total suffix on monotonic counters.

View Source
const (
	TagExecutorClass = "executor_class"
	TagKind          = "kind"
	TagQueue         = "queue"
	TagOutcome       = "outcome"
	TagErrorClass    = "error_class"
)

Tag keys. They are the label dimensions the taxonomy slices each metric by.

View Source
const (
	MetricQueueJobs        = "flywheel_queue_jobs"
	MetricQueueReady       = "flywheel_queue_ready"
	MetricQueueInFlight    = "flywheel_queue_inflight"
	MetricQueueOldestReady = "flywheel_queue_oldest_ready_seconds"
)

Queue-health gauge metric names. They are sampled fresh on each scrape from a flywheel.QueueHealth, not accumulated in the recorder.

Variables

This section is empty.

Functions

func MetricsHandler

func MetricsHandler(rec *MemRecorder, sample func(ctx context.Context) (flywheel.QueueHealth, error)) http.Handler

MetricsHandler returns an http.Handler that serves rec's counters together with queue-health gauges sampled fresh on every scrape via sample. A nil sample renders counters only (no gauges). When sample returns an error the handler responds 500 so the scrape is recorded as failed rather than silently dropping the lag signal.

func WritePrometheus

func WritePrometheus(w io.Writer, rec *MemRecorder, qh flywheel.QueueHealth) error

WritePrometheus renders rec's accumulated counters and distributions plus the point-in-time queue-health gauges in qh into the Prometheus text-exposition format (version 0.0.4): a # HELP and # TYPE line per metric family followed by its series lines. Output is deterministic — series are emitted in sorted order — so it is stable to diff and assert against.

Types

type CounterSeries

type CounterSeries struct {
	Name  string
	Tags  map[string]string
	Value int64
}

CounterSeries is one counter series in a Snapshot.

type GaugeSeries

type GaugeSeries struct {
	Name  string
	Tags  map[string]string
	Value float64
}

GaugeSeries is one gauge series in a Snapshot.

type MemRecorder

type MemRecorder struct {
	// contains filtered or unexported fields
}

MemRecorder is a concurrent-safe, in-memory MetricsRecorder. It is three things at once: the reference implementation of the interface, the test double the adapter tests assert against, and the source of the process-lifetime counters the local `/metrics` endpoint renders. A series is identified by its name plus its sorted tag set, so repeated calls with equal tags accumulate into one cell.

func NewMemRecorder

func NewMemRecorder() *MemRecorder

NewMemRecorder returns an empty MemRecorder ready for concurrent use.

func (*MemRecorder) Count

func (m *MemRecorder) Count(name string, delta int64, tags map[string]string)

Count adds delta to the named counter series.

func (*MemRecorder) Gauge

func (m *MemRecorder) Gauge(name string, value float64, tags map[string]string)

Gauge sets the named gauge series to value (last write wins).

func (*MemRecorder) Observe

func (m *MemRecorder) Observe(name string, value float64, tags map[string]string)

Observe folds value into the named distribution series as a running sum and count, so an average is sum/count.

func (*MemRecorder) Snapshot

func (m *MemRecorder) Snapshot() Snapshot

Snapshot copies every series out under the lock. The returned maps are private copies the caller may read freely without racing concurrent recording.

type MetricsObserver

type MetricsObserver struct {
	// contains filtered or unexported fields
}

MetricsObserver implements flywheel.Observer by translating each lifecycle event into MetricsRecorder calls, per this taxonomy:

OnClaim  -> Count(flywheel_jobs_claimed_total, batch, {executor_class})
OnStart  -> Count(flywheel_jobs_started_total, 1, {kind, queue})
OnFinish -> Count(flywheel_jobs_finished_total, 1, {kind, queue, outcome})
            Observe(flywheel_job_duration_seconds, secs, {kind, outcome})
            and, when the attempt carried a classified error,
            Count(flywheel_jobs_errored_total, 1, {kind, error_class})
OnRetry  -> Count(flywheel_jobs_retried_total, 1, {kind, error_class})

It holds no state of its own; all accumulation lives in the recorder.

func NewMetrics

func NewMetrics(rec MetricsRecorder) *MetricsObserver

NewMetrics returns a MetricsObserver that records into rec.

func (*MetricsObserver) OnClaim

OnClaim counts the jobs claimed in a batch, sliced by executor class.

func (*MetricsObserver) OnFinish

func (m *MetricsObserver) OnFinish(_ context.Context, ev flywheel.FinishEvent)

OnFinish counts each finished attempt by outcome, records its duration, and — when the attempt carried a classified error — counts it by error class.

func (*MetricsObserver) OnRetry

OnRetry counts each scheduled retry, sliced by kind and the error class that triggered it.

func (*MetricsObserver) OnStart

func (m *MetricsObserver) OnStart(_ context.Context, ev flywheel.JobEvent)

OnStart counts each started attempt, sliced by kind and queue.

type MetricsRecorder

type MetricsRecorder interface {
	// Count adds delta to the counter named name with the given tags.
	Count(name string, delta int64, tags map[string]string)
	// Gauge sets the gauge named name with the given tags to value.
	Gauge(name string, value float64, tags map[string]string)
	// Observe records one value into the distribution named name with the given
	// tags (kept as a running sum and count — duration/histogram telemetry).
	Observe(name string, value float64, tags map[string]string)
}

MetricsRecorder is the consumer-pluggable metrics sink. It is the one seam in this package that a third-party stack plugs into: a consumer implements it against Prometheus, OpenTelemetry, statsd, or CloudWatch, and flywheel imports none of them. MemRecorder is the in-memory reference implementation.

Every method is called on the synchronous dispatch path (via MetricsObserver) and must not block. tags is a small label set; an implementation must treat it as read-only and not retain it past the call.

type Multi

type Multi struct {
	// contains filtered or unexported fields
}

Multi is a flywheel.Observer that fans every event out to several observers in turn. It is how a Node runs more than one observer at once — typically a SlogObserver beside a MetricsObserver — behind the single RunnerConfig.Observer slot.

Fan-out is synchronous and ordered: each event is delivered to every child in the order they were given before the call returns, preserving the Observer contract that the dispatch path must not be left with work outstanding. A child that blocks therefore blocks its siblings, so each must honor the non-blocking contract — the same as any single Observer.

func NewMulti

func NewMulti(obs ...flywheel.Observer) *Multi

NewMulti returns a Multi that fans out to the given observers, in order. With no observers it is a valid no-op.

func (*Multi) OnClaim

func (m *Multi) OnClaim(ctx context.Context, ev flywheel.ClaimEvent)

OnClaim delivers the claim event to every child in order.

func (*Multi) OnFinish

func (m *Multi) OnFinish(ctx context.Context, ev flywheel.FinishEvent)

OnFinish delivers the finish event to every child in order.

func (*Multi) OnRetry

func (m *Multi) OnRetry(ctx context.Context, ev flywheel.RetryEvent)

OnRetry delivers the retry event to every child in order.

func (*Multi) OnStart

func (m *Multi) OnStart(ctx context.Context, ev flywheel.JobEvent)

OnStart delivers the start event to every child in order.

type ObservationSeries

type ObservationSeries struct {
	Name  string
	Tags  map[string]string
	Sum   float64
	Count int64
}

ObservationSeries is one distribution series in a Snapshot, as a sum and count.

type SlogObserver

type SlogObserver struct {
	// contains filtered or unexported fields
}

SlogObserver implements flywheel.Observer by logging every lifecycle event through an *slog.Logger. It logs at debug level so a daemon running at info stays quiet, and a `--log debug` run gets a rich, per-attempt trace — claim, start, finish, and retry — with structured attributes for filtering.

func NewSlog

func NewSlog(logger *slog.Logger) *SlogObserver

NewSlog returns a SlogObserver that logs through logger at debug level. A nil logger falls back to slog.Default(), so wiring it can never panic on a nil.

func (*SlogObserver) OnClaim

func (s *SlogObserver) OnClaim(ctx context.Context, ev flywheel.ClaimEvent)

OnClaim logs a claimed batch.

func (*SlogObserver) OnFinish

func (s *SlogObserver) OnFinish(ctx context.Context, ev flywheel.FinishEvent)

OnFinish logs a decided attempt, including the error when one occurred.

func (*SlogObserver) OnRetry

func (s *SlogObserver) OnRetry(ctx context.Context, ev flywheel.RetryEvent)

OnRetry logs a scheduled retry.

func (*SlogObserver) OnStart

func (s *SlogObserver) OnStart(ctx context.Context, ev flywheel.JobEvent)

OnStart logs an attempt about to run.

type Snapshot

type Snapshot struct {
	Counters     []CounterSeries
	Gauges       []GaugeSeries
	Observations []ObservationSeries
}

Snapshot is an immutable copy of a MemRecorder's series, taken under the lock, for rendering or assertion. Each slice is sorted by name then tags so the output is deterministic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL