adapter

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package adapter provides types and logic for polling external systems and converting their metrics into toc ObservationBatches.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run(ctx context.Context, cfg Config, sources []NamedSource, pub toc.ObservationPublisher, logger *slog.Logger) error

Run executes the poll loop. It polls sources at cfg.PollInterval (with +/-10% jitter), computes deltas, and publishes observation batches. Returns nil on context cancellation (clean shutdown). Closes all sources on exit.

Types

type Config

type Config struct {
	NATS             NATSConfig     `koanf:"nats"`
	PipelineID       string         `koanf:"pipeline_id"`
	PollInterval     time.Duration  `koanf:"poll_interval"`
	StalenessWindows int            `koanf:"staleness_windows"`
	Sources          []SourceConfig `koanf:"sources"`
}

Config holds toc-adapter sidecar configuration.

func LoadConfig

func LoadConfig(path string) (Config, error)

LoadConfig reads configuration from a YAML file and overlays environment variables with the TOC_ADAPTER_ prefix.

func (Config) Validate

func (c Config) Validate() error

Validate checks that the configuration is well-formed.

type DeltaTracker

type DeltaTracker struct {
	// contains filtered or unexported fields
}

DeltaTracker computes per-stage deltas from consecutive StageMetrics polls and produces core.StageObservation values. First poll per stage establishes a baseline; subsequent polls compute deltas.

Mirrors adapt.go:Adapt but operates on StageMetrics from external sources instead of toc.Stats from in-process stages.

func NewDeltaTracker

func NewDeltaTracker(stalenessLimit int, logger *slog.Logger) *DeltaTracker

NewDeltaTracker creates a tracker. stalenessLimit is the number of consecutive missed polls before a stage's state is evicted (forcing re-baseline on return). Use 0 for no eviction.

func (*DeltaTracker) Step

func (d *DeltaTracker) Step(now time.Time, current map[string]StageMetrics) []core.StageObservation

Step takes the current poll results and returns observations for stages that have a baseline. Returns nil on first call (baseline establishment) or when no deltas are computable.

type FieldConfig

type FieldConfig struct {
	Path       string  `koanf:"path"`
	Required   bool    `koanf:"required"`
	Multiplier float64 `koanf:"multiplier"`
}

FieldConfig describes how to extract a single metric field from a JSON response.

type MetricsMask

type MetricsMask uint16

MetricsMask distinguishes "not observed" from "observed zero." Aligned with core.ObservationMask convention.

const (
	HasCompletions MetricsMask = 1 << iota
	HasFailures
	HasArrivals
	HasQueueDepth
	HasWorkers
	HasBusyNs
	HasIdleNs
	HasBlockedNs
)

type NATSConfig

type NATSConfig struct {
	URL    string `koanf:"url"`
	Prefix string `koanf:"prefix"`
}

NATSConfig holds NATS connection settings.

type NamedSource

type NamedSource struct {
	Source
	Type string
	URL  string
}

NamedSource pairs a Source with identity for logging.

type Source

type Source interface {
	// Poll fetches current metrics from the external system.
	// Returns a map of stage name → StageMetrics with cumulative
	// counters and point-in-time gauges. Missing stages (extraction
	// failure, filtered out) should be omitted from the map, not
	// returned with empty masks.
	Poll(ctx context.Context) (map[string]StageMetrics, error)

	// Close releases resources (HTTP clients, DB connections, etc).
	Close() error
}

Source polls an external system and returns metrics for one or more pipeline stages. Implementations are system-scoped: a single Poll call may fetch data from one endpoint and extract metrics for multiple stages.

Poll must be safe for sequential calls from the poll loop. Concurrent calls are not required.

type SourceConfig

type SourceConfig struct {
	Type    string            `koanf:"type"`
	URL     string            `koanf:"url"`
	Timeout time.Duration     `koanf:"timeout"`
	Stages  []StageExtraction `koanf:"stages"`
}

SourceConfig describes one external data source to poll.

type StageExtraction

type StageExtraction struct {
	Name     string                 `koanf:"name"`
	Selector string                 `koanf:"selector"`
	Fields   map[string]FieldConfig `koanf:"fields"`
}

StageExtraction describes how to extract metrics for one stage from a source's response.

type StageMetrics

type StageMetrics struct {
	Mask        MetricsMask
	Completions int64 // cumulative
	Failures    int64 // cumulative
	Arrivals    int64 // cumulative
	QueueDepth  int64 // gauge
	Workers     int64 // gauge
	BusyNs      int64 // cumulative nanoseconds
	IdleNs      int64 // cumulative nanoseconds
	BlockedNs   int64 // cumulative nanoseconds
}

StageMetrics is the common intermediate produced by all Source implementations. Counters are cumulative (monotonically increasing across polls); gauges are point-in-time snapshots. Only fields with corresponding Mask bits set are valid — zero without a mask bit means "not observed," not "observed zero."

Directories

Path Synopsis
Package restsource implements adapter.Source for HTTP/JSON endpoints.
Package restsource implements adapter.Source for HTTP/JSON endpoints.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL