v0.0.0-...-116b634 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2021 License: Apache-2.0 Imports: 3 Imported by: 4



Package rrd contains the logic for updating in-memory partial Round-Robin Archives from incoming and usually unevenly-spaced data points by converting the input to a rate, consolidating and aggregating the data across a list of evenly spaced series of pre-defined resolution and time span.

Throughout this documentation and code the following terms are used (sometimes as abbreviations, listed in parenthesis):

Round-Robin Database (RRD): Collectively all the logic in this package and an instance of the data it maintains is referred to as an RRD.

Data Point (DP): There actually isn't a data structure representing a data point. A datapoint is just a float64.

Round-Robin Archive (RRA): An array of data points at a specific resolutoin and going back a pre-defined duration of time.

Primary Data Point (PDP): A conceptual value which represents a step-sized time slot in a series. Many (or none) actual data points can come in and fall into a PDP. Each DS and each RRA maintain a current (not yet complete) PDP, whose value upon completion is used to update PDPs of lower resolution (i.e. larger Step/PDP) RRAs.

Data Sourse (DS): Data Source loosely represents a "time series": the smalles resolution (PDP size) and other parameters, as well as the data. A DS should have at least one, but usually several RRAs.

Step: Step is the smallest unit of time of a DS and/or RRA's in milliseconds. RRA resolutions and sizes must be multiples of the DS step they belong to. In this implementation a step cannot be smaller than a millisecond.

DS Heartbeat (HB): Duration of time that can pass without data. A gap in data which exceeds HB is filled with NaNs.

Note that this package does not concern itself with loading a series from storage for analysis.



This section is empty.


This section is empty.


func IndexDistance

func IndexDistance(i, j, size int64) int64

Distance between i and j indexes in an RRA. If i > j (the RRA wraps around) then it is the sum of the distance from i to the end and the beginning to j. Size of 0 causes a division by zero panic.

func SlotIndex

func SlotIndex(slotEnd time.Time, step time.Duration, size int64) int64

Given a slot timestamp, RRA step and size, return the slot's (0-based) index in the data points array. Size of zero causes a division by zero panic.

func SlotTime

func SlotTime(n int64, latest time.Time, step time.Duration, size int64) time.Time

Given time of the latest slot, step and size, return the timestamp on which slot n ends. Size of zero causes a division by zero panic.


type ClockPdp

type ClockPdp struct {
	End time.Time

ClockPdp is a PDP that uses current time to determine the duration.

func (*ClockPdp) AddValue

func (p *ClockPdp) AddValue(val float64)

AddValue adds a value using a weighted mean. The first time it is called it only updates the End field and returns a zero value, the second and on will use End to compute the duration.

type Consolidation

type Consolidation int
const (
	WMEAN Consolidation = iota // Time-weighted average
	MAX                        // Max
	MIN                        // Min
	LAST                       // Last

type DSSpec

type DSSpec struct {
	Step      time.Duration
	Heartbeat time.Duration
	RRAs      []RRASpec

	// These can be used to fill the initial value
	LastUpdate time.Time
	Value      float64
	Duration   time.Duration

DSSpec describes a DataSource. DSSpec is a schema that is used to create the DataSource, as an argument to NewDataSource(). DSSpec is used in configuration describing how a DataSource must be created on-the-fly.

type DataSource

type DataSource struct {
	// contains filtered or unexported fields

DataSource contains a time series and its parameters, RRAs and intermediate state (PDP). The DS PDP is the smallest unit of accumulation for this series, all RRAs should have PDPs that are a multiple of the DS PDP. The DS PDP supports only weighted mean as its consolidation function. The reason for not supporting min/max/last/avg (at least initially) is that additional state is required to maintain those, while it seems like that is better done in other places, e.g. the Aggregator anyhow.

func NewDataSource

func NewDataSource(spec DSSpec) *DataSource

NewDataSource returns a new DataSource in accordance with the passed in DSSpec.

func (*DataSource) BestRRA

func (ds *DataSource) BestRRA(start, end time.Time, points int64) RoundRobinArchiver

BestRRA examines the RRAs and returns the one that best matches the given start, end and resolution (as number of points).

func (*DataSource) ClearRRAs

func (ds *DataSource) ClearRRAs()

ClearRRAs clears the data in all RRAs. It is meant to be called immedately after flushing the DS to permanent storage.

func (*DataSource) Copy

func (ds *DataSource) Copy() DataSourcer

Returns a complete copy of this Data Source

func (*DataSource) Heartbeat

func (ds *DataSource) Heartbeat() time.Duration

Heartbeat is the time interval size which if passed without any data renders the interval data NaN. Another way of looking at HB is this is how far back we go to connect adjacent data points. If the points are further apart than HB, the value in between becomes NaN.

A special value of 0 changes the behavior to be closer to that of Whisper files. Whisper logic assigns data points to slots and the last data point to arrive overwrites any previous value in the slot. The duration assigned to the data point is the PDP step, which causes it to be immediately moved to the RRAs. Note that multiple data points in the same PDP will cause multiple RRA updates, and the resulting RRA value is subject to whatever consolidation function the RRA uses. In the case of HB 0 MAX might be more appropriate than WMEAN (default).

Rationale for using HB 0 for this is that an HB of 0 doesn't make much sense otherwise: if a gap larger than HB is filled with NaNs (or just ignored, implementation detail), then HB of zero any incoming value strictly speaking ought to become NaN. We could treat 0 HB as "no limit to how far we go", but a "we just store the value without going back" is a nice compromise and it is actually useful when we want to mimic Whisper-like behavior. One example is using data points to denote that something happened, i.e. "deploy success".

func (*DataSource) LastUpdate

func (ds *DataSource) LastUpdate() time.Time

LastUpdate returns the timestamp of the last Data Point processed

func (*DataSource) PointCount

func (ds *DataSource) PointCount() int

PointCount returns the sum of all point counts of every RRA in this DS.

func (*DataSource) ProcessDataPoint

func (ds *DataSource) ProcessDataPoint(value float64, ts time.Time) error

ProcessDataPoint checks the values and updates the DS PDP. If this the very first call for this DS (lastUpdate is 0), then it only sets lastUpdate and returns.

func (*DataSource) RRAs

func (ds *DataSource) RRAs() []RoundRobinArchiver

List of Round Robin Archives this Data Source has

func (*DataSource) SetRRAs

func (ds *DataSource) SetRRAs(rras []RoundRobinArchiver)

SetRRAs provides a way to set the RRAs (which may contain data)

func (*DataSource) Spec

func (ds *DataSource) Spec() DSSpec

Return a DSSpec corresponding to this DS

func (*DataSource) Step

func (ds *DataSource) Step() time.Duration

Step returns the step, i.e. the size of the PDP. All RRAs this DS has must have steps that are a multiple of this Step.

type DataSourcer

type DataSourcer interface {
	Step() time.Duration
	Heartbeat() time.Duration
	LastUpdate() time.Time
	RRAs() []RoundRobinArchiver
	SetRRAs(rras []RoundRobinArchiver)
	Copy() DataSourcer
	BestRRA(start, end time.Time, points int64) RoundRobinArchiver
	PointCount() int
	ProcessDataPoint(value float64, ts time.Time) error
	Spec() DSSpec

DataSourcer is a DataSource as an interface.

type Pdp

type Pdp struct {
	// contains filtered or unexported fields

Pdp is a Primary Data Point. It provides intermediate state and logic to interpolate and store incoming DP data in a consolidated way using weighted mean.

This is an illustration of how incoming data points are consolidated into a PDP using weighted mean. The PDP below is 4 units long (the actual unit is not relevant). It shows a time period during which 3 values (measurements) arrived: 1.0 at 1, 3.0 at 3 and 2.0 at 4. The final value of this PDP is 2.25.

||    +---------+    ||
||    |	    3.0 +----||
||----+	        | 2.0||
|| 1.0|	        |    ||
 0    1    2    3     4  ---> time

In this PDP 0.25 of the value is 1.0, 0.50 is 3.0 and 0.25 is 2.0, for a total of 0.25*1 + 0.50*3 + 0.25*2 = 2.25.

If a part of the data point is NaN, then that part does not count. Even if NaN is at the end:

||    +---------+    ||
||    |	     3.0|    ||
||----+	        | NaN||
|| 1.0|	        |    ||
 0    1    2    3     4  ---> time

In the above PDP, the size is what is taken up by 1.0 and 3.0, without the NaN. Thus 1/3 of the value is 1.0 and 2/3 of the value is 3.0, for a total of 1/3*1 + 2/3*3 = 2.33333.

An alternative way of looking at the above data point is that it is simply shorter or has a shorter duration:

||    +---------||
||    |      3.0||
||----+         ||
|| 1.0|         ||
 0    1    2    3     4  ---> time

A datapoint must be all NaN for its value to be NaN. If duration is 0, then the value is irrelevant.

To create an "empty" Pdp, simply use its zero value.

func (*Pdp) AddValue

func (p *Pdp) AddValue(val float64, dur time.Duration)

AddValue adds a value to a PDP using weighted mean.

func (*Pdp) AddValueLast

func (p *Pdp) AddValueLast(val float64, dur time.Duration)

AddValueLast replaces the current value. This is different from SetValue in that it's a noop if val is NaN or dur is 0.

func (*Pdp) AddValueMax

func (p *Pdp) AddValueMax(val float64, dur time.Duration)

AddValueMax adds a value using max. A non-NaN value is considered greater than zero value (duration 0) or NaN.

func (*Pdp) AddValueMin

func (p *Pdp) AddValueMin(val float64, dur time.Duration)

AddValueMin adds a value using min. A non-NaN value is considered lesser than zero value (duration 0) or NaN.

func (*Pdp) Duration

func (p *Pdp) Duration() time.Duration

func (*Pdp) Reset

func (p *Pdp) Reset() float64

Reset sets the value to zero value and returns the value of the PDP before Reset.

func (*Pdp) SetValue

func (p *Pdp) SetValue(val float64, dur time.Duration)

SetValue sets both value and duration of the PDP

func (*Pdp) Value

func (p *Pdp) Value() float64

The current value of the PDP. If duration is 0, this returns NaN.

type Pdper

type Pdper interface {
	Value() float64
	Duration() time.Duration

type RRASpec

type RRASpec struct {
	Function Consolidation
	Step     time.Duration // duration of a single step
	Span     time.Duration // duration of the whole series (should be multiple of step)
	Xff      float32

	// These can be used to fill the initial value
	Latest   time.Time
	Value    float64
	Duration time.Duration
	DPs      map[int64]float64 // Careful, these are round-robin

RRASpec is the RRA definition for NewRoundRobinArchive.

type RoundRobinArchive

type RoundRobinArchive struct {
	// Each RRA has its own PDP (duration and value). Note that
	// whatever fits in the DS PDP step will reside there, but
	// anything exceeding the time period that DS PDP can hold will
	// trickle down to RRA PDPs, until they are add to DPs.
	// contains filtered or unexported fields

A Round Robin Archive and all its parameters.

func NewRoundRobinArchive

func NewRoundRobinArchive(spec RRASpec) *RoundRobinArchive

Returns a new RRA in accordance with the provided RRASpec.

func (*RoundRobinArchive) Begins

func (rra *RoundRobinArchive) Begins(now time.Time) time.Time

Begins returns the timestamp of the beginning of this RRA assuming that that the argument "now" is within it. This will be a time approximately but not exactly the RRA length ago, because it is aligned on the RRA step boundary.

func (*RoundRobinArchive) Copy

Returns a complete copy of the RRA.

func (*RoundRobinArchive) DPs

func (rra *RoundRobinArchive) DPs() map[int64]float64

Dps returns data points as a map of floats. It's a map rather than a slice to be more space-efficient for sparse series.

func (*RoundRobinArchive) Latest

func (rra *RoundRobinArchive) Latest() time.Time

Latest returns the time on which the last slot ends.

func (*RoundRobinArchive) PointCount

func (rra *RoundRobinArchive) PointCount() int

PointCount returns the number of points in this RRA.

func (*RoundRobinArchive) Size

func (rra *RoundRobinArchive) Size() int64

Number of data points in this RRA

func (*RoundRobinArchive) Spec

func (rra *RoundRobinArchive) Spec() RRASpec

Spec matching this RRA

func (*RoundRobinArchive) Step

func (rra *RoundRobinArchive) Step() time.Duration

Step of this RRA

type RoundRobinArchiver

type RoundRobinArchiver interface {
	Latest() time.Time
	Step() time.Duration
	Size() int64
	PointCount() int
	DPs() map[int64]float64
	Copy() RoundRobinArchiver
	Begins(now time.Time) time.Time
	Spec() RRASpec
	// contains filtered or unexported methods

RoundRobinArchive as an interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL