queue

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2023 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTooManyRequests = errors.New("too many outstanding requests")
	ErrStopped         = errors.New("queue is stopped")
)
View Source
var ErrOutOfBounds = errors.New("queue index out of bounds")

Functions

This section is empty.

Types

type Mapable

type Mapable interface {
	*tenantQueue | *TreeQueue
	// https://github.com/golang/go/issues/48522#issuecomment-924348755
	Pos() QueueIndex
	SetPos(index QueueIndex)
}

type Mapping

type Mapping[v Mapable] struct {
	// contains filtered or unexported fields
}

Mapping is a map-like data structure that allows accessing its items not only by key but also by index. When an item is removed, the internal key array is not resized, but the removed place is marked as empty. This allows to remove keys without changing the index of the remaining items after the removed key. Mapping uses *tenantQueue as concrete value and keys of type string. The data structure is not thread-safe.

func (*Mapping[v]) Get

func (m *Mapping[v]) Get(idx QueueIndex) v

func (*Mapping[v]) GetByKey

func (m *Mapping[v]) GetByKey(key string) v

func (*Mapping[v]) GetNext

func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error)

func (*Mapping[v]) Init

func (m *Mapping[v]) Init(size int)

func (*Mapping[v]) Keys

func (m *Mapping[v]) Keys() []string

func (*Mapping[v]) Len

func (m *Mapping[v]) Len() int

func (*Mapping[v]) Put

func (m *Mapping[v]) Put(key string, value v) bool

func (*Mapping[v]) Remove

func (m *Mapping[v]) Remove(key string) bool

func (*Mapping[v]) Values

func (m *Mapping[v]) Values() []v

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(subsystem string, registerer prometheus.Registerer) *Metrics

func (*Metrics) Cleanup

func (m *Metrics) Cleanup(user string)

type Queue

type Queue interface {
	Chan() RequestChannel
	Dequeue() Request
	Name() string
	Len() int
}

type QueueIndex

type QueueIndex int // nolint:revive

QueueIndex is opaque type that allows to resume iteration over tenants between successive calls of RequestQueue.GetNextRequestForQuerier method.

var StartIndex QueueIndex = -1

StartIndex is the index of the queue that starts iteration over sub queues.

var StartIndexWithLocalQueue QueueIndex = -2

StartIndexWithLocalQueue is the index of the queue that starts iteration over local and sub queues.

func (QueueIndex) ReuseLastIndex

func (ui QueueIndex) ReuseLastIndex() QueueIndex

Modify index to start iteration on the same tenant, for which last queue was returned.

type QueuePath

type QueuePath []string //nolint:revive

type Request

type Request any

Request stored into the queue.

type RequestChannel

type RequestChannel chan Request

RequestChannel is a channel that queues Requests

type RequestQueue

type RequestQueue struct {
	services.Service
	// contains filtered or unexported fields
}

RequestQueue holds incoming requests in per-tenant queues. It also assigns each tenant specified number of queriers, and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests in a fair fashion.

func NewRequestQueue

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue

func (*RequestQueue) Dequeue

func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID string) (Request, QueueIndex, error)

Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests. By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly. If querier finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser.

func (*RequestQueue) Enqueue

func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQueriers int, successFn func()) error

Enqueue puts the request into the queue. MaxQueries is tenant-specific value that specifies how many queriers can this tenant use (zero or negative = all queriers). It is passed to each Enqueue, because it can change between calls.

If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.

func (*RequestQueue) GetConnectedQuerierWorkersMetric

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64

func (*RequestQueue) NotifyQuerierShutdown

func (q *RequestQueue) NotifyQuerierShutdown(querierID string)

func (*RequestQueue) RegisterQuerierConnection

func (q *RequestQueue) RegisterQuerierConnection(querier string)

func (*RequestQueue) UnregisterQuerierConnection

func (q *RequestQueue) UnregisterQuerierConnection(querier string)

type TreeQueue

type TreeQueue struct {
	// contains filtered or unexported fields
}

TreeQueue is an hierarchical queue implementation where each sub-queue has the same guarantees to be chosen from. Each queue has also a local queue, which gets chosen with equal preference as the sub-queues.

func (*TreeQueue) Chan

func (q *TreeQueue) Chan() RequestChannel

Chan implements Queue

func (*TreeQueue) Dequeue

func (q *TreeQueue) Dequeue() Request

Dequeue implements Queue

func (*TreeQueue) Len

func (q *TreeQueue) Len() int

Len implements Queue It returns the length of the local queue and all sub-queues. This may be expensive depending on the size of the queue tree.

func (*TreeQueue) Name

func (q *TreeQueue) Name() string

Name implements Queue

func (*TreeQueue) Pos

func (q *TreeQueue) Pos() QueueIndex

Index implements Mapable

func (*TreeQueue) SetPos

func (q *TreeQueue) SetPos(index QueueIndex)

Index implements Mapable

func (*TreeQueue) String

func (q *TreeQueue) String() string

String makes the queue printable

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL