streamaggr

package
v1.93.8-cluster Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregators

type Aggregators struct {
	// contains filtered or unexported fields
}

Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.

func LoadFromFile

func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error)

LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.

If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, e.g. only the last sample per each time series per each dedupInterval is aggregated.

The returned Aggregators must be stopped with MustStop() when no longer needed.

func NewAggregators

func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error)

NewAggregators creates Aggregators from the given cfgs.

pushFunc is called when the aggregated data must be flushed.

If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, e.g. only the last sample per each time series per each dedupInterval is aggregated.

MustStop must be called on the returned Aggregators when they are no longer needed.

func NewAggregatorsFromData

func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error)

NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.

If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, e.g. only the last sample per each time series per each dedupInterval is aggregated.

The returned Aggregators must be stopped with MustStop() when no longer needed.

func (*Aggregators) Equal added in v1.90.0

func (a *Aggregators) Equal(b *Aggregators) bool

Equal returns true if a and b are initialized from identical configs.

func (*Aggregators) MustStop

func (a *Aggregators) MustStop()

MustStop stops a.

func (*Aggregators) Push

func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte

Push pushes tss to a.

Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations. Otherwise matchIdxs[idx] is set to 0.

Push returns matchIdxs with len equal to len(tss). It re-uses the matchIdxs if it has enough capacity to hold len(tss) items. Otherwise it allocates new matchIdxs.

type Config

type Config struct {
	// Match is a label selector for filtering time series for the given selector.
	//
	// If the match isn't set, then all the input time series are processed.
	Match *promrelabel.IfExpression `yaml:"match,omitempty"`

	// Interval is the interval between aggregations.
	Interval string `yaml:"interval"`

	// Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
	// The parameter is only relevant for outputs: total, increase and histogram_bucket.
	StalenessInterval string `yaml:"staleness_interval,omitempty"`

	// Outputs is a list of output aggregate functions to produce.
	//
	// The following names are allowed:
	//
	// - total - aggregates input counters
	// - increase - counts the increase over input counters
	// - count_series - counts the input series
	// - count_samples - counts the input samples
	// - sum_samples - sums the input samples
	// - last - the last biggest sample value
	// - min - the minimum sample value
	// - max - the maximum sample value
	// - avg - the average value across all the samples
	// - stddev - standard deviation across all the samples
	// - stdvar - standard variance across all the samples
	// - histogram_bucket - creates VictoriaMetrics histogram for input samples
	// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
	//
	// The output time series will have the following names:
	//
	//   input_name:aggr_<interval>_<output>
	//
	Outputs []string `yaml:"outputs"`

	// By is an optional list of labels for grouping input series.
	//
	// See also Without.
	//
	// If neither By nor Without are set, then the Outputs are calculated
	// individually per each input time series.
	By []string `yaml:"by,omitempty"`

	// Without is an optional list of labels, which must be excluded when grouping input series.
	//
	// See also By.
	//
	// If neither By nor Without are set, then the Outputs are calculated
	// individually per each input time series.
	Without []string `yaml:"without,omitempty"`

	// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
	// before aggregation.
	InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"`

	// OutputRelabelConfigs is an optional relabeling rules, which are applied
	// on the aggregated output before being sent to remote storage.
	OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}

Config is a configuration for a single stream aggregation.

type PushFunc

type PushFunc func(tss []prompbmarshal.TimeSeries)

PushFunc is called by Aggregators when it needs to push its state to metrics storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL