Documentation
¶
Overview ¶
Package queuebatch provides helper functions for exporter's queueing and batching.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrQueueIsFull = errors.New("sending queue is full")
ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to not block. Experimental: This API is at the early stage of development and may change without backward compatibility until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Functions ¶
This section is empty.
Types ¶
type BatchConfig ¶
type BatchConfig struct { // FlushTimeout sets the time after which a batch will be sent regardless of its size. FlushTimeout time.Duration `mapstructure:"flush_timeout"` // MinSize defines the configuration for the minimum size of a batch. MinSize int64 `mapstructure:"min_size"` // MaxSize defines the configuration for the maximum size of a batch. MaxSize int64 `mapstructure:"max_size"` }
BatchConfig defines a configuration for batching requests based on a timeout and a minimum number of items.
func (*BatchConfig) Validate ¶
func (cfg *BatchConfig) Validate() error
type Batcher ¶
Batcher is in charge of reading items from the queue and send them out asynchronously.
type Config ¶
type Config struct { // Enabled indicates whether to not enqueue and batch before exporting. Enabled bool `mapstructure:"enabled"` // WaitForResult determines if incoming requests are blocked until the request is processed or not. // Currently, this option is not available when persistent queue is configured using the storage configuration. WaitForResult bool `mapstructure:"wait_for_result"` // Sizer determines the type of size measurement used by this component. // It accepts "requests", "items", or "bytes". Sizer request.SizerType `mapstructure:"sizer"` // QueueSize represents the maximum data size allowed for concurrent storage and processing. QueueSize int64 `mapstructure:"queue_size"` // BlockOnOverflow determines the behavior when the component's TotalSize limit is reached. // If true, the component will wait for space; otherwise, operations will immediately return a retryable error. BlockOnOverflow bool `mapstructure:"block_on_overflow"` // Deprecated: [v0.123.0] use `block_on_overflow`. Blocking bool `mapstructure:"blocking"` // StorageID if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue. // TODO: This will be changed to Optional when available. StorageID *component.ID `mapstructure:"storage"` // NumConsumers is the maximum number of concurrent consumers from the queue. // This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.). // TODO: This will also control the maximum number of shards, when supported: // https://github.com/open-telemetry/opentelemetry-collector/issues/12473. NumConsumers int `mapstructure:"num_consumers"` // BatchConfig it configures how the requests are consumed from the queue and batch together during consumption. // TODO: This will be changed to Optional when available. Batch *BatchConfig `mapstructure:"batch"` // contains filtered or unexported fields }
Config defines configuration for queueing and batching incoming requests.
type Done ¶
type Done interface { // OnDone needs to be called when processing of the queue item is done. OnDone(error) }
Done represents the callback that will be called when the read request is completely processed by the downstream components. Experimental: This API is at the early stage of development and may change without backward compatibility until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type Queue ¶
type Queue[T any] interface { component.Component // Offer inserts the specified element into this queue if it is possible to do so immediately // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error // Size returns the current Size of the queue Size() int64 // Capacity returns the capacity of the queue. Capacity() int64 }
Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue (boundedMemoryQueue) or via a disk-based queue (persistentQueue) Experimental: This API is at the early stage of development and may change without backward compatibility until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type QueueBatch ¶
type QueueBatch struct {
// contains filtered or unexported fields
}
func NewQueueBatch ¶
func (*QueueBatch) Send ¶
Send implements the requestSender interface. It puts the request in the queue.