Package rrd contains the logic for updating in-memory partial Round-Robin Archives from incoming and usually unevenly-spaced data points by converting the input to a rate, consolidating and aggregating the data across a list of evenly spaced series of pre-defined resolution and time span.

    Throughout this documentation and code the following terms are used (sometimes as abbreviations, listed in parenthesis):

    Round-Robin Database (RRD): Collectively all the logic in this package and an instance of the data it maintains is referred to as an RRD.

    Data Point (DP): There actually isn't a data structure representing a data point. A datapoint is just a float64.

    Round-Robin Archive (RRA): An array of data points at a specific resolutoin and going back a pre-defined duration of time.

    Primary Data Point (PDP): A conceptual value which represents a step-sized time slot in a series. Many (or none) actual data points can come in and fall into a PDP. Each DS and each RRA maintain a current (not yet complete) PDP, whose value upon completion is used to update PDPs of lower resolution (i.e. larger Step/PDP) RRAs.

    Data Sourse (DS): Data Source loosely represents a "time series": the smalles resolution (PDP size) and other parameters, as well as the data. A DS should have at least one, but usually several RRAs.

    Step: Step is the smallest unit of time of a DS and/or RRA's in milliseconds. RRA resolutions and sizes must be multiples of the DS step they belong to. In this implementation a step cannot be smaller than a millisecond.

    DS Heartbeat (HB): Duration of time that can pass without data. A gap in data which exceeds HB is filled with NaNs.

    Note that this package does not concern itself with loading a series from storage for analysis.



    This section is empty.


    This section is empty.


    func IndexDistance

    func IndexDistance(i, j, size int64) int64

      Distance between i and j indexes in an RRA. If i > j (the RRA wraps around) then it is the sum of the distance from i to the end and the beginning to j. Size of 0 causes a division by zero panic.

      func SlotIndex

      func SlotIndex(slotEnd time.Time, step time.Duration, size int64) int64

        Given a slot timestamp, RRA step and size, return the slot's (0-based) index in the data points array. Size of zero causes a division by zero panic.

        func SlotTime

        func SlotTime(n int64, latest time.Time, step time.Duration, size int64) time.Time

          Given time of the latest slot, step and size, return the timestamp on which slot n ends. Size of zero causes a division by zero panic.


          type ClockPdp

          type ClockPdp struct {
          	End time.Time

            ClockPdp is a PDP that uses current time to determine the duration.

            func (*ClockPdp) AddValue

            func (p *ClockPdp) AddValue(val float64)

              AddValue adds a value using a weighted mean. The first time it is called it only updates the End field and returns a zero value, the second and on will use End to compute the duration.

              type Consolidation

              type Consolidation int
              const (
              	WMEAN Consolidation = iota // Time-weighted average
              	MAX                        // Max
              	MIN                        // Min
              	LAST                       // Last

              type DSSpec

              type DSSpec struct {
              	Step      time.Duration
              	Heartbeat time.Duration
              	RRAs      []RRASpec
              	// These can be used to fill the initial value
              	LastUpdate time.Time
              	Value      float64
              	Duration   time.Duration

                DSSpec describes a DataSource. DSSpec is a schema that is used to create the DataSource, as an argument to NewDataSource(). DSSpec is used in configuration describing how a DataSource must be created on-the-fly.

                type DataSource

                type DataSource struct {
                	// contains filtered or unexported fields

                  DataSource contains a time series and its parameters, RRAs and intermediate state (PDP). The DS PDP is the smallest unit of accumulation for this series, all RRAs should have PDPs that are a multiple of the DS PDP. The DS PDP supports only weighted mean as its consolidation function. The reason for not supporting min/max/last/avg (at least initially) is that additional state is required to maintain those, while it seems like that is better done in other places, e.g. the Aggregator anyhow.

                  func NewDataSource

                  func NewDataSource(spec DSSpec) *DataSource

                    NewDataSource returns a new DataSource in accordance with the passed in DSSpec.

                    func (*DataSource) BestRRA

                    func (ds *DataSource) BestRRA(start, end time.Time, points int64) RoundRobinArchiver

                      BestRRA examines the RRAs and returns the one that best matches the given start, end and resolution (as number of points).

                      func (*DataSource) ClearRRAs

                      func (ds *DataSource) ClearRRAs()

                        ClearRRAs clears the data in all RRAs. It is meant to be called immedately after flushing the DS to permanent storage.

                        func (*DataSource) Copy

                        func (ds *DataSource) Copy() DataSourcer

                          Returns a complete copy of this Data Source

                          func (*DataSource) Heartbeat

                          func (ds *DataSource) Heartbeat() time.Duration

                            Heartbeat is the time interval size which if passed without any data renders the interval data NaN. Another way of looking at HB is this is how far back we go to connect adjacent data points. If the points are further apart than HB, the value in between becomes NaN.

                            A special value of 0 changes the behavior to be closer to that of Whisper files. Whisper logic assigns data points to slots and the last data point to arrive overwrites any previous value in the slot. The duration assigned to the data point is the PDP step, which causes it to be immediately moved to the RRAs. Note that multiple data points in the same PDP will cause multiple RRA updates, and the resulting RRA value is subject to whatever consolidation function the RRA uses. In the case of HB 0 MAX might be more appropriate than WMEAN (default).

                            Rationale for using HB 0 for this is that an HB of 0 doesn't make much sense otherwise: if a gap larger than HB is filled with NaNs (or just ignored, implementation detail), then HB of zero any incoming value strictly speaking ought to become NaN. We could treat 0 HB as "no limit to how far we go", but a "we just store the value without going back" is a nice compromise and it is actually useful when we want to mimic Whisper-like behavior. One example is using data points to denote that something happened, i.e. "deploy success".

                            func (*DataSource) LastUpdate

                            func (ds *DataSource) LastUpdate() time.Time

                              LastUpdate returns the timestamp of the last Data Point processed

                              func (*DataSource) PointCount

                              func (ds *DataSource) PointCount() int

                                PointCount returns the sum of all point counts of every RRA in this DS.

                                func (*DataSource) ProcessDataPoint

                                func (ds *DataSource) ProcessDataPoint(value float64, ts time.Time) error

                                  ProcessDataPoint checks the values and updates the DS PDP. If this the very first call for this DS (lastUpdate is 0), then it only sets lastUpdate and returns.

                                  func (*DataSource) RRAs

                                  func (ds *DataSource) RRAs() []RoundRobinArchiver

                                    List of Round Robin Archives this Data Source has

                                    func (*DataSource) SetRRAs

                                    func (ds *DataSource) SetRRAs(rras []RoundRobinArchiver)

                                      SetRRAs provides a way to set the RRAs (which may contain data)

                                      func (*DataSource) Spec

                                      func (ds *DataSource) Spec() DSSpec

                                        Return a DSSpec corresponding to this DS

                                        func (*DataSource) Step

                                        func (ds *DataSource) Step() time.Duration

                                          Step returns the step, i.e. the size of the PDP. All RRAs this DS has must have steps that are a multiple of this Step.

                                          type DataSourcer

                                          type DataSourcer interface {
                                          	Step() time.Duration
                                          	Heartbeat() time.Duration
                                          	LastUpdate() time.Time
                                          	RRAs() []RoundRobinArchiver
                                          	SetRRAs(rras []RoundRobinArchiver)
                                          	Copy() DataSourcer
                                          	BestRRA(start, end time.Time, points int64) RoundRobinArchiver
                                          	PointCount() int
                                          	ProcessDataPoint(value float64, ts time.Time) error
                                          	Spec() DSSpec

                                            DataSourcer is a DataSource as an interface.

                                            type Pdp

                                            type Pdp struct {
                                            	// contains filtered or unexported fields

                                              Pdp is a Primary Data Point. It provides intermediate state and logic to interpolate and store incoming DP data in a consolidated way using weighted mean.

                                              This is an illustration of how incoming data points are consolidated into a PDP using weighted mean. The PDP below is 4 units long (the actual unit is not relevant). It shows a time period during which 3 values (measurements) arrived: 1.0 at 1, 3.0 at 3 and 2.0 at 4. The final value of this PDP is 2.25.

                                              ||    +---------+    ||
                                              ||    |	    3.0 +----||
                                              ||----+	        | 2.0||
                                              || 1.0|	        |    ||
                                               0    1    2    3     4  ---> time

                                              In this PDP 0.25 of the value is 1.0, 0.50 is 3.0 and 0.25 is 2.0, for a total of 0.25*1 + 0.50*3 + 0.25*2 = 2.25.

                                              If a part of the data point is NaN, then that part does not count. Even if NaN is at the end:

                                              ||    +---------+    ||
                                              ||    |	     3.0|    ||
                                              ||----+	        | NaN||
                                              || 1.0|	        |    ||
                                               0    1    2    3     4  ---> time

                                              In the above PDP, the size is what is taken up by 1.0 and 3.0, without the NaN. Thus 1/3 of the value is 1.0 and 2/3 of the value is 3.0, for a total of 1/3*1 + 2/3*3 = 2.33333.

                                              An alternative way of looking at the above data point is that it is simply shorter or has a shorter duration:

                                              ||    +---------||
                                              ||    |      3.0||
                                              ||----+         ||
                                              || 1.0|         ||
                                               0    1    2    3     4  ---> time

                                              A datapoint must be all NaN for its value to be NaN. If duration is 0, then the value is irrelevant.

                                              To create an "empty" Pdp, simply use its zero value.

                                              func (*Pdp) AddValue

                                              func (p *Pdp) AddValue(val float64, dur time.Duration)

                                                AddValue adds a value to a PDP using weighted mean.

                                                func (*Pdp) AddValueLast

                                                func (p *Pdp) AddValueLast(val float64, dur time.Duration)

                                                  AddValueLast replaces the current value. This is different from SetValue in that it's a noop if val is NaN or dur is 0.

                                                  func (*Pdp) AddValueMax

                                                  func (p *Pdp) AddValueMax(val float64, dur time.Duration)

                                                    AddValueMax adds a value using max. A non-NaN value is considered greater than zero value (duration 0) or NaN.

                                                    func (*Pdp) AddValueMin

                                                    func (p *Pdp) AddValueMin(val float64, dur time.Duration)

                                                      AddValueMin adds a value using min. A non-NaN value is considered lesser than zero value (duration 0) or NaN.

                                                      func (*Pdp) Duration

                                                      func (p *Pdp) Duration() time.Duration

                                                      func (*Pdp) Reset

                                                      func (p *Pdp) Reset() float64

                                                        Reset sets the value to zero value and returns the value of the PDP before Reset.

                                                        func (*Pdp) SetValue

                                                        func (p *Pdp) SetValue(val float64, dur time.Duration)

                                                          SetValue sets both value and duration of the PDP

                                                          func (*Pdp) Value

                                                          func (p *Pdp) Value() float64

                                                            The current value of the PDP. If duration is 0, this returns NaN.

                                                            type Pdper

                                                            type Pdper interface {
                                                            	Value() float64
                                                            	Duration() time.Duration

                                                            type RRASpec

                                                            type RRASpec struct {
                                                            	Function Consolidation
                                                            	Step     time.Duration // duration of a single step
                                                            	Span     time.Duration // duration of the whole series (should be multiple of step)
                                                            	Xff      float32
                                                            	// These can be used to fill the initial value
                                                            	Latest   time.Time
                                                            	Value    float64
                                                            	Duration time.Duration
                                                            	DPs      map[int64]float64 // Careful, these are round-robin

                                                              RRASpec is the RRA definition for NewRoundRobinArchive.

                                                              type RoundRobinArchive

                                                              type RoundRobinArchive struct {
                                                              	// Each RRA has its own PDP (duration and value). Note that
                                                              	// whatever fits in the DS PDP step will reside there, but
                                                              	// anything exceeding the time period that DS PDP can hold will
                                                              	// trickle down to RRA PDPs, until they are add to DPs.
                                                              	// contains filtered or unexported fields

                                                                A Round Robin Archive and all its parameters.

                                                                func NewRoundRobinArchive

                                                                func NewRoundRobinArchive(spec RRASpec) *RoundRobinArchive

                                                                  Returns a new RRA in accordance with the provided RRASpec.

                                                                  func (*RoundRobinArchive) Begins

                                                                  func (rra *RoundRobinArchive) Begins(now time.Time) time.Time

                                                                    Begins returns the timestamp of the beginning of this RRA assuming that that the argument "now" is within it. This will be a time approximately but not exactly the RRA length ago, because it is aligned on the RRA step boundary.

                                                                    func (*RoundRobinArchive) Copy

                                                                      Returns a complete copy of the RRA.

                                                                      func (*RoundRobinArchive) DPs

                                                                      func (rra *RoundRobinArchive) DPs() map[int64]float64

                                                                        Dps returns data points as a map of floats. It's a map rather than a slice to be more space-efficient for sparse series.

                                                                        func (*RoundRobinArchive) Latest

                                                                        func (rra *RoundRobinArchive) Latest() time.Time

                                                                          Latest returns the time on which the last slot ends.

                                                                          func (*RoundRobinArchive) PointCount

                                                                          func (rra *RoundRobinArchive) PointCount() int

                                                                            PointCount returns the number of points in this RRA.

                                                                            func (*RoundRobinArchive) Size

                                                                            func (rra *RoundRobinArchive) Size() int64

                                                                              Number of data points in this RRA

                                                                              func (*RoundRobinArchive) Spec

                                                                              func (rra *RoundRobinArchive) Spec() RRASpec

                                                                                Spec matching this RRA

                                                                                func (*RoundRobinArchive) Step

                                                                                func (rra *RoundRobinArchive) Step() time.Duration

                                                                                  Step of this RRA

                                                                                  type RoundRobinArchiver

                                                                                  type RoundRobinArchiver interface {
                                                                                  	Latest() time.Time
                                                                                  	Step() time.Duration
                                                                                  	Size() int64
                                                                                  	PointCount() int
                                                                                  	DPs() map[int64]float64
                                                                                  	Copy() RoundRobinArchiver
                                                                                  	Begins(now time.Time) time.Time
                                                                                  	Spec() RRASpec
                                                                                  	// contains filtered or unexported methods

                                                                                    RoundRobinArchive as an interface