contextagent

package
v0.0.0-...-6db8c13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Overview

Package contextagent assembles a production-ready TMP context-match service. It composes the targeting.ContextEngine with a Valkey-backed storage layer (per-domain reader services from mediabuystore, pkgconfigstore, urlliststore, suppressionstore, topicstore), optionally fronted by LRU caches; surfaces them behind TMP-signature verification on /context; exports Prometheus-compatible metrics; and orchestrates a coordinated graceful shutdown.

Index

Constants

View Source
const (
	OutcomePass    = "pass"
	OutcomeFail    = "fail"
	OutcomeTimeout = "timeout"
	OutcomeError   = "error"
)

Outcome labels paired with stages. Bounded to a fixed set so the resulting `<ns>_stage_outcome_total{stage, outcome}` series is finite. Mirror identityagent's vocabulary so dashboards and alerts can share PromQL across the two agents.

View Source
const (
	StatusOK          = "ok"
	StatusClientError = "client_error"
	StatusServerError = "server_error"
	StatusTimeout     = "timeout"
)

Request-level status labels used by RequestCompleted. Bounded to a small set: success (200), client_error (4xx), server_error (5xx), timeout (504). The handler maps HTTP status to one of these before recording.

View Source
const SuppressionStaleMaxAge = 30 * time.Minute

SuppressionStaleMaxAge is the upper bound on how old the in-memory suppression snapshot can be before /live flips to 503, independent of the consecutive-failure counter. Covers the failure mode where the refresh-loop goroutine exits cleanly (e.g. its context is cancelled by a future plumbing change) without ever incrementing ConsecutiveFailures — in that scenario the snapshot freezes forever and the counter-only check reports healthy. 30 minutes matches the failure-count threshold's effective coverage at the default 5-minute refresh interval; an operator running with a longer SUPPRESSION_REFRESH_INTERVAL should keep this longer than 2× their interval.

View Source
const SuppressionStaleThreshold = 6

SuppressionStaleThreshold is the consecutive-failure count at which /live flips to 503 for the suppression-snapshot liveness check. With the default 5-minute refresh interval, 6 consecutive failures means ~30 minutes of running on the snapshot the agent had at the last success — long enough that an operator wants the pod recycled rather than continuing to serve traffic with a kill-switch list that may no longer be authoritative.

Variables

This section is empty.

Functions

func NewAdminServer

func NewAdminServer(cfg AdminServerConfig) *http.Server

NewAdminServer builds the observability mux (/live, /metrics, /debug/pprof) on its own listener.

func NewHandler

func NewHandler(cfg HandlerConfig) http.Handler

NewHandler returns the http.Handler for POST /context.

func NewServer

func NewServer(cfg ServerConfig) *http.Server

NewServer builds the *http.Server for /context and /health. When AdminPort == 0 the operator endpoints (/live, /metrics, /debug/pprof) also mount on this server's mux. When AdminPort > 0 those are omitted here and the caller wires NewAdminServer on a second listener.

func Run

func Run(ctx context.Context, cfg Config, logger *slog.Logger, version string, opts ...Option) (retErr error)

Run executes the agent lifecycle: build dependencies, start the HTTP server, then block until SIGINT/SIGTERM and run an orderly shutdown. Returns non-nil only when startup fails or shutdown surfaces errors.

Options inject dependencies the package cannot construct from env alone — for example, a registry-fed property bitmap (see WithPropertyGlobal). Each option is optional; unset values fall back to the env-derived defaults on cfg.

Types

type AdminServerConfig

type AdminServerConfig struct {
	Port              int
	Registry          *prometheus.Registry
	Version           string
	IsRunning         func() bool
	PprofEnabled      bool
	ReadHeaderTimeout time.Duration
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	IdleTimeout       time.Duration
	MaxHeaderBytes    int
	LivenessChecks    []LivenessCheck
	Recorder          Recorder
	Logger            *slog.Logger
}

AdminServerConfig packages the inputs for NewAdminServer.

type CacheConfig

type CacheConfig struct {
	// Enabled is the master switch. When false, every reader is
	// constructed as a direct (uncached) reader regardless of
	// per-domain settings. Default true.
	Enabled bool

	MediaBuy  MediaBuyCacheConfig
	PkgConfig DomainCacheConfig
	URLList   DomainCacheConfig
	Topics    TopicsCacheConfig
}

CacheConfig groups the per-domain LRU knobs. Each domain has its own size / TTL plus an enable flag; the master Enabled toggle bypasses every cache layer regardless of per-domain settings.

type Config

type Config struct {
	HTTPPort       int
	RequestTimeout time.Duration

	HTTPReadHeaderTimeout time.Duration
	HTTPReadTimeout       time.Duration
	HTTPWriteTimeout      time.Duration
	HTTPIdleTimeout       time.Duration

	ShutdownGrace   time.Duration
	ShutdownTimeout time.Duration

	RequestBodyLimitBytes int
	MaxHeaderBytes        int
	MaxOpenConnections    int

	ResponseTTL time.Duration

	StrictContentType bool

	AdminPort int

	SupportedADCPMajorVersions []int

	LogLevel string

	// ProviderID is stamped into suppression keys (see
	// suppressionstore) and emitted on logs / metrics.
	ProviderID string

	// AcceptedTaxonomies enumerates the topic taxonomies the engine
	// trusts on inbound ContextSignals and consults on Valkey lookups.
	// Required: an empty list fails-closed on every TopicTargets
	// package.
	AcceptedTaxonomies []topicstore.Taxonomy

	// PropertyRIDs is the global property bitmap the engine checks at
	// the top of every request. A request whose property_rid is not in
	// this list short-circuits before any storage lookup.
	//
	// Static fallback: this list is used only when the binary entry
	// point does not inject a bitmap via WithPropertyGlobal. The
	// production wiring in cmd/context-agent feeds a live
	// registry.PropertyIndex through that option when REGISTRY_ENABLED
	// is true; PropertyRIDs covers stand-alone / smoke deployments.
	PropertyRIDs []string

	// SuppressionRefreshInterval is how often the agent re-scans
	// suppress:{provider_id}:* into its in-memory snapshot. External
	// writes (operator action, future writer pipeline) become visible
	// within this interval.
	SuppressionRefreshInterval time.Duration

	TMP    TMPConfig
	Valkey ValkeyBlock
	Cache  CacheConfig

	Metrics MetricsConfig
	Pprof   PprofConfig
}

Config is the env-derived configuration for a single context-agent process. Build with LoadConfigFromEnv and inspect with Validate before passing to Run.

func LoadConfigFromEnv

func LoadConfigFromEnv() (Config, error)

LoadConfigFromEnv reads every recognized environment variable into a Config. Validation is the caller's responsibility — call Validate before using the result.

func (Config) Validate

func (c Config) Validate() error

Validate runs cross-field invariants on a loaded Config.

type DomainCacheConfig

type DomainCacheConfig struct {
	Enabled bool
	Size    int
	TTL     time.Duration
}

DomainCacheConfig is the common shape for domains with a single LRU.

type HandlerConfig

type HandlerConfig struct {
	Engine                     *targeting.ContextEngine
	RequestTimeout             time.Duration
	RequestBodyLimit           int64
	ResponseTTL                time.Duration
	SupportedADCPMajorVersions []int
	Recorder                   Recorder
	Logger                     *slog.Logger
}

HandlerConfig packages the inputs for NewHandler. Recorder is accepted for symmetry with identity-agent and future per-stage instrumentation; request lifecycle is observed by requestMetricsMiddleware, so the handler itself only consults recorder for outcomes the middleware can't see (e.g. unknown adcp_major_version rejected before any engine call).

type LivenessCheck

type LivenessCheck struct {
	Name string
	Fn   func() error
}

LivenessCheck is one named predicate consulted by /live. Name is included in the response when Fn returns an error so an operator can see which subsystem is degraded without reading agent logs.

type MediaBuyCacheConfig

type MediaBuyCacheConfig struct {
	Enabled       bool
	SellerSetSize int
	SellerSetTTL  time.Duration
	MediaBuySize  int
	MediaBuyTTL   time.Duration
}

MediaBuyCacheConfig has two underlying caches: the seller-set cache and the per-buy record cache. They can be sized independently.

type MetricsConfig

type MetricsConfig struct {
	Enabled   bool
	Namespace string
}

MetricsConfig drives the Prometheus exporter and /metrics endpoint.

type MetricsProvider

type MetricsProvider struct {
	Registry *prometheus.Registry
	Recorder Recorder
	// contains filtered or unexported fields
}

MetricsProvider wires together a Prometheus registry, an OTEL meter provider that writes to it, and a Recorder. Build is the only constructor.

A disabled provider returns a noop Recorder and a nil Registry. The /metrics endpoint isn't mounted when Registry is nil.

func BuildMetrics

func BuildMetrics(cfg MetricsConfig) (*MetricsProvider, error)

BuildMetrics constructs a MetricsProvider per the supplied config.

  • cfg.Enabled=false → provider with a noop recorder and nil Registry; never fails.
  • cfg.Enabled=true → constructs prometheus.NewRegistry, plugs it into the OTEL Prometheus exporter, builds the OtelRecorder. Any failure surfaces as a startup error — invalid metrics config fails startup, matching the identity-agent contract.

Side effect: BuildMetrics calls otel.SetTextMapPropagator with a fresh composite of (TraceContext, Baggage). This is a *global* install; if the same process hosts both context-agent and identity-agent (not the current deployment shape, but possible in a future co-located build) and both call their respective Build, the last caller wins. The two install identical composites today, so the install is idempotent in practice — but co-locators MUST keep the two propagator lists in sync or one agent will lose inbound trace context to the other.

func (*MetricsProvider) RegisterOpenConnectionsObserver

func (m *MetricsProvider) RegisterOpenConnectionsObserver(observerFn func() int64) error

RegisterOpenConnectionsObserver mirrors identityagent: registers an OTEL ObservableGauge "<namespace>_open_connections" whose value is read from observerFn on every scrape. No-op when metrics are disabled. observerFn must be cheap and concurrent-safe.

func (*MetricsProvider) RegisterSuppressionSnapshotObservers

func (m *MetricsProvider) RegisterSuppressionSnapshotObservers(
	failures func() int64,
	lastRefreshUnix func() int64,
	propertySize func() int64,
	geoSize func() int64,
) error

RegisterSuppressionSnapshotObservers wires the suppression snapshot's health onto three gauges read on every Prometheus scrape:

  • <ns>_suppression_consecutive_failures — current uninterrupted refresh-failure streak; resets to 0 on every success. Alerting on `> 0 for 15m` catches a Valkey outage that the snapshot is surviving on stale data.
  • <ns>_suppression_last_refresh_unix — Unix-second timestamp of the most recent successful refresh. Pair with `time()` to compute snapshot age in PromQL.
  • <ns>_suppression_entries{kind=property|geo} — current snapshot size, segmented by suppression dimension.

failureGauge, lastRefreshGauge, and sizeGauge accept callback functions so the snapshot's internal atomics drive the metrics directly. No-op when metrics are disabled.

func (*MetricsProvider) Shutdown

func (m *MetricsProvider) Shutdown(ctx context.Context) error

Shutdown flushes any pending metric data and tears down the meter provider. Safe to call when the provider is disabled (no meter provider was built).

type Option

type Option func(*runOptions)

Option customizes a Run invocation. Options carry dependencies the agent cannot construct from env alone — most commonly an externally hydrated property bitmap fed by a registry sync loop in the binary entrypoint, instead of the PROPERTY_RIDS env fallback.

func WithLivenessChecks

func WithLivenessChecks(checks ...LivenessCheck) Option

WithLivenessChecks appends additional /live predicates to the default suppression-snapshot check. Use this to surface the health of binary-owned dependencies (e.g. a registry sync loop) into the same /live response the rest of the agent reports through.

func WithPropertyGlobal

func WithPropertyGlobal(b targeting.Bitmap) Option

WithPropertyGlobal injects the global property bitmap the engine consults at the top of every request. When supplied, the bitmap replaces the one derived from Config.PropertyRIDs. Use this to feed the agent from a registry.PropertyIndex that is kept fresh by a background syncer; the bitmap is read on every request, so a dynamic implementation gets eventual-consistency for free.

type PprofConfig

type PprofConfig struct {
	Enabled bool
}

PprofConfig toggles /debug/pprof on the observability mux.

type Recorder

type Recorder interface {
	RequestStarted(ctx context.Context)
	RequestCompleted(ctx context.Context, status string, d time.Duration)
	StageOutcome(ctx context.Context, stage, outcome string)
	StageDuration(ctx context.Context, stage string, d time.Duration)
	StoreError(ctx context.Context, store string)
	KeystoreRefresh(ctx context.Context, outcome string)
	HandlerPanic(ctx context.Context)
	BackgroundPanic(ctx context.Context, where string)
}

Recorder is the metric API the context-agent hot path uses. The OTEL implementation is the production recorder; tests use noopRecorder.

type ServerConfig

type ServerConfig struct {
	Port           int
	ContextHandler http.Handler
	KeyStore       tmproto.KeyStore
	OwnEndpointURL string
	RequireSig     bool
	Registry       *prometheus.Registry
	IsRunning      func() bool
	Version        string
	PprofEnabled   bool

	ReadHeaderTimeout time.Duration
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	IdleTimeout       time.Duration
	MaxHeaderBytes    int

	// RequestBodyLimit caps the bytes the verifier reads from the
	// signed request before computing the signature. Must match the
	// body limit the inner handler enforces, otherwise a signed
	// request between the verifier limit and the handler limit gets
	// silently truncated and rejected as malformed JSON.
	RequestBodyLimit int64

	AdminPort int

	StrictContentType bool

	// LivenessChecks runs in /live; any check returning a non-nil
	// error flips /live from 200 to 503 with the joined error
	// messages in the response body. Intended for "the agent is up
	// but the data plane is broken" signals (e.g. suppression
	// snapshot is N consecutive refreshes behind). /health stays
	// purely about process readiness — the orchestrator decides
	// which one to probe.
	LivenessChecks []LivenessCheck

	// Recorder observes request lifecycle, panics, and store errors.
	// nil → noop.
	Recorder Recorder

	Logger *slog.Logger
}

ServerConfig packages the inputs for NewServer.

type TMPConfig

type TMPConfig struct {
	RegistryURL    string
	OwnEndpointURL string
	AllowUnsigned  bool
}

TMPConfig drives TMP signature verification on /context.

type TopicsCacheConfig

type TopicsCacheConfig struct {
	Enabled      bool
	ArtifactSize int
	ArtifactTTL  time.Duration
	PackageSize  int
	PackageTTL   time.Duration
}

TopicsCacheConfig has two underlying caches: artifact-side and package-side. They can be sized independently.

type ValkeyBlock

type ValkeyBlock struct {
	Enabled bool

	// ShardsSupplied is true when VALKEY_SHARDS was set on the
	// environment, even if it failed to parse. Lets Validate
	// distinguish "operator forgot to set the shards" from "operator
	// set them but the JSON is malformed" so it doesn't pile a
	// generic "required" error on top of the parser's specific one.
	// Hand-constructed Config{} for tests should set this when they
	// want Validate to think the env was supplied.
	ShardsSupplied bool

	Mode     string
	Shards   map[string]string
	Username string
	Password string //nolint:gosec // G117 false positive: the field name describes purpose; the value is intentionally a secret.
	DB       int
	TLS      bool

	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	PoolSize     int
}

ValkeyBlock is the Valkey configuration. The context-agent issues reads against media-buy, package-config, URL-list, topic, and suppression keys, plus writes for operator-driven suppressions (via the suppressionstore.Service).

func (ValkeyBlock) ToRedisStoreConfig

func (b ValkeyBlock) ToRedisStoreConfig() redisstore.Config

ToRedisStoreConfig projects onto the redisstore.Config the Build helper consumes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL