v0.0.0-...-116b634 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2021 License: Apache-2.0 Imports: 7 Imported by: 0



Package aggregator provides the ability to aggregate data points from various sources similar to statsd. On flush, aggregator passes the consolidated values to a DataPointQueuer (e.g. tgres.receiver). The aggregator only aggregates the data, it does not concern itself with the periodic flushing, that is the job of its user.



This section is empty.


This section is empty.


This section is empty.


type AggCmd

type AggCmd int
const (
	CmdAdd      AggCmd = iota // Add the value, the flushed value is a per second rate.
	CmdAddGauge               // Add the value, the flushed value is the sum as is (e.g. total traffic for all routers).
	CmdSetGauge               // Overwrite the value, the flushed value is the last value as is.
	CmdAppend                 // Append the value to a slice. The flushed values will be upper/lower/sum/mean and Threshold percentiles.

type Aggregator

type Aggregator interface {
	// Process an aggregator command, which is a data point with insturctions on how to process it.
	ProcessCmd(cmd *Command)
	// Flush all aggregations to the undelying DataPointQueuer. If now is zero, time.Now() is used.
	// All internal state is cleared after a flush.
	Flush(now time.Time)

The Aggregator keeps the intermediate state for all data that is being aggregated.

type Command

type Command struct {
	Hops int // For cluster forwarding
	// contains filtered or unexported fields

An aggregator command. Use NewCommand() to create one.

func NewCommand

func NewCommand(cmd AggCmd, ident serde.Ident, value float64) *Command

Create an aggregator command. The cmd argument dictates how the data will be aggregated, see AggCmd.

func (*Command) GobDecode

func (ac *Command) GobDecode(b []byte) error

func (*Command) GobEncode

func (ac *Command) GobEncode() ([]byte, error)

type DataPointQueuer

type DataPointQueuer interface {
	QueueDataPoint(serde.Ident, time.Time, float64)

type State

type State struct {
	Thresholds []int // List of percentiles for CmdAppend
	AppendAttr string
	// contains filtered or unexported fields

func NewAggregator

func NewAggregator(t DataPointQueuer) *State

Returns a new aggregator. The only argument needs to provide a QueueDataPoint() method which is what the aggregator will use to queue the aggregated points. The returned aggregator state has Thresholds set to {90}.

func (*State) Flush

func (a *State) Flush(now time.Time)

func (*State) ProcessCmd

func (a *State) ProcessCmd(cmd *Command)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL