steps

package
v0.0.60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2020 License: Apache-2.0 Imports: 32 Imported by: 1

Documentation

Index

Constants

View Source
const ConsoleBoxEndpoint = bitflow.EndpointType("box")
View Source
const DynamicSourceEndpointType = "dynamic"
View Source
const GeneratorSourceEndpointType = "generate"
View Source
const PrometheusMarshallingFormat = bitflow.MarshallingFormat("prometheus")

Variables

View Source
var (
	ConsoleBoxSettings = gotermBox.CliLogBox{
		NoUtf8:        false,
		LogLines:      10,
		MessageBuffer: 500,
	}
	ConsoleBoxUpdateInterval    = 500 * time.Millisecond
	ConsoleBoxMinUpdateInterval = 50 * time.Millisecond

	// ConsoleBoxOutputTestMode is a flag used by tests to suppress initialization routines
	// that are not testable. It is a hack to keep the EndpointFactory easy to use
	// while making it testable.
	ConsoleBoxOutputTestMode = false
)
View Source
var BatchProcessorParameters = reg.RegisteredParameters{}.
	Optional("flush-tags", reg.List(reg.String()), []string{}, "Flush the current batch when one or more of the given tags change").
	Optional("flush-no-samples-timeout", reg.Duration(), time.Duration(0)).
	Optional("flush-sample-lag-timeout", reg.Duration(), time.Duration(0)).
	Optional("flush-num-samples", reg.Int(), 0).
	Optional("flush-time-diff", reg.Duration(), time.Duration(0)).
	Optional("ignore-close", reg.Bool(), false, "Do not flush the remaining samples, when the pipeline is closed", "The default behavior is to flush on close").
	Optional("forward-immediately", reg.Bool(), false, "In addition to the regular batching functionality, output each incoming sample immediately", "This will possibly duplicate each incoming sample, since the regular batch processing results are forwarded as well")

TODO "ignore-header-change"

View Source
var DynamicSourceParameters = reg.RegisteredParameters{}.
	Optional("update-time", reg.Duration(), 2*time.Second)
View Source
var GeneratorSourceParameters = reg.RegisteredParameters{}.
	Optional("interval", reg.Duration(), 500*time.Millisecond)
View Source
var MetricSplitterDescription = "" /* 237-byte string literal not displayed */

Functions

func AddDecoupleStep

func AddDecoupleStep(p *bitflow.SamplePipeline, params map[string]interface{}) error

func AddTagChangeListenerParams added in v0.0.42

func AddTagChangeListenerParams(step *reg.RegisteredStep)

func AppendToSample

func AppendToSample(s *bitflow.Sample, values []float64)

func FillSample

func FillSample(s *bitflow.Sample, values []float64)

func FillSampleFromMatrix

func FillSampleFromMatrix(s *bitflow.Sample, row int, mat *mat.Dense)

func FillSamplesFromMatrix

func FillSamplesFromMatrix(s []*bitflow.Sample, mat *mat.Dense)

func GetMinMax

func GetMinMax(header *bitflow.Header, samples []*bitflow.Sample) ([]float64, []float64)

func IsValidNumber

func IsValidNumber(val float64) bool

func MakeBatchProcessor added in v0.0.28

func MakeBatchProcessor(params map[string]interface{}) (res *bitflow.BatchProcessor, err error)

func NewSampleShuffler

func NewSampleShuffler() *bitflow.SimpleBatchProcessingStep

func NewTagMapper added in v0.0.52

func NewTagMapper(sourceTag, targetTag string, mapping map[string]string) bitflow.SampleProcessor

func NewTaggingProcessor

func NewTaggingProcessor(tags map[string]string) bitflow.SampleProcessor

func RegisterAppendTimeDifference

func RegisterAppendTimeDifference(b reg.ProcessorRegistry)

func RegisterConsoleBoxOutput added in v0.0.52

func RegisterConsoleBoxOutput(e *bitflow.EndpointFactory)

func RegisterDecouple

func RegisterDecouple(b reg.ProcessorRegistry)

func RegisterDrop

func RegisterDrop(b reg.ProcessorRegistry)

func RegisterDropErrorsStep

func RegisterDropErrorsStep(b reg.ProcessorRegistry)

func RegisterDropInvalid added in v0.0.50

func RegisterDropInvalid(b reg.ProcessorRegistry)

func RegisterDuplicateTimestampFilter

func RegisterDuplicateTimestampFilter(b reg.ProcessorRegistry)

func RegisterDynamicSource added in v0.0.37

func RegisterDynamicSource(factory *bitflow.EndpointFactory)

func RegisterExcludeMetricsFilter

func RegisterExcludeMetricsFilter(b reg.ProcessorRegistry)

func RegisterExecutable added in v0.0.58

func RegisterExecutable(b reg.ProcessorRegistry, description string) error

func RegisterExpression

func RegisterExpression(b reg.ProcessorRegistry)

func RegisterFillUpStep

func RegisterFillUpStep(b reg.ProcessorRegistry)

func RegisterFilterExpression

func RegisterFilterExpression(b reg.ProcessorRegistry)

func RegisterForks

func RegisterForks(b reg.ProcessorRegistry)

This function is placed in this package to avoid circular dependency between the fork and the query package.

func RegisterGeneratorSource added in v0.0.42

func RegisterGeneratorSource(factory *bitflow.EndpointFactory)

func RegisterGraphiteOutput

func RegisterGraphiteOutput(b reg.ProcessorRegistry)

func RegisterHttpTagger

func RegisterHttpTagger(b reg.ProcessorRegistry)

func RegisterIncludeMetricsFilter

func RegisterIncludeMetricsFilter(b reg.ProcessorRegistry)

func RegisterLoggingSteps

func RegisterLoggingSteps(b reg.ProcessorRegistry)

func RegisterMergeHeaders

func RegisterMergeHeaders(b reg.ProcessorRegistry)

func RegisterMetricMapper

func RegisterMetricMapper(b reg.ProcessorRegistry)

func RegisterMetricRenamer

func RegisterMetricRenamer(b reg.ProcessorRegistry)

func RegisterMetricSplitter added in v0.0.6

func RegisterMetricSplitter(b reg.ProcessorRegistry)

func RegisterNoop

func RegisterNoop(b reg.ProcessorRegistry)

func RegisterOpentsdbOutput

func RegisterOpentsdbOutput(b reg.ProcessorRegistry)

func RegisterOutputFiles

func RegisterOutputFiles(b reg.ProcessorRegistry)

func RegisterParseTags

func RegisterParseTags(b reg.ProcessorRegistry)

func RegisterPauseTagger

func RegisterPauseTagger(b reg.ProcessorRegistry)

func RegisterPickHead

func RegisterPickHead(b reg.ProcessorRegistry)

func RegisterPickPercent

func RegisterPickPercent(b reg.ProcessorRegistry)

func RegisterPickTail

func RegisterPickTail(b reg.ProcessorRegistry)

func RegisterPipelineRateSynchronizer

func RegisterPipelineRateSynchronizer(b reg.ProcessorRegistry)

func RegisterPrometheusMarshaller added in v0.0.52

func RegisterPrometheusMarshaller(endpoints *bitflow.EndpointFactory)

func RegisterResendStep

func RegisterResendStep(b reg.ProcessorRegistry)

func RegisterSampleShuffler

func RegisterSampleShuffler(b reg.ProcessorRegistry)

func RegisterSampleSorter

func RegisterSampleSorter(b reg.ProcessorRegistry)

func RegisterSetCurrentTime

func RegisterSetCurrentTime(b reg.ProcessorRegistry)

func RegisterSkipHead

func RegisterSkipHead(b reg.ProcessorRegistry)

func RegisterSleep

func RegisterSleep(b reg.ProcessorRegistry)

func RegisterStoreStats

func RegisterStoreStats(b reg.ProcessorRegistry)

func RegisterStripMetrics

func RegisterStripMetrics(b reg.ProcessorRegistry)

func RegisterSubProcessRunner added in v0.0.55

func RegisterSubProcessRunner(b reg.ProcessorRegistry)

func RegisterSubpipelineStreamMerger

func RegisterSubpipelineStreamMerger(b reg.ProcessorRegistry)

func RegisterTagChangeRunner added in v0.0.42

func RegisterTagChangeRunner(b reg.ProcessorRegistry)

func RegisterTagMapping added in v0.0.52

func RegisterTagMapping(b reg.ProcessorRegistry)

func RegisterTagSynchronizer

func RegisterTagSynchronizer(b reg.ProcessorRegistry)

func RegisterTaggingProcessor

func RegisterTaggingProcessor(b reg.ProcessorRegistry)

func RegisterVarianceMetricsFilter

func RegisterVarianceMetricsFilter(b reg.ProcessorRegistry)

func SampleToVector

func SampleToVector(sample *bitflow.Sample) []float64

func ScaleMinMax

func ScaleMinMax(val, min, max, outputMin, outputMax float64) float64

func ScaleStddev

func ScaleStddev(val float64, mean, stddev, min, max float64) float64

func SendPeriodically

func SendPeriodically(sample *bitflow.Sample, header *bitflow.Header, receiver bitflow.SampleSink, interval time.Duration, wg *sync.WaitGroup) golib.StopChan

func SplitShellCommand

func SplitShellCommand(s string) []string

func ValuesToVector

func ValuesToVector(input []bitflow.Value) []float64

Types

type AbstractBatchMetricMapper

type AbstractBatchMetricMapper struct {
	Description      fmt.Stringer
	ConstructIndices func(header *bitflow.Header, samples []*bitflow.Sample) ([]int, []string)
}

func NewMetricVarianceFilter

func NewMetricVarianceFilter(minimumWeightedStddev float64) *AbstractBatchMetricMapper

func (*AbstractBatchMetricMapper) ProcessBatch

func (mapper *AbstractBatchMetricMapper) ProcessBatch(header *bitflow.Header, samples []*bitflow.Sample) (*bitflow.Header, []*bitflow.Sample, error)

func (*AbstractBatchMetricMapper) String

func (mapper *AbstractBatchMetricMapper) String() string

type AbstractMetricFilter

type AbstractMetricFilter struct {
	AbstractMetricMapper
	IncludeFilter func(name string) bool // Return true if metric should be included
}

type AbstractMetricMapper

type AbstractMetricMapper struct {
	bitflow.NoopProcessor
	Description      fmt.Stringer
	ConstructIndices func(header *bitflow.Header) ([]int, []string)
	// contains filtered or unexported fields
}

func (*AbstractMetricMapper) Sample

func (m *AbstractMetricMapper) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*AbstractMetricMapper) String

func (m *AbstractMetricMapper) String() string

type BlockManager

type BlockManager struct {
	// contains filtered or unexported fields
}

func NewBlockManager

func NewBlockManager() *BlockManager

func (*BlockManager) GetList

func (m *BlockManager) GetList(key string) *BlockerList

func (*BlockManager) NewBlocker

func (m *BlockManager) NewBlocker(key string) *BlockingProcessor

func (*BlockManager) NewReleaser

func (m *BlockManager) NewReleaser(key string) *ReleasingProcessor

func (*BlockManager) RegisterBlockingProcessor

func (m *BlockManager) RegisterBlockingProcessor(b reg.ProcessorRegistry)

func (*BlockManager) RegisterReleasingProcessor

func (m *BlockManager) RegisterReleasingProcessor(b reg.ProcessorRegistry)

type BlockerList

type BlockerList struct {
	Blockers []*BlockingProcessor
}

func (*BlockerList) Add

func (l *BlockerList) Add(blocker *BlockingProcessor)

func (*BlockerList) ReleaseAll

func (l *BlockerList) ReleaseAll()

type BlockingProcessor

type BlockingProcessor struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

func (*BlockingProcessor) Close

func (p *BlockingProcessor) Close()

func (*BlockingProcessor) Release

func (p *BlockingProcessor) Release()

func (*BlockingProcessor) Sample

func (p *BlockingProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*BlockingProcessor) String

func (p *BlockingProcessor) String() string

type ConsoleBoxSink added in v0.0.52

type ConsoleBoxSink struct {
	bitflow.AbstractSampleOutput
	gotermBox.CliLogBoxTask

	// ImmediateScreenUpdate causes the console box to be updated immediately
	// whenever a sample is received by this ConsoleBoxSink. Otherwise, the screen
	// will be updated in regular intervals based on the settings in CliLogBoxTask.
	ImmediateScreenUpdate bool
	// contains filtered or unexported fields
}

ConsoleBoxSink implements the SampleSink interface by printing the received samples to the standard out. Contrary to the ConsoleSink, the screen is erased before printing a new sample, and the output is embedded in a box that shows the last lines of log output at the bottom. ConsoleBoxSink does not implement MarshallingSampleSink, because it uses its own, fixed marshaller.

Multiple embedded fields provide access to configuration options.

Init() must be called as early as possible when using ConsoleBoxSink, to make sure that all log messages are capture and none are overwritten by the box.

func (*ConsoleBoxSink) Close added in v0.0.52

func (sink *ConsoleBoxSink) Close()

Close implements the SampleSink interface. It stops the screen refresh goroutine.

func (*ConsoleBoxSink) Sample added in v0.0.52

func (sink *ConsoleBoxSink) Sample(sample *bitflow.Sample, header *bitflow.Header) error

Sample implements the SampleSink interface. The latest sample is stored and displayed on the console on the next screen refresh. Intermediate samples might get lost without being displayed.

func (*ConsoleBoxSink) Start added in v0.0.52

func (sink *ConsoleBoxSink) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleSink interface. It starts a goroutine that regularly refreshes the screen to display the current sample values and latest log output lines.

func (*ConsoleBoxSink) Stop added in v0.0.52

func (sink *ConsoleBoxSink) Stop()

Stop shadows the Stop() method from gotermBox.CliLogBoxTask to make sure that this SampleSink is actually closed in the Close() method.

func (*ConsoleBoxSink) String added in v0.0.52

func (sink *ConsoleBoxSink) String() string

String implements the SampleSink interface.

func (*ConsoleBoxSink) WritesToConsole added in v0.0.52

func (sink *ConsoleBoxSink) WritesToConsole() bool

Implement the bitflow.ConsoleSampleSink interface

type DecouplingProcessor

type DecouplingProcessor struct {
	bitflow.NoopProcessor

	ChannelBuffer int // Must be set before calling Start()
	// contains filtered or unexported fields
}

Decouple the incoming samples from the MetricSink through a looping goroutine and a channel. Creates potential parallelism in the pipeline.

func (*DecouplingProcessor) Close

func (p *DecouplingProcessor) Close()

func (*DecouplingProcessor) Sample

func (p *DecouplingProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*DecouplingProcessor) Start

func (*DecouplingProcessor) String

func (p *DecouplingProcessor) String() string

type DropErrorsProcessor

type DropErrorsProcessor struct {
	bitflow.NoopProcessor
	LogError   bool
	LogWarning bool
	LogDebug   bool
	LogInfo    bool
}

func (*DropErrorsProcessor) Sample

func (p *DropErrorsProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*DropErrorsProcessor) String

func (p *DropErrorsProcessor) String() string

type DynamicSource added in v0.0.37

type DynamicSource struct {
	bitflow.AbstractSampleSource
	URL          string
	FetchTimeout time.Duration
	Endpoints    *bitflow.EndpointFactory
	// contains filtered or unexported fields
}

func (*DynamicSource) Close added in v0.0.37

func (s *DynamicSource) Close()

func (*DynamicSource) Start added in v0.0.37

func (s *DynamicSource) Start(wg *sync.WaitGroup) (_ golib.StopChan)

func (*DynamicSource) String added in v0.0.37

func (s *DynamicSource) String() string

type Expression

type Expression struct {
	// contains filtered or unexported fields
}

func NewExpression

func NewExpression(expressionString string) (*Expression, error)

func (*Expression) Evaluate

func (p *Expression) Evaluate(sample *bitflow.Sample, header *bitflow.Header) (*bitflow.Sample, *bitflow.Header, error)

func (*Expression) EvaluateBool

func (p *Expression) EvaluateBool(sample *bitflow.Sample, header *bitflow.Header) (bool, error)

func (*Expression) UpdateHeader

func (p *Expression) UpdateHeader(header *bitflow.Header) error

type ExpressionProcessor

type ExpressionProcessor struct {
	bitflow.NoopProcessor
	Filter bool
	// contains filtered or unexported fields
}

func (*ExpressionProcessor) AddExpression

func (p *ExpressionProcessor) AddExpression(expressionString string) error

func (*ExpressionProcessor) MergeProcessor

func (p *ExpressionProcessor) MergeProcessor(otherProcessor bitflow.SampleProcessor) bool

func (*ExpressionProcessor) Sample

func (p *ExpressionProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*ExpressionProcessor) String

func (p *ExpressionProcessor) String() string

type FeatureStats

type FeatureStats struct {
	onlinestats.Running
	Min float64
	Max float64
}

func GetStats

func GetStats(header *bitflow.Header, samples []*bitflow.Sample) []FeatureStats

func NewFeatureStats

func NewFeatureStats() *FeatureStats

func (*FeatureStats) Push

func (stats *FeatureStats) Push(values ...float64)

func (*FeatureStats) Reset added in v0.0.56

func (stats *FeatureStats) Reset()

func (*FeatureStats) ScaleMinMax

func (stats *FeatureStats) ScaleMinMax(val float64, outputMin, outputMax float64) float64

func (*FeatureStats) ScaleStddev

func (stats *FeatureStats) ScaleStddev(val float64) float64

type FillUpProcessor

type FillUpProcessor struct {
	bitflow.NoopProcessor
	MinMissingInterval time.Duration
	StepInterval       time.Duration
	// contains filtered or unexported fields
}

func (*FillUpProcessor) Sample

func (p *FillUpProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*FillUpProcessor) String

func (p *FillUpProcessor) String() string

type GeneratorSource added in v0.0.42

type GeneratorSource struct {
	bitflow.AbstractSampleSource
	// contains filtered or unexported fields
}

func (*GeneratorSource) Close added in v0.0.42

func (s *GeneratorSource) Close()

func (*GeneratorSource) Start added in v0.0.42

func (s *GeneratorSource) Start(wg *sync.WaitGroup) (_ golib.StopChan)

func (*GeneratorSource) String added in v0.0.42

func (s *GeneratorSource) String() string

type HttpTagger

type HttpTagger struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

func NewHttpTagger

func NewHttpTagger(pathPrefix string, r *mux.Router) *HttpTagger

func NewStandaloneHttpTagger

func NewStandaloneHttpTagger(pathPrefix string, endpoint string) *HttpTagger

func (*HttpTagger) HasTags

func (tagger *HttpTagger) HasTags() bool

func (*HttpTagger) NumTags

func (tagger *HttpTagger) NumTags() int

func (*HttpTagger) Sample

func (tagger *HttpTagger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*HttpTagger) String

func (tagger *HttpTagger) String() string

func (*HttpTagger) Tags

func (tagger *HttpTagger) Tags() map[string]string

type LoggingWriter added in v0.0.59

type LoggingWriter struct {
	Prefix string
	// contains filtered or unexported fields
}

func (*LoggingWriter) Flush added in v0.0.59

func (w *LoggingWriter) Flush()

func (*LoggingWriter) Write added in v0.0.59

func (w *LoggingWriter) Write(p []byte) (int, error)

type MetricFilter

type MetricFilter struct {
	AbstractMetricFilter
	// contains filtered or unexported fields
}

func NewMetricFilter

func NewMetricFilter() *MetricFilter

func (*MetricFilter) Exclude

func (filter *MetricFilter) Exclude(regex *regexp.Regexp) *MetricFilter

func (*MetricFilter) ExcludeRegex

func (filter *MetricFilter) ExcludeRegex(regexStr string) (*MetricFilter, error)

func (*MetricFilter) ExcludeStr

func (filter *MetricFilter) ExcludeStr(substr string) *MetricFilter

func (*MetricFilter) Include

func (filter *MetricFilter) Include(regex *regexp.Regexp) *MetricFilter

func (*MetricFilter) IncludeRegex

func (filter *MetricFilter) IncludeRegex(regexStr string) (*MetricFilter, error)

func (*MetricFilter) IncludeStr

func (filter *MetricFilter) IncludeStr(substr string) *MetricFilter

func (*MetricFilter) MergeProcessor

func (filter *MetricFilter) MergeProcessor(other bitflow.SampleProcessor) bool

func (*MetricFilter) String

func (filter *MetricFilter) String() string

type MetricMapper

type MetricMapper struct {
	AbstractMetricMapper
	Metrics []string
}

func NewMetricMapper

func NewMetricMapper(metrics []string) *MetricMapper

func (*MetricMapper) String

func (mapper *MetricMapper) String() string

type MetricMapperHelper

type MetricMapperHelper struct {
	bitflow.HeaderChecker
	// contains filtered or unexported fields
}

type MetricRenamer

type MetricRenamer struct {
	AbstractMetricMapper
	// contains filtered or unexported fields
}

func NewMetricRenamer

func NewMetricRenamer(regexes []*regexp.Regexp, replacements []string) *MetricRenamer

func (*MetricRenamer) MergeProcessor

func (r *MetricRenamer) MergeProcessor(other bitflow.SampleProcessor) bool

func (*MetricRenamer) String

func (r *MetricRenamer) String() string

type MetricSplitter added in v0.0.6

type MetricSplitter struct {
	NoopProcessor

	Splitters []*regexp.Regexp
	// contains filtered or unexported fields
}

func NewMetricSplitter added in v0.0.6

func NewMetricSplitter(regexes []string) (*MetricSplitter, error)

func (*MetricSplitter) Sample added in v0.0.6

func (m *MetricSplitter) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*MetricSplitter) Split added in v0.0.6

func (m *MetricSplitter) Split(sample *bitflow.Sample, header *bitflow.Header) []bitflow.SampleAndHeader

func (*MetricSplitter) String added in v0.0.6

func (m *MetricSplitter) String() string

type MetricWindow

type MetricWindow struct {
	// contains filtered or unexported fields
}

func NewMetricWindow

func NewMetricWindow(size int) *MetricWindow

func (*MetricWindow) Data

func (w *MetricWindow) Data() []bitflow.Value

func (*MetricWindow) Empty

func (w *MetricWindow) Empty() bool

func (*MetricWindow) FastData

func (w *MetricWindow) FastData() []bitflow.Value

Avoid copying if possible. Dangerous.

func (*MetricWindow) FillData

func (w *MetricWindow) FillData(target []bitflow.Value) []bitflow.Value

func (*MetricWindow) Full

func (w *MetricWindow) Full() bool

func (*MetricWindow) Pop

func (w *MetricWindow) Pop() bitflow.Value

Remove and return the oldest value. The oldest value is also deleted by Push() when the window is full.

func (*MetricWindow) Push

func (w *MetricWindow) Push(val bitflow.Value)

func (*MetricWindow) Size

func (w *MetricWindow) Size() int

type MultiHeaderMerger

type MultiHeaderMerger struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

Can tolerate multiple headers, fills missing data up with default values.

func NewMultiHeaderMerger

func NewMultiHeaderMerger() *MultiHeaderMerger

func (*MultiHeaderMerger) Close

func (p *MultiHeaderMerger) Close()

func (*MultiHeaderMerger) OutputSampleSize

func (p *MultiHeaderMerger) OutputSampleSize(sampleSize int) int

func (*MultiHeaderMerger) Sample

func (p *MultiHeaderMerger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*MultiHeaderMerger) String

func (p *MultiHeaderMerger) String() string

type NoopProcessor

type NoopProcessor struct {
	bitflow.NoopProcessor
}

func (*NoopProcessor) String

func (*NoopProcessor) String() string

type PauseTagger

type PauseTagger struct {
	bitflow.NoopProcessor
	MinimumPause time.Duration
	Tag          string
	// contains filtered or unexported fields
}

func (*PauseTagger) Sample

func (d *PauseTagger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*PauseTagger) String

func (d *PauseTagger) String() string

type PipelineRateSynchronizer

type PipelineRateSynchronizer struct {
	ChannelSize      int
	ChannelCloseHook func(lastSample *bitflow.Sample, lastHeader *bitflow.Header)
	// contains filtered or unexported fields
}

func (*PipelineRateSynchronizer) NewSynchronizationStep

func (s *PipelineRateSynchronizer) NewSynchronizationStep() bitflow.SampleProcessor

type PrometheusMarshaller added in v0.0.52

type PrometheusMarshaller struct {
}

PrometheusMarshaller marshals Headers and Samples to the prometheus exposition format

func (PrometheusMarshaller) ShouldCloseAfterFirstSample added in v0.0.52

func (PrometheusMarshaller) ShouldCloseAfterFirstSample() bool

ShouldCloseAfterFirstSample defines that prometheus streams should close after first sent sample

func (PrometheusMarshaller) String added in v0.0.52

func (PrometheusMarshaller) String() string

String implements the Marshaller interface.

func (PrometheusMarshaller) WriteHeader added in v0.0.52

func (PrometheusMarshaller) WriteHeader(header *bitflow.Header, withTags bool, output io.Writer) error

WriteHeader implements the Marshaller interface. It is empty, because the prometheus exposition format doesn't need one

func (PrometheusMarshaller) WriteSample added in v0.0.52

func (m PrometheusMarshaller) WriteSample(sample *bitflow.Sample, header *bitflow.Header, withTags bool, writer io.Writer) error

WriteSample implements the Marshaller interface. See the PrometheusMarshaller godoc for information about the format.

type ReleasingProcessor

type ReleasingProcessor struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

func (*ReleasingProcessor) Close

func (p *ReleasingProcessor) Close()

func (*ReleasingProcessor) String

func (p *ReleasingProcessor) String() string

type ResendProcessor

type ResendProcessor struct {
	bitflow.NoopProcessor
	Interval time.Duration
	// contains filtered or unexported fields
}

func (*ResendProcessor) Close

func (p *ResendProcessor) Close()

func (*ResendProcessor) Sample

func (p *ResendProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*ResendProcessor) Start

func (*ResendProcessor) String

func (p *ResendProcessor) String() string

type RestDataSource

type RestDataSource struct {
	bitflow.AbstractSampleSource
	// contains filtered or unexported fields
}

func (*RestDataSource) Close

func (source *RestDataSource) Close()

func (*RestDataSource) EmitSample

func (source *RestDataSource) EmitSample(sample *bitflow.Sample, header *bitflow.Header)

func (*RestDataSource) EmitSampleAndHeader added in v0.0.32

func (source *RestDataSource) EmitSampleAndHeader(sample bitflow.SampleAndHeader)

func (*RestDataSource) EmitSampleAndHeaderTimeout added in v0.0.37

func (source *RestDataSource) EmitSampleAndHeaderTimeout(sample bitflow.SampleAndHeader, timeout time.Duration) bool

func (*RestDataSource) EmitSampleTimeout added in v0.0.37

func (source *RestDataSource) EmitSampleTimeout(sample *bitflow.Sample, header *bitflow.Header, timeout time.Duration) bool

func (*RestDataSource) EmitSamples added in v0.0.32

func (source *RestDataSource) EmitSamples(samples []bitflow.SampleAndHeader)

func (*RestDataSource) EmitSamplesTimeout added in v0.0.37

func (source *RestDataSource) EmitSamplesTimeout(samples []bitflow.SampleAndHeader, timeout time.Duration) bool

func (*RestDataSource) Endpoint

func (source *RestDataSource) Endpoint() string

func (*RestDataSource) Serve

func (source *RestDataSource) Serve(verb string, path string, handlers ...gin.HandlerFunc) error

func (*RestDataSource) Start

func (source *RestDataSource) Start(wg *sync.WaitGroup) golib.StopChan

func (*RestDataSource) String added in v0.0.32

func (source *RestDataSource) String() string

type RestEndpoint

type RestEndpoint struct {
	// contains filtered or unexported fields
}

func (*RestEndpoint) NewDataSource

func (endpoint *RestEndpoint) NewDataSource(outgoingSampleBuffer int) *RestDataSource

type RestEndpointFactory

type RestEndpointFactory struct {
	// contains filtered or unexported fields
}

func (*RestEndpointFactory) GetEndpoint

func (s *RestEndpointFactory) GetEndpoint(endpointString string) *RestEndpoint

type RestReplyHelpers

type RestReplyHelpers struct {
}

func (RestReplyHelpers) Reply added in v0.0.39

func (h RestReplyHelpers) Reply(context *gin.Context, message string, statusCode int, contentType string)

func (RestReplyHelpers) ReplyCode added in v0.0.39

func (h RestReplyHelpers) ReplyCode(context *gin.Context, message string, statusCode int)

func (RestReplyHelpers) ReplyError

func (h RestReplyHelpers) ReplyError(context *gin.Context, errorMessage string)

func (RestReplyHelpers) ReplyGenericError

func (h RestReplyHelpers) ReplyGenericError(context *gin.Context, errorMessage string)

func (RestReplyHelpers) ReplySuccess

func (h RestReplyHelpers) ReplySuccess(context *gin.Context, message string)

type SampleFilter

type SampleFilter struct {
	bitflow.NoopProcessor
	Description   fmt.Stringer
	IncludeFilter func(sample *bitflow.Sample, header *bitflow.Header) (bool, error) // Return true if sample should be included
}

func (*SampleFilter) Sample

func (p *SampleFilter) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*SampleFilter) String

func (p *SampleFilter) String() string

type SampleSlice

type SampleSlice struct {
	// contains filtered or unexported fields
}

func (SampleSlice) Len

func (s SampleSlice) Len() int

func (SampleSlice) Less

func (s SampleSlice) Less(i, j int) bool

func (SampleSlice) Swap

func (s SampleSlice) Swap(i, j int)

type SampleSorter

type SampleSorter struct {
	Tags []string
}

Sort based on given Tags, use Timestamp as last sort criterion

func (*SampleSorter) ProcessBatch

func (sorter *SampleSorter) ProcessBatch(header *bitflow.Header, samples []*bitflow.Sample) (*bitflow.Header, []*bitflow.Sample, error)

func (*SampleSorter) String

func (sorter *SampleSorter) String() string

type SimpleTextMarshaller

type SimpleTextMarshaller struct {
	Description  string
	MetricPrefix string
	NameFixer    func(string) string
	WriteValue   func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error
}

func (SimpleTextMarshaller) ShouldCloseAfterFirstSample added in v0.0.26

func (SimpleTextMarshaller) ShouldCloseAfterFirstSample() bool

ShouldCloseAfterFirstSample defines that text streams can stream without closing

func (*SimpleTextMarshaller) String

func (o *SimpleTextMarshaller) String() string

func (*SimpleTextMarshaller) WriteHeader

func (o *SimpleTextMarshaller) WriteHeader(header *bitflow.Header, hasTags bool, writer io.Writer) error

func (*SimpleTextMarshaller) WriteSample

func (o *SimpleTextMarshaller) WriteSample(sample *bitflow.Sample, header *bitflow.Header, hasTags bool, writer io.Writer) error

type SimpleTextMarshallerFactory

type SimpleTextMarshallerFactory struct {
	Description string
	NameFixer   func(string) string
	WriteValue  func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error

	Endpoints *bitflow.EndpointFactory
}

type StoreStats

type StoreStats struct {
	bitflow.NoopProcessor
	TargetFile string
	// contains filtered or unexported fields
}

func NewStoreStats

func NewStoreStats(targetFile string) *StoreStats

func (*StoreStats) Close

func (stats *StoreStats) Close()

func (*StoreStats) Sample

func (stats *StoreStats) Sample(inSample *bitflow.Sample, header *bitflow.Header) error

func (*StoreStats) StoreStatistics

func (stats *StoreStats) StoreStatistics() error

func (*StoreStats) String

func (stats *StoreStats) String() string

type SubProcessRunner added in v0.0.55

type SubProcessRunner struct {
	bitflow.NoopProcessor
	Cmd  string
	Args []string

	// StderrPrefix enables immediate logging of each line of the stderr of the sub-process. These lines are
	// prefixed with the given string. If StderrPrefix is left empty, the output is instead collected in an in-memory
	// buffer, and logged when the process exits (regardless of the exit condition).
	StderrPrefix string

	Reader     bitflow.SampleReader
	Writer     bitflow.SampleWriter
	Marshaller bitflow.Marshaller
	// contains filtered or unexported fields
}

func (*SubProcessRunner) Close added in v0.0.55

func (r *SubProcessRunner) Close()

func (*SubProcessRunner) Configure added in v0.0.55

func (r *SubProcessRunner) Configure(marshallingFormat string, f *bitflow.EndpointFactory) error

func (*SubProcessRunner) Sample added in v0.0.55

func (r *SubProcessRunner) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*SubProcessRunner) Start added in v0.0.55

func (*SubProcessRunner) String added in v0.0.55

func (r *SubProcessRunner) String() string

type SynchronizedStreamMerger

type SynchronizedStreamMerger struct {
	bitflow.NoopProcessor

	MergeTag        string
	MergeInterval   time.Duration
	ExpectedStreams int
	MergeSamples    func([]*bitflow.Sample, []*bitflow.Header) (*bitflow.Sample, *bitflow.Header)

	Description        string
	DebugQueueLengths  bool
	DebugWaitingQueues bool
	// contains filtered or unexported fields
}

func (*SynchronizedStreamMerger) Close

func (p *SynchronizedStreamMerger) Close()

func (*SynchronizedStreamMerger) Sample

func (p *SynchronizedStreamMerger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*SynchronizedStreamMerger) StreamClosed

func (p *SynchronizedStreamMerger) StreamClosed(name string)

func (*SynchronizedStreamMerger) StreamOfSampleClosed

func (p *SynchronizedStreamMerger) StreamOfSampleClosed(lastSample *bitflow.Sample, lastHeader *bitflow.Header)

func (*SynchronizedStreamMerger) String

func (p *SynchronizedStreamMerger) String() string

type TagChangeCallback added in v0.0.42

type TagChangeCallback interface {
	// Return value indicates if handling the expiration was successful. Returning false will trigger this method again later.
	Expired(value string, allValues []string) bool
	Updated(value string, sample *bitflow.Sample, allValues []string)
}

type TagChangeListener added in v0.0.42

type TagChangeListener struct {
	bitflow.AbstractSampleProcessor
	Tag               string
	UpdateInterval    time.Duration
	ExpirationTimeout time.Duration
	ExpireOnClose     bool
	Callback          TagChangeCallback
	// contains filtered or unexported fields
}

func (*TagChangeListener) Close added in v0.0.42

func (t *TagChangeListener) Close()

func (*TagChangeListener) ReadParameters added in v0.0.42

func (t *TagChangeListener) ReadParameters(params map[string]interface{})

func (*TagChangeListener) Sample added in v0.0.42

func (t *TagChangeListener) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*TagChangeListener) Start added in v0.0.42

func (*TagChangeListener) String added in v0.0.42

func (t *TagChangeListener) String() string

type TagChangeRunner added in v0.0.42

type TagChangeRunner struct {
	TagChangeListener
	Program        string
	Args           []string
	PreserveStdout bool
	// contains filtered or unexported fields
}

func (*TagChangeRunner) Expired added in v0.0.42

func (r *TagChangeRunner) Expired(value string, allValues []string) bool

func (*TagChangeRunner) String added in v0.0.42

func (r *TagChangeRunner) String() string

func (*TagChangeRunner) Updated added in v0.0.42

func (r *TagChangeRunner) Updated(value string, sample *bitflow.Sample, allValues []string)

type TagSynchronizer

type TagSynchronizer struct {
	bitflow.NoopProcessor

	StreamIdentifierTag string
	ReferenceStream     string
	NumTargetStreams    int
	// contains filtered or unexported fields
}

This processor copies tags from a "reference" sample stream to a number of "target" sample streams. Streams are identified by the value of a given tag, where the reference stream holds a special value that must be given. The target streams can have arbitrary values. The tag synchronization is done by time: one reference sample affects all target samples after its timestamp, and before the timestamp of the follow-up reference sample. Target samples with timestamps BEFORE any reference sample are forwarded unmodified (with a warning). Target samples AFTER the last reference sample will receive the tags from the last reference sample. All streams are assumed to be sorted by time, arrive in parallel, and are forwarded in the same order.

func (*TagSynchronizer) Close

func (s *TagSynchronizer) Close()

func (*TagSynchronizer) Sample

func (s *TagSynchronizer) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*TagSynchronizer) Start

func (*TagSynchronizer) String

func (s *TagSynchronizer) String() string

type UniqueTagPrinter

type UniqueTagPrinter struct {
	bitflow.NoopProcessor
	Tag   string
	Count bool
	// contains filtered or unexported fields
}

func NewUniqueTagCounter

func NewUniqueTagCounter(tag string) *UniqueTagPrinter

func NewUniqueTagPrinter

func NewUniqueTagPrinter(tag string) *UniqueTagPrinter

func (*UniqueTagPrinter) Close

func (printer *UniqueTagPrinter) Close()

func (*UniqueTagPrinter) Sample

func (printer *UniqueTagPrinter) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*UniqueTagPrinter) String

func (printer *UniqueTagPrinter) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL