queuebatch

package
v0.123.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package queuebatch provides helper functions for exporter's queueing and batching.

Index

Constants

This section is empty.

Variables

View Source
var ErrQueueIsFull = errors.New("sending queue is full")

ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to not block. Experimental: This API is at the early stage of development and may change without backward compatibility until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.

Functions

This section is empty.

Types

type BatchConfig

type BatchConfig struct {
	// FlushTimeout sets the time after which a batch will be sent regardless of its size.
	FlushTimeout time.Duration `mapstructure:"flush_timeout"`

	// MinSize defines the configuration for the minimum size of a batch.
	MinSize int64 `mapstructure:"min_size"`

	// MaxSize defines the configuration for the maximum size of a batch.
	MaxSize int64 `mapstructure:"max_size"`
}

BatchConfig defines a configuration for batching requests based on a timeout and a minimum number of items.

func (*BatchConfig) Validate

func (cfg *BatchConfig) Validate() error

type Batcher

type Batcher[T any] interface {
	component.Component
	Consume(context.Context, T, Done)
}

Batcher is in charge of reading items from the queue and send them out asynchronously.

type Config

type Config struct {
	// Enabled indicates whether to not enqueue and batch before exporting.
	Enabled bool `mapstructure:"enabled"`

	// WaitForResult determines if incoming requests are blocked until the request is processed or not.
	// Currently, this option is not available when persistent queue is configured using the storage configuration.
	WaitForResult bool `mapstructure:"wait_for_result"`

	// Sizer determines the type of size measurement used by this component.
	// It accepts "requests", "items", or "bytes".
	Sizer request.SizerType `mapstructure:"sizer"`

	// QueueSize represents the maximum data size allowed for concurrent storage and processing.
	QueueSize int64 `mapstructure:"queue_size"`

	// BlockOnOverflow determines the behavior when the component's TotalSize limit is reached.
	// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
	BlockOnOverflow bool `mapstructure:"block_on_overflow"`

	// Deprecated: [v0.123.0] use `block_on_overflow`.
	Blocking bool `mapstructure:"blocking"`

	// StorageID if not empty, enables the persistent storage and uses the component specified
	// as a storage extension for the persistent queue.
	// TODO: This will be changed to Optional when available.
	StorageID *component.ID `mapstructure:"storage"`

	// NumConsumers is the maximum number of concurrent consumers from the queue.
	// This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.).
	// TODO: This will also control the maximum number of shards, when supported:
	//  https://github.com/open-telemetry/opentelemetry-collector/issues/12473.
	NumConsumers int `mapstructure:"num_consumers"`

	// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
	// TODO: This will be changed to Optional when available.
	Batch *BatchConfig `mapstructure:"batch"`
	// contains filtered or unexported fields
}

Config defines configuration for queueing and batching incoming requests.

func (*Config) Unmarshal

func (cfg *Config) Unmarshal(conf *confmap.Conf) error

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks if the Config is valid

type ConsumeFunc

type ConsumeFunc[T any] func(context.Context, T, Done)

type Done

type Done interface {
	// OnDone needs to be called when processing of the queue item is done.
	OnDone(error)
}

Done represents the callback that will be called when the read request is completely processed by the downstream components. Experimental: This API is at the early stage of development and may change without backward compatibility until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.

type Encoding

type Encoding[T any] interface {
	// Marshal is a function that can marshal a request into bytes.
	Marshal(T) ([]byte, error)

	// Unmarshal is a function that can unmarshal bytes into a request.
	Unmarshal([]byte) (T, error)
}

type Queue

type Queue[T any] interface {
	component.Component
	// Offer inserts the specified element into this queue if it is possible to do so immediately
	// without violating capacity restrictions. If success returns no error.
	// It returns ErrQueueIsFull if no space is currently available.
	Offer(ctx context.Context, item T) error
	// Size returns the current Size of the queue
	Size() int64
	// Capacity returns the capacity of the queue.
	Capacity() int64
}

Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue (boundedMemoryQueue) or via a disk-based queue (persistentQueue) Experimental: This API is at the early stage of development and may change without backward compatibility until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.

type QueueBatch

type QueueBatch struct {
	// contains filtered or unexported fields
}

func NewQueueBatch

func NewQueueBatch(
	set Settings[request.Request],
	cfg Config,
	next sender.SendFunc[request.Request],
) (*QueueBatch, error)

func NewQueueBatchLegacyBatcher

func NewQueueBatchLegacyBatcher(
	set Settings[request.Request],
	cfg Config,
	next sender.SendFunc[request.Request],
) (*QueueBatch, error)

func (*QueueBatch) Send

func (qs *QueueBatch) Send(ctx context.Context, req request.Request) error

Send implements the requestSender interface. It puts the request in the queue.

func (*QueueBatch) Shutdown

func (qs *QueueBatch) Shutdown(ctx context.Context) error

Shutdown is invoked during service shutdown.

func (*QueueBatch) Start

func (qs *QueueBatch) Start(ctx context.Context, host component.Host) error

Start is invoked during service startup.

type Settings

type Settings[T any] struct {
	Signal    pipeline.Signal
	ID        component.ID
	Telemetry component.TelemetrySettings
	Encoding  Encoding[T]
	Sizers    map[request.SizerType]request.Sizer[T]
}

Settings defines settings for creating a QueueBatch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL