internal

package
v0.93.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full.
	ErrQueueIsFull = errors.New("sending queue is full")
)

Functions

func NewMockStorageExtension added in v0.84.0

func NewMockStorageExtension(getClientError error) storage.Extension

func NewShutdownErr added in v0.92.0

func NewShutdownErr(err error) error

Types

type ItemsSizer added in v0.92.0

type ItemsSizer[T itemsCounter] struct{}

ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains.

func (*ItemsSizer[T]) SizeOf added in v0.92.0

func (is *ItemsSizer[T]) SizeOf(el T) uint64

type MemoryQueueSettings added in v0.92.0

type MemoryQueueSettings[T any] struct {
	Sizer    Sizer[T]
	Capacity int
}

MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.

type PersistentQueueSettings added in v0.84.0

type PersistentQueueSettings[T any] struct {
	Sizer            Sizer[T]
	Capacity         int
	DataType         component.DataType
	StorageID        component.ID
	Marshaler        func(req T) ([]byte, error)
	Unmarshaler      func([]byte) (T, error)
	ExporterSettings exporter.CreateSettings
}

type Queue added in v0.89.0

type Queue[T any] interface {
	component.Component
	// Offer inserts the specified element into this queue if it is possible to do so immediately
	// without violating capacity restrictions. If success returns no error.
	// It returns ErrQueueIsFull if no space is currently available.
	Offer(ctx context.Context, item T) error
	// Consume applies the provided function on the head of queue.
	// The call blocks until there is an item available or the queue is stopped.
	// The function returns true when an item is consumed or false if the queue is stopped.
	Consume(func(ctx context.Context, item T) error) bool
	// Size returns the current Size of the queue
	Size() int
	// Capacity returns the capacity of the queue.
	Capacity() int
}

Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue (boundedMemoryQueue) or via a disk-based queue (persistentQueue)

func NewBoundedMemoryQueue

func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T]

NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).

func NewPersistentQueue

func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T]

NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage

type QueueConsumers added in v0.90.0

type QueueConsumers[T any] struct {
	// contains filtered or unexported fields
}

func NewQueueConsumers added in v0.90.0

func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *QueueConsumers[T]

func (*QueueConsumers[T]) Shutdown added in v0.90.0

func (qc *QueueConsumers[T]) Shutdown(ctx context.Context) error

Shutdown ensures that queue and all consumers are stopped.

func (*QueueConsumers[T]) Start added in v0.90.0

func (qc *QueueConsumers[T]) Start(ctx context.Context, host component.Host) error

Start ensures that queue and all consumers are started.

type RequestSizer added in v0.92.0

type RequestSizer[T any] struct{}

RequestSizer is a Sizer implementation that returns the size of a queue element as one request.

func (*RequestSizer[T]) SizeOf added in v0.92.0

func (rs *RequestSizer[T]) SizeOf(T) uint64

type Sizer added in v0.92.0

type Sizer[T any] interface {
	SizeOf(T) uint64
}

Sizer is an interface that returns the size of the given element.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL