Documentation ¶
Overview ¶
Package metrics implements the Beam metrics API, described at http://s.apache.org/beam-metrics-api
Metrics in the Beam model are uniquely identified by a namespace, a name, and the PTransform context in which they are used. Further, they are reported as a delta against the bundle being processed, so that overcounting doesn't occur if a bundle needs to be retried. Each metric is scoped to their bundle, and ptransform.
Cells (or metric cells) are defined for each Beam model metric type, and the serve as concurrency safe storage of a given metric's values. Proxys are exported values representing the metric, for use in user ptransform code. They don't retain their cells, since they don't have the context to be able to store them for export back to the pipeline runner.
Metric cells aren't initialized until their first mutation, which follows from the Beam model design, where metrics are only sent for a bundle if they have changed. This is particularly convenient for distributions which means their min and max fields can be set to the first value on creation rather than have some marker of uninitialized state, which would otherwise need to be checked for on every update.
Metric values are implemented as lightweight proxies of the user provided namespace and name. This allows them to be declared globally, and used in any ParDo. Further, as per the design, they can be declared dynamically at runtime.
To handle reporting deltas on the metrics by bundle, metrics are keyed by bundleID,PTransformID,namespace, and name, so metrics that are identical except for bundles are treated as distinct, effectively providing per bundle deltas, since a new value cell is used per bundle.
Index ¶
- Constants
- func DumpToLog(ctx context.Context)
- func DumpToLogFromStore(ctx context.Context, store *Store)
- func DumpToOutFromContext(ctx context.Context)
- func DumpToOutFromStore(store *Store)
- func SetBundleID(ctx context.Context, id string) context.Context
- func SetPTransformID(ctx context.Context, id string) context.Context
- type BundleState
- type Counter
- type CounterResult
- type Distribution
- type DistributionResult
- type DistributionValue
- type ExecutionState
- type Extractor
- type Gauge
- type GaugeResult
- type GaugeValue
- type Labels
- type MsecResult
- type MsecValue
- type PColResult
- type PColValue
- type PTransformState
- type QueryResults
- type Results
- type SingleResult
- type StateSampler
- type StepKey
- type Store
Constants ¶
const ( // StartBundle indicates starting state of a bundle StartBundle bundleProcState = 0 // ProcessBundle indicates processing state of a bundle ProcessBundle bundleProcState = 1 // FinishBundle indicates finishing state of a bundle FinishBundle bundleProcState = 2 // TotalBundle (not a state) used for aggregating above states of a bundle TotalBundle bundleProcState = 3 )
Variables ¶
This section is empty.
Functions ¶
func DumpToLog ¶
DumpToLog is a debugging function that outputs all metrics available locally to beam.Log.
func DumpToLogFromStore ¶
DumpToLogFromStore dumps the metrics in the provided Store to beam.Log.
func DumpToOutFromContext ¶
DumpToOutFromContext is a debugging function that outputs all metrics available locally to std out, extracting the metric store from the context.
func DumpToOutFromStore ¶
func DumpToOutFromStore(store *Store)
DumpToOutFromStore is a debugging function that outputs all metrics available locally to std out directly from the store.
func SetBundleID ¶
SetBundleID sets the id of the current Bundle, and populates the store.
Types ¶
type BundleState ¶ added in v2.35.0
type BundleState struct {
// contains filtered or unexported fields
}
BundleState stores information about a PTransform for execution time metrics.
func (BundleState) String ¶ added in v2.40.0
func (b BundleState) String() string
String implements the Stringer interface.
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
Counter is a simple counter for incrementing and decrementing a value.
func NewCounter ¶
NewCounter returns the Counter with the given namespace and name.
type CounterResult ¶
CounterResult is an attempted and a commited value of a counter metric plus key.
func MergeCounters ¶
func MergeCounters( attempted map[StepKey]int64, committed map[StepKey]int64) []CounterResult
MergeCounters combines counter metrics that share a common key.
func (CounterResult) Name ¶ added in v2.34.0
func (r CounterResult) Name() string
Name returns the Name of this Counter.
func (CounterResult) Namespace ¶ added in v2.34.0
func (r CounterResult) Namespace() string
Namespace returns the Namespace of this Counter.
func (CounterResult) Result ¶
func (r CounterResult) Result() int64
Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).
func (CounterResult) Transform ¶ added in v2.35.0
func (r CounterResult) Transform() string
Transform returns the Transform step for this CounterResult.
type Distribution ¶
type Distribution struct {
// contains filtered or unexported fields
}
Distribution is a simple distribution of values.
func NewDistribution ¶
func NewDistribution(ns, n string) *Distribution
NewDistribution returns the Distribution with the given namespace and name.
func (*Distribution) String ¶
func (m *Distribution) String() string
type DistributionResult ¶
type DistributionResult struct {
Attempted, Committed DistributionValue
Key StepKey
}
DistributionResult is an attempted and a commited value of a distribution metric plus key.
func MergeDistributions ¶
func MergeDistributions( attempted map[StepKey]DistributionValue, committed map[StepKey]DistributionValue) []DistributionResult
MergeDistributions combines distribution metrics that share a common key.
func (DistributionResult) Name ¶ added in v2.34.0
func (r DistributionResult) Name() string
Name returns the Name of this Distribution.
func (DistributionResult) Namespace ¶ added in v2.34.0
func (r DistributionResult) Namespace() string
Namespace returns the Namespace of this Distribution.
func (DistributionResult) Result ¶
func (r DistributionResult) Result() DistributionValue
Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).
func (DistributionResult) Transform ¶ added in v2.35.0
func (r DistributionResult) Transform() string
Transform returns the Transform step for this DistributionResult.
type DistributionValue ¶
type DistributionValue struct {
Count, Sum, Min, Max int64
}
DistributionValue is the value of a Distribution metric.
type ExecutionState ¶ added in v2.35.0
type ExecutionState struct { State bundleProcState IsProcessing bool // set to true when sent as a response to ProcessBundleProgress Request TotalTime time.Duration }
ExecutionState stores the information about a bundle in a particular state.
func (ExecutionState) String ¶ added in v2.40.0
func (e ExecutionState) String() string
String implements the Stringer interface.
type Extractor ¶
type Extractor struct { // SumInt64 extracts data from Sum Int64 counters. SumInt64 func(labels Labels, v int64) // DistributionInt64 extracts data from Distribution Int64 counters. DistributionInt64 func(labels Labels, count, sum, min, max int64) // GaugeInt64 extracts data from Gauge Int64 counters. GaugeInt64 func(labels Labels, v int64, t time.Time) // MsecsInt64 extracts data from StateRegistry of ExecutionState. // Extraction of Msec counters is experimental and subject to change. MsecsInt64 func(labels string, e *[4]ExecutionState) }
Extractor allows users to access metrics programatically after pipeline completion. Users assign functions to fields that interest them, and that function is called for each metric of the associated kind.
func (Extractor) ExtractFrom ¶
ExtractFrom the given metrics Store all the metrics for populated function fields. Returns an error if no fields were set.
type Gauge ¶
type Gauge struct {
// contains filtered or unexported fields
}
Gauge is a time, value pair metric.
type GaugeResult ¶
type GaugeResult struct {
Attempted, Committed GaugeValue
Key StepKey
}
GaugeResult is an attempted and a commited value of a gauge metric plus key.
func MergeGauges ¶
func MergeGauges( attempted map[StepKey]GaugeValue, committed map[StepKey]GaugeValue) []GaugeResult
MergeGauges combines gauge metrics that share a common key.
func (GaugeResult) Name ¶ added in v2.34.0
func (r GaugeResult) Name() string
Name returns the Name of this Gauge.
func (GaugeResult) Namespace ¶ added in v2.34.0
func (r GaugeResult) Namespace() string
Namespace returns the Namespace of this Gauge.
func (GaugeResult) Result ¶
func (r GaugeResult) Result() GaugeValue
Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).
func (GaugeResult) Transform ¶ added in v2.35.0
func (r GaugeResult) Transform() string
Transform returns the Transform step for this GaugeResult.
type GaugeValue ¶
GaugeValue is the value of a Gauge metric.
type Labels ¶
type Labels struct {
// contains filtered or unexported fields
}
Labels provide the context for the given metric.
func PCollectionLabels ¶
PCollectionLabels builds a Labels for pcollection metrics. Intended for framework use.
func PTransformLabels ¶
PTransformLabels builds a Labels for transform metrics. Intended for framework use.
func UserLabels ¶
UserLabels builds a Labels for user metrics. Intended for framework use.
func (Labels) Map ¶
Map produces a map of present labels to their values.
Returns nil map if invalid.
func (Labels) PCollection ¶
PCollection returns the PCollection id for this metric.
type MsecResult ¶ added in v2.35.0
MsecResult is an attempted and a commited value of a counter metric plus key.
func MergeMsecs ¶ added in v2.35.0
func MergeMsecs( attempted map[StepKey]MsecValue, committed map[StepKey]MsecValue) []MsecResult
MergeMsecs combines counter metrics that share a common key.
func (MsecResult) Name ¶ added in v2.35.0
func (r MsecResult) Name() string
Name returns the Name of this MsecResult.
func (MsecResult) Namespace ¶ added in v2.35.0
func (r MsecResult) Namespace() string
Namespace returns the Namespace of this MsecResult.
func (MsecResult) Result ¶ added in v2.35.0
func (r MsecResult) Result() MsecValue
Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).
func (MsecResult) Transform ¶ added in v2.35.0
func (r MsecResult) Transform() string
Transform returns the Transform step for this MsecResult.
type PColResult ¶ added in v2.35.0
PColResult is an attempted and a commited value of a pcollection metric plus key.
func MergePCols ¶ added in v2.35.0
func MergePCols( attempted map[StepKey]PColValue, committed map[StepKey]PColValue) []PColResult
MergePCols combines pcollection metrics that share a common key.
func (PColResult) Name ¶ added in v2.35.0
func (r PColResult) Name() string
Name returns the Name of this Pcollection Result.
func (PColResult) Namespace ¶ added in v2.35.0
func (r PColResult) Namespace() string
Namespace returns the Namespace of this Pcollection Result.
func (PColResult) Result ¶ added in v2.35.0
func (r PColResult) Result() PColValue
Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).
func (PColResult) Transform ¶ added in v2.35.0
func (r PColResult) Transform() string
Transform returns the Transform step for this Pcollection Result.
type PColValue ¶ added in v2.35.0
type PColValue struct { ElementCount int64 SampledByteSize DistributionValue }
PColValue is the value of a single PCollection metric.
type PTransformState ¶ added in v2.35.0
type PTransformState struct {
// contains filtered or unexported fields
}
PTransformState stores the state of PTransform for DoFn metrics.
func NewPTransformState ¶ added in v2.35.0
func NewPTransformState(pid string) *PTransformState
NewPTransformState creates a new PTransformState.
func (*PTransformState) Set ¶ added in v2.35.0
func (s *PTransformState) Set(ctx context.Context, state bundleProcState)
Set stores the state of PTransform in its bundle.
type QueryResults ¶
type QueryResults struct {
// contains filtered or unexported fields
}
QueryResults is the result of a query. Allows accessing all of the metrics that matched the filter.
func (QueryResults) Counters ¶
func (qr QueryResults) Counters() []CounterResult
Counters returns a slice of counter metrics.
func (QueryResults) Distributions ¶
func (qr QueryResults) Distributions() []DistributionResult
Distributions returns a slice of distribution metrics.
func (QueryResults) Gauges ¶
func (qr QueryResults) Gauges() []GaugeResult
Gauges returns a slice of gauge metrics.
func (QueryResults) Msecs ¶ added in v2.35.0
func (qr QueryResults) Msecs() []MsecResult
Msecs returns a slice of DoFn metrics
func (QueryResults) PCols ¶ added in v2.35.0
func (qr QueryResults) PCols() []PColResult
PCols returns a slice of PCollection metrics.
type Results ¶
type Results struct {
// contains filtered or unexported fields
}
Results represents all metrics gathered during the job's execution. It allows for querying metrics using a provided filter.
func NewResults ¶
func NewResults( counters []CounterResult, distributions []DistributionResult, gauges []GaugeResult, msecs []MsecResult, pCols []PColResult) *Results
NewResults creates a new Results.
func ResultsExtractor ¶ added in v2.34.0
ResultsExtractor extracts the metrics.Results from Store using ctx. This is same as what metrics.dumperExtractor and metrics.dumpTo would do together.
func (Results) AllMetrics ¶
func (mr Results) AllMetrics() QueryResults
AllMetrics returns all metrics from a Results instance.
func (Results) Query ¶ added in v2.34.0
func (mr Results) Query(f func(SingleResult) bool) QueryResults
Query allows metrics querying with filter. The filter takes the form of predicate function. Example:
qr = pr.Metrics().Query(func(mr beam.MetricResult) bool { return sr.Namespace() == test.namespace })
type SingleResult ¶ added in v2.34.0
SingleResult interface facilitates metrics query filtering methods.
type StateSampler ¶ added in v2.35.0
type StateSampler struct {
// contains filtered or unexported fields
}
StateSampler tracks the state of a bundle.
func NewSampler ¶ added in v2.35.0
func NewSampler(store *Store) StateSampler
NewSampler creates a new state sampler.
func (*StateSampler) Sample ¶ added in v2.35.0
func (s *StateSampler) Sample(ctx context.Context, t time.Duration)
Sample checks for state transition in processing a DoFn
func (*StateSampler) SetLogInterval ¶ added in v2.35.0
func (s *StateSampler) SetLogInterval(t time.Duration)
SetLogInterval sets the logging interval for lull reporting.
type StepKey ¶
type StepKey struct {
Step, Name, Namespace string
}
StepKey uniquely identifies a metric within a pipeline graph.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store retains per transform countersets, intended for per bundle use.
func GetStore ¶
GetStore extracts the metrics Store for the given context for a bundle.
Returns nil if the context doesn't contain a metric Store.
func (*Store) BundleState ¶ added in v2.40.0
BundleState returns the bundle state.
func (*Store) StateRegistry ¶ added in v2.40.0
StateRegistry returns the state registry that stores bundleID to executions states mapping.