Documentation
¶
Overview ¶
Package observers provides ready-made, dependency-free implementations of flywheel.Observer — the telemetry seam the Runner invokes around each attempt — so a consumer gets metrics, structured logs, and a Prometheus endpoint without hand-rolling and wiring an adapter first.
The package depends only on the standard library and the flywheel root package, so importing it adds no metrics or tracing dependency to a consumer. The boundary that does carry a third-party stack is MetricsRecorder: a one-method sink a consumer implements against Prometheus, OpenTelemetry, statsd, or CloudWatch. flywheel itself imports none of them — MemRecorder, the in-memory reference recorder, backs the local `/metrics` endpoint with no external system.
The three building blocks compose:
- MetricsObserver translates lifecycle events into MetricsRecorder calls (the metric taxonomy lives on MetricsObserver).
- SlogObserver logs each event at debug level for `--log debug` diagnosis.
- Multi fans one event out to several observers, so a Node can run metrics and logging side by side: NewMulti(NewSlog(logger), NewMetrics(rec)).
WritePrometheus and MetricsHandler render a MemRecorder's counters plus a freshly sampled flywheel.QueueHealth into the Prometheus text-exposition format.
Every method runs synchronously on the dispatch path and must not block, the same contract as flywheel.Observer: MemRecorder takes a short mutex-guarded map write and SlogObserver defaults to debug, so neither stalls a worker.
Index ¶
- Constants
- func MetricsHandler(rec *MemRecorder, ...) http.Handler
- func WritePrometheus(w io.Writer, rec *MemRecorder, qh flywheel.QueueHealth) error
- type CounterSeries
- type GaugeSeries
- type MemRecorder
- type MetricsObserver
- func (m *MetricsObserver) OnClaim(_ context.Context, ev flywheel.ClaimEvent)
- func (m *MetricsObserver) OnFinish(_ context.Context, ev flywheel.FinishEvent)
- func (m *MetricsObserver) OnRetry(_ context.Context, ev flywheel.RetryEvent)
- func (m *MetricsObserver) OnStart(_ context.Context, ev flywheel.JobEvent)
- type MetricsRecorder
- type Multi
- type ObservationSeries
- type SlogObserver
- func (s *SlogObserver) OnClaim(ctx context.Context, ev flywheel.ClaimEvent)
- func (s *SlogObserver) OnFinish(ctx context.Context, ev flywheel.FinishEvent)
- func (s *SlogObserver) OnRetry(ctx context.Context, ev flywheel.RetryEvent)
- func (s *SlogObserver) OnStart(ctx context.Context, ev flywheel.JobEvent)
- type Snapshot
Constants ¶
const ( MetricJobsClaimed = "flywheel_jobs_claimed_total" MetricJobsStarted = "flywheel_jobs_started_total" MetricJobsFinished = "flywheel_jobs_finished_total" MetricJobsErrored = "flywheel_jobs_errored_total" MetricJobsRetried = "flywheel_jobs_retried_total" MetricJobDuration = "flywheel_job_duration_seconds" )
Metric names. They follow Prometheus convention: a flywheel_ prefix, a unit suffix where one applies, and a _total suffix on monotonic counters.
const ( TagExecutorClass = "executor_class" TagKind = "kind" TagQueue = "queue" TagOutcome = "outcome" TagErrorClass = "error_class" )
Tag keys. They are the label dimensions the taxonomy slices each metric by.
const ( MetricQueueJobs = "flywheel_queue_jobs" MetricQueueReady = "flywheel_queue_ready" MetricQueueInFlight = "flywheel_queue_inflight" MetricQueueOldestReady = "flywheel_queue_oldest_ready_seconds" )
Queue-health gauge metric names. They are sampled fresh on each scrape from a flywheel.QueueHealth, not accumulated in the recorder.
Variables ¶
This section is empty.
Functions ¶
func MetricsHandler ¶
func MetricsHandler(rec *MemRecorder, sample func(ctx context.Context) (flywheel.QueueHealth, error)) http.Handler
MetricsHandler returns an http.Handler that serves rec's counters together with queue-health gauges sampled fresh on every scrape via sample. A nil sample renders counters only (no gauges). When sample returns an error the handler responds 500 so the scrape is recorded as failed rather than silently dropping the lag signal.
func WritePrometheus ¶
func WritePrometheus(w io.Writer, rec *MemRecorder, qh flywheel.QueueHealth) error
WritePrometheus renders rec's accumulated counters and distributions plus the point-in-time queue-health gauges in qh into the Prometheus text-exposition format (version 0.0.4): a # HELP and # TYPE line per metric family followed by its series lines. Output is deterministic — series are emitted in sorted order — so it is stable to diff and assert against.
Types ¶
type CounterSeries ¶
CounterSeries is one counter series in a Snapshot.
type GaugeSeries ¶
GaugeSeries is one gauge series in a Snapshot.
type MemRecorder ¶
type MemRecorder struct {
// contains filtered or unexported fields
}
MemRecorder is a concurrent-safe, in-memory MetricsRecorder. It is three things at once: the reference implementation of the interface, the test double the adapter tests assert against, and the source of the process-lifetime counters the local `/metrics` endpoint renders. A series is identified by its name plus its sorted tag set, so repeated calls with equal tags accumulate into one cell.
func NewMemRecorder ¶
func NewMemRecorder() *MemRecorder
NewMemRecorder returns an empty MemRecorder ready for concurrent use.
func (*MemRecorder) Count ¶
func (m *MemRecorder) Count(name string, delta int64, tags map[string]string)
Count adds delta to the named counter series.
func (*MemRecorder) Gauge ¶
func (m *MemRecorder) Gauge(name string, value float64, tags map[string]string)
Gauge sets the named gauge series to value (last write wins).
func (*MemRecorder) Observe ¶
func (m *MemRecorder) Observe(name string, value float64, tags map[string]string)
Observe folds value into the named distribution series as a running sum and count, so an average is sum/count.
func (*MemRecorder) Snapshot ¶
func (m *MemRecorder) Snapshot() Snapshot
Snapshot copies every series out under the lock. The returned maps are private copies the caller may read freely without racing concurrent recording.
type MetricsObserver ¶
type MetricsObserver struct {
// contains filtered or unexported fields
}
MetricsObserver implements flywheel.Observer by translating each lifecycle event into MetricsRecorder calls, per this taxonomy:
OnClaim -> Count(flywheel_jobs_claimed_total, batch, {executor_class})
OnStart -> Count(flywheel_jobs_started_total, 1, {kind, queue})
OnFinish -> Count(flywheel_jobs_finished_total, 1, {kind, queue, outcome})
Observe(flywheel_job_duration_seconds, secs, {kind, outcome})
and, when the attempt carried a classified error,
Count(flywheel_jobs_errored_total, 1, {kind, error_class})
OnRetry -> Count(flywheel_jobs_retried_total, 1, {kind, error_class})
It holds no state of its own; all accumulation lives in the recorder.
func NewMetrics ¶
func NewMetrics(rec MetricsRecorder) *MetricsObserver
NewMetrics returns a MetricsObserver that records into rec.
func (*MetricsObserver) OnClaim ¶
func (m *MetricsObserver) OnClaim(_ context.Context, ev flywheel.ClaimEvent)
OnClaim counts the jobs claimed in a batch, sliced by executor class.
func (*MetricsObserver) OnFinish ¶
func (m *MetricsObserver) OnFinish(_ context.Context, ev flywheel.FinishEvent)
OnFinish counts each finished attempt by outcome, records its duration, and — when the attempt carried a classified error — counts it by error class.
func (*MetricsObserver) OnRetry ¶
func (m *MetricsObserver) OnRetry(_ context.Context, ev flywheel.RetryEvent)
OnRetry counts each scheduled retry, sliced by kind and the error class that triggered it.
type MetricsRecorder ¶
type MetricsRecorder interface {
// Count adds delta to the counter named name with the given tags.
Count(name string, delta int64, tags map[string]string)
// Gauge sets the gauge named name with the given tags to value.
Gauge(name string, value float64, tags map[string]string)
// Observe records one value into the distribution named name with the given
// tags (kept as a running sum and count — duration/histogram telemetry).
Observe(name string, value float64, tags map[string]string)
}
MetricsRecorder is the consumer-pluggable metrics sink. It is the one seam in this package that a third-party stack plugs into: a consumer implements it against Prometheus, OpenTelemetry, statsd, or CloudWatch, and flywheel imports none of them. MemRecorder is the in-memory reference implementation.
Every method is called on the synchronous dispatch path (via MetricsObserver) and must not block. tags is a small label set; an implementation must treat it as read-only and not retain it past the call.
type Multi ¶
type Multi struct {
// contains filtered or unexported fields
}
Multi is a flywheel.Observer that fans every event out to several observers in turn. It is how a Node runs more than one observer at once — typically a SlogObserver beside a MetricsObserver — behind the single RunnerConfig.Observer slot.
Fan-out is synchronous and ordered: each event is delivered to every child in the order they were given before the call returns, preserving the Observer contract that the dispatch path must not be left with work outstanding. A child that blocks therefore blocks its siblings, so each must honor the non-blocking contract — the same as any single Observer.
func NewMulti ¶
NewMulti returns a Multi that fans out to the given observers, in order. With no observers it is a valid no-op.
func (*Multi) OnClaim ¶
func (m *Multi) OnClaim(ctx context.Context, ev flywheel.ClaimEvent)
OnClaim delivers the claim event to every child in order.
func (*Multi) OnFinish ¶
func (m *Multi) OnFinish(ctx context.Context, ev flywheel.FinishEvent)
OnFinish delivers the finish event to every child in order.
type ObservationSeries ¶
ObservationSeries is one distribution series in a Snapshot, as a sum and count.
type SlogObserver ¶
type SlogObserver struct {
// contains filtered or unexported fields
}
SlogObserver implements flywheel.Observer by logging every lifecycle event through an *slog.Logger. It logs at debug level so a daemon running at info stays quiet, and a `--log debug` run gets a rich, per-attempt trace — claim, start, finish, and retry — with structured attributes for filtering.
func NewSlog ¶
func NewSlog(logger *slog.Logger) *SlogObserver
NewSlog returns a SlogObserver that logs through logger at debug level. A nil logger falls back to slog.Default(), so wiring it can never panic on a nil.
func (*SlogObserver) OnClaim ¶
func (s *SlogObserver) OnClaim(ctx context.Context, ev flywheel.ClaimEvent)
OnClaim logs a claimed batch.
func (*SlogObserver) OnFinish ¶
func (s *SlogObserver) OnFinish(ctx context.Context, ev flywheel.FinishEvent)
OnFinish logs a decided attempt, including the error when one occurred.
func (*SlogObserver) OnRetry ¶
func (s *SlogObserver) OnRetry(ctx context.Context, ev flywheel.RetryEvent)
OnRetry logs a scheduled retry.
type Snapshot ¶
type Snapshot struct {
Counters []CounterSeries
Gauges []GaugeSeries
Observations []ObservationSeries
}
Snapshot is an immutable copy of a MemRecorder's series, taken under the lock, for rendering or assertion. Each slice is sorted by name then tags so the output is deterministic.