Documentation ¶
Overview ¶
Package writer contains a set of components that accept a single type of SignalFx data (e.g. datapoints, trace spans) in a simple manner (e.g. an input channel) and then sorts out the complexities of sending that data to SignalFx's ingest (or agent/gateway) endpoints. They are intended to be used when high volumes of data are expected. Some of the issues that a writer should deal with are:
- Batching of data that should be sent to SignalFx. It is infeasible to send every single data item as a single request but too much batching will reduce the timeliness of data into the system.
- Buffering of data items while waiting to be transmitted to SignalFx. The buffering could use all available memory to the process or have a limit on data waiting to be sent, after which point data is dropped.
- If buffering is limited, then the writer must decide what to do when the limit is exceeded. Given the nature of data in our system, newer data is usually more valuable than older data, so the writer should not necessarily just drop all new incoming data (although this is relatively simple to implement), as that would prioritize old data.
- Sending data concurrently to SignalFx. At large volumes, sending one request at a time to ingest/gateway is probably not going to get enough throughput, as usually the network and HTTP RTT is the bottleneck at that point.
Index ¶
Examples ¶
Constants ¶
const ( DefaultDatapointMaxBuffered = 10000 DefaultDatapointMaxRequests = 10 DefaultDatapointMaxBatchSize = 1000 )
const ( DefaultSpanMaxBuffered = 10000 DefaultSpanMaxRequests = 10 DefaultSpanMaxBatchSize = 1000 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DatapointPreprocessor ¶
DatapointPreprocessor is used to filter out or otherwise change datapoints before being sent. If the return value is false, the datapoint won't be sent.
type DatapointRingBuffer ¶
type DatapointRingBuffer struct {
// contains filtered or unexported fields
}
DatapointRingBuffer is a ring buffer that supports inserting and reading chunks of elements in an orderly fashion. It is NOT thread-safe and the returned batches are not copied, they are a slice against the original backing array of this datapoint. This means that if the buffer wraps around, elements in the slice returned by NextBatch will be changed, and you are subject to all of the rules of Go's memory model if accessing the data in a separate goroutine.
func NewDatapointRingBuffer ¶
func NewDatapointRingBuffer(size int) *DatapointRingBuffer
NewDatapointRingBuffer creates a new initialized buffer ready for use.
func (*DatapointRingBuffer) Add ¶
func (b *DatapointRingBuffer) Add(inst *datapoint.Datapoint) (isOverwrite bool)
Add an Datapoint:datapoint.Datapoint to the buffer. It will overwrite any existing element in the buffer as the buffer wraps around. Returns whether the new element overwrites an uncommitted element already in the buffer.
func (*DatapointRingBuffer) NextBatch ¶
func (b *DatapointRingBuffer) NextBatch(maxSize int) []*datapoint.Datapoint
NextBatch returns the next batch of unprocessed elements. If there are none, this can return nil.
func (*DatapointRingBuffer) Size ¶
func (b *DatapointRingBuffer) Size() int
Size returns how many elements can fit in the buffer at once.
func (*DatapointRingBuffer) UnprocessedCount ¶
func (b *DatapointRingBuffer) UnprocessedCount() int
UnprocessedCount returns the number of elements that have been written to the buffer but not read via NextBatch.
type DatapointSender ¶
DatapointSender is what sends a slice of datapoints. It should block until the datapoints have been sent, or an error has occurred.
type DatapointWriter ¶
type DatapointWriter struct { // This must be provided by the user of this writer. InputChan chan []*datapoint.Datapoint // PreprocessFunc can be used for filtering or modifying datapoints before // being sent. If PreprocessFunc returns false, the datapoint will not be // sent. PreprocessFunc can be left nil, in which case all datapoints will // be sent. PreprocessFunc DatapointPreprocessor // SendFunc must be provided as the writer is useless without it. SendFunc // should synchronously process/send the Datapoints passed to it and not // return until they have been dealt with. The slice passed to SendFunc // should not be used after the function returns, as its backing array // might get reused. SendFunc DatapointSender // OverwriteFunc can be set to a function that will be called // whenever an Add call to the underlying ring buffer results in the // overwriting of an unprocessed datapoint. OverwriteFunc func() // The maximum number of Datapoints that this writer will hold before // overwriting. You must set this before calling Start. MaxBuffered int // The maximum number of concurrent calls to sendFunc that can be // active at a given datapoint. You must set this before calling Start. MaxRequests int // The biggest batch of Datapoints the writer will emit to sendFunc at once. // You must set this before calling Start. MaxBatchSize int // Purely internal metrics. If accessing any of these externally, use // atomic.LoadInt64! TotalReceived int64 TotalFilteredOut int64 TotalInFlight int64 TotalSent int64 TotalFailedToSend int64 TotalOverwritten int64 // contains filtered or unexported fields }
DatapointWriter is an abstraction that accepts a bunch of datapoints, buffers them in a circular buffer and sends them out in concurrent batches. This prioritizes newer Datapoints at the expense of older ones, which is generally desirable from a monitoring standpoint.
You must call the non-blocking method Start on a created datapoint for it to do anything.
Example ¶
client := sfxclient.NewHTTPSink() filterFunc := func(dp *datapoint.Datapoint) bool { return dp.Meta["shouldSend"].(bool) } in := make(chan []*datapoint.Datapoint, 1) // filterFunc can also be nil if no filtering/modification is needed. writer := &DatapointWriter{ PreprocessFunc: filterFunc, SendFunc: client.AddDatapoints, InputChan: in, } ctx, cancel := context.WithCancel(context.Background()) writer.Start(ctx) // Send datapoints with the writer in <- []*datapoint.Datapoint{} // Close the context passed to Run() cancel() // Will wait for all pending datapoints to be written. writer.WaitForShutdown()
Output:
func (*DatapointWriter) InternalMetrics ¶
func (w *DatapointWriter) InternalMetrics(prefix string) []*datapoint.Datapoint
InternalMetrics about the datapoint writer
func (*DatapointWriter) Start ¶
func (w *DatapointWriter) Start(ctx context.Context)
Start the writer processing loop
func (*DatapointWriter) WaitForShutdown ¶
func (w *DatapointWriter) WaitForShutdown()
WaitForShutdown will block until all of the elements inserted to the writer have been processed.
type SpanPreprocessor ¶
SpanPreprocessor is used to filter out or otherwise change spans before being sent. If the return value is false, the span won't be sent.
type SpanRingBuffer ¶
type SpanRingBuffer struct {
// contains filtered or unexported fields
}
SpanRingBuffer is a ring buffer that supports inserting and reading chunks of elements in an orderly fashion. It is NOT thread-safe and the returned batches are not copied, they are a slice against the original backing array of this span. This means that if the buffer wraps around, elements in the slice returned by NextBatch will be changed, and you are subject to all of the rules of Go's memory model if accessing the data in a separate goroutine.
func NewSpanRingBuffer ¶
func NewSpanRingBuffer(size int) *SpanRingBuffer
NewSpanRingBuffer creates a new initialized buffer ready for use.
func (*SpanRingBuffer) Add ¶
func (b *SpanRingBuffer) Add(inst *trace.Span) (isOverwrite bool)
Add an Span:trace.Span to the buffer. It will overwrite any existing element in the buffer as the buffer wraps around. Returns whether the new element overwrites an uncommitted element already in the buffer.
func (*SpanRingBuffer) NextBatch ¶
func (b *SpanRingBuffer) NextBatch(maxSize int) []*trace.Span
NextBatch returns the next batch of unprocessed elements. If there are none, this can return nil.
func (*SpanRingBuffer) Size ¶
func (b *SpanRingBuffer) Size() int
Size returns how many elements can fit in the buffer at once.
func (*SpanRingBuffer) UnprocessedCount ¶
func (b *SpanRingBuffer) UnprocessedCount() int
UnprocessedCount returns the number of elements that have been written to the buffer but not read via NextBatch.
type SpanSender ¶
SpanSender is what sends a slice of spans. It should block until the spans have been sent, or an error has occurred.
type SpanWriter ¶
type SpanWriter struct { // This must be provided by the user of this writer. InputChan chan []*trace.Span // PreprocessFunc can be used for filtering or modifying spans before // being sent. If PreprocessFunc returns false, the span will not be // sent. PreprocessFunc can be left nil, in which case all spans will // be sent. PreprocessFunc SpanPreprocessor // SendFunc must be provided as the writer is useless without it. SendFunc // should synchronously process/send the Spans passed to it and not // return until they have been dealt with. The slice passed to SendFunc // should not be used after the function returns, as its backing array // might get reused. SendFunc SpanSender // OverwriteFunc can be set to a function that will be called // whenever an Add call to the underlying ring buffer results in the // overwriting of an unprocessed span. OverwriteFunc func() // The maximum number of Spans that this writer will hold before // overwriting. You must set this before calling Start. MaxBuffered int // The maximum number of concurrent calls to sendFunc that can be // active at a given span. You must set this before calling Start. MaxRequests int // The biggest batch of Spans the writer will emit to sendFunc at once. // You must set this before calling Start. MaxBatchSize int // Purely internal metrics. If accessing any of these externally, use // atomic.LoadInt64! TotalReceived int64 TotalFilteredOut int64 TotalInFlight int64 TotalSent int64 TotalFailedToSend int64 TotalOverwritten int64 // contains filtered or unexported fields }
SpanWriter is an abstraction that accepts a bunch of spans, buffers them in a circular buffer and sends them out in concurrent batches. This prioritizes newer Spans at the expense of older ones, which is generally desirable from a monitoring standpoint.
You must call the non-blocking method Start on a created span for it to do anything.
Example ¶
client := sfxclient.NewHTTPSink() filterFunc := func(dp *trace.Span) bool { return dp.Meta["shouldSend"].(bool) } in := make(chan []*trace.Span, 1) // filterFunc can also be nil if no filtering/modification is needed. writer := &SpanWriter{ PreprocessFunc: filterFunc, SendFunc: client.AddSpans, InputChan: in, } ctx, cancel := context.WithCancel(context.Background()) writer.Start(ctx) // Send traces with the writer in <- []*trace.Span{} // Close the context passed to Run() cancel() // Will wait for all pending traces to be written. writer.WaitForShutdown()
Output:
func (*SpanWriter) InternalMetrics ¶
func (w *SpanWriter) InternalMetrics(prefix string) []*datapoint.Datapoint
InternalMetrics about the span writer
func (*SpanWriter) Start ¶
func (w *SpanWriter) Start(ctx context.Context)
Start the writer processing loop
func (*SpanWriter) WaitForShutdown ¶
func (w *SpanWriter) WaitForShutdown()
WaitForShutdown will block until all of the elements inserted to the writer have been processed.