Version: v1.0.6 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2021 License: GPL-3.0 Imports: 7 Imported by: 0



Package flowcontrol implements a client side flow control mechanism



View Source
const (

	// DecParamDelay is applied at server side when decreasing capacity in order to
	// avoid a buffer underrun error due to requests sent by the client before
	// receiving the capacity update announcement
	DecParamDelay = time.Second * 2
View Source
const FixedPointMultiplier = 1000000

FixedPointMultiplier is applied to the recharge integrator and the recharge curve.

Note: fixed point arithmetic is required for the integrator because it is a constantly increasing value that can wrap around int64 limits (which behavior is also supported by the priority queue). A floating point value would gradually lose precision in this application. The recharge curve and all recharge values are encoded as fixed point because sumRecharge is frequently updated by adding or subtracting individual recharge values and perfect precision is required.


This section is empty.


This section is empty.


type ClientManager

type ClientManager struct {
	// contains filtered or unexported fields

ClientManager controls the capacity assigned to the clients of a server. Since ServerParams guarantee a safe lower estimate for processable requests even in case of all clients being active, ClientManager calculates a corrigated buffer value and usually allows a higher remaining buffer value to be returned with each reply.

func NewClientManager

func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager

NewClientManager returns a new client manager. Client manager enhances flow control performance by allowing client buffers to recharge quicker than the minimum guaranteed recharge rate if possible. The sum of all minimum recharge rates (sumRecharge) is updated each time a clients starts or finishes buffer recharging. Then an adjusted total recharge rate is calculated using a piecewise linear recharge curve:

totalRecharge = curve(sumRecharge) (totalRecharge >= sumRecharge is enforced)

Then the "bonus" buffer recharge is distributed between currently recharging clients proportionally to their minimum recharge rates.

Note: total recharge is proportional to the average number of parallel running serving threads. A recharge value of 1000000 corresponds to one thread in average. The maximum number of allowed serving threads should always be considerably higher than the targeted average number.

Note 2: although it is possible to specify a curve allowing the total target recharge starting from zero sumRecharge, it makes sense to add a linear ramp starting from zero in order to not let a single low-priority client use up the entire server capacity and thus ensure quick availability for others at any moment.

func (*ClientManager) SetCapacityLimits

func (cm *ClientManager) SetCapacityLimits(min, max, raiseThreshold uint64)

SetCapacityRaiseThreshold sets a threshold value used for raising capFactor. Either if the difference between total allowed and connected capacity is less than this threshold or if their ratio is less than capacityRaiseThresholdRatio then capFactor is allowed to slowly raise.

func (*ClientManager) SetRechargeCurve

func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear)

SetRechargeCurve updates the recharge curve

func (*ClientManager) Stop

func (cm *ClientManager) Stop()

Stop stops the client manager

func (*ClientManager) SubscribeTotalCapacity

func (cm *ClientManager) SubscribeTotalCapacity(ch chan uint64) uint64

SubscribeTotalCapacity returns all future updates to the total capacity value through a channel and also returns the current value

type ClientNode

type ClientNode struct {
	// contains filtered or unexported fields

ClientNode is the flow control system's representation of a client (used in server mode only)

func NewClientNode

func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode

NewClientNode returns a new ClientNode

func (*ClientNode) AcceptRequest

func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bool, bufShort uint64, priority int64)

AcceptRequest returns whether a new request can be accepted and the missing buffer amount if it was rejected due to a buffer underrun. If accepted, maxCost is deducted from the flow control buffer.

func (*ClientNode) BufferStatus

func (node *ClientNode) BufferStatus() (uint64, uint64)

BufferStatus returns the current buffer value and limit

func (*ClientNode) Disconnect

func (node *ClientNode) Disconnect()

Disconnect should be called when a client is disconnected

func (*ClientNode) Freeze

func (node *ClientNode) Freeze()

Freeze notifies the client manager about a client freeze event in which case the total capacity allowance is slightly reduced.

func (*ClientNode) OneTimeCost

func (node *ClientNode) OneTimeCost(cost uint64)

OneTimeCost subtracts the given amount from the node's buffer.

Note: this call can take the buffer into the negative region internally. In this case zero buffer value is returned by exported calls and no requests are accepted.

func (*ClientNode) RequestProcessed

func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64

RequestProcessed should be called when the request has been processed

func (*ClientNode) UpdateParams

func (node *ClientNode) UpdateParams(params ServerParams)

UpdateParams updates the flow control parameters of a client node

type PieceWiseLinear

type PieceWiseLinear []struct{ X, Y uint64 }

PieceWiseLinear is used to describe recharge curves

func (PieceWiseLinear) Valid

func (pwl PieceWiseLinear) Valid() bool

Valid returns true if the X coordinates of the curve points are non-strictly monotonic

func (PieceWiseLinear) ValueAt

func (pwl PieceWiseLinear) ValueAt(x uint64) float64

ValueAt returns the curve's value at a given point

type ServerNode

type ServerNode struct {
	// contains filtered or unexported fields

ServerNode is the flow control system's representation of a server (used in client mode only)

func NewServerNode

func NewServerNode(params ServerParams, clock mclock.Clock) *ServerNode

NewServerNode returns a new ServerNode

func (*ServerNode) CanSend

func (node *ServerNode) CanSend(maxCost uint64) (time.Duration, float64)

CanSend returns the minimum waiting time required before sending a request with the given maximum estimated cost. Second return value is the relative estimated buffer level after sending the request (divided by BufLimit).

func (*ServerNode) DumpLogs

func (node *ServerNode) DumpLogs()

DumpLogs dumps the event log if logging is used

func (*ServerNode) QueuedRequest

func (node *ServerNode) QueuedRequest(reqID, maxCost uint64)

QueuedRequest should be called when the request has been assigned to the given server node, before putting it in the send queue. It is mandatory that requests are sent in the same order as the QueuedRequest calls are made.

func (*ServerNode) ReceivedReply

func (node *ServerNode) ReceivedReply(reqID, bv uint64)

ReceivedReply adjusts estimated buffer value according to the value included in the latest request reply.

func (*ServerNode) ResumeFreeze

func (node *ServerNode) ResumeFreeze(bv uint64)

ResumeFreeze cleans all pending requests and sets the buffer estimate to the reported value after resuming from a frozen state

func (*ServerNode) UpdateParams

func (node *ServerNode) UpdateParams(params ServerParams)

UpdateParams updates the flow control parameters of the node

type ServerParams

type ServerParams struct {
	BufLimit, MinRecharge uint64

ServerParams are the flow control parameters specified by a server for a client

Note: a server can assign different amounts of capacity to each client by giving different parameters to them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL