Package aggregator provides the ability to aggregate data points from various sources similar to statsd. On flush, aggregator passes the consolidated values to a DataPointQueuer (e.g. tgres.receiver). The aggregator only aggregates the data, it does not concern itself with the periodic flushing, that is the job of its user.



    This section is empty.


    This section is empty.


    This section is empty.


    type AggCmd

    type AggCmd int
    const (
    	CmdAdd      AggCmd = iota // Add the value, the flushed value is a per second rate.
    	CmdAddGauge               // Add the value, the flushed value is the sum as is (e.g. total traffic for all routers).
    	CmdSetGauge               // Overwrite the value, the flushed value is the last value as is.
    	CmdAppend                 // Append the value to a slice. The flushed values will be upper/lower/sum/mean and Threshold percentiles.

    type Aggregator

    type Aggregator interface {
    	// Process an aggregator command, which is a data point with insturctions on how to process it.
    	ProcessCmd(cmd *Command)
    	// Flush all aggregations to the undelying DataPointQueuer. If now is zero, time.Now() is used.
    	// All internal state is cleared after a flush.
    	Flush(now time.Time)

      The Aggregator keeps the intermediate state for all data that is being aggregated.

      type Command

      type Command struct {
      	Hops int // For cluster forwarding
      	// contains filtered or unexported fields

        An aggregator command. Use NewCommand() to create one.

        func NewCommand

        func NewCommand(cmd AggCmd, ident serde.Ident, value float64) *Command

          Create an aggregator command. The cmd argument dictates how the data will be aggregated, see AggCmd.

          func (*Command) GobDecode

          func (ac *Command) GobDecode(b []byte) error

          func (*Command) GobEncode

          func (ac *Command) GobEncode() ([]byte, error)

          type DataPointQueuer

          type DataPointQueuer interface {
          	QueueDataPoint(serde.Ident, time.Time, float64)

          type State

          type State struct {
          	Thresholds []int // List of percentiles for CmdAppend
          	AppendAttr string
          	// contains filtered or unexported fields

          func NewAggregator

          func NewAggregator(t DataPointQueuer) *State

            Returns a new aggregator. The only argument needs to provide a QueueDataPoint() method which is what the aggregator will use to queue the aggregated points. The returned aggregator state has Thresholds set to {90}.

            func (*State) Flush

            func (a *State) Flush(now time.Time)

            func (*State) ProcessCmd

            func (a *State) ProcessCmd(cmd *Command)

            Source Files