sampler

package
v0.0.0-...-4646cf5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2020 License: Apache-2.0 Imports: 12 Imported by: 1

Documentation

Overview

Package sampler contains all the logic of the agent-side trace sampling

Currently implementation is based on the scoring of the "signature" of each trace Based on the score, we get a sample rate to apply to the given trace

Current score implementation is super-simple, it is a counter with polynomial decay per signature. We increment it for each incoming trace then we periodically divide the score by two every X seconds. Right after the division, the score is an approximation of the number of received signatures over X seconds. It is different from the scoring in the Agent.

Since the sampling can happen at different levels (client, agent, server) or depending on different rules, we have to track the sample rate applied at previous steps. This way, sampling twice at 50% can result in an effective 25% sampling. The rate is stored as a metric in the trace root.

Package sampler contains all the logic of the agent-side trace sampling

Currently implementation is based on the scoring of the "signature" of each trace Based on the score, we get a sample rate to apply to the given trace

Current score implementation is super-simple, it is a counter with polynomial decay per signature. We increment it for each incoming trace then we periodically divide the score by two every X seconds. Right after the division, the score is an approximation of the number of received signatures over X seconds. It is different from the scoring in the Agent.

Since the sampling can happen at different levels (client, agent, server) or depending on different rules, we have to track the sample rate applied at previous steps. This way, sampling twice at 50% can result in an effective 25% sampling. The rate is stored as a metric in the trace root.

Index

Constants

View Source
const (
	// KeySamplingRateGlobal is a metric key holding the global sampling rate.
	KeySamplingRateGlobal = "_sample_rate"

	// KeySamplingRateClient is a metric key holding the client-set sampling rate for APM events.
	KeySamplingRateClient = "_dd1.sr.rcusr"

	// KeySamplingRatePreSampler is a metric key holding the API rate limiter's rate for APM events.
	KeySamplingRatePreSampler = "_dd1.sr.rapre"

	// KeySamplingRateEventExtraction is the key of the metric storing the event extraction rate on an APM event.
	KeySamplingRateEventExtraction = "_dd1.sr.eausr"

	// KeySamplingRateMaxEPSSampler is the key of the metric storing the max eps sampler rate on an APM event.
	KeySamplingRateMaxEPSSampler = "_dd1.sr.eamax"

	// KeySamplingPriority is the key of the sampling priority value in the metrics map of the root span
	KeySamplingPriority = "_sampling_priority_v1"

	// KeyErrorType is the key of the error type in the meta map
	KeyErrorType = "error.type"

	// KeyHTTPStatusCode is the key of the http status code in the meta map
	KeyHTTPStatusCode = "http.status_code"
)
View Source
const (
	// SamplingPriorityRateKey is the metrics key holding the sampling rate at which this trace
	// was sampled.
	SamplingPriorityRateKey = "_sampling_priority_rate_v1"
)

Variables

This section is empty.

Functions

func AddGlobalRate

func AddGlobalRate(s *pb.Span, rate float64)

AddGlobalRate updates the cumulative sample rate of the trace to which this span belongs to with the provided rate which is assumed to belong to an independent sampler. The combination is done by simple multiplications.

func CombineRates

func CombineRates(rate1 float64, rate2 float64) float64

CombineRates merges two rates from Sampler1, Sampler2. Both samplers law are independent, and {sampled} = {sampled by Sampler1} or {sampled by Sampler2}

func GetClientRate

func GetClientRate(s *pb.Span) float64

GetClientRate gets the rate at which the trace this span belongs to was sampled by the tracer. NOTE: This defaults to 1 if no rate is stored.

func GetEventExtractionRate

func GetEventExtractionRate(s *pb.Span) float64

GetEventExtractionRate gets the rate at which the trace from which we extracted this event was sampled at the tracer. This defaults to 1 if no rate is stored.

func GetGlobalRate

func GetGlobalRate(s *pb.Span) float64

GetGlobalRate gets the cumulative sample rate of the trace to which this span belongs to.

func GetMaxEPSRate

func GetMaxEPSRate(s *pb.Span) float64

GetMaxEPSRate gets the rate at which this event was sampled by the max eps event sampler.

func GetPreSampleRate

func GetPreSampleRate(s *pb.Span) float64

GetPreSampleRate returns the rate at which the trace this span belongs to was sampled by the agent's presampler. NOTE: This defaults to 1 if no rate is stored.

func SampleByRate

func SampleByRate(traceID uint64, rate float64) bool

SampleByRate tells if a trace (from its ID) with a given rate should be sampled Use Knuth multiplicative hashing to leverage imbalanced traceID generators

func SetClientRate

func SetClientRate(s *pb.Span, rate float64)

SetClientRate sets the rate at which the trace this span belongs to was sampled by the tracer.

func SetEventExtractionRate

func SetEventExtractionRate(s *pb.Span, rate float64)

SetEventExtractionRate sets the rate at which the trace from which we extracted this event was sampled at the tracer.

func SetGlobalRate

func SetGlobalRate(s *pb.Span, rate float64)

SetGlobalRate sets the cumulative sample rate of the trace to which this span belongs to.

func SetMaxEPSRate

func SetMaxEPSRate(s *pb.Span, rate float64)

SetMaxEPSRate sets the rate at which this event was sampled by the max eps event sampler.

func SetPreSampleRate

func SetPreSampleRate(s *pb.Span, rate float64)

SetPreSampleRate sets the rate at which the trace this span belongs to was sampled by the agent's presampler.

func SetSamplingPriority

func SetSamplingPriority(s *pb.Span, priority SamplingPriority)

SetSamplingPriority sets the sampling priority value on this span, overwriting any previously set value.

Types

type Backend

type Backend interface {
	// Run runs the blocking execution of the backend main loop.
	Run()

	// Stop stops the backend main loop.
	Stop()

	// CountSample counts that 1 trace is going through the sampler.
	CountSample()

	// CountSignature counts that 1 signature is going through the sampler.
	CountSignature(signature Signature)

	// GetTotalScore returns the TPS (Traces Per Second) of all traces ingested.
	GetTotalScore() float64

	// GetSampledScore returns the TPS of all traces sampled.
	GetSampledScore() float64

	// GetUpperSampledScore is similar to GetSampledScore, but with the upper approximation.
	GetUpperSampledScore() float64

	// GetSignatureScore returns the TPS of traces ingested of a given signature.
	GetSignatureScore(signature Signature) float64

	// GetSignatureScores returns the TPS of traces ingested for all signatures.
	GetSignatureScores() map[Signature]float64

	// GetCardinality returns the number of different signatures seen.
	GetCardinality() int64
}

Backend stores and counts traces and signatures ingested by a sampler.

type DynamicConfig

type DynamicConfig struct {
	// RateByService contains the rate for each service/env tuple,
	// used in priority sampling by client libs.
	RateByService RateByService
}

DynamicConfig contains configuration items which may change dynamically over time.

func NewDynamicConfig

func NewDynamicConfig(env string) *DynamicConfig

NewDynamicConfig creates a new dynamic config object which maps service signatures to their corresponding sampling rates. Each service will have a default assigned matching the service rate of the specified env.

type Engine

type Engine interface {
	// Run the sampler.
	Run()
	// Stop the sampler.
	Stop()
	// Sample a trace.
	Sample(trace pb.Trace, root *pb.Span, env string) (sampled bool, samplingRate float64)
	// GetState returns information about the sampler.
	GetState() interface{}
	// GetType returns the type of the sampler.
	GetType() EngineType
}

Engine is a common basic interface for sampler engines.

type EngineType

type EngineType int

EngineType represents the type of a sampler engine.

const (
	// NormalScoreEngineType is the type of the ScoreEngine sampling non-error traces.
	NormalScoreEngineType EngineType = iota
	// ErrorsScoreEngineType is the type of the ScoreEngine sampling error traces.
	ErrorsScoreEngineType
	// PriorityEngineType is type of the priority sampler engine type.
	PriorityEngineType
)

type ExceptionSampler

type ExceptionSampler struct {
	// contains filtered or unexported fields
}

ExceptionSampler samples traces that are not caught by the Priority sampler. It ensures that we sample traces for each combination of (env, service, name, resource, error type, http status) seen on a top level or measured span for which we did not see any span with a priority > 0 (sampled by Priority). The resulting sampled traces will likely be incomplete and will be flagged with a exceptioKey metric set at 1.

func NewExceptionSampler

func NewExceptionSampler() *ExceptionSampler

NewExceptionSampler returns a NewExceptionSampler that ensures that we sample combinations of env, service, name, resource, http-status, error type for each top level or measured spans

func (*ExceptionSampler) Add

func (e *ExceptionSampler) Add(env string, root *pb.Span, t pb.Trace) (sampled bool)

Add samples a trace and returns true if trace was sampled (should be kept)

func (*ExceptionSampler) Stop

func (e *ExceptionSampler) Stop()

Stop stops reporting stats

type InternalState

type InternalState struct {
	Offset      float64
	Slope       float64
	Cardinality int64
	InTPS       float64
	OutTPS      float64
	MaxTPS      float64
}

InternalState exposes all the main internal settings of the score sampler

type MemoryBackend

type MemoryBackend struct {
	// contains filtered or unexported fields
}

MemoryBackend storing any state required to run the sampling algorithms.

Current implementation is only based on counters with polynomial decay. Its bias with steady counts is 1 * decayFactor. The stored scores represent approximation of the real count values (with a countScaleFactor factor).

func NewMemoryBackend

func NewMemoryBackend(decayPeriod time.Duration, decayFactor float64) *MemoryBackend

NewMemoryBackend returns an initialized Backend.

func (*MemoryBackend) CountSample

func (b *MemoryBackend) CountSample()

CountSample counts a trace sampled by the sampler.

func (*MemoryBackend) CountSignature

func (b *MemoryBackend) CountSignature(signature Signature)

CountSignature counts an incoming signature.

func (*MemoryBackend) GetCardinality

func (b *MemoryBackend) GetCardinality() int64

GetCardinality returns the number of different signatures seen recently.

func (*MemoryBackend) GetSampledScore

func (b *MemoryBackend) GetSampledScore() float64

GetSampledScore returns the global score of all sampled traces.

func (*MemoryBackend) GetSignatureScore

func (b *MemoryBackend) GetSignatureScore(signature Signature) float64

GetSignatureScore returns the score of a signature. It is normalized to represent a number of signatures per second.

func (*MemoryBackend) GetSignatureScores

func (b *MemoryBackend) GetSignatureScores() map[Signature]float64

GetSignatureScores returns the scores for all signatures. It is normalized to represent a number of signatures per second.

func (*MemoryBackend) GetTotalScore

func (b *MemoryBackend) GetTotalScore() float64

GetTotalScore returns the global score of all sampled traces.

func (*MemoryBackend) GetUpperSampledScore

func (b *MemoryBackend) GetUpperSampledScore() float64

GetUpperSampledScore returns a certain upper bound of the global count of all sampled traces.

func (*MemoryBackend) Run

func (b *MemoryBackend) Run()

Run runs and block on the Sampler main loop.

func (*MemoryBackend) Stop

func (b *MemoryBackend) Stop()

Stop stops the main Run loop.

type PriorityEngine

type PriorityEngine struct {
	// Sampler is the underlying sampler used by this engine, sharing logic among various engines.
	Sampler *Sampler
	// contains filtered or unexported fields
}

PriorityEngine is the main component of the sampling logic

func NewPriorityEngine

func NewPriorityEngine(extraRate float64, maxTPS float64, rateByService *RateByService) *PriorityEngine

NewPriorityEngine returns an initialized Sampler

func (*PriorityEngine) GetState

func (s *PriorityEngine) GetState() interface{}

GetState collects and return internal statistics and coefficients for indication purposes It returns an interface{}, as other samplers might return other informations.

func (*PriorityEngine) GetType

func (s *PriorityEngine) GetType() EngineType

GetType return the type of the sampler engine

func (*PriorityEngine) Run

func (s *PriorityEngine) Run()

Run runs and block on the Sampler main loop

func (*PriorityEngine) Sample

func (s *PriorityEngine) Sample(trace pb.Trace, root *pb.Span, env string) (sampled bool, rate float64)

Sample counts an incoming trace and returns the trace sampling decision and the applied sampling rate

func (*PriorityEngine) Stop

func (s *PriorityEngine) Stop()

Stop stops the main Run loop

type RateByService

type RateByService struct {
	// contains filtered or unexported fields
}

RateByService stores the sampling rate per service. It is thread-safe, so one can read/write on it concurrently, using getters and setters.

func (*RateByService) GetAll

func (rbs *RateByService) GetAll() map[string]float64

GetAll returns all sampling rates for all services.

func (*RateByService) SetAll

func (rbs *RateByService) SetAll(rates map[ServiceSignature]float64)

SetAll the sampling rate for all services. If a service/env is not in the map, then the entry is removed.

type Sampler

type Sampler struct {
	// Storage of the state of the sampler
	Backend Backend
	// contains filtered or unexported fields
}

Sampler is the main component of the sampling logic

func (*Sampler) AdjustScoring

func (s *Sampler) AdjustScoring()

AdjustScoring modifies sampler coefficients to fit better the `maxTPS` condition

func (*Sampler) GetAllCountScores

func (s *Sampler) GetAllCountScores() map[Signature]float64

GetAllCountScores scores all signatures based on their recent throughput The score value can be seeing as the sample rate if the count were the only factor Since other factors can intervene (such as extra global sampling), its value can be larger than 1

func (*Sampler) GetAllSignatureSampleRates

func (s *Sampler) GetAllSignatureSampleRates() map[Signature]float64

GetAllSignatureSampleRates gives the sample rate to apply to all signatures. For now, only based on count score.

func (*Sampler) GetCountScore

func (s *Sampler) GetCountScore(signature Signature) float64

GetCountScore scores any signature based on its recent throughput The score value can be seeing as the sample rate if the count were the only factor Since other factors can intervene (such as extra global sampling), its value can be larger than 1

func (*Sampler) GetDefaultCountScore

func (s *Sampler) GetDefaultCountScore() float64

GetDefaultCountScore returns a default score when not knowing the signature for real. Since other factors can intervene (such as extra global sampling), its value can be larger than 1

func (*Sampler) GetDefaultSampleRate

func (s *Sampler) GetDefaultSampleRate() float64

GetDefaultSampleRate gives the sample rate to apply to an unknown signature. For now, only based on count score.

func (*Sampler) GetMaxTPSSampleRate

func (s *Sampler) GetMaxTPSSampleRate() float64

GetMaxTPSSampleRate returns an extra sample rate to apply if we are above maxTPS.

func (*Sampler) GetSampleRate

func (s *Sampler) GetSampleRate(trace pb.Trace, root *pb.Span, signature Signature) float64

GetSampleRate returns the sample rate to apply to a trace.

func (*Sampler) GetSignatureSampleRate

func (s *Sampler) GetSignatureSampleRate(signature Signature) float64

GetSignatureSampleRate gives the sample rate to apply to any signature. For now, only based on count score.

func (*Sampler) GetState

func (s *Sampler) GetState() InternalState

GetState collects and return internal statistics and coefficients for indication purposes

func (*Sampler) Run

func (s *Sampler) Run()

Run runs and block on the Sampler main loop

func (*Sampler) RunAdjustScoring

func (s *Sampler) RunAdjustScoring()

RunAdjustScoring is the sampler feedback loop to adjust the scoring coefficients

func (*Sampler) SetSignatureCoefficients

func (s *Sampler) SetSignatureCoefficients(offset float64, slope float64)

SetSignatureCoefficients updates the internal scoring coefficients used by the signature scoring

func (*Sampler) Stop

func (s *Sampler) Stop()

Stop stops the main Run loop

func (*Sampler) UpdateExtraRate

func (s *Sampler) UpdateExtraRate(extraRate float64)

UpdateExtraRate updates the extra sample rate

func (*Sampler) UpdateMaxTPS

func (s *Sampler) UpdateMaxTPS(maxTPS float64)

UpdateMaxTPS updates the max TPS limit

type SamplingPriority

type SamplingPriority int8

SamplingPriority is the type encoding a priority sampling decision.

const (
	// PriorityNone is the value for SamplingPriority when no priority sampling decision could be found.
	PriorityNone SamplingPriority = math.MinInt8

	// PriorityUserDrop is the value set by a user to explicitly drop a trace.
	PriorityUserDrop SamplingPriority = -1

	// PriorityAutoDrop is the value set by a tracer to suggest dropping a trace.
	PriorityAutoDrop SamplingPriority = 0

	// PriorityAutoKeep is the value set by a tracer to suggest keeping a trace.
	PriorityAutoKeep SamplingPriority = 1

	// PriorityUserKeep is the value set by a user to explicitly keep a trace.
	PriorityUserKeep SamplingPriority = 2
)

func GetSamplingPriority

func GetSamplingPriority(s *pb.Span) (SamplingPriority, bool)

GetSamplingPriority returns the value of the sampling priority metric set on this span and a boolean indicating if such a metric was actually found or not.

type ScoreEngine

type ScoreEngine struct {
	// Sampler is the underlying sampler used by this engine, sharing logic among various engines.
	Sampler *Sampler
	// contains filtered or unexported fields
}

ScoreEngine is the main component of the sampling logic

func NewErrorsEngine

func NewErrorsEngine(extraRate float64, maxTPS float64) *ScoreEngine

NewErrorsEngine returns an initialized Sampler dedicate to errors. It behaves just like the the normal ScoreEngine except for its GetType method (useful for reporting).

func NewScoreEngine

func NewScoreEngine(extraRate float64, maxTPS float64) *ScoreEngine

NewScoreEngine returns an initialized Sampler

func (*ScoreEngine) GetState

func (s *ScoreEngine) GetState() interface{}

GetState collects and return internal statistics and coefficients for indication purposes It returns an interface{}, as other samplers might return other informations.

func (*ScoreEngine) GetType

func (s *ScoreEngine) GetType() EngineType

GetType returns the type of the sampler

func (*ScoreEngine) Run

func (s *ScoreEngine) Run()

Run runs and block on the Sampler main loop

func (*ScoreEngine) Sample

func (s *ScoreEngine) Sample(trace pb.Trace, root *pb.Span, env string) (sampled bool, rate float64)

Sample counts an incoming trace and tells if it is a sample which has to be kept

func (*ScoreEngine) Stop

func (s *ScoreEngine) Stop()

Stop stops the main Run loop

type ServiceSignature

type ServiceSignature struct{ Name, Env string }

ServiceSignature represents a unique way to identify a service.

func (ServiceSignature) Hash

func (s ServiceSignature) Hash() Signature

Hash generates the signature of a trace with minimal information such as service and env, this is typically used by distributed sampling based on priority, and used as a key to store the desired rate for a given service,env tuple.

func (ServiceSignature) String

func (s ServiceSignature) String() string

type Signature

type Signature uint64

Signature is a hash representation of trace or a service, used to identify simlar signatures.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL