Version: v1.32.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2023 License: Apache-2.0 Imports: 5 Imported by: 0




View Source
const (
	DefaultInstanceMaxBuffered  = 10000
	DefaultInstanceMaxRequests  = 10
	DefaultInstanceMaxBatchSize = 1000


This section is empty.


This section is empty.


type Instance

type Instance generic.Type

Instance is the element type

type InstancePreprocessor

type InstancePreprocessor func(*Instance) bool

InstancePreprocessor is used to filter out or otherwise change instances before being sent. If the return value is false, the instance won't be sent.

type InstanceRingBuffer

type InstanceRingBuffer struct {
	// contains filtered or unexported fields

InstanceRingBuffer is a ring buffer that supports inserting and reading chunks of elements in an orderly fashion. It is NOT thread-safe and the returned batches are not copied, they are a slice against the original backing array of this instance. This means that if the buffer wraps around, elements in the slice returned by NextBatch will be changed, and you are subject to all of the rules of Go's memory model if accessing the data in a separate goroutine.

func NewInstanceRingBuffer

func NewInstanceRingBuffer(size int) *InstanceRingBuffer

NewInstanceRingBuffer creates a new initialized buffer ready for use.

func (*InstanceRingBuffer) Add

func (b *InstanceRingBuffer) Add(inst *Instance) (isOverwrite bool)

Add an Instance to the buffer. It will overwrite any existing element in the buffer as the buffer wraps around. Returns whether the new element overwrites an uncommitted element already in the buffer.

func (*InstanceRingBuffer) NextBatch

func (b *InstanceRingBuffer) NextBatch(maxSize int) []*Instance

NextBatch returns the next batch of unprocessed elements. If there are none, this can return nil.

func (*InstanceRingBuffer) Size

func (b *InstanceRingBuffer) Size() int

Size returns how many elements can fit in the buffer at once.

func (*InstanceRingBuffer) UnprocessedCount

func (b *InstanceRingBuffer) UnprocessedCount() int

UnprocessedCount returns the number of elements that have been written to the buffer but not read via NextBatch.

type InstanceSender

type InstanceSender func(context.Context, []*Instance) error

InstanceSender is what sends a slice of instances. It should block until the instances have been sent, or an error has occurred.

type InstanceWriter

type InstanceWriter struct {
	// This must be provided by the user of this writer.
	InputChan chan []*Instance

	// PreprocessFunc can be used for filtering or modifying instances before
	// being sent.  If PreprocessFunc returns false, the instance will not be
	// sent. PreprocessFunc can be left nil, in which case all instances will
	// be sent.
	PreprocessFunc InstancePreprocessor

	// SendFunc must be provided as the writer is useless without it.  SendFunc
	// should synchronously process/send the Instances passed to it and not
	// return until they have been dealt with.  The slice passed to SendFunc
	// should not be used after the function returns, as its backing array
	// might get reused.
	SendFunc InstanceSender

	// OverwriteFunc can be set to a function that will be called
	// whenever an Add call to the underlying ring buffer results in the
	// overwriting of an unprocessed instance.
	OverwriteFunc func()

	// The maximum number of Instances that this writer will hold before
	// overwriting.  You must set this before calling Start.
	MaxBuffered int
	// The maximum number of concurrent calls to sendFunc that can be
	// active at a given instance.  You must set this before calling Start.
	MaxRequests int
	// The biggest batch of Instances the writer will emit to sendFunc at once.
	// You must set this before calling Start.
	MaxBatchSize int

	// Purely internal metrics.  If accessing any of these externally, use
	// atomic.LoadInt64!
	TotalReceived     int64
	TotalFilteredOut  int64
	TotalInFlight     int64
	TotalSent         int64
	TotalFailedToSend int64
	TotalOverwritten  int64
	// contains filtered or unexported fields

InstanceWriter is an abstraction that accepts a bunch of instances, buffers them in a circular buffer and sends them out in concurrent batches. This prioritizes newer Instances at the expense of older ones, which is generally desirable from a monitoring standpoint.

You must call the non-blocking method Start on a created instance for it to do anything.

func (*InstanceWriter) InternalMetrics

func (w *InstanceWriter) InternalMetrics(prefix string) []*datapoint.Datapoint

InternalMetrics about the instance writer

func (*InstanceWriter) Start

func (w *InstanceWriter) Start(ctx context.Context)

Start the writer processing loop

func (*InstanceWriter) WaitForShutdown

func (w *InstanceWriter) WaitForShutdown()

WaitForShutdown will block until all of the elements inserted to the writer have been processed.

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL