queuebatch

package
v0.130.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package queuebatch provides helper functions for exporter's queueing and batching.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LinksFromContext added in v0.124.0

func LinksFromContext(ctx context.Context) []trace.Link

LinksFromContext returns a list of trace links registered in the context.

Types

type BatchConfig

type BatchConfig struct {
	// FlushTimeout sets the time after which a batch will be sent regardless of its size.
	FlushTimeout time.Duration `mapstructure:"flush_timeout"`

	// Sizer determines the type of size measurement used by the batch.
	// If not configured, use the same configuration as the queue.
	// It accepts "requests", "items", or "bytes".
	Sizer request.SizerType `mapstructure:"sizer"`

	// MinSize defines the configuration for the minimum size of a batch.
	MinSize int64 `mapstructure:"min_size"`

	// MaxSize defines the configuration for the maximum size of a batch.
	MaxSize int64 `mapstructure:"max_size"`
}

BatchConfig defines a configuration for batching requests based on a timeout and a minimum number of items.

func (*BatchConfig) Validate

func (cfg *BatchConfig) Validate() error

type Batcher

type Batcher[T any] interface {
	component.Component
	Consume(context.Context, T, queue.Done)
}

Batcher is in charge of reading items from the queue and send them out asynchronously.

func NewBatcher added in v0.128.0

func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[request.Request]) (Batcher[request.Request], error)

type Config

type Config struct {
	// Enabled indicates whether to not enqueue and batch before exporting.
	Enabled bool `mapstructure:"enabled"`

	// WaitForResult determines if incoming requests are blocked until the request is processed or not.
	// Currently, this option is not available when persistent queue is configured using the storage configuration.
	WaitForResult bool `mapstructure:"wait_for_result"`

	// Sizer determines the type of size measurement used by this component.
	// It accepts "requests", "items", or "bytes".
	Sizer request.SizerType `mapstructure:"sizer"`

	// QueueSize represents the maximum data size allowed for concurrent storage and processing.
	QueueSize int64 `mapstructure:"queue_size"`

	// BlockOnOverflow determines the behavior when the component's TotalSize limit is reached.
	// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
	BlockOnOverflow bool `mapstructure:"block_on_overflow"`

	// StorageID if not empty, enables the persistent storage and uses the component specified
	// as a storage extension for the persistent queue.
	// TODO: This will be changed to Optional when available.
	StorageID *component.ID `mapstructure:"storage"`

	// NumConsumers is the maximum number of concurrent consumers from the queue.
	// This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.).
	// TODO: This will also control the maximum number of shards, when supported:
	//  https://github.com/open-telemetry/opentelemetry-collector/issues/12473.
	NumConsumers int `mapstructure:"num_consumers"`

	// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
	Batch configoptional.Optional[BatchConfig] `mapstructure:"batch"`
}

Config defines configuration for queueing and batching incoming requests.

func (*Config) Unmarshal

func (cfg *Config) Unmarshal(conf *confmap.Conf) error

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks if the Config is valid

type GetKeyFunc added in v0.125.0

type GetKeyFunc[T any] func(context.Context, T) string

func (GetKeyFunc[T]) GetKey added in v0.125.0

func (f GetKeyFunc[T]) GetKey(ctx context.Context, t T) string

type Partitioner added in v0.125.0

type Partitioner[T any] interface {
	GetKey(context.Context, T) string
}

Partitioner is an interface that returns the the partition key of the given element.

func NewPartitioner added in v0.125.0

func NewPartitioner(
	getKeyFunc GetKeyFunc[request.Request],
) Partitioner[request.Request]

type QueueBatch

type QueueBatch struct {
	// contains filtered or unexported fields
}

func NewQueueBatch

func NewQueueBatch(
	set Settings[request.Request],
	cfg Config,
	next sender.SendFunc[request.Request],
) (*QueueBatch, error)

func (*QueueBatch) Send

func (qs *QueueBatch) Send(ctx context.Context, req request.Request) error

Send implements the requestSender interface. It puts the request in the queue.

func (*QueueBatch) Shutdown

func (qs *QueueBatch) Shutdown(ctx context.Context) error

Shutdown is invoked during service shutdown.

func (*QueueBatch) Start

func (qs *QueueBatch) Start(ctx context.Context, host component.Host) error

Start is invoked during service startup.

type Settings

type Settings[T any] struct {
	Signal      pipeline.Signal
	ID          component.ID
	Telemetry   component.TelemetrySettings
	Encoding    queue.Encoding[T]
	ItemsSizer  request.Sizer[T]
	BytesSizer  request.Sizer[T]
	Partitioner Partitioner[T]
}

Settings defines settings for creating a QueueBatch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL