aggregator

package
Version: v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2021 License: MPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSampleQueueSize = 2 * samplesInQueueAfterDiskWriteUpperBound
)

Variables

This section is empty.

Functions

func NewDiscard

func NewDiscard() core.Aggregator

NewDiscard returns Aggregator that just throws reported ammo away.

func NewEncoderAggregator

func NewEncoderAggregator(
	newEncoder NewSampleEncoder,
	conf EncoderAggregatorConfig,
) core.Aggregator

NewEncoderAggregator returns aggregator that use SampleEncoder to marshall samples to core.DataSink. Handles encoder flushing and sample dropping on queue overflow. putSample is optional func, that called on handled sample. Usually returns sample to pool.

func NewJSONLinesAggregator

func NewJSONLinesAggregator(conf JSONLineAggregatorConfig) core.Aggregator

Aggregates samples in JSON Lines format: each output line is a Valid JSON Value of one sample. See http://jsonlines.org/ for details.

func NewLog

func NewLog() core.Aggregator

Types

type EncoderAggregatorConfig

type EncoderAggregatorConfig struct {
	Sink           core.DataSink  `config:"sink" validate:"required"`
	BufferSize     int            `config:"buffer-size"`
	FlushInterval  time.Duration  `config:"flush-interval"`
	ReporterConfig ReporterConfig `config:",squash"`
}

func DefaultEncoderAggregatorConfig

func DefaultEncoderAggregatorConfig() EncoderAggregatorConfig

type JSONIterConfig

type JSONIterConfig struct {
	// MarshalFloatWith6Digits makes float marshalling faster.
	MarshalFloatWith6Digits bool `config:"marshal-float-with-6-digits"`
	// SortMapKeys useful, when sample contains map object, and you want to see them in same order.
	SortMapKeys bool `config:"sort-map-keys"`
}

JSONIterConfig is subset of jsoniter.Config that may be useful to configure.

type JSONLineAggregatorConfig

type JSONLineAggregatorConfig struct {
	EncoderAggregatorConfig `config:",squash"`
	JSONLineEncoderConfig   `config:",squash"`
}

func DefaultJSONLinesAggregatorConfig

func DefaultJSONLinesAggregatorConfig() JSONLineAggregatorConfig

type JSONLineEncoderConfig

type JSONLineEncoderConfig struct {
	JSONIterConfig            `config:",squash"`
	coreutil.BufferSizeConfig `config:",squash"`
}

type NewSampleEncoder

type NewSampleEncoder func(w io.Writer, onFlush func()) SampleEncoder

type Reporter

type Reporter struct {
	Incomming chan core.Sample
	// contains filtered or unexported fields
}

func NewReporter

func NewReporter(conf ReporterConfig) *Reporter

func (*Reporter) DroppedErr

func (a *Reporter) DroppedErr() error

func (*Reporter) Report

func (a *Reporter) Report(s core.Sample)

type ReporterConfig

type ReporterConfig struct {
	// SampleQueueSize is number maximum number of unhandled samples.
	// On queue overflow, samples are dropped.
	SampleQueueSize int `config:"sample-queue-size" validate:"min=1"`
}

func DefaultReporterConfig

func DefaultReporterConfig() ReporterConfig

type SampleEncodeCloser

type SampleEncodeCloser interface {
	SampleEncoder
	io.Closer
}

SampleEncoderCloser is SampleEncoder that REQUIRE Close call to finish encoding.

type SampleEncoder

type SampleEncoder interface {
	// SampleEncoder SHOULD panic, if passed sample type is not supported.
	Encode(s core.Sample) error
	// Flush flushes internal buffer to wrapped io.Writer.
	Flush() error
}

SampleEncoder is efficient, buffered encoder of samples. SampleEncoder MAY support only concrete type of sample. MAY also implement SampleEncodeCloser.

func NewJSONEncoder

func NewJSONEncoder(w io.Writer, conf JSONLineEncoderConfig) SampleEncoder

type SomeSamplesDropped

type SomeSamplesDropped struct {
	Dropped int64
}

func (*SomeSamplesDropped) Error

func (err *SomeSamplesDropped) Error() string

type Test

type Test struct {
	// contains filtered or unexported fields
}

func NewTest

func NewTest() *Test

func (*Test) GetSamples

func (t *Test) GetSamples() []core.Sample

func (*Test) Report

func (t *Test) Report(s core.Sample)

func (*Test) Run

func (t *Test) Run(ctx context.Context, _ core.AggregatorDeps) error

Directories

Path Synopsis
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto