Back to / pkg / util / flowcontrol / fairqueuing

Package fairqueuing

Latest Go to latest

The highest tagged major version is .

Published: Oct 20, 2020 | License: Apache-2.0 | Module:


type DispatchingConfig

type DispatchingConfig struct {
	// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
	ConcurrencyLimit int

DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.

type Integrator

type Integrator interface {

	GetResults() IntegratorResults

	// Return the results of integrating to now, and reset integration to start now
	Reset() IntegratorResults

Integrator computes the moments of some variable X over time as read from a particular clock. The integrals start when the Integrator is created, and ends at the latest operation on the Integrator. As a `metrics.TimedObserver` this fixes X1=1 and ignores attempts to change X1.

func NewIntegrator

func NewIntegrator(clock clock.PassiveClock) Integrator

NewIntegrator makes one that uses the given clock

type IntegratorResults

type IntegratorResults struct {
	Duration  float64 //seconds
	Average   float64 //time-weighted
	Deviation float64 //standard deviation: sqrt(avg((value-avg)^2))
	Min, Max  float64

IntegratorResults holds statistical abstracts of the integration

func (*IntegratorResults) Equal

func (x *IntegratorResults) Equal(y *IntegratorResults) bool

Equal tests for semantic equality. This considers all NaN values to be equal to each other.

type Moments

type Moments struct {
	ElapsedSeconds float64 // integral of dt
	IntegralX      float64 // integral of x dt
	IntegralXX     float64 // integral of x*x dt

Moments are the integrals of the 0, 1, and 2 powers of some variable X over some range of time.

func ConstantMoments

func ConstantMoments(dt, x float64) Moments

ConstantMoments is for a constant X

func (Moments) Add

func (igr Moments) Add(ogr Moments) Moments

Add combines over two ranges of time

func (Moments) AvgAndStdDev

func (igr Moments) AvgAndStdDev() (float64, float64)

AvgAndStdDev returns the average and standard devation

func (Moments) Sub

func (igr Moments) Sub(ogr Moments) Moments

Sub finds the difference between a range of time and a subrange

type QueueNoteFn

type QueueNoteFn func(inQueue bool)

QueueNoteFn is called when a request enters and leaves a queue

type QueueSet

type QueueSet interface {
	// BeginConfigChange starts the two-step process of updating the
	// configuration.  No change is made until Complete is called.  If
	// `C := X.BeginConstruction(q)` then `C.Complete(d)` returns the
	// same value `X`.  If the QueuingConfig's DesiredNumQueues field
	// is zero then the other queuing-specific config parameters are
	// not changed, so that the queues continue draining as before.
	// In any case, reconfiguration does not discard any queue unless
	// and until it is undesired and empty.
	BeginConfigChange(QueuingConfig) (QueueSetCompleter, error)

	// IsIdle returns a bool indicating whether the QueueSet was idle
	// at the moment of the return.  Idle means the QueueSet has zero
	// requests queued and zero executing.  This bit can change only
	// (1) during a call to StartRequest and (2) during a call to
	// Request::Finish.  In the latter case idleness can only change
	// from false to true.
	IsIdle() bool

	// StartRequest begins the process of handling a request.  If the
	// request gets queued and the number of queues is greater than 1
	// then StartRequest uses the given hashValue as the source of
	// entropy as it shuffle-shards the request into a queue.  The
	// descr1 and descr2 values play no role in the logic but appear
	// in log messages.  This method always returns quickly (without
	// waiting for the request to be dequeued).  If this method
	// returns a nil Request value then caller should reject the
	// request and the returned bool indicates whether the QueueSet
	// was idle at the moment of the return.  Otherwise idle==false
	// and the client must call the Finish method of the Request
	// exactly once.
	StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)

	// UpdateObservations makes sure any time-based statistics have
	// caught up with the current clock reading

	// Dump saves and returns the instant internal state of the queue-set.
	// Note that dumping process will stop the queue-set from proceeding
	// any requests.
	// For debugging only.
	Dump(includeRequestDetails bool) debug.QueueSetDump

QueueSet is the abstraction for the queuing and dispatching functionality of one non-exempt priority level. It covers the functionality described in the "Assignment to a Queue", "Queuing", and "Dispatching" sections of . Some day we may have connections between priority levels, but today is not that day.

type QueueSetCompleter

type QueueSetCompleter interface {
	// Complete returns a QueueSet configured by the given
	// dispatching configuration.
	Complete(DispatchingConfig) QueueSet

QueueSetCompleter finishes the two-step process of creating or reconfiguring a QueueSet

type QueueSetFactory

type QueueSetFactory interface {
	// BeginConstruction does the first phase of creating a QueueSet
	BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error)

QueueSetFactory is used to create QueueSet objects. Creation, like config update, is done in two phases: the first phase consumes the QueuingConfig and the second consumes the DispatchingConfig. They are separated so that errors from the first phase can be found before committing to a concurrency allotment for the second.

type QueuingConfig

type QueuingConfig struct {
	// Name is used to identify a queue set, allowing for descriptive information about its intended use
	Name string

	// DesiredNumQueues is the number of queues that the API says
	// should exist now.  This may be zero, in which case
	// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
	DesiredNumQueues int

	// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
	QueueLengthLimit int

	// HandSize is a parameter of shuffle sharding.  Upon arrival of a request, a queue is chosen by randomly
	// dealing a "hand" of this many queues and then picking one of minimum length.
	HandSize int

	// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
	// If, by the end of that time, the request has not been dispatched then it is rejected.
	RequestWaitLimit time.Duration

QueuingConfig defines the configuration of the queuing aspect of a QueueSet.

type Request

type Request interface {
	// Finish determines whether to execute or reject the request and
	// invokes `execute` if the decision is to execute the request.
	// The returned `idle bool` value indicates whether the QueueSet
	// was idle when the value was calculated, but might no longer be
	// accurate by the time the client examines that value.
	Finish(execute func()) (idle bool)

Request represents the remainder of the handling of one request

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier