telemetry

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2025 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 25 Imported by: 0

README

Instrumentation Telemetry Client Architecture

This documentation details the current architecture of the Instrumentation Telemetry Client of dd-trace-go and was are its capabilities. For an API documentation, please refer to the api.go file content.

Please, make sure to read the Specification Documentation before reading this document.

Data Flow
flowchart TD
    linkStyle default interpolate basis
    globalclient@{ shape: circle } -->|client == nil| recorder
    globalclient -->|client != nil| client
    recorder@{ shape: cyl } --> client@{ shape: circle }

    subgraph datasources
        integrations@{ shape: cyl }
        configuration@{ shape: cyl }
        dependencies@{ shape: cyl }
        products@{ shape: cyl }
        logs@{ shape: cyl }
        metrics@{ shape: cyl }
    end

    client --> datasources

    subgraph mapper
        direction LR
        app-started -->
        default[message-batch<div>heartbeat<div>extended-heartbeat] --> app-closing
    end

    flush@{ shape:rounded }

    queue@{ shape: cyl } --> flush

    datasources -..->|at flush| mapper --> flush
    flush -->|if writer fails| queue

    flush --> writer

    writer --> agent@{ shape: das }
    writer --> backend@{ shape: stadium }
    agent --> backend
Low Level Components
  • RingQueue[T]: The ring queue is an arbitrary data structure that support growing buffers, a buffer pool, and overflow. It is used as a backend data structure for the payload queue, the recorder and distribution metrics.
  • Recorder[T]: The recorder is a RingBuffer[func(T)] that stores functions until the actual value T has been created when calling Replay(T) dequeues all functions from the recorder and applies them to the value T. By default, it can store 512 functions at most.
  • Range[T]: Simple data structure that stores a start and end value, a minimum and maximum interval and has utils functions to help managing ranges.
  • SyncMap[K, V]: Typed version of sync.Map
  • SyncPool[T]: Typed version of sync.Pool
High Level Components
  • GlobalClient: The global client is a singleton that is used to access the client instance. It is used to create a new client instance if it does not exist yet. It is also used to access the client instance if it already exists. The global client recorder record calls to the clients until the StartApp function is called
  • Client: The actual Client interface implementation. It's main job is to steer data to its corresponding data source. Other than that it actually manages the config of the client and gather data from the data sources to call Flush with it.
  • Data Sources: Each data source implement the dataSource interface that has the method Payload() transport.Payload that is supposed to flush all data from the data source and make it into a payload ready to be serialized and sent to the backend.
    • Integrations: The integrations data source is responsible for creating the app-integrations-change payload. A very simple slice and mutex is used as backing store.
    • Configuration: The configuration data source is responsible for creating the app-client-configuration-change payload. A map and mutex is used as backing store.
    • Dependencies: The dependencies data source is responsible for gathering data app-dependencies-loaded payload. No public API is available for this as this is does in-house with the ClientConfig.DependencyLoader function output.
    • Product: The product data source is responsible for gathering data app-product-change payload. A map and mutex is used as backing store.
    • Metrics: The metrics data source is responsible for gathering data for the generate-metrics payload. A SyncMap[metrickey, metricHandle] is used as backing store. More on that in the metrics specific section
    • Distributions: The distributions data source is responsible for gathering data for the distributions payload. A SyncMap[distributionkey, distributionHandle] is used as backing store. More on that in the metrics specific section
    • Logs: The logs data source is responsible for gathering data for the generate-logs payload. A SyncMap[logkey, logValue] is used as backing store. More on that in the logs specific section.
  • Mapper: The mapper is also responsible for creating the app-started, app-closing, heartbeat, extended-heartbeat and message-batch payloads from the data sources that needs data from other payloads but not from the API user. The mapper already return another mapper that will be used in the next call to Flush.
  • Writer: The writer is responsible for sending the payload to the backend. It is a simple interface that has a Write method that receives a transport.Payload and returns statistics about the write operation.

Documentation

Overview

Package telemetry provides a telemetry client that is thread-safe burden-less telemetry client following the specification of the instrumentation telemetry from Datadog. Specification here: https://github.com/DataDog/instrumentation-telemetry-api-docs/tree/main

The telemetry package has 6 main capabilities:

  • Metrics: Support for Count, Rate, Gauge, Distribution metrics.
  • Logs: Support Debug, Warn, Error logs with tags and stack traces via the subpackage log or the Log function.
  • Product: Start, Stop and Startup errors reporting to the backend
  • App Config: Register and change the configuration of the application and declare its origin
  • Integration: Loading and errors
  • Dependencies: Sending all the dependencies of the application to the backend (for SCA purposes for example)

Each of these capabilities is exposed through the Client interface but mainly through the package level functions. that mirror and call the global client that is started through the StartApp function.

Before the StartApp function is called, all called to the global client will be recorded and replay when the StartApp function is called synchronously. The telemetry client is allowed to record at most 512 calls.

At the end of the app lifetime. If [tracer.Stop] is called, the client should be stopped with the StopApp function. For all data to be flushed to the backend appropriately.

Note: No public API is available for the dependencies payloads as this is does in-house with the `ClientConfig.DependencyLoader` function output.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddFlushTicker

func AddFlushTicker(ticker func(Client))

AddFlushTicker adds a function that is called at each telemetry Flush. By default, every minute

func Disabled

func Disabled() bool

Disabled returns whether instrumentation telemetry is disabled according to the DD_INSTRUMENTATION_TELEMETRY_ENABLED env var

func LoadIntegration

func LoadIntegration(integration string)

LoadIntegration marks an integration as loaded in the telemetry client. If telemetry is disabled, it will do nothing. If the telemetry client has not started yet, it will record the action and replay it once the client is started.

func Log

func Log(level LogLevel, text string, options ...LogOption)

func MarkIntegrationAsLoaded

func MarkIntegrationAsLoaded(integration Integration)

MarkIntegrationAsLoaded marks an integration as loaded in the telemetry. If telemetry is disabled or the client has not started yet it will record the action and replay it once the client is started.

func MockClient

func MockClient(client Client) func()

MockClient swaps the global client with the given client and clears the recorder to make sure external calls are not replayed. It returns a function that can be used to swap back the global client

func ProductStartError

func ProductStartError(product Namespace, err error)

ProductStartError declares that a product could not start because of the following error. If telemetry is disabled, it will do nothing. If the telemetry client has not started yet, it will record the action and replay it once the client is started.

func ProductStarted

func ProductStarted(product Namespace)

ProductStarted declares a product to have started at the customer’s request. If telemetry is disabled, it will do nothing. If the telemetry client has not started yet, it will record the action and replay it once the client is started.

func ProductStopped

func ProductStopped(product Namespace)

ProductStopped declares a product to have being stopped by the customer. If telemetry is disabled, it will do nothing. If the telemetry client has not started yet, it will record the action and replay it once the client is started.

func RegisterAppConfig

func RegisterAppConfig(key string, value any, origin Origin)

RegisterAppConfig adds a key value pair to the app configuration and send the change to telemetry value has to be json serializable and the origin is the source of the change. If telemetry is disabled, it will do nothing. If the telemetry client has not started yet, it will record the action and replay it once the client is started.

func RegisterAppConfigs

func RegisterAppConfigs(kvs ...Configuration)

RegisterAppConfigs adds a list of key value pairs to the app configuration and sends the change to telemetry. Same as AddAppConfig but for multiple values. If telemetry is disabled, it will do nothing. If the telemetry client has not started yet, it will record the action and replay it once the client is started.

func SanitizeConfigValue

func SanitizeConfigValue(value any) any

SanitizeConfigValue sanitizes the value of a configuration key to ensure it can be marshalled.

func StartApp

func StartApp(client Client)

StartApp starts the telemetry client with the given client send the app-started telemetry and sets it as the global (*client) then calls client.Flush on the client asynchronously.

func StopApp

func StopApp()

StopApp creates the app-stopped telemetry, adding to the queue and Flush all the queue before stopping the (*client).

Types

type Client

type Client interface {
	io.Closer

	// Count obtains the metric handle for the given parameters, or creates a new one if none was created just yet.
	// Tags cannot contain commas.
	Count(namespace Namespace, name string, tags []string) MetricHandle

	// Rate obtains the metric handle for the given parameters, or creates a new one if none was created just yet.
	// Tags cannot contain commas.
	Rate(namespace Namespace, name string, tags []string) MetricHandle

	// Gauge obtains the metric handle for the given parameters, or creates a new one if none was created just yet.
	// Tags cannot contain commas.
	Gauge(namespace Namespace, name string, tags []string) MetricHandle

	// Distribution obtains the metric handle for the given parameters, or creates a new one if none was created just yet.
	// Tags cannot contain commas.
	Distribution(namespace Namespace, name string, tags []string) MetricHandle

	// Log sends a telemetry log at the desired level with the given text and options.
	// Options include sending key-value pairs as tags, and a stack trace frozen from inside the Log function.
	Log(level LogLevel, text string, options ...LogOption)

	// ProductStarted declares a product to have started at the customer’s request
	ProductStarted(product Namespace)

	// ProductStopped declares a product to have being stopped by the customer
	ProductStopped(product Namespace)

	// ProductStartError declares that a product could not start because of the following error
	ProductStartError(product Namespace, err error)

	// RegisterAppConfig adds a key value pair to the app configuration and send the change to telemetry
	// value has to be json serializable and the origin is the source of the change.
	RegisterAppConfig(key string, value any, origin Origin)

	// RegisterAppConfigs adds a list of key value pairs to the app configuration and sends the change to telemetry.
	// Same as AddAppConfig but for multiple values.
	RegisterAppConfigs(kvs ...Configuration)

	// MarkIntegrationAsLoaded marks an integration as loaded in the telemetry
	MarkIntegrationAsLoaded(integration Integration)

	// Flush closes the client and flushes any remaining data.
	Flush()

	// AppStart sends the telemetry necessary to signal that the app is starting.
	// Preferred use via [StartApp] package level function
	AppStart()

	// AppStop sends the telemetry necessary to signal that the app is stopping.
	// Preferred use via [StopApp] package level function
	AppStop()

	// AddFlushTicker adds a function that is called at each telemetry Flush. By default, every minute
	AddFlushTicker(ticker func(Client))
}

Client constitutes all the functions available concurrently for the telemetry users. All methods are thread-safe This is an interface for easier testing but all functions will be mirrored at the package level to call the global client.

func GlobalClient

func GlobalClient() Client

GlobalClient returns the global telemetry client.

func NewClient

func NewClient(service, env, version string, config ClientConfig) (Client, error)

NewClient creates a new telemetry client with the given service, environment, and version and config.

func SwapClient

func SwapClient(client Client) Client

SwapClient swaps the global client with the given client and Flush the old (*client).

type ClientConfig

type ClientConfig struct {
	// DependencyLoader determines how dependency data is sent via telemetry.
	// The default value is [debug.ReadBuildInfo] since Application Security Monitoring uses this data to detect vulnerabilities in the ASM-SCA product
	// To disable this feature, please implement a function that returns nil, false.
	// This can only be controlled via the env var DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED
	DependencyLoader func() (*debug.BuildInfo, bool)

	// MetricsEnabled determines whether metrics are sent via telemetry.
	// If false, libraries should not send the generate-metrics or distributions events.
	// This can only be controlled via the env var DD_TELEMETRY_METRICS_ENABLED
	MetricsEnabled bool

	// LogsEnabled determines whether logs are sent via telemetry.
	// This can only be controlled via the env var DD_TELEMETRY_LOG_COLLECTION_ENABLED
	LogsEnabled bool

	// AgentlessURL is the full URL to the agentless telemetry endpoint. (optional)
	// Defaults to https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry
	AgentlessURL string

	// AgentURL is the url of the agent to send telemetry to. (optional)
	// If the AgentURL is not set, the telemetry client will not attempt to connect to the agent before sending to the agentless endpoint.
	AgentURL string

	// HTTPClient is the http client to use for sending telemetry, defaults to a http.DefaultClient copy.
	HTTPClient *http.Client

	// HeartbeatInterval is the interval at which to send a heartbeat payload, defaults to 60s.
	// The maximum value is 60s.
	HeartbeatInterval time.Duration

	// ExtendedHeartbeatInterval is the interval at which to send an extended heartbeat payload, defaults to 24h.
	ExtendedHeartbeatInterval time.Duration

	// FlushInterval is the interval at which the client flushes the data.
	// By default, the client will start to Flush at 60s intervals and will reduce the interval based on the load till it hit 15s
	// Both values cannot be higher than 60s because the heartbeat need to be sent at least every 60s. Values will be clamped otherwise.
	FlushInterval internal.Range[time.Duration]

	// PayloadQueueSize is the size of the payload queue. Default range is [4, 32].
	PayloadQueueSize internal.Range[int]

	// DistributionsSize is the size of the distribution queue. Default range is [2^8, 2^14].
	DistributionsSize internal.Range[int]

	// Debug enables debug mode for the telemetry client and sent it to the backend so it logs the request. The
	// DD_TELEMETRY_DEBUG environment variable, when set to a truthy value, overrides this setting.
	Debug bool

	// APIKey is the API key to use for sending telemetry to the agentless endpoint. (using DD_API_KEY env var by default)
	APIKey string

	// EarlyFlushPayloadSize is the size of the payload that will trigger an early flush.
	// This is necessary because backend won't allow bodies larger than 5MB.
	// The default value here will be 2MB to take into account the large inaccuracy in estimating the size of bodies
	EarlyFlushPayloadSize int

	// MaxDistributionsSize is the maximum number of logs with distinct message, level and tags that can be stored per flush window.
	// If the limit is reached, logs will be dropped and a log will be sent to the backend about it
	// The default value is 1024.
	MaxDistinctLogs int32
	// contains filtered or unexported fields
}

type Configuration

type Configuration struct {
	// Key is the key of the configuration.
	Name string
	// Value is the value of the configuration. Need to be json serializable.
	Value any
	// Origin is the source of the configuration change.
	Origin Origin
}

Configuration is a key-value pair that is used to configure the application.

type Integration

type Integration struct {
	// Name is an arbitrary string that must stay constant for the integration.
	Name string
	// Version is the version of the integration/dependency that is being loaded.
	Version string
	// Error is the error that occurred while loading the integration. If this field is specified, the integration is
	// considered to be having been forcefully disabled because of the error.
	Error string
}

Integration is an integration that is configured to be traced.

type LogOption

type LogOption func(key *loggerKey, value *loggerValue)

LogOption is a function that modifies the log message that is sent to the telemetry.

func WithStacktrace

func WithStacktrace() LogOption

WithStacktrace returns a LogOption that sets the stacktrace for the telemetry log message. The stacktrace is a string that is generated inside the WithStacktrace function. Logs demultiplication does not take the stacktrace into account. This means that a log that has been demultiplicated will only show of the first log.

func WithTags

func WithTags(tags []string) LogOption

WithTags returns a LogOption that sets the tags for the telemetry log message. Tags are key-value pairs that are then serialized into a simple "key:value,key2:value2" format. No quoting or escaping is performed.

type MetricHandle

type MetricHandle interface {
	// Submit submits a value to the metric handle.
	Submit(value float64)
	// Get returns the last value submitted to the metric handle.
	Get() float64
}

MetricHandle can be used to submit different values for the same metric. MetricHandle is used to reduce lock contention when submitting metrics. This can also be used ephemerally to submit a single metric value like this:

telemetry.metric(telemetry.Appsec, "my-count", map[string]string{"tag1": "true", "tag2": "1.0"}).Submit(1.0)

func Count

func Count(namespace Namespace, name string, tags []string) MetricHandle

Count creates a new metric handle for the given parameters that can be used to submit values. Count will always return a MetricHandle, even if telemetry is disabled or the client has yet to start. The MetricHandle is then swapped with the actual MetricHandle once the client is started.

func Distribution

func Distribution(namespace Namespace, name string, tags []string) MetricHandle

Distribution creates a new metric handle for the given parameters that can be used to submit values. Distribution will always return a MetricHandle, even if telemetry is disabled or the client has yet to start. The MetricHandle is then swapped with the actual MetricHandle once the client is started. The Get() method of the MetricHandle will return the last value submitted. Distribution MetricHandle is advised to be held in a variable more than the rest of the metric types to avoid too many useless allocations.

func Gauge

func Gauge(namespace Namespace, name string, tags []string) MetricHandle

Gauge creates a new metric handle for the given parameters that can be used to submit values. Gauge will always return a MetricHandle, even if telemetry is disabled or the client has yet to start. The MetricHandle is then swapped with the actual MetricHandle once the client is started.

func Rate

func Rate(namespace Namespace, name string, tags []string) MetricHandle

Rate creates a new metric handle for the given parameters that can be used to submit values. Rate will always return a MetricHandle, even if telemetry is disabled or the client has yet to start. The MetricHandle is then swapped with the actual MetricHandle once the client is started.

type Namespace

type Namespace = transport.Namespace

Namespace describes a product to distinguish telemetry coming from different products used by the same application

const (
	NamespaceGeneral      Namespace = transport.NamespaceGeneral
	NamespaceTracers      Namespace = transport.NamespaceTracers
	NamespaceProfilers    Namespace = transport.NamespaceProfilers
	NamespaceAppSec       Namespace = transport.NamespaceAppSec
	NamespaceIAST         Namespace = transport.NamespaceIAST
	NamespaceCIVisibility Namespace = transport.NamespaceCIVisibility
	NamespaceMLOps        Namespace = transport.NamespaceMLOps
	NamespaceRUM          Namespace = transport.NamespaceRUM
)

type Origin

type Origin = transport.Origin
const (
	OriginDefault             Origin = transport.OriginDefault
	OriginCode                Origin = transport.OriginCode
	OriginDDConfig            Origin = transport.OriginDDConfig
	OriginEnvVar              Origin = transport.OriginEnvVar
	OriginRemoteConfig        Origin = transport.OriginRemoteConfig
	OriginLocalStableConfig   Origin = transport.OriginLocalStableConfig
	OriginManagedStableConfig Origin = transport.OriginManagedStableConfig
)

Directories

Path Synopsis
Package telemetrytest provides a mock implementation of the telemetry client for testing purposes
Package telemetrytest provides a mock implementation of the telemetry client for testing purposes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL