writer

package
v1.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2024 License: Apache-2.0 Imports: 5 Imported by: 1

README

Writers

See the godoc for information on what they are and how to use them.

Compiling

The buffer and writer are generated from a common template in the template package. This module has a //go:generate comment that will be recognized when you run go generate ./... on the repo. For the generate script to work, you must install the code generation tool with go get github.com/mauricelam/genny.

Also, span_[writer|buffer]_test.go is automatically generated from datapoint_[writer|buffer]_test.go by the same go generate command, so be sure to make changes in the datapoint test module.

Documentation

Overview

Package writer contains a set of components that accept a single type of SignalFx data (e.g. datapoints, trace spans) in a simple manner (e.g. an input channel) and then sorts out the complexities of sending that data to SignalFx's ingest (or agent/gateway) endpoints. They are intended to be used when high volumes of data are expected. Some of the issues that a writer should deal with are:

- Batching of data that should be sent to SignalFx. It is infeasible to send every single data item as a single request but too much batching will reduce the timeliness of data into the system.

- Buffering of data items while waiting to be transmitted to SignalFx. The buffering could use all available memory to the process or have a limit on data waiting to be sent, after which point data is dropped.

- If buffering is limited, then the writer must decide what to do when the limit is exceeded. Given the nature of data in our system, newer data is usually more valuable than older data, so the writer should not necessarily just drop all new incoming data (although this is relatively simple to implement), as that would prioritize old data.

- Sending data concurrently to SignalFx. At large volumes, sending one request at a time to ingest/gateway is probably not going to get enough throughput, as usually the network and HTTP RTT is the bottleneck at that point.

Index

Examples

Constants

View Source
const (
	DefaultDatapointMaxBuffered  = 10000
	DefaultDatapointMaxRequests  = 10
	DefaultDatapointMaxBatchSize = 1000
)
View Source
const (
	DefaultSpanMaxBuffered  = 10000
	DefaultSpanMaxRequests  = 10
	DefaultSpanMaxBatchSize = 1000
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DatapointPreprocessor

type DatapointPreprocessor func(*datapoint.Datapoint) bool

DatapointPreprocessor is used to filter out or otherwise change datapoints before being sent. If the return value is false, the datapoint won't be sent.

type DatapointRingBuffer

type DatapointRingBuffer struct {
	// contains filtered or unexported fields
}

DatapointRingBuffer is a ring buffer that supports inserting and reading chunks of elements in an orderly fashion. It is NOT thread-safe and the returned batches are not copied, they are a slice against the original backing array of this datapoint. This means that if the buffer wraps around, elements in the slice returned by NextBatch will be changed, and you are subject to all of the rules of Go's memory model if accessing the data in a separate goroutine.

func NewDatapointRingBuffer

func NewDatapointRingBuffer(size int) *DatapointRingBuffer

NewDatapointRingBuffer creates a new initialized buffer ready for use.

func (*DatapointRingBuffer) Add

func (b *DatapointRingBuffer) Add(inst *datapoint.Datapoint) (isOverwrite bool)

Add an Datapoint:datapoint.Datapoint to the buffer. It will overwrite any existing element in the buffer as the buffer wraps around. Returns whether the new element overwrites an uncommitted element already in the buffer.

func (*DatapointRingBuffer) NextBatch

func (b *DatapointRingBuffer) NextBatch(maxSize int) []*datapoint.Datapoint

NextBatch returns the next batch of unprocessed elements. If there are none, this can return nil.

func (*DatapointRingBuffer) Size

func (b *DatapointRingBuffer) Size() int

Size returns how many elements can fit in the buffer at once.

func (*DatapointRingBuffer) UnprocessedCount

func (b *DatapointRingBuffer) UnprocessedCount() int

UnprocessedCount returns the number of elements that have been written to the buffer but not read via NextBatch.

type DatapointSender

type DatapointSender func(context.Context, []*datapoint.Datapoint) error

DatapointSender is what sends a slice of datapoints. It should block until the datapoints have been sent, or an error has occurred.

type DatapointWriter

type DatapointWriter struct {
	// This must be provided by the user of this writer.
	InputChan chan []*datapoint.Datapoint

	// PreprocessFunc can be used for filtering or modifying datapoints before
	// being sent.  If PreprocessFunc returns false, the datapoint will not be
	// sent. PreprocessFunc can be left nil, in which case all datapoints will
	// be sent.
	PreprocessFunc DatapointPreprocessor

	// SendFunc must be provided as the writer is useless without it.  SendFunc
	// should synchronously process/send the Datapoints passed to it and not
	// return until they have been dealt with.  The slice passed to SendFunc
	// should not be used after the function returns, as its backing array
	// might get reused.
	SendFunc DatapointSender

	// OverwriteFunc can be set to a function that will be called
	// whenever an Add call to the underlying ring buffer results in the
	// overwriting of an unprocessed datapoint.
	OverwriteFunc func()

	// The maximum number of Datapoints that this writer will hold before
	// overwriting.  You must set this before calling Start.
	MaxBuffered int
	// The maximum number of concurrent calls to sendFunc that can be
	// active at a given datapoint.  You must set this before calling Start.
	MaxRequests int
	// The biggest batch of Datapoints the writer will emit to sendFunc at once.
	// You must set this before calling Start.
	MaxBatchSize int

	// Purely internal metrics.  If accessing any of these externally, use
	// atomic.LoadInt64!
	TotalReceived     int64
	TotalFilteredOut  int64
	TotalInFlight     int64
	TotalSent         int64
	TotalFailedToSend int64
	TotalOverwritten  int64
	// contains filtered or unexported fields
}

DatapointWriter is an abstraction that accepts a bunch of datapoints, buffers them in a circular buffer and sends them out in concurrent batches. This prioritizes newer Datapoints at the expense of older ones, which is generally desirable from a monitoring standpoint.

You must call the non-blocking method Start on a created datapoint for it to do anything.

Example
client := sfxclient.NewHTTPSink()
filterFunc := func(dp *datapoint.Datapoint) bool {
	return dp.Meta["shouldSend"].(bool)
}

in := make(chan []*datapoint.Datapoint, 1)

// filterFunc can also be nil if no filtering/modification is needed.
writer := &DatapointWriter{
	PreprocessFunc: filterFunc,
	SendFunc:       client.AddDatapoints,
	InputChan:      in,
}

ctx, cancel := context.WithCancel(context.Background())
writer.Start(ctx)

// Send datapoints with the writer
in <- []*datapoint.Datapoint{}

// Close the context passed to Run()
cancel()
// Will wait for all pending datapoints to be written.
writer.WaitForShutdown()
Output:

func (*DatapointWriter) InternalMetrics

func (w *DatapointWriter) InternalMetrics(prefix string) []*datapoint.Datapoint

InternalMetrics about the datapoint writer

func (*DatapointWriter) Start

func (w *DatapointWriter) Start(ctx context.Context)

Start the writer processing loop

func (*DatapointWriter) WaitForShutdown

func (w *DatapointWriter) WaitForShutdown()

WaitForShutdown will block until all of the elements inserted to the writer have been processed.

type SpanPreprocessor

type SpanPreprocessor func(*trace.Span) bool

SpanPreprocessor is used to filter out or otherwise change spans before being sent. If the return value is false, the span won't be sent.

type SpanRingBuffer

type SpanRingBuffer struct {
	// contains filtered or unexported fields
}

SpanRingBuffer is a ring buffer that supports inserting and reading chunks of elements in an orderly fashion. It is NOT thread-safe and the returned batches are not copied, they are a slice against the original backing array of this span. This means that if the buffer wraps around, elements in the slice returned by NextBatch will be changed, and you are subject to all of the rules of Go's memory model if accessing the data in a separate goroutine.

func NewSpanRingBuffer

func NewSpanRingBuffer(size int) *SpanRingBuffer

NewSpanRingBuffer creates a new initialized buffer ready for use.

func (*SpanRingBuffer) Add

func (b *SpanRingBuffer) Add(inst *trace.Span) (isOverwrite bool)

Add an Span:trace.Span to the buffer. It will overwrite any existing element in the buffer as the buffer wraps around. Returns whether the new element overwrites an uncommitted element already in the buffer.

func (*SpanRingBuffer) NextBatch

func (b *SpanRingBuffer) NextBatch(maxSize int) []*trace.Span

NextBatch returns the next batch of unprocessed elements. If there are none, this can return nil.

func (*SpanRingBuffer) Size

func (b *SpanRingBuffer) Size() int

Size returns how many elements can fit in the buffer at once.

func (*SpanRingBuffer) UnprocessedCount

func (b *SpanRingBuffer) UnprocessedCount() int

UnprocessedCount returns the number of elements that have been written to the buffer but not read via NextBatch.

type SpanSender

type SpanSender func(context.Context, []*trace.Span) error

SpanSender is what sends a slice of spans. It should block until the spans have been sent, or an error has occurred.

type SpanWriter

type SpanWriter struct {
	// This must be provided by the user of this writer.
	InputChan chan []*trace.Span

	// PreprocessFunc can be used for filtering or modifying spans before
	// being sent.  If PreprocessFunc returns false, the span will not be
	// sent. PreprocessFunc can be left nil, in which case all spans will
	// be sent.
	PreprocessFunc SpanPreprocessor

	// SendFunc must be provided as the writer is useless without it.  SendFunc
	// should synchronously process/send the Spans passed to it and not
	// return until they have been dealt with.  The slice passed to SendFunc
	// should not be used after the function returns, as its backing array
	// might get reused.
	SendFunc SpanSender

	// OverwriteFunc can be set to a function that will be called
	// whenever an Add call to the underlying ring buffer results in the
	// overwriting of an unprocessed span.
	OverwriteFunc func()

	// The maximum number of Spans that this writer will hold before
	// overwriting.  You must set this before calling Start.
	MaxBuffered int
	// The maximum number of concurrent calls to sendFunc that can be
	// active at a given span.  You must set this before calling Start.
	MaxRequests int
	// The biggest batch of Spans the writer will emit to sendFunc at once.
	// You must set this before calling Start.
	MaxBatchSize int

	// Purely internal metrics.  If accessing any of these externally, use
	// atomic.LoadInt64!
	TotalReceived     int64
	TotalFilteredOut  int64
	TotalInFlight     int64
	TotalSent         int64
	TotalFailedToSend int64
	TotalOverwritten  int64
	// contains filtered or unexported fields
}

SpanWriter is an abstraction that accepts a bunch of spans, buffers them in a circular buffer and sends them out in concurrent batches. This prioritizes newer Spans at the expense of older ones, which is generally desirable from a monitoring standpoint.

You must call the non-blocking method Start on a created span for it to do anything.

Example
client := sfxclient.NewHTTPSink()
filterFunc := func(dp *trace.Span) bool {
	return dp.Meta["shouldSend"].(bool)
}

in := make(chan []*trace.Span, 1)

// filterFunc can also be nil if no filtering/modification is needed.
writer := &SpanWriter{
	PreprocessFunc: filterFunc,
	SendFunc:       client.AddSpans,
	InputChan:      in,
}

ctx, cancel := context.WithCancel(context.Background())
writer.Start(ctx)

// Send traces with the writer
in <- []*trace.Span{}

// Close the context passed to Run()
cancel()
// Will wait for all pending traces to be written.
writer.WaitForShutdown()
Output:

func (*SpanWriter) InternalMetrics

func (w *SpanWriter) InternalMetrics(prefix string) []*datapoint.Datapoint

InternalMetrics about the span writer

func (*SpanWriter) Start

func (w *SpanWriter) Start(ctx context.Context)

Start the writer processing loop

func (*SpanWriter) WaitForShutdown

func (w *SpanWriter) WaitForShutdown()

WaitForShutdown will block until all of the elements inserted to the writer have been processed.

type Writer

type Writer interface {
	InternalMetrics(prefix string) []*datapoint.Datapoint
	Start(context.Context)
}

Writer is common to both datapoint and span writers

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL