Back to godoc.org

Package bitflow

v0.0.60
Latest Go to latest

The highest tagged major version is .

Published: Apr 28, 2020 | License: Apache-2.0 | Module: github.com/bitflow-stream/go-bitflow

Index

Constants

const (
	UndefinedEndpoint = EndpointType("")
	TcpEndpoint       = EndpointType("tcp")
	TcpListenEndpoint = EndpointType("listen")
	FileEndpoint      = EndpointType("file")
	StdEndpoint       = EndpointType("std")
	HttpEndpoint      = EndpointType("http")
	EmptyEndpoint     = EndpointType("empty")
	ClosedEndpoint    = EndpointType("closed")

	UndefinedFormat = MarshallingFormat("")
	TextFormat      = MarshallingFormat("text")
	CsvFormat       = MarshallingFormat("csv")
	BinaryFormat    = MarshallingFormat("bin")

	StdTransportTarget = "-"
	BinaryFileSuffix   = ".bin"
)
const (
	DefaultCsvTimeColumn = "time"
	TagsColumn           = "tags"
	BinaryTimeColumn     = "timB" // Must not collide with DefaultCsvTimeColumn, but have same length

)
const (

	// This is arbitrary and was chosen human-readable for convenience. It must
	// not collide with BinaryTimeColumn.
	BinarySampleStart = "X"

	// BinarySeparator is the character separating fields in the marshalled output
	// of BinaryMarshaller. Every field is marshalled on a separate line.
	BinarySeparator = '\n'
)
const (
	// CsvSeparator is the character separating fields in the marshalled output
	// of CsvMarshaller.
	DefaultCsvSeparator = ','

	// CsvNewline is used by CsvMarshaller after outputting the header line and
	// each sample.
	DefaultCsvNewline = '\n'

	// CsvDateFormat is the format used by CsvMarshaller to marshall the timestamp
	// of samples.
	DefaultCsvDateFormat = "2006-01-02 15:04:05.999999999"
)
const (
	// TextMarshallerDateFormat is the date format used by TextMarshaller to
	// print the timestamp of each sample.
	TextMarshallerDateFormat = "2006-01-02 15:04:05.999"

	// TextMarshallerDefaultSpacing is the default spacing between the columns
	// printed by TextMarshaller.
	TextMarshallerDefaultSpacing = 3

	// TextMarshallerHeaderChar is used as fill-character in the header line
	// preceding each sample marshalled by TextMarshaller.
	TextMarshallerHeaderChar = '='
)
const (
	// MaxOutputFileErrors is the number of retries that are accepted before
	// giving up to open a new output file. After each try, the output filename
	// will be changed.
	MaxOutputFileErrors = 5

	// MkdirsPermissions defines the permission bits used when creating new
	// directories for storing output files.
	MkdirsPermissions = 0755
)
const MinimumInputIoBuffer = 16 // Needed for auto-detecting stream format
const TAG_TEMPLATE_ENV_PREFIX = "ENV_"

Variables

var StopWalking = errors.New("stop walking")

StopWalking can be returned from the walk function parameter for WalkFiles to indicate, that the tree should not be walked any further down the current directory.

var (
	TagStringEscaper = strings.NewReplacer(
		tag_equals, tag_replacement,
		tag_separator, tag_replacement,
		string(BinarySeparator), tag_replacement,
		string(DefaultCsvSeparator), tag_replacement,
		string(DefaultCsvNewline), tag_replacement)
)

func EncodeTags

func EncodeTags(tags map[string]string) string

func IsBrokenPipeError

func IsBrokenPipeError(err error) bool

func IsConsoleOutput

func IsConsoleOutput(sink SampleSink) bool

IsConsoleOutput returns true if the given processor will output to the standard output when started.

func IsFileClosedError

func IsFileClosedError(err error) bool

IsFileClosedError returns true, if the given error likely originates from intentionally closing a file, while it is still being read concurrently.

func IsValidFilename

func IsValidFilename(path string) bool

IsValidFilename tries to infer in a system-independent way, if the given path is a valid file name.

func RequiredValues

func RequiredValues(numFields int, sink SampleSink) int

RequiredValues the number of Values that should be large enough to hold the end-result after processing a Sample by all intermediate SampleProcessors. The result is based on ResizingSampleProcessor.OutputSampleSize(). SampleProcessor instances that do not implement the ResizingSampleProcessor interface are assumed to not increase the number metrics.

func ResolveTagTemplate

func ResolveTagTemplate(template string, missingValues string, sample *Sample) string

type AbstractMarshallingSampleOutput

type AbstractMarshallingSampleOutput struct {
	AbstractSampleOutput

	// Marshaller will be used when converting Samples to byte buffers before
	// writing them to the given output stream.
	Marshaller Marshaller

	// Writer contains variables that control the marshalling and writing process.
	// They must be configured before calling Start() on this AbstractSampleOutput.
	Writer SampleWriter
}

AbstractMarshallingSampleOutput is a partial implementation of MarshallingSampleOutput with a simple implementation of SetMarshaller().

func (*AbstractMarshallingSampleOutput) SetMarshaller

func (out *AbstractMarshallingSampleOutput) SetMarshaller(marshaller Marshaller)

SetMarshaller implements the SampleOutput interface.

type AbstractSampleOutput

type AbstractSampleOutput struct {
	AbstractSampleProcessor

	// DontForwardSamples can be set to true to disable forwarding of received samples
	// to the subsequent SampleProcessor.
	DontForwardSamples bool

	// DropOutputErrors can be set to true to make this AbstractSampleOutput ignore
	// errors that occurred from outputting samples to byte streams like files or network Connections.
	// In that case, such errors will be logged and the samples will be forwarded to subsequent
	// processing steps.
	DropOutputErrors bool
}

AbstractSampleOutput is a partial implementation of SampleProcessor intended for processors that output samples to an external data sink (e.g. console, file, ...). Configuration variables are provided for controlling the error handling.

func (*AbstractSampleOutput) Sample

func (out *AbstractSampleOutput) Sample(err error, sample *Sample, header *Header) error

Sample forwards the received header and sample the the subsequent SampleProcessor, unless the DontForwardSamples flag has been set. Actual implementations of SampleOutput should provide an implementation that writes the samples to some destination. The error parameter should be an error (possibly nil), that resulted from previously writing the sample to some byte stream output (like a file or network connection). Depending on the configuration of this AbstractSampleOutput, this error will be returned immediately or simply logged so that the sample can be forwarded to the subsequent processing step.

type AbstractSampleProcessor

type AbstractSampleProcessor struct {
	AbstractSampleSource
}

AbstractSampleProcessor provides a few basic methods for implementations of SampleProcessor. It currently simply embeds the AbstractSampleSource type, but should be used instead of it to make the purpose more clear.

type AbstractSampleSource

type AbstractSampleSource struct {
	// contains filtered or unexported fields
}

AbstractSampleSource is a partial implementation of SampleSource that stores the SampleProcessor and closes the outgoing SampleProcessor after all samples have been generated.

func (*AbstractSampleSource) CloseSink

func (s *AbstractSampleSource) CloseSink()

CloseSink closes the subsequent SampleProcessor. It must be called after the receiving AbstractSampleSource has finished producing samples.

func (*AbstractSampleSource) CloseSinkParallel

func (s *AbstractSampleSource) CloseSinkParallel(wg *sync.WaitGroup)

CloseSinkParallel closes the subsequent SampleProcessor in a concurrent goroutine, which is registered in the WaitGroup. This can be useful compared to CloseSink() in certain cases to avoid deadlocks due to long-running Close() invocations. As a general rule of thumb, Implementations of SampleSource should use CloseSinkParallel(), while SampleProcessors should simply use CloseSink().

func (*AbstractSampleSource) GetSink

func (s *AbstractSampleSource) GetSink() SampleProcessor

GetSink implements the SampleSource interface.

func (*AbstractSampleSource) SetSink

func (s *AbstractSampleSource) SetSink(sink SampleProcessor)

SetSink implements the SampleSource interface.

type AbstractTcpSink

type AbstractTcpSink struct {
	AbstractMarshallingSampleOutput
	TCPConnCounter

	// LogReceivedTraffic enables logging received TCP traffic, which is usually not expected.
	// Only the values log.ErrorLevel, log.WarnLevel, log.InfoLevel, log.DebugLevel enable logging.
	LogReceivedTraffic log.Level

	// Protocol is used for more detailed logging
	Protocol string
}

AbstractTcpSink is a helper type for TCP-based SampleSink implementations. The two fields AbstractSampleOutput and TCPConnCounter can be used to configure different aspects of the marshalling and writing of the data. The purpose of AbstractTcpSink is to create instances of TcpWriteConn with the configured parameters.

func (*AbstractTcpSink) OpenWriteConn

func (sink *AbstractTcpSink) OpenWriteConn(wg *sync.WaitGroup, remoteAddr string, conn io.WriteCloser) *TcpWriteConn

OpenWriteConn wraps a net.TCPConn in a new TcpWriteConn using the parameters defined in the receiving AbstractTcpSink.

type AbstractUnmarshallingSampleSource

type AbstractUnmarshallingSampleSource struct {
	AbstractSampleSource

	// Reader configures aspects of parallel reading and parsing. See SampleReader for more info.
	Reader SampleReader
}

AbstractUnmarshallingSampleSource extends AbstractSampleSource by adding configuration fields required for unmarshalling samples.

func (*AbstractUnmarshallingSampleSource) SetSampleHandler

func (s *AbstractUnmarshallingSampleSource) SetSampleHandler(handler ReadSampleHandler)

SetSampleHandler implements the UnmarshallingSampleSource interface

type BatchProcessingStep

type BatchProcessingStep interface {
	ProcessBatch(header *Header, samples []*Sample) (*Header, []*Sample, error)
	String() string
}

type BatchProcessor

type BatchProcessor struct {
	NoopProcessor

	Steps []BatchProcessingStep

	// ForwardImmediately enables a mode where each received sample is immediately forwarded to the subsequent processing
	// step for further processing. The sample is also stored for batch processing, and all flush semantics are still active,
	// but the results of the batch processing will be dropped. Errors that occur during batch flushing are logged, but are
	// not reported to the caller of the Sample() method.
	ForwardImmediately bool

	// DontFlushOnClose disables the automatic final flushing when this processing step is closed. This is useful only in
	// certain special cases.
	DontFlushOnClose bool

	FlushNoSampleTimeout time.Duration // If > 0, flush when no new samples are received for the given duration. The wall-time is used for this (not sample timestamps)
	FlushSampleLag       time.Duration // If > 0, flush when a sample is received with a timestamp jump bigger than this
	FlushAfterNumSamples int           // If > 0, flush after batch  window contains this amount of samples
	FlushAfterTime       time.Duration // If > 0, flush after time difference between the first and the last received sample in batch is greater than this

	FlushTags []string // If set, flush every time any of these tags change
	// contains filtered or unexported fields
}

func (*BatchProcessor) Add

func (p *BatchProcessor) Add(step BatchProcessingStep) *BatchProcessor

func (*BatchProcessor) Close

func (p *BatchProcessor) Close()

func (*BatchProcessor) ContainedStringers

func (p *BatchProcessor) ContainedStringers() []fmt.Stringer

func (*BatchProcessor) MergeProcessor

func (p *BatchProcessor) MergeProcessor(other SampleProcessor) bool

func (*BatchProcessor) OutputSampleSize

func (p *BatchProcessor) OutputSampleSize(sampleSize int) int

func (*BatchProcessor) Sample

func (p *BatchProcessor) Sample(sample *Sample, header *Header) (err error)

func (*BatchProcessor) Start

func (p *BatchProcessor) Start(wg *sync.WaitGroup) golib.StopChan

func (*BatchProcessor) String

func (p *BatchProcessor) String() string

type BidiMarshaller

type BidiMarshaller interface {
	Read(input *bufio.Reader, previousHeader *UnmarshalledHeader) (newHeader *UnmarshalledHeader, sampleData []byte, err error)
	ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (*Sample, error)
	WriteHeader(header *Header, withTags bool, output io.Writer) error
	WriteSample(sample *Sample, header *Header, withTags bool, output io.Writer) error
	ShouldCloseAfterFirstSample() bool
	String() string
}

BidiMarshaller is a bidirectional marshaller that combines the Marshaller and Unmarshaller interfaces.

type BinaryMarshaller

type BinaryMarshaller struct {
}

BinaryMarshaller marshalled every sample to a dense binary format.

The header is marshalled to a newline-separated list of strings. The first field is 'timB', the second field is 'tags' if the following samples include tags. The following fields are the names of the metrics in the header. An empty line denotes the end of the header.

After the header, every sample is marshalled as follows. A special byte sequence signals the start of a sample. This is used to distinguish between sample data and a new header. Headers always start with the string "time". Then, the timestamp is marshalled as a big-endian unsigned int64 value containing the nanoseconds since the Unix epoch (8 bytes). Then the tags are marshalled as a newline-delimited string containing a space-separated list of key-values pairs for the tags. If the 'tags' field was missing in the header fields, this tags string is missing, including the newline delimiter. After the optional tags string the values for the sample are marshalled as an array of big-endian double-precision values, 8 bytes each. Since the number of metrics is known from the header, the number of bytes for one sample is given as 8 * number of metrics.

func (BinaryMarshaller) ParseSample

func (BinaryMarshaller) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (sample *Sample, err error)

ParseSample implements the Unmarshaller interface by parsing the byte buffer to a new Sample instance. See the godoc for BinaryMarshaller for details on the format.

func (BinaryMarshaller) Read

func (m BinaryMarshaller) Read(reader *bufio.Reader, previousHeader *UnmarshalledHeader) (*UnmarshalledHeader, []byte, error)

Read implements the Unmarshaller interface. It peeks a few bytes from the input stream to decide if the stream contains a header or a sample. In case of a header, Read() continues reading until an empty line and parse the data to a header instance. In case of a sample, the size is derived from the previousHeader parameter.

func (BinaryMarshaller) ShouldCloseAfterFirstSample

func (BinaryMarshaller) ShouldCloseAfterFirstSample() bool

ShouldCloseAfterFirstSample defines that binary streams can stream without closing

func (BinaryMarshaller) String

func (BinaryMarshaller) String() string

String implements the Marshaller interface.

func (BinaryMarshaller) WriteHeader

func (BinaryMarshaller) WriteHeader(header *Header, withTags bool, writer io.Writer) error

WriteHeader implements the Marshaller interface by writing a newline-separated list of header field strings and an additional empty line.

func (BinaryMarshaller) WriteSample

func (m BinaryMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error

WriteSample implements the Marshaller interface by writing the Sample out in a dense binary format. See the BinaryMarshaller godoc for information on the format.

type BufferedWriteCloser

type BufferedWriteCloser struct {
	*bufio.Writer
	// contains filtered or unexported fields
}

BufferedWriteCloser is a helper type that wraps a bufio.Writer around a io.WriteCloser, while still implementing the io.WriteCloser interface and forwarding all method calls to the correct receiver. The Writer field should not be accessed directly.

func NewBufferedWriteCloser

func NewBufferedWriteCloser(writer io.WriteCloser, io_buffer int) *BufferedWriteCloser

NewBufferedWriteCloser creates a BufferedWriteCloser instance wrapping the writer parameter. It creates a bufio.Writer with a buffer size of the io_buffer parameter.

func (*BufferedWriteCloser) Close

func (writer *BufferedWriteCloser) Close() (err error)

Close implements the Close method in io.WriteCloser by flushing its bufio.Writer and forwarding the Close call to the io.WriteCloser used to create it.

type ClosedSampleSource

type ClosedSampleSource struct {
	AbstractSampleSource
	// contains filtered or unexported fields
}

ClosedSampleSource implements SampleSource, but instead generating samples, it immediately closes. It can be used for debugging purposes, to create a real pipeline that does not

func (*ClosedSampleSource) Close

func (s *ClosedSampleSource) Close()

Close implements the SampleSource interface.

func (*ClosedSampleSource) SetSampleHandler

func (s *ClosedSampleSource) SetSampleHandler(_ ReadSampleHandler)

SetSampleHandler implements the UnmarshallingSampleSource interface.

func (*ClosedSampleSource) Start

func (s *ClosedSampleSource) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the golib.Task interface.

func (*ClosedSampleSource) String

func (s *ClosedSampleSource) String() string

String implements the golib.Task interface.

type ConsoleSampleSink

type ConsoleSampleSink interface {
	WritesToConsole() bool
}

ConsoleSampleSink is a marking interface for SampleSink implementations that notifies the framework that the sink writes to the standard output. This is used to avoid multiple such sinks that would conflict with each other.

type CsvMarshaller

type CsvMarshaller struct {
	ValueSeparator byte
	LineSeparator  byte
	DateFormat     string
	TimeColumn     string
	TagsColumn     string

	UnmarshallSkipColumns    uint
	UnmarshallPreprocessDate func(string) string
}

CsvMarshaller marshals Headers and Samples to a CSV format.

Every header is marshalled as a comma-separated CSV header line. The first field is 'time', the second field is 'tags' (if the following samples contain tags). After that the header contains a list of all metrics.

Every sample is marshalled to a comma-separated line starting with a textual representation of the timestamp (see CsvDateFormat, local timezone), then a space-separated key-value list for the tags (only if the 'tags' field was included in the header), and then all the metric values in the same order as on the preceding header line. To follow the semantics of a correct CSV file, every changed header should start a new CSV file.

Every CSV line must be terminated by a newline character (including the last line in a file).

CsvMarshaller can deal with multiple header declarations in the same file or data stream. A line that begins with the string "time" is assumed to start a new header, since samples usually start with a timestamp, which cannot be formatted as "time".

There are no configuration options for CsvMarshaller.

func (*CsvMarshaller) ParseSample

func (c *CsvMarshaller) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (sample *Sample, err error)

ParseSample implements the Unmarshaller interface by parsing a CSV line.

func (*CsvMarshaller) Read

func (c *CsvMarshaller) Read(reader *bufio.Reader, previousHeader *UnmarshalledHeader) (*UnmarshalledHeader, []byte, error)

Read implements the Unmarshaller interface by reading CSV line from the input stream. Based on the first field, Read decides whether the line represents a header or a Sample. In case of a header, the CSV fields are split and parsed to a Header instance. In case of a Sample, the data for the line is returned without parsing it.

func (CsvMarshaller) ShouldCloseAfterFirstSample

func (CsvMarshaller) ShouldCloseAfterFirstSample() bool

ShouldCloseAfterFirstSample defines that csv streams can stream without closing

func (CsvMarshaller) String

func (CsvMarshaller) String() string

String implements the Marshaller interface.

func (*CsvMarshaller) WriteHeader

func (c *CsvMarshaller) WriteHeader(header *Header, withTags bool, writer io.Writer) error

WriteHeader implements the Marshaller interface by printing a CSV header line.

func (*CsvMarshaller) WriteSample

func (c *CsvMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error

WriteSample implements the Marshaller interface by writing a CSV line.

type DroppingSampleProcessor

type DroppingSampleProcessor struct {
	AbstractSampleProcessor
}

DroppingSampleProcessor implements the SampleProcessor interface by dropping any incoming samples.

func (*DroppingSampleProcessor) Close

func (s *DroppingSampleProcessor) Close()

Close implements the SampleProcessor interface.

func (*DroppingSampleProcessor) Sample

func (s *DroppingSampleProcessor) Sample(sample *Sample, header *Header) error

Sample implements the SampleProcessor interface.

func (*DroppingSampleProcessor) Start

func (s *DroppingSampleProcessor) Start(wg *sync.WaitGroup) (_ golib.StopChan)

Start implements the golib.Task interface.

func (*DroppingSampleProcessor) String

func (s *DroppingSampleProcessor) String() string

String implements the golib.Task interface.

type EmptySampleSource

type EmptySampleSource struct {
	AbstractSampleSource
	// contains filtered or unexported fields
}

EmptySampleSource implements SampleSource but does not generate any samples. It is used in cases where a source is required but no real implementation is available.

func (*EmptySampleSource) Close

func (s *EmptySampleSource) Close()

Close implements the SampleSource interface.

func (*EmptySampleSource) SetSampleHandler

func (s *EmptySampleSource) SetSampleHandler(_ ReadSampleHandler)

SetSampleHandler implements the UnmarshallingSampleSource interface.

func (*EmptySampleSource) Start

func (s *EmptySampleSource) Start(wg *sync.WaitGroup) (_ golib.StopChan)

Start implements the golib.Task interface.

func (*EmptySampleSource) String

func (s *EmptySampleSource) String() string

String implements the golib.Task interface.

type EndpointDescription

type EndpointDescription struct {
	Format       MarshallingFormat
	Type         EndpointType
	IsCustomType bool
	Target       string
	Params       map[string]string
}

EndpointDescription describes a data endpoint, regardless of the data direction (input or output).

func GuessEndpointDescription

func GuessEndpointDescription(endpoint string) (res EndpointDescription, err error)

GuessEndpointDescription guesses the transport type and format of the given endpoint target. See GuessEndpointType for details.

func (EndpointDescription) DefaultOutputFormat

func (e EndpointDescription) DefaultOutputFormat() MarshallingFormat

DefaultOutputFormat returns the default MarshallingFormat that should be used when sending data to the described endpoint, if no format is specified by the user.

func (EndpointDescription) OutputFormat

func (e EndpointDescription) OutputFormat() MarshallingFormat

OutputFormat returns the MarshallingFormat that should be used when sending data to the described endpoint.

type EndpointFactory

type EndpointFactory struct {
	FlagSourceTag string

	FlagInputFilesRobust  bool
	FlagOutputFilesClean  bool
	FlagIoBuffer          int
	FlagFilesKeepAlive    bool
	FlagFilesAppend       bool
	FlagFileVanishedCheck time.Duration

	FlagOutputTcpListenBuffer uint
	FlagTcpConnectionLimit    uint
	FlagInputTcpAcceptLimit   uint
	FlagTcpSourceDropErrors   bool
	FlagTcpLogReceivedData    bool

	FlagParallelHandler ParallelSampleHandler

	// CustomDataSources can be filled by client code before EndpointFactory.CreateInput or similar
	// methods to allow creation of custom data sources. The map key is a short name of the data source
	// that can be used in URL endpoint descriptions. The parameter for the function will be
	// the URL of the endpoint, with the scheme part stripped.
	// Example: When registering a function with the key "http", the following URL endpoint:
	//   http://localhost:5555/abc
	// will invoke the factory function with the parameter "localhost:5555/abc"
	CustomDataSources map[EndpointType]func(string) (SampleSource, error)

	// CustomDataSinks can be filled by client code before EndpointFactory.CreateOutput or similar
	// methods to allow creation of custom data sinks. See CustomDataSources for the meaning of the
	// map keys and values.
	CustomDataSinks map[EndpointType]func(string) (SampleProcessor, error)

	// Marshallers can be filled by client code before EndpointFactory.CreateOutput or similar
	// methods to allow custom marshalling formats in output files, network connections and so on.
	Marshallers map[MarshallingFormat]func() Marshaller

	// Unmarshallers can be filled by client code before EndpointFactory.CreateInput or similar
	// methods to allow custom unmarshalling formats in input files, network connections and so on.
	Unmarshallers map[MarshallingFormat]func() Unmarshaller

	// CustomGeneralFlags, CustomInputFlags and CustomOutputFlags lets client code
	// register custom command line flags that configure aspects of endpoints created
	// through CustomDataSources and CustomDataSinks.
	CustomGeneralFlags []func(f *flag.FlagSet)
	CustomInputFlags   []func(f *flag.FlagSet)
	CustomOutputFlags  []func(f *flag.FlagSet)
}

EndpointFactory creates SampleSink and SampleSource instances for a SamplePipeline. It defines command line flags for configuring the objects it creates. All fields named Flag* are set by the according command line flags and evaluated in CreateInput() and CreateOutput(). FlagInputs is not set by command line flags automatically. After flag.Parse(), those fields can be modified to override the command line flags defined by the user.

func NewEndpointFactory

func NewEndpointFactory() *EndpointFactory

NewEndpointFactory returns an EndpointFactory object filled with default values.

func (*EndpointFactory) Clear

func (f *EndpointFactory) Clear()

func (*EndpointFactory) Clone

func (f *EndpointFactory) Clone() *EndpointFactory

func (*EndpointFactory) CloneWithParams

func (f *EndpointFactory) CloneWithParams(params map[string]string) (*EndpointFactory, error)

func (*EndpointFactory) CreateInput

func (f *EndpointFactory) CreateInput(inputs ...string) (SampleSource, error)

CreateInput creates a SampleSource object based on the given input endpoint descriptions and the configuration flags in the EndpointFactory.

func (*EndpointFactory) CreateMarshaller

func (f *EndpointFactory) CreateMarshaller(format MarshallingFormat) (Marshaller, error)

func (*EndpointFactory) CreateOutput

func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error)

CreateInput creates a SampleSink object based on the given output endpoint description and the configuration flags in the EndpointFactory.

func (*EndpointFactory) CreateUnmarshaller

func (f *EndpointFactory) CreateUnmarshaller(format MarshallingFormat) (Unmarshaller, error)

func (*EndpointFactory) ParseEndpointDescription

func (f *EndpointFactory) ParseEndpointDescription(endpoint string, isOutput bool) (EndpointDescription, error)

ParseEndpointDescription parses the given string to an EndpointDescription object. The string can be one of two forms: the URL-style description will be parsed by ParseUrlEndpointDescription, other descriptions will be parsed by GuessEndpointDescription.

func (*EndpointFactory) ParseParameters

func (f *EndpointFactory) ParseParameters(params map[string]string) (err error)

func (*EndpointFactory) ParseUrlEndpointDescription

func (f *EndpointFactory) ParseUrlEndpointDescription(endpoint string, isOutput bool) (res EndpointDescription, err error)

ParseUrlEndpointDescription parses the endpoint string as a URL endpoint description. It has the form:

format+transport://target

One of the format and transport parts must be specified, optionally both. If one of format or transport is missing, it will be guessed. The order does not matter. The 'target' part must not be empty.

func (*EndpointFactory) Reader

func (f *EndpointFactory) Reader(um Unmarshaller) SampleReader

Writer returns an instance of SampleReader, configured by the values stored in the EndpointFactory.

func (*EndpointFactory) RegisterFlags

func (f *EndpointFactory) RegisterFlags()

RegisterConfigFlags registers all flags to the global CommandLine object.

func (*EndpointFactory) RegisterGeneralFlagsTo

func (f *EndpointFactory) RegisterGeneralFlagsTo(fs *flag.FlagSet)

RegisterGeneralFlagsTo registers flags that configure different aspects of both data input and data output. These flags affect to both performance and functionality of TCP, file and std I/O.

func (*EndpointFactory) RegisterInputFlagsTo

func (f *EndpointFactory) RegisterInputFlagsTo(fs *flag.FlagSet)

RegisterInputFlagsTo registers flags that configure aspects of data input.

func (*EndpointFactory) RegisterOutputFlagsTo

func (f *EndpointFactory) RegisterOutputFlagsTo(fs *flag.FlagSet)

RegisterOutputConfigFlagsTo registers flags that configure data outputs.

func (*EndpointFactory) Writer

func (f *EndpointFactory) Writer() SampleWriter

Writer returns an instance of SampleWriter, configured by the values stored in the EndpointFactory.

type EndpointType

type EndpointType string

func GuessEndpointType

func GuessEndpointType(target string) (EndpointType, error)

GuessEndpointType guesses the EndpointType for the given target. Three forms of are recognized for the target:

- A host:port pair indicates an active TCP endpoint
- A :port pair (without the host part, but with the colon) indicates a passive TCP endpoint listening on the given port.
- The hyphen '-' is interpreted as standard input/output.
- All other targets are treated as file names.

type FileGroup

type FileGroup struct {
	// contains filtered or unexported fields
}

FileGroup provides utility functionality when dealing with a group of files sharing the same directory, file prefix and file extension. It provides methods for listing, walking or deleting files that belong to that group.

func NewFileGroup

func NewFileGroup(filename string) (group FileGroup)

NewFileGroup returns a new FileGroup instance. The filename parameter is parsed and split into directory, file name prefix and file extension. The file can also have no extension.

func (*FileGroup) AllFiles

func (group *FileGroup) AllFiles() (all []string, err error)

AllFiles returns a slice of all files that belong to the receiving FileGroup, and a non-nil error if the list could not be determined. AllFiles returns all files matching the regular expression returned by FileRegex().

The files are returned sorted in the order they would be written out by FileSink.

func (*FileGroup) BuildFilename

func (group *FileGroup) BuildFilename(num int) string

BuildFilename returns a file belonging to the receiving group, with the added number as suffix. The suffix is added before the file extension, separated with a hyphen, like so:

dir1/dir2/filePrefix-<num>.ext

func (*FileGroup) BuildFilenameStr

func (group *FileGroup) BuildFilenameStr(suffix string) string

BuildFilenameStr returns a file belonging to the receiving group, with the added string as suffix. The suffix is added before the file extension, separated with a hyphen, like so:

dir1/dir2/filePrefix-<suffix>.ext

func (*FileGroup) DeleteFiles

func (group *FileGroup) DeleteFiles() error

DeleteFiles tries to delete all files that belong to the receiving FileGroup and returns a non-nil error when deleting any of the files failed. DeleteFiles deletes all files matching the regular expression returned by FileRegex().

func (*FileGroup) FileRegex

func (group *FileGroup) FileRegex() *regexp.Regexp

FileRegex returns a regular expression that matches file names belonging to the receiving group. Only files with an optional numeric suffix are matched, e.g.:

dir1/dir2/filePrefix(-[0-9]+)?.ext

For empty 'filePrefix':

dir1/dir2/[0-9]+.ext

func (*FileGroup) OpenNewFile

func (group *FileGroup) OpenNewFile(counter *int) (file *os.File, err error)

OpenFile attempts to open a new file that will belong to the file group. An integer suffix is counted up to find a non-existing file. A small number of errors is tolerated before giving up.

func (*FileGroup) WalkFiles

func (group *FileGroup) WalkFiles(walk func(string, os.FileInfo) error) (num int, err error)

WalkFiles walks all files that belong to the receiving FileGroup. It returns the number of walked files and a non-nil error if there was an error while walking. The walk function parameter is called for every file, providing the file name and the respective os.FileInfo.

WalkFiles walks all files that match the regular expression returns by FileRegex().

The files are walked in lexical order, which does not represent the order the files would be written by FileSink.

type FileSink

type FileSink struct {
	// AbstractSampleOutput defines the Marshaller and SampleWriter that will
	// be used when writing Samples. See their documentation for further info.
	AbstractMarshallingSampleOutput

	// Filename defines the file that will be used for writing Samples. Each time a new Header
	// is received be FileSink, a new file will be opened automatically. The file names are built
	// by FileGroup.BuildFilename(), using an automatically incrementing integer suffix. The first
	// filename will not have any suffix, the second file will have suffix "-0", the second "-1", and so on.
	// If one of those files already exists, the suffix keeps incrementing, until a free slot is found.
	// If errors occur while opening output files, a number of retries is attempted while incrementing
	// the suffix, until the number of error exceeds MaxOutputFileErrors. After this, the FileSink stops
	// and reports the last error. All intermediate errors are logged as warnings.
	Filename string

	// IoBuffer defines the output buffer when writing samples to a file. It should be large
	// enough to minimize the number of write() calls in the operating system.
	IoBuffer int

	// CleanFiles can be set to true to delete all files that would potentially collide with output files.
	// In particular, this causes the following when starting the FileSink:
	//   NewFileGroup(sink.Filename).DeleteFiles()
	// When deleting these files fails, the FileSink stops and reports an error.
	CleanFiles bool

	// Append can be set to true to make the FileSink append data to a file, if it exists.
	Append bool

	// VanishedFileCheck can be set to > 0 to enable a periodic check, if the currently opened
	// output file is still available under the same file path as it was opened. The check will
	// be performed whenever a sample is to be written and the last check is older than the given
	// duration. If the check fails, the output file is reopened, including the creation of all necessary directories.
	// This can happen, if the output file is deleted while still being written to, and enabling
	// the VanishedFileCheck leads to the file be recreated, which could be the more expected behavior.
	VanishedFileCheck time.Duration
	// contains filtered or unexported fields
}

FileSink is an implementation of SampleSink that writes output Headers and Samples to a given file. Every time a new Header is received by the FileSink, a new file is opened using an automatically incremented number as suffix (see FileGroup). Other parameters define the parsing behavior of the FileSink.

func (*FileSink) Close

func (sink *FileSink) Close()

Close implements the SampleSink interface. It flushes and closes the currently open file. No more data should be written to Sample/Header after calling Close.

func (*FileSink) Sample

func (sink *FileSink) Sample(sample *Sample, header *Header) error

Sample writes a Sample to the current open file.

func (*FileSink) Start

func (sink *FileSink) Start(wg *sync.WaitGroup) (_ golib.StopChan)

Start implements the SampleSink interface. It does not start any goroutines. It initialized the FileSink, prints some log messages, and depending on the CleanFiles flag tries to delete existing files that would conflict with the output file.

func (*FileSink) String

func (sink *FileSink) String() string

String implements the SampleSink interface.

type FileSource

type FileSource struct {
	AbstractUnmarshallingSampleSource

	// File names is a slice of all files that will be read by the FileSource in sequence.
	// For every Filename, the FileSource will not only read the file itself,
	// but also for all files that belong to the same FileGroup, as returned by:
	//   NewFileGroup(filename).AllFiles()
	FileNames []string

	// ReadFileGroups can be set to true to extend the input files to the associated
	// file groups. For an input file named 'data.bin', all files named 'data-[0-9]+.bin'
	// will be read as well. The file group for 'data' is 'data-[0-9]+', the file
	// group for '.bin' is '[0-9]+.bin'.
	ReadFileGroups bool

	// Robust can be set to true to allow errors when reading or parsing files,
	// and only print Warnings instead. This is useful if the files to be parsed
	// are mostly valid, but have garbage at the end.
	Robust bool

	// IoBuffer configures the buffer size for read files. It should be large enough
	// to allow multiple goroutines to parse the read data in parallel.
	IoBuffer int

	// ConvertFile is an optional hook for converting the filename to a custom string.
	// The custom string will then be passed to the ReadSampleHandler configured in
	// the Reader field, instead of simply using the filename.
	ConvertFilename func(string) string

	// KeepAlive makes this FileSource not close after all files have been read.
	// Instead, it will stay open without producing any more data.
	KeepAlive bool

	// UnsynchronizedFileAccess can be set to true to disable synchronizing Read() and Close()
	// methods of files through a sync.RWMutex. Tests shows no measurable performance difference
	// from the additional Lock/Unlock operations, but they prevent potential race conditions
	// when accessing the underlying fd (file descriptor) field, as reported by the Go race detector.
	UnsynchronizedFileAccess bool
	// contains filtered or unexported fields
}

FileSource is an implementation of UnmarshallingSampleSource that reads samples from one or more files. Various parameters control the behavior and performance of the FileSource.

func (*FileSource) Close

func (source *FileSource) Close()

Close implements the SampleSource interface. it stops all goroutines that are spawned for reading files and prints any errors to the logger. Calling it after the FileSource finished on its own will have no effect.

func (*FileSource) Start

func (source *FileSource) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleSource interface. It starts reading all configured files in sequence using background goroutines. Depending on the Robust flag of the receiving FileSource, the reading exits after the first error, or continues until all configured files have been opened.

func (*FileSource) String

func (source *FileSource) String() string

String implements the SampleSource interface.

type Header struct {
	// Fields defines the names of the metrics of samples belonging to this header.
	Fields []string
}

Header defines the structure of samples that belong to this header. When unmarshalling headers and sample, usually one header precedes a number of samples. Those samples are defined by the header.

func (*Header) BuildIndex

func (h *Header) BuildIndex() map[string]int

BuildIndex creates a dictionary of the header field names to their index in the header for optimize access to sample values.

func (*Header) Clone

func (h *Header) Clone(newFields []string) *Header

Clone creates a copy of the Header receiver, using a new string-array as the header fields.

func (*Header) Equals

func (h *Header) Equals(other *Header) bool

Equals compares the receiving header with the argument header and returns true, if the two represent the same header. This method tried to optimize the comparison by first comparing the header pointers and the length of the Fields slices, and pointers to the arrays backing the Fields slices. If all the checks fail, the last resort is to compare all the fields string-by-string.

func (*Header) String

func (h *Header) String() string

String returns a human-readable string-representation of the header, including all meta-data and field names.

type HeaderChecker

type HeaderChecker struct {
	LastHeader *Header
}

HeaderChecker is a helper type for implementations of SampleSink to find out, when the incoming header changes.

func (*HeaderChecker) HeaderChanged

func (h *HeaderChecker) HeaderChanged(newHeader *Header) bool

HeaderChanged returns true, if the newHeader parameter represents a different header from the last time HeaderChanged was called. The result will also be true for the first time this method is called.

func (*HeaderChecker) InitializedHeaderChanged

func (h *HeaderChecker) InitializedHeaderChanged(newHeader *Header) bool

InitializedHeaderChanged returns true, if the newHeader parameter represents a different header from the last time HeaderChanged was called. The first call to this method will return false, so this can be used in situations where the header has to be initialized.

type HttpServerSink

type HttpServerSink struct {
	AbstractTcpSink

	// Endpoint defines the TCP host and port to listen on for incoming TCP connections.
	// The host can be empty (e.g. ":1234"). If not, it must contain a hostname or IP of the
	// local host.
	Endpoint string

	// If BufferedSamples is >0, the given number of latest samples will be kept in a ring buffer.
	// New requests will first receive all samples currently in the buffer, and will
	// afterwards continue receiving live incoming samples.
	BufferedSamples uint

	// SubPathTag can be set to allow requesting samples on HTTP path /<val>, which will only output that
	// contain the tag <SubPathTag>=<val>. The root path '/' still serves all samples.
	SubPathTag string

	// RootPathPrefix is the base path for requests. A '/' will be appended.
	RootPathPrefix string
	// contains filtered or unexported fields
}

HttpServerSink implements the SampleSink interface as an HTTP server. It listens for incoming HTTP connections on a port and provides incoming data on certain HTTP request paths.

func (*HttpServerSink) Close

func (sink *HttpServerSink) Close()

Close implements the SampleSink interface. It closes any existing connection and shuts down the HTTP server.

func (*HttpServerSink) Sample

func (sink *HttpServerSink) Sample(sample *Sample, header *Header) error

Sample implements the SampleSink interface. It stores the sample in a ring buffer and sends it to all established connections. New connections will first receive all samples stored in the buffer, before getting the live samples directly. If the buffer is disable or full, and there are no established connections, samples are dropped.

func (*HttpServerSink) Start

func (sink *HttpServerSink) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleSink interface. It creates the TCP socket and starts listening on it in a separate goroutine. Any incoming connection is then handled in their own goroutine.

func (*HttpServerSink) String

func (sink *HttpServerSink) String() string

String implements the SampleSink interface.

type IndentPrinter

type IndentPrinter struct {
	OuterIndent         string
	InnerIndent         string
	FillerIndent        string
	CornerIndent        string
	MarkEmptyContainers bool
}

func (IndentPrinter) Print

func (p IndentPrinter) Print(obj fmt.Stringer) string

func (IndentPrinter) PrintLines

func (p IndentPrinter) PrintLines(obj fmt.Stringer) []string

type KeyValuePair

type KeyValuePair struct {
	Key   string
	Value string
}

KeyValuePair represents a key-value string pair

type Marshaller

type Marshaller interface {
	String() string
	WriteHeader(header *Header, withTags bool, output io.Writer) error
	WriteSample(sample *Sample, header *Header, withTags bool, output io.Writer) error
	// Some marshallers might be supposed to send a single sample per request
	ShouldCloseAfterFirstSample() bool
}

Marshaller is an interface for converting Samples and Headers into byte streams. The byte streams can be anything including files, network connections, console output, or in-memory byte buffers.

type MarshallingFormat

type MarshallingFormat string

type MarshallingSampleOutput

type MarshallingSampleOutput interface {
	SampleProcessor

	// SetMarshaller must configure a valid instance of Marshaller before Start() is called.
	// All received samples will be converted to a byte stream using the configured marshaller.
	SetMarshaller(marshaller Marshaller)
}

MarshallingSampleOutput is a SampleProcessor that outputs the received samples to a byte stream that is generated by a Marshaller instance.

type MergeableProcessor

type MergeableProcessor interface {
	SampleProcessor
	MergeProcessor(other SampleProcessor) bool
}

MergeableProcessor is an extension of SampleProcessor, that also allows merging two processor instances of the same time into one. Merging is only allowed when the result of the merge would has exactly the same functionality as using the two separate instances. This can be used as an optional optimization.

type NoopProcessor

type NoopProcessor struct {
	AbstractSampleProcessor
	StopChan golib.StopChan
}

NoopProcessor is an empty implementation of SampleProcessor. It can be directly added to a SamplePipeline and will behave as a no-op processing step. Other implementations of SampleProcessor can embed this and override parts of the methods as required. No initialization is needed for this type, but an instance can only be used once, in one pipeline.

func (*NoopProcessor) Close

func (p *NoopProcessor) Close()

Close implements the SampleProcessor interface by closing the outgoing sink and internal golib.StopChan. Other types that embed NoopProcessor can override this to perform specific actions when closing, but CloseSink() should always be called in the end.

func (*NoopProcessor) CloseSink

func (p *NoopProcessor) CloseSink()

CloseSink reports that this NoopProcessor is finished processing. All goroutines must be stopped, and all Headers and Samples must be already forwarded to the outgoing sink, when this is called. CloseSink forwards the Close() invocation to the outgoing sink.

func (*NoopProcessor) Error

func (p *NoopProcessor) Error(err error)

Error reports that NoopProcessor has encountered an error and has stopped operation. After calling this, no more Headers and Samples can be forwarded to the outgoing sink. Ultimately, p.Close() will be called for cleaning up.

func (*NoopProcessor) Sample

func (p *NoopProcessor) Sample(sample *Sample, header *Header) error

Sample implements the SampleProcessor interface. It forwards the sample to the subsequent processor.

func (*NoopProcessor) Start

func (p *NoopProcessor) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleProcessor interface. It creates an error-channel with a small channel buffer. Calling CloseSink() or Error() writes a value to that channel to signalize that this NoopProcessor is finished.

func (*NoopProcessor) String

func (p *NoopProcessor) String() string

String implements the SampleProcessor interface. This should be overridden by types that are embedding NoopProcessor.

type ParallelSampleHandler

type ParallelSampleHandler struct {
	// BufferedSamples is the number of Samples that are buffered between the
	// marshall/unmarshall routines and the routine that writes/reads the input
	// or output streams.
	// The purpose of the buffer is, for example, to allow the routine reading a file
	// to read the data for multiple Samples in one read operation, which then
	// allows the parallel parsing routines to parse all the read Samples at the same time.
	// Setting BufferedSamples is a trade-off between memory consumption and
	// parallelism, but most of the time a value of around 1000 or so should be enough.
	// If this value is not set, no parallelism will be possible because
	// the channel between the cooperating routines will block on each operation.
	BufferedSamples int

	// ParallelParsers can be set to the number of goroutines that will be
	// used when marshalling or unmarshalling samples. These routines can
	// parallelize the parsing and marshalling operations. The most benefit
	// from the parallelism comes when reading samples from e.g. files, because
	// reading the file into memory can be decoupled from parsing Samples,
	// and multiple Samples can be parsed at the same time.
	//
	// This must be set to a value greater than zero, otherwise no goroutines
	// will be started.
	ParallelParsers int
}

ParallelSampleHandler is a configuration type that is included in SampleReader and SampleWriter. Both the reader and writer can marshall and unmarshall Samples in parallel, and these routines are controlled through the two parameters in ParallelSampleHandler.

type ProcessorTaskWrapper

type ProcessorTaskWrapper struct {
	SampleProcessor
}

ProcessorTaskWrapper can be used to convert an instance of SampleProcessor to a golib.Task. The Stop() method of the resulting Task is ignored.

func (*ProcessorTaskWrapper) Stop

func (t *ProcessorTaskWrapper) Stop()

Stop implements the golib.Task interface. Calls to this Stop() method are ignored, because SampleProcessor instances should be shutdown through the Close() method.

type ReadSampleHandler

type ReadSampleHandler interface {
	// HandleSample allows modifying received Samples. It can be used to modify
	// the tags of the Sample based on the source string. The source string depends on the
	// SampleSource that is using the SampleReader that contains this ReadSampleHandler.
	// In general it represents the data source of the sample. For FileSource this will be
	// the file name, for TCPSource it will be the remote TCP endpoint sending the data.
	// It might also be useful to change the values or the timestamp of the Sample here,
	// but that should rather be done in a later processing step.
	HandleSample(sample *Sample, source string)
}

ReadSampleHandler defines a hook for modifying unmarshalled Samples.

type ReaderSource

type ReaderSource struct {
	AbstractUnmarshallingSampleSource
	Input       io.ReadCloser
	Description string
	// contains filtered or unexported fields
}

ReaderSource implements the SampleSource interface by reading Headers and Samples from an arbitrary io.ReadCloser instance. An instance of SampleReader is used to read the data in parallel.

func NewConsoleSource

func NewConsoleSource() *ReaderSource

NewConsoleSource creates a SampleSource that reads from the standard input.

func (*ReaderSource) Close

func (source *ReaderSource) Close()

Close implements the SampleSource interface. It stops the underlying stream and prints any errors to the logger.

func (*ReaderSource) Start

func (source *ReaderSource) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleSource interface by starting a SampleInputStream instance that reads from the given io.ReadCloser.

func (*ReaderSource) String

func (source *ReaderSource) String() string

String implements the SampleSource interface.

type ResizingBatchProcessingStep

type ResizingBatchProcessingStep interface {
	BatchProcessingStep
	OutputSampleSize(sampleSize int) int
}

type ResizingSampleProcessor

type ResizingSampleProcessor interface {
	SampleProcessor
	OutputSampleSize(sampleSize int) int
}

ResizingSampleProcessor is a helper interface that can be implemented by SampleProcessors in order to make RequiredValues() more reliable. The result of the OutputSampleSize() method should give a worst-case estimation of the number of values that will be present in Samples after this SampleProcessor is done processing a sample. This allows the optimization of pre-allocating a value array large enough to hold the final amount of metrics. The optimization works best when all samples are processed in a one-to-one fashion, i.e. no samples are split into multiple samples.

type Sample

type Sample struct {
	Values []Value
	Time   time.Time
	// contains filtered or unexported fields
}

Sample contains an array of Values, a timestamp, and a string-to-string map of tags. The values are explained by the header belonging to this sample. There is no direct pointer from the sample to the header, so the header must be known from the context. In other words, the Sample should always be passed along with the header it belongs to. Without the header, the meaning of the Value array is not defined. The Values slice and the timestamp an be accessed and modified directly, but the tags map should only be manipulated through methods to ensure concurrency safe map operations. The Values and Tags should be modified by only one goroutine at a time.

func (*Sample) AddTagsFrom

func (sample *Sample) AddTagsFrom(other *Sample)

CopyTagsFrom adds the tags of the parameter sample to the set of tags already present in the receiving sample. Colliding tags are overwritten, but other existing tags are not deleted.

func (*Sample) Clone

func (sample *Sample) Clone() *Sample

Clone returns a copy of the receiving sample. The metadata (timestamp and tags) is copied deeply, but values are referencing the old values. After using this, the old Sample should either not be used anymore, or the Values slice in the new Sample should be replaced by a new slice.

func (*Sample) CopyMetadataFrom

func (sample *Sample) CopyMetadataFrom(other *Sample)

CopyMetadataFrom copies the timestamp and tags from the argument Sample into the receiving Sample. All previous tags in the receiving Sample are discarded.

func (*Sample) DeepClone

func (sample *Sample) DeepClone() *Sample

DeepClone returns a deep copy of the receiving sample, including the timestamp, tags and actual metric values.

func (*Sample) DeleteTag

func (sample *Sample) DeleteTag(name string)

DeleteTag deleted the given tag from the receiving Sample.

func (*Sample) HasTag

func (sample *Sample) HasTag(name string) (ok bool)

HasTag returns true if the receiving Sample includes a tag with the given name.

func (*Sample) Metadata

func (sample *Sample) Metadata() *SampleMetadata

Metadata returns an instance of SampleMetadata containing the tags and timestamp of the receiving Sample.

func (*Sample) NumTags

func (sample *Sample) NumTags() (l int)

NumTags returns the number of tags defined in the receiving Sample.

func (*Sample) ParseTagString

func (sample *Sample) ParseTagString(tags string)

ParseTagString parses a string in the format produced by TagString(). The resulting tags and tag values directly replace the tags inside the receiving Sample. Old tags are discarded.

The parsing is resilient to wrongly formatted inputs. Empty tag values are allowed, additional whitespace separators are ignored.

This method is used on freshly created Samples by CsvMarshaller and BinaryMarshaller when unmarshalling Samples from the respective format.

func (*Sample) Resize

func (sample *Sample) Resize(newSize int) bool

Resize ensures that the Values slice of the sample has the given length. If possible, the current Values slice will be reused (shrinking or growing within the limits if its capacity). Otherwise, a new slice will be allocated, without copying any values. The result value will be true, if the current slice was reused, and false if a new slice was allocated.

func (*Sample) SetTag

func (sample *Sample) SetTag(name, value string)

SetTag sets the tag of the given name to the given value in the receiving Sample.

func (*Sample) SortedTags

func (sample *Sample) SortedTags() (res []KeyValuePair)

SortedTags returns a slice of key-value tag pairs, sorted by key

func (*Sample) Tag

func (sample *Sample) Tag(name string) (value string)

Tag returns the value of the given tag inside the receiving Sample. If the tag is not defined in the sample, an empty string is returned. HasTag can be used to find out if a tag is defined or not.

func (*Sample) TagMap

func (sample *Sample) TagMap() (res map[string]string)

TagMap returns a copy of the tags stored in the receiving sample.

func (*Sample) TagString

func (sample *Sample) TagString() (res string)

TagString returns a string representation of all the tags and tag values in the receiving Sample. This representation is used for marshalling by the CsvMarshaller and BinaryMarshaller. The format is a space-separated string of key-value pairs separated by '=' characters.

Example:

tag1=value1 tag2=value2

type SampleAndHeader

type SampleAndHeader struct {
	*Sample
	*Header
}

SampleAndHeader is a convenience type combining pointers to a Sample and a Header.

func (*SampleAndHeader) AddField

func (s *SampleAndHeader) AddField(name string, value Value) *SampleAndHeader

type SampleHeaderIndex

type SampleHeaderIndex struct {
	// contains filtered or unexported fields
}

SampleHeaderIndex builds a `map[string]int` index of the fields of a Header instance. The index is cached and only updated when the header changes. This allows efficient access to specific header fields.

func (*SampleHeaderIndex) Get

func (index *SampleHeaderIndex) Get(sample *Sample, header *Header, field string) (Value, bool)

func (*SampleHeaderIndex) GetSingle

func (index *SampleHeaderIndex) GetSingle(sample *Sample, field string) (Value, bool)

func (*SampleHeaderIndex) Update

func (index *SampleHeaderIndex) Update(header *Header)

type SampleInputStream

type SampleInputStream struct {
	// contains filtered or unexported fields
}

SampleInputStream represents one input stream of Headers and Samples that reads and parses data from one io.ReadCloser instance. A SampleInputStream can be created using SampleReader.Open or .OpenBuffered. The stream then has to be started using one of the Read* methods. The Read* method will block until the stream is finished. Reading and parsing the Samples will be done in parallel goroutines. The Read* methods behave differently in terms of printing errors. The stream can be closed forcefully using the Close method.

func (*SampleInputStream) Close

func (stream *SampleInputStream) Close() error

Close closes the receiving SampleInputStream. Close should be called even if the Read* method, that started the stream, returns an error. Close() might return the same error as the Read* method.

func (*SampleInputStream) Format

func (stream *SampleInputStream) Format() string

Format returns a string description of the unmarshalling format used by the receiving SampleInputStream. It returns "auto-detected", if no Unmarshaller is configured, and if the unmarshalling format was not yet detected automatically. After the unmarshalling format is detected, Format will return the correct format description.

func (*SampleInputStream) ReadNamedSamples

func (stream *SampleInputStream) ReadNamedSamples(sourceName string) (err error)

ReadNamedSamples calls ReadSamples with the given source string, and prints some additional logging information. It is a convenience function for different implementations of SampleSource.

func (*SampleInputStream) ReadSamples

func (stream *SampleInputStream) ReadSamples(source string) (int, error)

ReadSamples starts the receiving input stream and blocks until the stream is finished or closed by Close(). It returns the number of successfully received samples and a non-nil error, if any occurred while reading or parsing. The source string parameter will be forwarded to the ReadSampleHandler, if one is set in the SampleReader that created this SampleInputStream. The source string will be used for the HandleSample() method.

func (*SampleInputStream) ReadTcpSamples

func (stream *SampleInputStream) ReadTcpSamples(conn io.ReadCloser, remote string, checkClosed func() bool)

ReadTcpSamples reads Samples from the given net.TCPConn and blocks until the connection is closed by the remote host, or Close() is called on the input stream. Any error is logged instead of being returned. The checkClosed() function parameter is used when a read error occurs: if it returns true, ReadTcpSamples assumes that the connection was closed by the local host, because of a call to Close() or some other external reason. If checkClosed() returns false, it is assumed that a network error or timeout caused the connection to be closed.

type SampleMetadata

type SampleMetadata struct {
	Time time.Time
	// contains filtered or unexported fields
}

SampleMetadata is a helper type containing the timestamp and the tags of a Sample. It can be used to store samples in cases where the actual Values of the Sample are not relevant, or stored in a different location. The timestamp can be accessed directly, but the tags are hidden to ensure consistency. If a SampleMetadata instance is required with specific tags, a Sample with those tags should be created, and Metadata() should be called on that Sample to create the desired SampleMetadata instance.

func (*SampleMetadata) NewSample

func (meta *SampleMetadata) NewSample(values []Value) *Sample

NewSample returns a new Sample instances containing the tags and timestamp defined in the receiving SampleMetadata instance and the Values given as argument. The metadata is copied deeply, so the resulting Sample can be modified independently of the receiving SampleMetadata instance.

type SampleOutputStream

type SampleOutputStream struct {
	// contains filtered or unexported fields
}

SampleOutputStream represents one open output stream that marshals and writes Headers and Samples in parallel. It is created by using SampleWriter.Open or SampleWriter.OpenBuffered. The Sample() method can be used to output data on this stream, and the Close() method must be called when no more Samples are expected. No more data can be written after calling Close().

func (*SampleOutputStream) Close

func (stream *SampleOutputStream) Close() error

Close closes the receiving SampleOutputStream. After calling this, neither Sample nor Header can be called anymore! The returned error is the first error that ever occurred in any of the Sample/Header/Close calls on this stream.

func (*SampleOutputStream) Sample

func (stream *SampleOutputStream) Sample(sample *Sample, header *Header) error

Sample marshals the given Sample and writes the resulting byte buffer into the writer behind the stream receiver. If a non-nil error is returned here, the stream should not be used any further, but still must be closed externally.

type SamplePipeline

type SamplePipeline struct {
	Source     SampleSource
	Processors []SampleProcessor
	// contains filtered or unexported fields
}

SamplePipeline reads data from a source and pipes it through zero or more SampleProcessor instances. The job of the SamplePipeline is to connect all the processing steps in the Construct method. After calling Construct, the SamplePipeline should not used any further.

func (*SamplePipeline) Add

func (p *SamplePipeline) Add(processor SampleProcessor) *SamplePipeline

Add adds the SampleProcessor parameter to the list of SampleProcessors in the receiving SamplePipeline. The Source field must be accessed directly. The Processors field can also be accessed directly, but the Add method allows chaining multiple Add invocations like so:

pipeline.Add(processor1).Add(processor2)

func (*SamplePipeline) Batch

func (p *SamplePipeline) Batch(steps ...BatchProcessingStep) *SamplePipeline

func (*SamplePipeline) Construct

func (p *SamplePipeline) Construct(tasks *golib.TaskGroup)

Construct connects the SampleSource and all SampleProcessors. It adds small wrapping golib.StoppableTask instances to the given golib.TaskGroup. Afterwards, tasks.WaitAndStop() can be called to start the entire pipeline. If the Source field is missing, it will be replaced with a new EmptySampleSource instance. nil values in the Processors field will be ignored. A new instance of DroppingSampleProcessor is added to the list of Processors to ensure that every step has a valid subsequent step.

Additionally, all SampleProcessor instances will be wrapped in small wrapper objects that ensure that the samples and headers forwarded between the processors are consistent.

func (*SamplePipeline) ContainedStringers

func (p *SamplePipeline) ContainedStringers() []fmt.Stringer

func (*SamplePipeline) FormatLines

func (p *SamplePipeline) FormatLines() []string

func (*SamplePipeline) StartAndWait

func (p *SamplePipeline) StartAndWait(extraTasks ...golib.Task) int

StartAndWait constructs the pipeline and starts it. It blocks until the pipeline is finished. The Sink and Source fields must be set to non-nil values, for example using Configure* methods or setting the fields directly.

The sequence of operations to start a SamplePipeline should roughly follow the following example:

// ... Define additional flags using the "flag" package (Optional)
var p sample.SamplePipeline
var f EndpointFactory
f.RegisterFlags()
flag.Parse()
// ... Modify f.Flag* values (Optional)
defer golib.ProfileCpu()() // (Optional)
// ... Set p.Processors (Optional, e.g. using f.CreateSink())
// ... Set p.Source using f.CreateSource()
os.Exit(p.StartAndWait()) // os.Exit() should be called in an outer method if 'defer' is used here

An additional golib.Task is started along with the pipeline, which listens for the Ctrl-C user external interrupt and makes the pipeline stoppable cleanly by the user.

StartAndWait returns the number of errors that occurred in the pipeline.

func (*SamplePipeline) String

func (p *SamplePipeline) String() string

type SampleProcessor

type SampleProcessor interface {
	SampleSource
	SampleSink
}

SampleProcessor is the basic interface to receive and process samples. It receives Samples through the Sample method and sends samples to the subsequent SampleProcessor configured over SetSink. The forwarded Samples can be the same as received, completely new generated samples, and also a different number of Samples from the incoming ones. The Header can also be changed, but then the SampleProcessor implementation must take care to adjust the outgoing Samples accordingly. All required goroutines must be started in Start() and stopped when Close() is called. When Start() is called, it can be assumed that SetSink() has already been called to configure a non-nil subsequent SampleProcessor. As a special case, some SampleProcessor implementations output samples to external sinks like files or network connections. In this case, the incoming samples should usually be forwarded to the subsequent SampleProcessor without changes.

type SampleReader

type SampleReader struct {
	ParallelSampleHandler

	// Handler is an optional hook for modifying Headers and Samples that were
	// read by this SampleReader. The hook method receives a string-representation of
	// the data source and can use it to modify tags in the Samples.
	Handler ReadSampleHandler

	// Unmarshaller will be used when reading and parsing Headers and Samples.
	// If this field is nil when creating an input stream, the SampleInputStream will try
	// to automatically determine the format of the incoming data and create
	// a fitting Unmarshaller instance accordingly.
	Unmarshaller Unmarshaller
}

SampleReader is used to read Headers and Samples from an io.Reader, parallelizing the reading and parsing procedures. The parallelization must be configured through the ParallelSampleHandler parameters before starting this SampleReader.

func (*SampleReader) Format

func (r *SampleReader) Format() string

Format returns a string description of the unmarshalling format used by the receiving SampleReader. It returns "auto-detected", if no Unmarshaller is configured.

func (*SampleReader) Open

func (r *SampleReader) Open(input io.ReadCloser, sink SampleSink) *SampleInputStream

Open creates an input stream reading from the given io.ReadCloser and writing the received Headers and Samples to the given SampleSink. Although no buffer size is given, the stream will actually have a small input buffer to enable automatically detecting the format of incoming data, if no Unmarshaller was configured in the receiving SampleReader.

func (*SampleReader) OpenBuffered

func (r *SampleReader) OpenBuffered(input io.ReadCloser, sink SampleSink, bufSize int) *SampleInputStream

OpenBuffered creates an input stream with a given buffer size. The buffer size should be at least MinimumInputIoBuffer bytes to support automatically discovering the input stream format. See Open() for more details.

type SampleRing

type SampleRing struct {
	// contains filtered or unexported fields
}

SampleRing is a one-way circular queue of Sample instances. There is no dequeue operation. The stored samples can be copied into a correctly ordered slice.

func NewSampleRing

func NewSampleRing(capacity int) *SampleRing

func (*SampleRing) Get

func (r *SampleRing) Get() []*SampleAndHeader

func (*SampleRing) IsFull

func (r *SampleRing) IsFull() bool

func (*SampleRing) Len

func (r *SampleRing) Len() int

func (*SampleRing) Push

func (r *SampleRing) Push(sample *Sample, header *Header) *SampleRing

func (*SampleRing) PushSampleAndHeader

func (r *SampleRing) PushSampleAndHeader(sample *SampleAndHeader) *SampleRing

type SampleSink

type SampleSink interface {
	Sample(sample *Sample, header *Header) error
}

A SampleSink receives samples and headers to do arbitrary operations on them. The usual interface for this is SampleProcessor, but sometimes this simpler interface is useful.

type SampleSource

type SampleSource interface {
	golib.Startable
	String() string
	SetSink(sink SampleProcessor)
	GetSink() SampleProcessor
	Close()
}

SampleSource is the interface used for producing Headers and Samples. It should start producing samples in a separate goroutine when Start() is called, and should stop all goroutines when Close() is called. Before Start() is called, SetSink() must be called to inform the SampleSource about the SampleSink it should output the Headers/Samples into. After all samples have been generated (for example because the data source is finished, like a file, or because Close() has been called) the Close() method must be called on the outgoing SampleProcessor. After calling Close(), no more headers or samples are allowed to go into the SampleProcessor. See the golib.Task interface for info about the Start() method.

type SampleWriter

type SampleWriter struct {
	ParallelSampleHandler
}

SampleWriter implements parallel writing of Headers and Samples to an instance of io.WriteCloser. The WriteCloser can be anything like a file or a network connection. The parallel writing must be configured before using the SampleWriter. See ParallelSampleHandler for the configuration variables.

SampleWriter instances are mainly used by implementations of SampleOutput that write to output streams, like FileSink or TCPSink.

func (*SampleWriter) Open

func (w *SampleWriter) Open(writer io.WriteCloser, marshaller Marshaller) *SampleOutputStream

Open returns an output stream that sends the marshalled samples directly to the given writer. Marshalling and writing is done in separate routines, as configured in the SampleWriter configuration parameters.

func (*SampleWriter) OpenBuffered

func (w *SampleWriter) OpenBuffered(writer io.WriteCloser, marshaller Marshaller, io_buffer int) *SampleOutputStream

OpenBuffered returns a buffered output stream with a buffer of the size io_buffer. Samples coming into that stream are marshalled using marshaller and finally written the given writer.

type SimpleBatchProcessingStep

type SimpleBatchProcessingStep struct {
	Description          string
	Process              func(header *Header, samples []*Sample) (*Header, []*Sample, error)
	OutputSampleSizeFunc func(sampleSize int) int
}

func (*SimpleBatchProcessingStep) OutputSampleSize

func (s *SimpleBatchProcessingStep) OutputSampleSize(sampleSize int) int

func (*SimpleBatchProcessingStep) ProcessBatch

func (s *SimpleBatchProcessingStep) ProcessBatch(header *Header, samples []*Sample) (*Header, []*Sample, error)

func (*SimpleBatchProcessingStep) String

func (s *SimpleBatchProcessingStep) String() string

type SimpleProcessor

type SimpleProcessor struct {
	NoopProcessor
	Description          string
	Process              func(sample *Sample, header *Header) (*Sample, *Header, error)
	OnClose              func()
	OutputSampleSizeFunc func(sampleSize int) int
}

func (*SimpleProcessor) Close

func (p *SimpleProcessor) Close()

func (*SimpleProcessor) OutputSampleSize

func (p *SimpleProcessor) OutputSampleSize(sampleSize int) int

func (*SimpleProcessor) Sample

func (p *SimpleProcessor) Sample(sample *Sample, header *Header) error

func (*SimpleProcessor) String

func (p *SimpleProcessor) String() string

type SortedStringPairs

type SortedStringPairs struct {
	Keys   []string
	Values []string
}

func (*SortedStringPairs) FillFromMap

func (s *SortedStringPairs) FillFromMap(values map[string]string)

func (*SortedStringPairs) Len

func (s *SortedStringPairs) Len() int

func (*SortedStringPairs) Less

func (s *SortedStringPairs) Less(i, j int) bool

func (*SortedStringPairs) String

func (s *SortedStringPairs) String() string

func (*SortedStringPairs) Swap

func (s *SortedStringPairs) Swap(i, j int)

type SortedStringers

type SortedStringers []fmt.Stringer

func (SortedStringers) Len

func (t SortedStringers) Len() int

func (SortedStringers) Less

func (t SortedStringers) Less(a, b int) bool

func (SortedStringers) Swap

func (t SortedStringers) Swap(a, b int)

type SourceTaskWrapper

type SourceTaskWrapper struct {
	SampleSource
}

SourceTaskWrapper can be used to convert an instance of SampleSource to a golib.Task. Calls to the Stop() method are mapped to the Close() method of the underlying SampleSource.

func (*SourceTaskWrapper) Stop

func (t *SourceTaskWrapper) Stop()

type String

type String string

String is a trivial implementation of the fmt.Stringer interface

func (String) String

func (s String) String() string

type StringerContainer

type StringerContainer interface {
	ContainedStringers() []fmt.Stringer
}

type SynchronizedReadCloser

type SynchronizedReadCloser struct {
	ReadCloser io.ReadCloser
	// contains filtered or unexported fields
}

SynchronizedReadCloser is a helper type to wrap *os.File and synchronize calls to Read() and Close(). This prevents race condition warnings from the Go race detector due to parallel access to the fd field of the internal os.file type. The performance overhead is not measurable, but this can be deactivated by setting the UnsynchronizedFileAccess flag in FileSource.

func (*SynchronizedReadCloser) Close

func (s *SynchronizedReadCloser) Close() error

func (*SynchronizedReadCloser) Read

func (s *SynchronizedReadCloser) Read(b []byte) (int, error)

type SynchronizingSampleSink

type SynchronizingSampleSink struct {
	Out SampleSink
	// contains filtered or unexported fields
}

SynchronizingSampleSink is a SampleSink implementation that allows multiple goroutines to write data to the same sink and synchronizes these writes through a mutex.

func (*SynchronizingSampleSink) Sample

func (s *SynchronizingSampleSink) Sample(sample *Sample, header *Header) error

Sample implements the SampleSink interface.

type TCPConnCounter

type TCPConnCounter struct {
	// TcpConnLimit defines a limit for the number of TCP connections that should be accepted
	// or initiated. When this is <= 0, the number of not limited.
	TcpConnLimit uint
	// contains filtered or unexported fields
}

TCPConnCounter contains the TcpConnLimit configuration parameter that optionally defines a limit for the number of TCP connection that are accepted or initiated by the SampleSink and SampleSource implementations using TCP connections.

type TCPListenerSink

type TCPListenerSink struct {
	// AbstractTcpSink defines parameters for controlling TCP and marshalling
	// aspects of the TCPListenerSink. See AbstractTcpSink for details.
	AbstractTcpSink

	// Endpoint defines the TCP host and port to listen on for incoming TCP connections.
	// The host can be empty (e.g. ":1234"). If not, it must contain a hostname or IP of the
	// local host.
	Endpoint string

	// If BufferedSamples is >0, the given number of samples will be kept in a ring buffer.
	// New incoming connections will first receive all samples currently in the buffer, and will
	// afterwards continue receiving live incoming samples.
	BufferedSamples uint
	// contains filtered or unexported fields
}

TCPListenerSink implements the SampleSink interface through a TCP server. It creates a socket listening on a local TCP endpoint and listens for incoming TCP connections. Once one or more connections are established, it forwards all incoming Headers and Samples to those connections. If a new header should be sent into a TCP connection, the old connection is instead closed and the TCPListenerSink waits for a new connection to be created.

func (*TCPListenerSink) Close

func (sink *TCPListenerSink) Close()

Close implements the SampleSink interface. It closes any existing connection and closes the TCP socket.

func (*TCPListenerSink) Sample

func (sink *TCPListenerSink) Sample(sample *Sample, header *Header) error

Sample implements the SampleSink interface. It stores the sample in a ring buffer and sends it to all established connections. New connections will first receive all samples stored in the buffer, before getting the live samples directly. If the buffer is disable or full, and there are no established connections, samples are dropped.

func (*TCPListenerSink) Start

func (sink *TCPListenerSink) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleSink interface. It creates the TCP socket and starts listening on it in a separate goroutine. Any incoming connection is then handled in their own goroutine.

func (*TCPListenerSink) String

func (sink *TCPListenerSink) String() string

String implements the SampleSink interface.

type TCPListenerSource

type TCPListenerSource struct {
	AbstractUnmarshallingSampleSource

	// TCPConnCounter has a configuration for limiting the total number of
	// accepted connections. After that number of connections were accepted, no
	// further connections are accepted. After they all are closed, the TCPListenerSource
	// automatically stops.
	TCPConnCounter

	// SimultaneousConnections can limit the number of TCP connections accepted
	// at the same time. Set to >0 to activate the limit. Connections going over
	// the limit will be immediately closed, and a warning will be printed on the logger.
	SimultaneousConnections uint
	// contains filtered or unexported fields
}

TCPListenerSource implements the SampleSource interface as a TCP server. It listens for incoming TCP connections on a port and reads Headers and Samples from every accepted connection. See the doc for the different fields for options affecting the TCP connections and aspects of reading and parsing.

func NewTcpListenerSource

func NewTcpListenerSource(endpoint string) *TCPListenerSource

NewTcpListenerSource creates a new instance of TCPListenerSource listening on the given TCP endpoint. It must be a IP/hostname combined with a port that can be bound on the local machine.

func (*TCPListenerSource) Close

func (source *TCPListenerSource) Close()

Stop implements the SampleSource interface. It closes all active TCP connections and closes the listening socket.

func (*TCPListenerSource) Start

func (source *TCPListenerSource) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleSource interface. It creates a socket listening for incoming connections on the configured endpoint. New connections are handled in separate goroutines.

func (*TCPListenerSource) String

func (source *TCPListenerSource) String() string

String implements the SampleSource interface.

type TCPSink

type TCPSink struct {
	// AbstractTcpSink contains different configuration options regarding the
	// marshalling and writing of data to the remote TCP connection.
	AbstractTcpSink

	// Endpoint is the target TCP endpoint to connect to for sending marshalled data.
	Endpoint string

	// DialTimeout can be set to time out automatically when connecting to a remote TCP endpoint
	DialTimeout time.Duration
	// contains filtered or unexported fields
}

TCPSink implements SampleSink by sending the received Headers and Samples to a given remote TCP endpoint. Every time it receives a Header or a Sample, it checks whether a TCP connection is already established. If so, it sends the data on the existing connection. Otherwise, it tries to connect to the configured endpoint and sends the data there, if the connection is successful.

func (*TCPSink) Close

func (sink *TCPSink) Close()

Close implements the SampleSink interface. It stops the current TCP connection, if one is running, and prevents future connections from being created. No more data can be sent into the TCPSink after this.

func (*TCPSink) Sample

func (sink *TCPSink) Sample(sample *Sample, header *Header) error

Sample implements the SampleSink interface. If a connection is already established, the Sample is directly sent through it. Otherwise, a new connection is established, and the sample is sent there.

func (*TCPSink) Start

func (sink *TCPSink) Start(wg *sync.WaitGroup) (_ golib.StopChan)

Start implements the SampleSink interface. It creates a log message and prepares the TCPSink for sending data.

func (*TCPSink) String

func (sink *TCPSink) String() string

String implements the SampleSink interface.

type TCPSource

type TCPSource struct {
	AbstractUnmarshallingSampleSource
	TCPConnCounter

	// RemoteAddrs defines the list of remote TCP endpoints that the TCPSource will try to
	// connect to. If there are more than one connection, all connections will run in parallel.
	// In that case, an additional instance of SynchronizedSampleSink is used to synchronize all
	// received data. For multiple connections, all samples and headers will be pushed into the
	// outgoing SampleSink in an interleaved fashion, so the outgoing SampleSink must be able to handle that.
	RemoteAddrs []string

	// PrintErrors controls whether errors from establishing download TCP connections are logged or not.
	PrintErrors bool

	// RetryInterval defines the time to wait before trying to reconnect after a closed connection
	// or failed connection attempt.
	RetryInterval time.Duration

	// DialTimeout can be set to time out automatically when connecting to a remote TCP endpoint
	DialTimeout time.Duration

	// UseHTTP instructs this data source to use the HTTP protocol instead of TCP. In this case, the RemoteAddrs
	// strings are treated as HTTP URLs, but without the http:// prefix. This prefix is appended before attempting to
	// send an HTTP request.
	UseHTTP bool
	// contains filtered or unexported fields
}

TCPSource implements the SampleSource interface by connecting to a list of remote TCP endpoints and downloading Header and Sample data from there. A background goroutine continuously tries to establish the required TCP connections and reads data from it whenever a connection succeeds. The contained AbstractUnmarshallingSampleSource and TCPConnCounter fields provide various parameters for configuring different aspects of the TCP connections and reading of data from them.

func (*TCPSource) Close

func (source *TCPSource) Close()

Close implements the SampleSource interface. It stops all background goroutines and tries to gracefully close all established TCP connections.

func (*TCPSource) SourceString

func (source *TCPSource) SourceString() string

SourceString returns a string representation of the TCP endpoints the TCPSource will download data from.

func (*TCPSource) Start

func (source *TCPSource) Start(wg *sync.WaitGroup) golib.StopChan

Start implements the SampleSource interface. It starts one goroutine for every configured TCP endpoint. The goroutines continuously try to connect to the remote endpoints and download Headers and Samples as soon as a connection is established.

func (*TCPSource) String

func (source *TCPSource) String() string

String implements the SampleSource interface.

type TagTemplate

type TagTemplate struct {
	Template      string // Placeholders like ${xxx} will be replaced by tag values. Values matching ENV_* will be replaced by the environment variable.
	MissingValue  string // Replacement for missing values
	IgnoreEnvVars bool   // Set to true to not treat ENV_ replacement templates specially
}

func (TagTemplate) Resolve

func (t TagTemplate) Resolve(sample *Sample) string

type TcpWriteConn

type TcpWriteConn struct {
	// contains filtered or unexported fields
}

TcpWriteConn is a helper type for TCP-base SampleSink implementations. It can send Headers and Samples over an opened TCP connection. It is created from AbstractTcpSink.OpenWriteConn() and can be used until Sample() returns an error or Close() is called explicitly.

func (*TcpWriteConn) Close

func (conn *TcpWriteConn) Close()

Close explicitly closes the underlying TCP connection of the receiving TcpWriteConn.

func (*TcpWriteConn) IsRunning

func (conn *TcpWriteConn) IsRunning() bool

IsRunning returns true, if the receiving TcpWriteConn is connected to a remote TCP endpoint.

func (*TcpWriteConn) Sample

func (conn *TcpWriteConn) Sample(sample *Sample, header *Header)

Sample writes the given sample into the receiving TcpWriteConn and closes the underlying TCP connection if there is an error.

type TextMarshaller

type TextMarshaller struct {
	// TextWidths sets the width of the header line and value table.
	// If Columns > 0, this value is ignored as the width is determined by the
	// number of columns. If this is 0, the width will be determined automatically:
	// If the output is a TTY (or if AssumeStdout is true), the width of the terminal
	// will be used. If it cannot be obtained, golib.GetTerminalSize() will return
	// a default value.
	TextWidth int

	// Columns can be set to > 0 to override TextWidth and set a fixed number of
	// columns in the table. Otherwise it will be computed automatically based
	// on TextWidth.
	Columns int

	// Set additional spacing between the columns of the output table. If <= 0, the
	// default value TextMarshallerDefaultSpacing will be used.
	Spacing int

	// If true, assume the output is a TTY and try to obtain the TextWidth from
	// the operating system.
	AssumeStdout bool
}

TextMarshaller marshals Headers and Samples to a human readable test format. It is mainly intended for easily readable output on the console. Headers are not printed separately. Every Sample is preceded by a header line containing the timestamp and tags. Afterwards, all values are printed in a aligned table in a key = value format. The width of the header line, the number of columns in the table, and the spacing between the columns in the table can be configured.

func (TextMarshaller) ShouldCloseAfterFirstSample

func (TextMarshaller) ShouldCloseAfterFirstSample() bool

ShouldCloseAfterFirstSample defines that text streams can stream without closing

func (TextMarshaller) String

func (TextMarshaller) String() string

String implements the Marshaller interface.

func (TextMarshaller) WriteHeader

func (TextMarshaller) WriteHeader(header *Header, withTags bool, output io.Writer) error

WriteHeader implements the Marshaller interface. It is empty, because TextMarshaller prints a separate header for each Sample.

func (TextMarshaller) WriteSample

func (m TextMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error

WriteSample implements the Marshaller interface. See the TextMarshaller godoc for information about the format.

type TitledSamplePipeline

type TitledSamplePipeline struct {
	*SamplePipeline
	Title string
}

func (*TitledSamplePipeline) String

func (t *TitledSamplePipeline) String() string

type UnmarshalledHeader

type UnmarshalledHeader struct {
	Header
	HasTags bool
}

UnmarshalledHeader extends a Header by adding a flag that indicated whether the unmarshalled samples will contain tags or not. This enables backwards-compatibility for data input without tags.

type Unmarshaller

type Unmarshaller interface {
	// String returns a short description of the Unmarshaller.
	String() string

	// Read must inspect the data in the stream and perform exactly one of two tasks:
	// read a header, or read a sample.
	// The Unmarshaller must be able to distinguish between a header and a sample based
	// on the first bytes received from the stream. If the previousHeader parameter is nil,
	// the Unmarshaller must attempt to receive a header, regardless of the stream contents.
	//
	// If a header is read, it is also parsed and a Header instance is allocated.
	// A pointer to the new header is returned, the sampleData byte-slice must be returned as nil.
	//
	// If sample data is read, Read must read data from the stream, until a full Sample has been read.
	// The sample data is not parsed, the ParseSample() method will be invoked separately.
	// The size of the Sample should be known based on the previousHeader parameter.
	// If sample data is read, is must be returned as the sampleData return value, and the Header pointer
	// must be returned as nil.
	//
	// Error handling:
	// The io.EOF error can be returned in two cases: 1) the read operation was successful and complete,
	// but the stream ended immediately afterwards, or 2) the stream was already empty. In the second
	// case, both other return values must be nil.
	// If io.EOF occurs in the middle of reading the stream, it must be converted to io.ErrUnexpectedEOF
	// to indicate an actual error condition.
	Read(input *bufio.Reader, previousHeader *UnmarshalledHeader) (newHeader *UnmarshalledHeader, sampleData []byte, err error)

	// ParseSample uses a header and a byte buffer to parse it to a newly
	// allocated Sample instance. The resulting Sample must have a Value slice with at least the capacity
	// of minValueCapacity. A non-nil error indicates that the data was in the wrong format.
	ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (*Sample, error)
}

Unmarshaller is an interface for reading Samples and Headers from byte streams. The byte streams can be anything including files, network connections, console output, or in-memory byte buffers. Reading is split into three parts: reading the header, receiving the bytes for a sample, and parsing those bytes into the actual sample. This separation is done for optimization purpose, to enable parallel parsing of samples by separating the data reading part from the parsing part. One goroutine can continuously call ReadSampleData(), while multiple other routines execute ParseSample() in parallel.

func DetectFormatFrom

func DetectFormatFrom(start string) (Unmarshaller, error)

DetectFormatFrom uses the start of a marshalled header to determine what unmarshaller should be used to decode the header and all following samples.

type UnmarshallingSampleSource

type UnmarshallingSampleSource interface {
	SampleSource
	SetSampleHandler(handler ReadSampleHandler)
}

UnmarshallingSampleSource extends SampleSource and adds a configuration setter that gives access to the samples that are read by this data source.

type Value

type Value float64

Value is a type alias for float64 and defines the type for metric values.

func (Value) String

func (v Value) String() string

String formats the receiving float64 value.

type WriteCascade

type WriteCascade struct {
	// Writer must be set before calling Write. It will receive the Write calls.
	Writer io.Writer

	// Err stores the error that occurred in one of the write calls.
	Err error
}

WriteCascade is a helper type for more concise Write code by avoiding error checks on every Write() invocation. Multiple Write calls can be cascaded without intermediate checks for errors. The trade-off/overhead are additional no-op Write()/WriteStr() calls after an error has occurred (which is the exception).

func (*WriteCascade) Write

func (w *WriteCascade) Write(bytes []byte) error

Write forwards the call to the contained Writer, but only of no error has been encountered yet. If an error occurs, it is stored in the Error field.

func (*WriteCascade) WriteAny

func (w *WriteCascade) WriteAny(i interface{})

WriteAny uses the fmt package to format he given object directly into the underlying writer. The write is only executed, if previous writes have been successful.

func (*WriteCascade) WriteByte

func (w *WriteCascade) WriteByte(b byte) error

WriteByte calls Write with the single parameter byte.

func (*WriteCascade) WriteStr

func (w *WriteCascade) WriteStr(str string)

WriteStr calls Write with a []byte representation of the string parameter.

type WriterSink

type WriterSink struct {
	AbstractMarshallingSampleOutput
	Output      io.WriteCloser
	Description string
	// contains filtered or unexported fields
}

WriterSink implements SampleSink by writing all Headers and Samples to a single io.WriteCloser instance. An instance of SampleWriter is used to write the data in parallel.

func NewConsoleSink

func NewConsoleSink() *WriterSink

NewConsoleSink creates a SampleSink that writes to the standard output.

func (*WriterSink) Close

func (sink *WriterSink) Close()

Close implements the SampleSink interface. It flushes the remaining data to the underlying io.WriteCloser and closes it.

func (*WriterSink) Sample

func (sink *WriterSink) Sample(sample *Sample, header *Header) error

Header implements the SampleSink interface by using a SampleOutputStream to write the given Sample to the configured io.WriteCloser.

func (*WriterSink) Start

func (sink *WriterSink) Start(wg *sync.WaitGroup) (_ golib.StopChan)

Start implements the SampleSink interface. No additional goroutines are spawned, only a log message is printed.

func (*WriterSink) String

func (sink *WriterSink) String() string

String implements the SampleSink interface.

func (*WriterSink) WritesToConsole

func (sink *WriterSink) WritesToConsole() bool

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier