Documentation
¶
Overview ¶
Package adapter provides types and logic for polling external systems and converting their metrics into toc ObservationBatches.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Run ¶
func Run(ctx context.Context, cfg Config, sources []NamedSource, pub toc.ObservationPublisher, logger *slog.Logger) error
Run executes the poll loop. It polls sources at cfg.PollInterval (with +/-10% jitter), computes deltas, and publishes observation batches. Returns nil on context cancellation (clean shutdown). Closes all sources on exit.
Types ¶
type Config ¶
type Config struct {
NATS NATSConfig `koanf:"nats"`
PipelineID string `koanf:"pipeline_id"`
PollInterval time.Duration `koanf:"poll_interval"`
StalenessWindows int `koanf:"staleness_windows"`
Sources []SourceConfig `koanf:"sources"`
}
Config holds toc-adapter sidecar configuration.
func LoadConfig ¶
LoadConfig reads configuration from a YAML file and overlays environment variables with the TOC_ADAPTER_ prefix.
type DeltaTracker ¶
type DeltaTracker struct {
// contains filtered or unexported fields
}
DeltaTracker computes per-stage deltas from consecutive StageMetrics polls and produces core.StageObservation values. First poll per stage establishes a baseline; subsequent polls compute deltas.
Mirrors adapt.go:Adapt but operates on StageMetrics from external sources instead of toc.Stats from in-process stages.
func NewDeltaTracker ¶
func NewDeltaTracker(stalenessLimit int, logger *slog.Logger) *DeltaTracker
NewDeltaTracker creates a tracker. stalenessLimit is the number of consecutive missed polls before a stage's state is evicted (forcing re-baseline on return). Use 0 for no eviction.
func (*DeltaTracker) Step ¶
func (d *DeltaTracker) Step(now time.Time, current map[string]StageMetrics) []core.StageObservation
Step takes the current poll results and returns observations for stages that have a baseline. Returns nil on first call (baseline establishment) or when no deltas are computable.
type FieldConfig ¶
type FieldConfig struct {
Path string `koanf:"path"`
Required bool `koanf:"required"`
Multiplier float64 `koanf:"multiplier"`
}
FieldConfig describes how to extract a single metric field from a JSON response.
type MetricsMask ¶
type MetricsMask uint16
MetricsMask distinguishes "not observed" from "observed zero." Aligned with core.ObservationMask convention.
const ( HasCompletions MetricsMask = 1 << iota HasFailures HasArrivals HasQueueDepth HasWorkers HasBusyNs HasIdleNs HasBlockedNs )
type NATSConfig ¶
NATSConfig holds NATS connection settings.
type NamedSource ¶
NamedSource pairs a Source with identity for logging.
type Source ¶
type Source interface {
// Poll fetches current metrics from the external system.
// Returns a map of stage name → StageMetrics with cumulative
// counters and point-in-time gauges. Missing stages (extraction
// failure, filtered out) should be omitted from the map, not
// returned with empty masks.
Poll(ctx context.Context) (map[string]StageMetrics, error)
// Close releases resources (HTTP clients, DB connections, etc).
Close() error
}
Source polls an external system and returns metrics for one or more pipeline stages. Implementations are system-scoped: a single Poll call may fetch data from one endpoint and extract metrics for multiple stages.
Poll must be safe for sequential calls from the poll loop. Concurrent calls are not required.
type SourceConfig ¶
type SourceConfig struct {
Type string `koanf:"type"`
URL string `koanf:"url"`
Timeout time.Duration `koanf:"timeout"`
Stages []StageExtraction `koanf:"stages"`
}
SourceConfig describes one external data source to poll.
type StageExtraction ¶
type StageExtraction struct {
Name string `koanf:"name"`
Selector string `koanf:"selector"`
Fields map[string]FieldConfig `koanf:"fields"`
}
StageExtraction describes how to extract metrics for one stage from a source's response.
type StageMetrics ¶
type StageMetrics struct {
Mask MetricsMask
Completions int64 // cumulative
Failures int64 // cumulative
Arrivals int64 // cumulative
QueueDepth int64 // gauge
Workers int64 // gauge
BusyNs int64 // cumulative nanoseconds
IdleNs int64 // cumulative nanoseconds
BlockedNs int64 // cumulative nanoseconds
}
StageMetrics is the common intermediate produced by all Source implementations. Counters are cumulative (monotonically increasing across polls); gauges are point-in-time snapshots. Only fields with corresponding Mask bits set are valid — zero without a mask bit means "not observed," not "observed zero."
Directories
¶
| Path | Synopsis |
|---|---|
|
Package restsource implements adapter.Source for HTTP/JSON endpoints.
|
Package restsource implements adapter.Source for HTTP/JSON endpoints. |