v0.0.0-...-116b634 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2021 License: Apache-2.0 Imports: 20 Imported by: 0



Package receiver manages the receiving end of the data. All of the queueing, caching, perioding flushing and cluster forwarding logic is here.



This section is empty.


View Source
var DftDSSPec = &rrd.DSSpec{
	Step:      10 * time.Second,
	Heartbeat: 2 * time.Hour,
	RRAs: []rrd.RRASpec{
		rrd.RRASpec{Function: rrd.WMEAN,
			Step: 10 * time.Second,
			Span: 6 * time.Hour,
		rrd.RRASpec{Function: rrd.WMEAN,
			Step: 1 * time.Minute,
			Span: 24 * time.Hour,
		rrd.RRASpec{Function: rrd.WMEAN,
			Step: 10 * time.Minute,
			Span: 93 * 24 * time.Hour,
		rrd.RRASpec{Function: rrd.WMEAN,
			Step: 24 * time.Hour,
			Span: 1825 * 24 * time.Hour,

A default "reasonable" spec for those who do not want to think about it.


func ReportRcacheStats

func ReportRcacheStats(ndsf dsl.NamedDSFetcher, sr statReporter)

TODO: This is a hack, there should be a better way to report stats from outside the receiver package.


type MatchingDSSpecFinder

type MatchingDSSpecFinder interface {
	FindMatchingDSSpec(ident serde.Ident) *rrd.DSSpec

A DSSpec Finder can find a DSSpec for a name. For previously unknown DS names that need to be created on-the-fly this interface provides a mechanism for specifying DS/RRA configurations based on the name.

type Receiver

type Receiver struct {
	// Smallest step
	MinStep time.Duration

	// MaxReceiverQueueSize is the limit on the receiver queue. Points
	// are sent to /dev/null when this size is exceeded. Zero or a
	// negative value means unlimited.
	MaxReceiverQueueSize int

	// MaxMemoryBytes is the limit after which points are
	// discarded. It is based on runtime.ReadMemStats() and is rough
	// and approximate, but better than nothing.
	MaxMemoryBytes uint64

	StatFlushDuration time.Duration // Period after which stats are flushed
	StatsNamePrefix   string        // Stat names are prefixed with this

	ReportStats       bool   // report internal stats?
	ReportStatsPrefix string // prefix for internal stats

	// Number of workers and flushers
	NWorkers int

	Blaster *blaster.Blaster
	// contains filtered or unexported fields

Receiver receives and directs incoming datapoints to one of n workers, which is done to provide some parallelism, especially when it comes to flushing data to the database. The job of the workers is to update and maintain an in-memory RRD, and the job of the flushers is to persist the data to a database. The Receiver orchestrates this flow, providing a caching layer which reduces the database I/O.

The Receiver is cluster-aware. In a clustered set up points are forwarded to the node responsible for a particular DS.

The Receiver also creates an Aggregator which can aggregate metrics and send as aggregated data points periodically. In a clustered set up there is one Aggregator per cluster. Default aggregation period is 10 seconds.

Receiver also handles paced metrics. A paced metric is a metric that can come in at a very fast rate (e.g. counting function calls within a process). Paced metrics are similar to aggregator metrics, but in a clustered set up they are accumulated locally in the process, and then sent to the aggregator (counter) or to the receiver (gauge), at which point they may end up getting forwarded to the appropriate node for handling. By default metrics are paced to be send once per second.

func New

func New(serde serde.SerDe, finder MatchingDSSpecFinder) *Receiver

Create a Receiver. The first argument is a SerDe, the second is a MatchingDSSpecFinder used to match previously unknown DS names to a DSSpec with which the DS is to be created. If you pass nil, then the default SimpleDSFinder is used which always returns DftDSSPec.

func NewWithMaxQueue

func NewWithMaxQueue(db serde.SerDe, finder MatchingDSSpecFinder, maxQueue int) *Receiver

func (*Receiver) ClusterReady

func (r *Receiver) ClusterReady(ready bool)

In a clustered set up informes other nodes that we are ready to handle data.

func (*Receiver) Drain

func (r *Receiver) Drain()

Marks the receiver as stopped and waits for the channel to empty

func (*Receiver) DsCache

func (r *Receiver) DsCache() *dsCache

Return a pointer to dsCache

func (*Receiver) QueueAggregatorCommand

func (r *Receiver) QueueAggregatorCommand(agg *aggregator.Command)

Sends a data point (in the form of an aggregator.Command) to the aggregator.

func (*Receiver) QueueDataPoint

func (r *Receiver) QueueDataPoint(ident serde.Ident, ts time.Time, v float64)

Sends a data point to the receiver channel. A Data Source PDP always treats incoming data as a rate, it is the responsibility of the caller to present non-rate values such as counters as a rate. Consider using the Aggregator (QueueAggregatorCommand) or paced metrics (QueueSum/QueueGauge) for non-rate data.

func (*Receiver) QueueGauge

func (r *Receiver) QueueGauge(ident serde.Ident, v float64)

Send a gauge (i.e. a rate). This is a paced metric.

func (*Receiver) QueueSum

func (r *Receiver) QueueSum(ident serde.Ident, v float64)

Send a counter/sum. This is a paced metric which will periodically be passed to the aggregator and from the aggregator to the data source as a rate.

func (*Receiver) SetCluster

func (r *Receiver) SetCluster(c clusterer)

Make the receiver clustered. It will also cause internal stats to be prefixed with the node address by setting ReportStatsPrefix.

func (*Receiver) Start

func (r *Receiver) Start()

Before using the receiver it must be Started. This starts all the worker and flusher goroutines, etc.

func (*Receiver) Stop

func (r *Receiver) Stop()

Stops processing, waits for everything to finish and shuts down all workers/flushers.

type SimpleDSFinder

type SimpleDSFinder struct {

A simple DS finder always returns itself as the only DSSpec it knows.

func (*SimpleDSFinder) FindMatchingDSSpec

func (s *SimpleDSFinder) FindMatchingDSSpec(ident serde.Ident) *rrd.DSSpec

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL