Package flowcontrol implements a client side flow control mechanism



    View Source
    const (
    	// DecParamDelay is applied at server side when decreasing capacity in order to
    	// avoid a buffer underrun error due to requests sent by the client before
    	// receiving the capacity update announcement
    	DecParamDelay = time.Second * 2
    View Source
    const FixedPointMultiplier = 1000000

      FixedPointMultiplier is applied to the recharge integrator and the recharge curve.

      Note: fixed point arithmetic is required for the integrator because it is a constantly increasing value that can wrap around int64 limits (which behavior is also supported by the priority queue). A floating point value would gradually lose precision in this application. The recharge curve and all recharge values are encoded as fixed point because sumRecharge is frequently updated by adding or subtracting individual recharge values and perfect precision is required.


      This section is empty.


      This section is empty.


      type ClientManager

      type ClientManager struct {
      	// contains filtered or unexported fields

        ClientManager controls the capacity assigned to the clients of a server. Since ServerParams guarantee a safe lower estimate for processable requests even in case of all clients being active, ClientManager calculates a corrigated buffer value and usually allows a higher remaining buffer value to be returned with each reply.

        func NewClientManager

        func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager

          NewClientManager returns a new client manager. Client manager enhances flow control performance by allowing client buffers to recharge quicker than the minimum guaranteed recharge rate if possible. The sum of all minimum recharge rates (sumRecharge) is updated each time a clients starts or finishes buffer recharging. Then an adjusted total recharge rate is calculated using a piecewise linear recharge curve:

          totalRecharge = curve(sumRecharge) (totalRecharge >= sumRecharge is enforced)

          Then the "bonus" buffer recharge is distributed between currently recharging clients proportionally to their minimum recharge rates.

          Note: total recharge is proportional to the average number of parallel running serving threads. A recharge value of 1000000 corresponds to one thread in average. The maximum number of allowed serving threads should always be considerably higher than the targeted average number.

          Note 2: although it is possible to specify a curve allowing the total target recharge starting from zero sumRecharge, it makes sense to add a linear ramp starting from zero in order to not let a single low-priority client use up the entire server capacity and thus ensure quick availability for others at any moment.

          func (*ClientManager) SetCapacityLimits

          func (cm *ClientManager) SetCapacityLimits(min, max, raiseThreshold uint64)

            SetCapacityRaiseThreshold sets a threshold value used for raising capFactor. Either if the difference between total allowed and connected capacity is less than this threshold or if their ratio is less than capacityRaiseThresholdRatio then capFactor is allowed to slowly raise.

            func (*ClientManager) SetRechargeCurve

            func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear)

              SetRechargeCurve updates the recharge curve

              func (*ClientManager) Stop

              func (cm *ClientManager) Stop()

                Stop stops the client manager

                func (*ClientManager) SubscribeTotalCapacity

                func (cm *ClientManager) SubscribeTotalCapacity(ch chan uint64) uint64

                  SubscribeTotalCapacity returns all future updates to the total capacity value through a channel and also returns the current value

                  type ClientNode

                  type ClientNode struct {
                  	// contains filtered or unexported fields

                    ClientNode is the flow control system's representation of a client (used in server mode only)

                    func NewClientNode

                    func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode

                      NewClientNode returns a new ClientNode

                      func (*ClientNode) AcceptRequest

                      func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bool, bufShort uint64, priority int64)

                        AcceptRequest returns whether a new request can be accepted and the missing buffer amount if it was rejected due to a buffer underrun. If accepted, maxCost is deducted from the flow control buffer.

                        func (*ClientNode) BufferStatus

                        func (node *ClientNode) BufferStatus() (uint64, uint64)

                          BufferStatus returns the current buffer value and limit

                          func (*ClientNode) Disconnect

                          func (node *ClientNode) Disconnect()

                            Disconnect should be called when a client is disconnected

                            func (*ClientNode) Freeze

                            func (node *ClientNode) Freeze()

                              Freeze notifies the client manager about a client freeze event in which case the total capacity allowance is slightly reduced.

                              func (*ClientNode) OneTimeCost

                              func (node *ClientNode) OneTimeCost(cost uint64)

                                OneTimeCost subtracts the given amount from the node's buffer.

                                Note: this call can take the buffer into the negative region internally. In this case zero buffer value is returned by exported calls and no requests are accepted.

                                func (*ClientNode) RequestProcessed

                                func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64

                                  RequestProcessed should be called when the request has been processed

                                  func (*ClientNode) UpdateParams

                                  func (node *ClientNode) UpdateParams(params ServerParams)

                                    UpdateParams updates the flow control parameters of a client node

                                    type PieceWiseLinear

                                    type PieceWiseLinear []struct{ X, Y uint64 }

                                      PieceWiseLinear is used to describe recharge curves

                                      func (PieceWiseLinear) Valid

                                      func (pwl PieceWiseLinear) Valid() bool

                                        Valid returns true if the X coordinates of the curve points are non-strictly monotonic

                                        func (PieceWiseLinear) ValueAt

                                        func (pwl PieceWiseLinear) ValueAt(x uint64) float64

                                          ValueAt returns the curve's value at a given point

                                          type ServerNode

                                          type ServerNode struct {
                                          	// contains filtered or unexported fields

                                            ServerNode is the flow control system's representation of a server (used in client mode only)

                                            func NewServerNode

                                            func NewServerNode(params ServerParams, clock mclock.Clock) *ServerNode

                                              NewServerNode returns a new ServerNode

                                              func (*ServerNode) CanSend

                                              func (node *ServerNode) CanSend(maxCost uint64) (time.Duration, float64)

                                                CanSend returns the minimum waiting time required before sending a request with the given maximum estimated cost. Second return value is the relative estimated buffer level after sending the request (divided by BufLimit).

                                                func (*ServerNode) DumpLogs

                                                func (node *ServerNode) DumpLogs()

                                                  DumpLogs dumps the event log if logging is used

                                                  func (*ServerNode) QueuedRequest

                                                  func (node *ServerNode) QueuedRequest(reqID, maxCost uint64)

                                                    QueuedRequest should be called when the request has been assigned to the given server node, before putting it in the send queue. It is mandatory that requests are sent in the same order as the QueuedRequest calls are made.

                                                    func (*ServerNode) ReceivedReply

                                                    func (node *ServerNode) ReceivedReply(reqID, bv uint64)

                                                      ReceivedReply adjusts estimated buffer value according to the value included in the latest request reply.

                                                      func (*ServerNode) ResumeFreeze

                                                      func (node *ServerNode) ResumeFreeze(bv uint64)

                                                        ResumeFreeze cleans all pending requests and sets the buffer estimate to the reported value after resuming from a frozen state

                                                        func (*ServerNode) UpdateParams

                                                        func (node *ServerNode) UpdateParams(params ServerParams)

                                                          UpdateParams updates the flow control parameters of the node

                                                          type ServerParams

                                                          type ServerParams struct {
                                                          	BufLimit, MinRecharge uint64

                                                            ServerParams are the flow control parameters specified by a server for a client

                                                            Note: a server can assign different amounts of capacity to each client by giving different parameters to them.