Documentation
¶
Overview ¶
Package contextagent assembles a production-ready TMP context-match service. It composes the targeting.ContextEngine with a Valkey-backed storage layer (per-domain reader services from mediabuystore, pkgconfigstore, urlliststore, suppressionstore, topicstore), optionally fronted by LRU caches; surfaces them behind TMP-signature verification on /context; exports Prometheus-compatible metrics; and orchestrates a coordinated graceful shutdown.
Index ¶
- Constants
- func NewAdminServer(cfg AdminServerConfig) *http.Server
- func NewHandler(cfg HandlerConfig) http.Handler
- func NewServer(cfg ServerConfig) *http.Server
- func Run(ctx context.Context, cfg Config, logger *slog.Logger, version string, ...) (retErr error)
- type AdminServerConfig
- type CacheConfig
- type Config
- type DomainCacheConfig
- type HandlerConfig
- type LivenessCheck
- type MediaBuyCacheConfig
- type MetricsConfig
- type MetricsProvider
- func (m *MetricsProvider) RegisterOpenConnectionsObserver(observerFn func() int64) error
- func (m *MetricsProvider) RegisterSuppressionSnapshotObservers(failures func() int64, lastRefreshUnix func() int64, propertySize func() int64, ...) error
- func (m *MetricsProvider) Shutdown(ctx context.Context) error
- type Option
- type PprofConfig
- type Recorder
- type ServerConfig
- type TMPConfig
- type TopicsCacheConfig
- type ValkeyBlock
Constants ¶
const ( OutcomePass = "pass" OutcomeFail = "fail" OutcomeTimeout = "timeout" OutcomeError = "error" )
Outcome labels paired with stages. Bounded to a fixed set so the resulting `<ns>_stage_outcome_total{stage, outcome}` series is finite. Mirror identityagent's vocabulary so dashboards and alerts can share PromQL across the two agents.
const ( StatusOK = "ok" StatusClientError = "client_error" StatusServerError = "server_error" StatusTimeout = "timeout" )
Request-level status labels used by RequestCompleted. Bounded to a small set: success (200), client_error (4xx), server_error (5xx), timeout (504). The handler maps HTTP status to one of these before recording.
const SuppressionStaleMaxAge = 30 * time.Minute
SuppressionStaleMaxAge is the upper bound on how old the in-memory suppression snapshot can be before /live flips to 503, independent of the consecutive-failure counter. Covers the failure mode where the refresh-loop goroutine exits cleanly (e.g. its context is cancelled by a future plumbing change) without ever incrementing ConsecutiveFailures — in that scenario the snapshot freezes forever and the counter-only check reports healthy. 30 minutes matches the failure-count threshold's effective coverage at the default 5-minute refresh interval; an operator running with a longer SUPPRESSION_REFRESH_INTERVAL should keep this longer than 2× their interval.
const SuppressionStaleThreshold = 6
SuppressionStaleThreshold is the consecutive-failure count at which /live flips to 503 for the suppression-snapshot liveness check. With the default 5-minute refresh interval, 6 consecutive failures means ~30 minutes of running on the snapshot the agent had at the last success — long enough that an operator wants the pod recycled rather than continuing to serve traffic with a kill-switch list that may no longer be authoritative.
Variables ¶
This section is empty.
Functions ¶
func NewAdminServer ¶
func NewAdminServer(cfg AdminServerConfig) *http.Server
NewAdminServer builds the observability mux (/live, /metrics, /debug/pprof) on its own listener.
func NewHandler ¶
func NewHandler(cfg HandlerConfig) http.Handler
NewHandler returns the http.Handler for POST /context.
func NewServer ¶
func NewServer(cfg ServerConfig) *http.Server
NewServer builds the *http.Server for /context and /health. When AdminPort == 0 the operator endpoints (/live, /metrics, /debug/pprof) also mount on this server's mux. When AdminPort > 0 those are omitted here and the caller wires NewAdminServer on a second listener.
func Run ¶
func Run(ctx context.Context, cfg Config, logger *slog.Logger, version string, opts ...Option) (retErr error)
Run executes the agent lifecycle: build dependencies, start the HTTP server, then block until SIGINT/SIGTERM and run an orderly shutdown. Returns non-nil only when startup fails or shutdown surfaces errors.
Options inject dependencies the package cannot construct from env alone — for example, a registry-fed property bitmap (see WithPropertyGlobal). Each option is optional; unset values fall back to the env-derived defaults on cfg.
Types ¶
type AdminServerConfig ¶
type AdminServerConfig struct {
Port int
Registry *prometheus.Registry
Version string
IsRunning func() bool
PprofEnabled bool
ReadHeaderTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
MaxHeaderBytes int
LivenessChecks []LivenessCheck
Recorder Recorder
Logger *slog.Logger
}
AdminServerConfig packages the inputs for NewAdminServer.
type CacheConfig ¶
type CacheConfig struct {
// Enabled is the master switch. When false, every reader is
// constructed as a direct (uncached) reader regardless of
// per-domain settings. Default true.
Enabled bool
MediaBuy MediaBuyCacheConfig
PkgConfig DomainCacheConfig
URLList DomainCacheConfig
Topics TopicsCacheConfig
}
CacheConfig groups the per-domain LRU knobs. Each domain has its own size / TTL plus an enable flag; the master Enabled toggle bypasses every cache layer regardless of per-domain settings.
type Config ¶
type Config struct {
HTTPPort int
RequestTimeout time.Duration
HTTPReadHeaderTimeout time.Duration
HTTPReadTimeout time.Duration
HTTPWriteTimeout time.Duration
HTTPIdleTimeout time.Duration
ShutdownGrace time.Duration
ShutdownTimeout time.Duration
RequestBodyLimitBytes int
MaxHeaderBytes int
MaxOpenConnections int
ResponseTTL time.Duration
StrictContentType bool
AdminPort int
SupportedADCPMajorVersions []int
LogLevel string
// ProviderID is stamped into suppression keys (see
// suppressionstore) and emitted on logs / metrics.
ProviderID string
// AcceptedTaxonomies enumerates the topic taxonomies the engine
// trusts on inbound ContextSignals and consults on Valkey lookups.
// Required: an empty list fails-closed on every TopicTargets
// package.
AcceptedTaxonomies []topicstore.Taxonomy
// PropertyRIDs is the global property bitmap the engine checks at
// the top of every request. A request whose property_rid is not in
// this list short-circuits before any storage lookup.
//
// Static fallback: this list is used only when the binary entry
// point does not inject a bitmap via WithPropertyGlobal. The
// production wiring in cmd/context-agent feeds a live
// registry.PropertyIndex through that option when REGISTRY_ENABLED
// is true; PropertyRIDs covers stand-alone / smoke deployments.
PropertyRIDs []string
// SuppressionRefreshInterval is how often the agent re-scans
// suppress:{provider_id}:* into its in-memory snapshot. External
// writes (operator action, future writer pipeline) become visible
// within this interval.
SuppressionRefreshInterval time.Duration
TMP TMPConfig
Valkey ValkeyBlock
Cache CacheConfig
Metrics MetricsConfig
Pprof PprofConfig
}
Config is the env-derived configuration for a single context-agent process. Build with LoadConfigFromEnv and inspect with Validate before passing to Run.
func LoadConfigFromEnv ¶
LoadConfigFromEnv reads every recognized environment variable into a Config. Validation is the caller's responsibility — call Validate before using the result.
type DomainCacheConfig ¶
DomainCacheConfig is the common shape for domains with a single LRU.
type HandlerConfig ¶
type HandlerConfig struct {
Engine *targeting.ContextEngine
RequestTimeout time.Duration
RequestBodyLimit int64
ResponseTTL time.Duration
SupportedADCPMajorVersions []int
Recorder Recorder
Logger *slog.Logger
}
HandlerConfig packages the inputs for NewHandler. Recorder is accepted for symmetry with identity-agent and future per-stage instrumentation; request lifecycle is observed by requestMetricsMiddleware, so the handler itself only consults recorder for outcomes the middleware can't see (e.g. unknown adcp_major_version rejected before any engine call).
type LivenessCheck ¶
LivenessCheck is one named predicate consulted by /live. Name is included in the response when Fn returns an error so an operator can see which subsystem is degraded without reading agent logs.
type MediaBuyCacheConfig ¶
type MediaBuyCacheConfig struct {
Enabled bool
SellerSetSize int
SellerSetTTL time.Duration
MediaBuySize int
MediaBuyTTL time.Duration
}
MediaBuyCacheConfig has two underlying caches: the seller-set cache and the per-buy record cache. They can be sized independently.
type MetricsConfig ¶
MetricsConfig drives the Prometheus exporter and /metrics endpoint.
type MetricsProvider ¶
type MetricsProvider struct {
Registry *prometheus.Registry
Recorder Recorder
// contains filtered or unexported fields
}
MetricsProvider wires together a Prometheus registry, an OTEL meter provider that writes to it, and a Recorder. Build is the only constructor.
A disabled provider returns a noop Recorder and a nil Registry. The /metrics endpoint isn't mounted when Registry is nil.
func BuildMetrics ¶
func BuildMetrics(cfg MetricsConfig) (*MetricsProvider, error)
BuildMetrics constructs a MetricsProvider per the supplied config.
- cfg.Enabled=false → provider with a noop recorder and nil Registry; never fails.
- cfg.Enabled=true → constructs prometheus.NewRegistry, plugs it into the OTEL Prometheus exporter, builds the OtelRecorder. Any failure surfaces as a startup error — invalid metrics config fails startup, matching the identity-agent contract.
Side effect: BuildMetrics calls otel.SetTextMapPropagator with a fresh composite of (TraceContext, Baggage). This is a *global* install; if the same process hosts both context-agent and identity-agent (not the current deployment shape, but possible in a future co-located build) and both call their respective Build, the last caller wins. The two install identical composites today, so the install is idempotent in practice — but co-locators MUST keep the two propagator lists in sync or one agent will lose inbound trace context to the other.
func (*MetricsProvider) RegisterOpenConnectionsObserver ¶
func (m *MetricsProvider) RegisterOpenConnectionsObserver(observerFn func() int64) error
RegisterOpenConnectionsObserver mirrors identityagent: registers an OTEL ObservableGauge "<namespace>_open_connections" whose value is read from observerFn on every scrape. No-op when metrics are disabled. observerFn must be cheap and concurrent-safe.
func (*MetricsProvider) RegisterSuppressionSnapshotObservers ¶
func (m *MetricsProvider) RegisterSuppressionSnapshotObservers( failures func() int64, lastRefreshUnix func() int64, propertySize func() int64, geoSize func() int64, ) error
RegisterSuppressionSnapshotObservers wires the suppression snapshot's health onto three gauges read on every Prometheus scrape:
- <ns>_suppression_consecutive_failures — current uninterrupted refresh-failure streak; resets to 0 on every success. Alerting on `> 0 for 15m` catches a Valkey outage that the snapshot is surviving on stale data.
- <ns>_suppression_last_refresh_unix — Unix-second timestamp of the most recent successful refresh. Pair with `time()` to compute snapshot age in PromQL.
- <ns>_suppression_entries{kind=property|geo} — current snapshot size, segmented by suppression dimension.
failureGauge, lastRefreshGauge, and sizeGauge accept callback functions so the snapshot's internal atomics drive the metrics directly. No-op when metrics are disabled.
type Option ¶
type Option func(*runOptions)
Option customizes a Run invocation. Options carry dependencies the agent cannot construct from env alone — most commonly an externally hydrated property bitmap fed by a registry sync loop in the binary entrypoint, instead of the PROPERTY_RIDS env fallback.
func WithLivenessChecks ¶
func WithLivenessChecks(checks ...LivenessCheck) Option
WithLivenessChecks appends additional /live predicates to the default suppression-snapshot check. Use this to surface the health of binary-owned dependencies (e.g. a registry sync loop) into the same /live response the rest of the agent reports through.
func WithPropertyGlobal ¶
WithPropertyGlobal injects the global property bitmap the engine consults at the top of every request. When supplied, the bitmap replaces the one derived from Config.PropertyRIDs. Use this to feed the agent from a registry.PropertyIndex that is kept fresh by a background syncer; the bitmap is read on every request, so a dynamic implementation gets eventual-consistency for free.
type PprofConfig ¶
type PprofConfig struct {
Enabled bool
}
PprofConfig toggles /debug/pprof on the observability mux.
type Recorder ¶
type Recorder interface {
RequestStarted(ctx context.Context)
RequestCompleted(ctx context.Context, status string, d time.Duration)
StageOutcome(ctx context.Context, stage, outcome string)
StageDuration(ctx context.Context, stage string, d time.Duration)
StoreError(ctx context.Context, store string)
KeystoreRefresh(ctx context.Context, outcome string)
HandlerPanic(ctx context.Context)
BackgroundPanic(ctx context.Context, where string)
}
Recorder is the metric API the context-agent hot path uses. The OTEL implementation is the production recorder; tests use noopRecorder.
type ServerConfig ¶
type ServerConfig struct {
Port int
ContextHandler http.Handler
KeyStore tmproto.KeyStore
OwnEndpointURL string
RequireSig bool
Registry *prometheus.Registry
IsRunning func() bool
Version string
PprofEnabled bool
ReadHeaderTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
MaxHeaderBytes int
// RequestBodyLimit caps the bytes the verifier reads from the
// signed request before computing the signature. Must match the
// body limit the inner handler enforces, otherwise a signed
// request between the verifier limit and the handler limit gets
// silently truncated and rejected as malformed JSON.
RequestBodyLimit int64
AdminPort int
StrictContentType bool
// LivenessChecks runs in /live; any check returning a non-nil
// error flips /live from 200 to 503 with the joined error
// messages in the response body. Intended for "the agent is up
// but the data plane is broken" signals (e.g. suppression
// snapshot is N consecutive refreshes behind). /health stays
// purely about process readiness — the orchestrator decides
// which one to probe.
LivenessChecks []LivenessCheck
// Recorder observes request lifecycle, panics, and store errors.
// nil → noop.
Recorder Recorder
Logger *slog.Logger
}
ServerConfig packages the inputs for NewServer.
type TopicsCacheConfig ¶
type TopicsCacheConfig struct {
Enabled bool
ArtifactSize int
ArtifactTTL time.Duration
PackageSize int
PackageTTL time.Duration
}
TopicsCacheConfig has two underlying caches: artifact-side and package-side. They can be sized independently.
type ValkeyBlock ¶
type ValkeyBlock struct {
Enabled bool
// ShardsSupplied is true when VALKEY_SHARDS was set on the
// environment, even if it failed to parse. Lets Validate
// distinguish "operator forgot to set the shards" from "operator
// set them but the JSON is malformed" so it doesn't pile a
// generic "required" error on top of the parser's specific one.
// Hand-constructed Config{} for tests should set this when they
// want Validate to think the env was supplied.
ShardsSupplied bool
Mode string
Shards map[string]string
Username string
Password string //nolint:gosec // G117 false positive: the field name describes purpose; the value is intentionally a secret.
DB int
TLS bool
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
}
ValkeyBlock is the Valkey configuration. The context-agent issues reads against media-buy, package-config, URL-list, topic, and suppression keys, plus writes for operator-driven suppressions (via the suppressionstore.Service).
func (ValkeyBlock) ToRedisStoreConfig ¶
func (b ValkeyBlock) ToRedisStoreConfig() redisstore.Config
ToRedisStoreConfig projects onto the redisstore.Config the Build helper consumes.