concurrentbatchprocessor

package module
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: Apache-2.0 Imports: 25 Imported by: 3

README

Concurrent Batch Processor

This component is an experimental processor, forked from the core OpenTelemetry Collector batchprocessor component. The differences in this component, relative to that component are:

  1. Synchronous pipeline support: this component blocks each producer until the request returns with success or an error status code.
  2. Maximim in-flight-bytes setting. This component measures the in-memory size of each request it admits to the pipeline and otherwise stalls requests until they timeout.
  3. Unlimited concurrency: this component will start as many goroutines as needed to send batches through the pipeline.

Here is an example configuration:

    processors:
      concurrentbatch:
        send_batch_max_size: 1500
        send_batch_size: 1000
        timeout: 1s
        max_in_flight_size_mib: 128

In this configuration, the component will admit up to 128MiB of request data before stalling.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory

func NewFactory() processor.Factory

NewFactory returns a new factory for the Batch processor.

Types

type Config

type Config struct {
	// Timeout sets the time after which a batch will be sent regardless of size.
	// When this is set to zero, batched data will be sent immediately.
	Timeout time.Duration `mapstructure:"timeout"`

	// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
	// When this is set to zero, the batch size is ignored and data will be sent immediately
	// subject to only send_batch_max_size.
	SendBatchSize uint32 `mapstructure:"send_batch_size"`

	// SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
	// Larger batches are split into smaller units.
	// Default value is 0, that means no maximum size.
	SendBatchMaxSize uint32 `mapstructure:"send_batch_max_size"`

	// MetadataKeys is a list of client.Metadata keys that will be
	// used to form distinct batchers.  If this setting is empty,
	// a single batcher instance will be used.  When this setting
	// is not empty, one batcher will be used per distinct
	// combination of values for the listed metadata keys.
	//
	// Empty value and unset metadata are treated as distinct cases.
	//
	// Entries are case-insensitive.  Duplicated entries will
	// trigger a validation error.
	MetadataKeys []string `mapstructure:"metadata_keys"`

	// MetadataCardinalityLimit indicates the maximum number of
	// batcher instances that will be created through a distinct
	// combination of MetadataKeys.
	MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`

	// MaxInFlightSizeMiB limits the number of bytes in queue waiting to be
	// processed by the senders.
	MaxInFlightSizeMiB uint32 `mapstructure:"max_in_flight_size_mib"`
}

Config defines configuration for batch processor.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks if the processor configuration is valid

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL