controller

package
v0.0.0-...-1679dbc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 23 Imported by: 3

Documentation

Index

Constants

View Source
const (
	FromPeriodReport = "period_report"
	FromLowRU        = "low_ru"
)

Source List

Inf is the infinite rate limit; it allows all events (even if burst is zero).

View Source
const InfDuration = time.Duration(1<<63 - 1)

InfDuration is the duration returned by Delay when a Reservation is not OK.

Variables

This section is empty.

Functions

func WaitReservations

func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) (time.Duration, error)

WaitReservations is used to process a series of reservations so that all limiter tokens are returned if one reservation fails

Types

type Config

type Config struct {
	// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
	DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

	// LTBMaxWaitDuration is the max wait time duration for local token bucket.
	LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

	// WaitRetryInterval is the interval to retry when waiting for the token.
	WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"`

	// WaitRetryTimes is the times to retry when waiting for the token.
	WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"`

	// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
	// This configuration should be modified carefully.
	RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`

	// EnableControllerTraceLog is to control whether resource control client enable trace.
	EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
}

Config is the configuration of the resource manager controller which includes some option for client needed.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns the default resource manager controller configuration.

type Duration

type Duration struct {
	time.Duration
}

Duration is a wrapper of time.Duration for TOML and JSON.

func NewDuration

func NewDuration(duration time.Duration) Duration

NewDuration creates a Duration from time.Duration.

func (*Duration) MarshalJSON

func (d *Duration) MarshalJSON() ([]byte, error)

MarshalJSON returns the duration as a JSON string.

func (Duration) MarshalText

func (d Duration) MarshalText() ([]byte, error)

MarshalText returns the duration as a JSON string.

func (*Duration) UnmarshalJSON

func (d *Duration) UnmarshalJSON(text []byte) error

UnmarshalJSON parses a JSON string into the duration.

func (*Duration) UnmarshalText

func (d *Duration) UnmarshalText(text []byte) error

UnmarshalText parses a TOML string into the duration.

type KVCalculator

type KVCalculator struct {
	*RUConfig
}

KVCalculator is used to calculate the KV-side consumption.

func (*KVCalculator) AfterKVRequest

func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo)

AfterKVRequest ...

func (*KVCalculator) BeforeKVRequest

func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req RequestInfo)

BeforeKVRequest ...

func (*KVCalculator) Trickle

func (*KVCalculator) Trickle(*rmpb.Consumption)

Trickle ...

type Limit

type Limit float64

Limit defines the maximum frequency of some events. Limit is represented as number of events per second. A zero Limit allows no events.

func Every

func Every(interval time.Duration) Limit

Every converts a minimum time interval between events to a Limit.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

A Limiter controls how frequently events are allowed to happen. It implements a "token bucket" of size b, initially full and refilled at rate r tokens per second. Informally, in any large enough time interval, the Limiter limits the rate to r tokens per second, with a maximum burst size of b events. As a special case, if r == Inf (the infinite rate), b is ignored. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.

The zero value is a valid Limiter, but it will reject all events. Use NewLimiter to create non-zero Limiters.

Limiter has one main methods Reserve. If no token is available, Reserve returns a reservation for a future token and the amount of time the caller must wait before using it, or its associated context.Context is canceled.

Some changes about burst(b):

  • If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
  • If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within an unlimited capacity).
  • If b > 0, that means the limiter is limited capacity.

func NewLimiter

func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter

NewLimiter returns a new Limiter that allows events up to rate r and permits bursts of at most b tokens.

func NewLimiterWithCfg

func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter

NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits bursts of at most b tokens.

func (*Limiter) AvailableTokens

func (lim *Limiter) AvailableTokens(now time.Time) float64

AvailableTokens decreases the amount of tokens currently available.

func (*Limiter) GetBurst

func (lim *Limiter) GetBurst() int64

GetBurst returns the burst size of the limiter

func (*Limiter) IsLowTokens

func (lim *Limiter) IsLowTokens() bool

IsLowTokens returns whether the limiter is in low tokens

func (*Limiter) Limit

func (lim *Limiter) Limit() Limit

Limit returns the maximum overall event rate.

func (*Limiter) Reconfigure

func (lim *Limiter) Reconfigure(now time.Time,
	args tokenBucketReconfigureArgs,
	opts ...LimiterOption,
)

Reconfigure modifies all setting for limiter

func (*Limiter) RemoveTokens

func (lim *Limiter) RemoveTokens(now time.Time, amount float64)

RemoveTokens decreases the amount of tokens currently available.

func (*Limiter) Reserve

func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now time.Time, n float64) *Reservation

Reserve returns a Reservation that indicates how long the caller must wait before n events happen. The Limiter takes this Reservation into account when allowing future events. The returned Reservation's OK() method returns false if wait duration exceeds deadline. Usage example:

r := lim.Reserve(time.Now(), 1)
if !r.OK() {
  // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
  return
}
time.Sleep(r.Delay())
Act()

Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.

func (*Limiter) ResetRemainingNotifyTimes

func (lim *Limiter) ResetRemainingNotifyTimes()

ResetRemainingNotifyTimes resets the remaining notify times to 3.

func (*Limiter) SetupNotificationThreshold

func (lim *Limiter) SetupNotificationThreshold(threshold float64)

SetupNotificationThreshold enables the notification at the given threshold.

type LimiterOption

type LimiterOption func(*Limiter)

LimiterOption configures Limiter.

type RUConfig

type RUConfig struct {
	// RU model config
	ReadBaseCost          RequestUnit
	ReadPerBatchBaseCost  RequestUnit
	ReadBytesCost         RequestUnit
	WriteBaseCost         RequestUnit
	WritePerBatchBaseCost RequestUnit
	WriteBytesCost        RequestUnit
	CPUMsCost             RequestUnit

	// some config for client
	LTBMaxWaitDuration       time.Duration
	WaitRetryInterval        time.Duration
	WaitRetryTimes           int
	DegradedModeWaitDuration time.Duration
	// contains filtered or unexported fields
}

RUConfig is the configuration of the resource units, which gives the read/write request units or request resource cost standards. It should be calculated by a given `RequestUnitConfig` or `RequestResourceConfig`.

func DefaultRUConfig

func DefaultRUConfig() *RUConfig

DefaultRUConfig returns the default configuration.

func GenerateRUConfig

func GenerateRUConfig(config *Config) *RUConfig

GenerateRUConfig generates the configuration by the given request unit configuration.

type RequestInfo

type RequestInfo interface {
	IsWrite() bool
	WriteBytes() uint64
	ReplicaNumber() int64
	StoreID() uint64
}

RequestInfo is the interface of the request information provider. A request should be able to tell whether it's a write request and if so, the written bytes would also be provided.

type RequestUnit

type RequestUnit float64

RequestUnit is the basic unit of the resource request management, which has two types:

  • RRU: read request unit
  • WRU: write request unit

type RequestUnitConfig

type RequestUnitConfig struct {
	// ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or
	// the CPU times taken for a request, this cost is inevitable.
	ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"`
	// ReadPerBatchBaseCost is the base cost for a read request with batch.
	ReadPerBatchBaseCost float64 `toml:"read-per-batch-base-cost" json:"read-per-batch-base-cost"`
	// ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default.
	ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"`
	// WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or
	// the CPU times taken for a request, this cost is inevitable.
	WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"`
	// WritePerBatchBaseCost is the base cost for a write request with batch.
	WritePerBatchBaseCost float64 `toml:"write-per-batch-base-cost" json:"write-per-batch-base-cost"`
	// WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default.
	WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"`
	// CPUMsCost is the cost for each millisecond of CPU time taken.
	// It's 1 RU = 3 millisecond by default.
	CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}

RequestUnitConfig is the configuration of the request units, which determines the coefficients of the RRU and WRU cost. This configuration should be modified carefully.

func DefaultRequestUnitConfig

func DefaultRequestUnitConfig() RequestUnitConfig

DefaultRequestUnitConfig returns the default request unit configuration.

type Reservation

type Reservation struct {
	// contains filtered or unexported fields
}

A Reservation holds information about events that are permitted by a Limiter to happen after a delay. A Reservation may be canceled, which may enable the Limiter to permit additional events.

func (*Reservation) CancelAt

func (r *Reservation) CancelAt(now time.Time)

CancelAt indicates that the reservation holder will not perform the reserved action and reverses tokens which be refilled into limiter.

func (*Reservation) Delay

func (r *Reservation) Delay() time.Duration

Delay is shorthand for DelayFrom(time.Now()).

func (*Reservation) DelayFrom

func (r *Reservation) DelayFrom(now time.Time) time.Duration

DelayFrom returns the duration for which the reservation holder must wait before taking the reserved action. Zero duration means act immediately. InfDuration means the limiter cannot grant the tokens requested in this Reservation within the maximum wait time.

func (*Reservation) OK

func (r *Reservation) OK() bool

OK returns whether the limiter can provide the requested number of tokens within the maximum wait time. If OK is false, Delay returns InfDuration, and Cancel does nothing.

type ResourceCalculator

type ResourceCalculator interface {
	// Trickle is used to calculate the resource consumption periodically rather than on the request path.
	// It's mainly used to calculate like the SQL CPU cost.
	// Need to check if it is a serverless environment
	Trickle(*rmpb.Consumption)
	// BeforeKVRequest is used to calculate the resource consumption before the KV request.
	// It's mainly used to calculate the base and write request cost.
	BeforeKVRequest(*rmpb.Consumption, RequestInfo)
	// AfterKVRequest is used to calculate the resource consumption after the KV request.
	// It's mainly used to calculate the read request cost and KV CPU cost.
	AfterKVRequest(*rmpb.Consumption, RequestInfo, ResponseInfo)
}

ResourceCalculator is used to calculate the resource consumption of a request.

type ResourceControlCreateOption

type ResourceControlCreateOption func(controller *ResourceGroupsController)

ResourceControlCreateOption create a ResourceGroupsController with the optional settings.

func EnableSingleGroupByKeyspace

func EnableSingleGroupByKeyspace() ResourceControlCreateOption

EnableSingleGroupByKeyspace is the option to enable single group by keyspace feature.

func WithMaxWaitDuration

func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption

WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.

func WithWaitRetryInterval

func WithWaitRetryInterval(d time.Duration) ResourceControlCreateOption

WithWaitRetryInterval is the option to set the retry interval when waiting for the token.

func WithWaitRetryTimes

func WithWaitRetryTimes(times int) ResourceControlCreateOption

WithWaitRetryTimes is the option to set the times to retry when waiting for the token.

type ResourceGroupKVInterceptor

type ResourceGroupKVInterceptor interface {
	// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
	OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)
	// OnResponse is used to consume tokens after receiving response.
	OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
	// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
	IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}

ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.

type ResourceGroupProvider

type ResourceGroupProvider interface {
	GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error)
	ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error)
	AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
	ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
	DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
	AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)

	// meta storage client
	LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
	Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error)
	Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error)
}

ResourceGroupProvider provides some api to interact with resource manager server.

type ResourceGroupsController

type ResourceGroupsController struct {
	// contains filtered or unexported fields
}

ResourceGroupsController implements ResourceGroupKVInterceptor.

func NewResourceGroupController

func NewResourceGroupController(
	ctx context.Context,
	clientUniqueID uint64,
	provider ResourceGroupProvider,
	requestUnitConfig *RequestUnitConfig,
	opts ...ResourceControlCreateOption,
) (*ResourceGroupsController, error)

NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor

func (*ResourceGroupsController) GetActiveResourceGroup

func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup

GetActiveResourceGroup is used to get action resource group. This is used for test only.

func (*ResourceGroupsController) GetConfig

func (c *ResourceGroupsController) GetConfig() *RUConfig

GetConfig returns the config of controller.

func (*ResourceGroupsController) GetResourceGroup

func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error)

GetResourceGroup returns the meta setting of the given resource group name.

func (*ResourceGroupsController) IsBackgroundRequest

func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
	resourceGroupName, requestResource string) bool

IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.

func (*ResourceGroupsController) OnRequestWait

func (c *ResourceGroupsController) OnRequestWait(
	ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)

OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.

func (*ResourceGroupsController) OnResponse

func (c *ResourceGroupsController) OnResponse(
	resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error)

OnResponse is used to consume tokens after receiving response

func (*ResourceGroupsController) Start

Start starts ResourceGroupController service.

func (*ResourceGroupsController) Stop

func (c *ResourceGroupsController) Stop() error

Stop stops ResourceGroupController service.

type ResponseInfo

type ResponseInfo interface {
	ReadBytes() uint64
	KVCPU() time.Duration
	// Succeed is used to tell whether the request is successfully returned.
	// If not, we need to pay back the WRU cost of the request.
	Succeed() bool
}

ResponseInfo is the interface of the response information provider. A response should be able to tell how many bytes it read and KV CPU cost in milliseconds.

type SQLCalculator

type SQLCalculator struct {
	*RUConfig
}

SQLCalculator is used to calculate the SQL-side consumption.

func (*SQLCalculator) AfterKVRequest

AfterKVRequest ...

func (*SQLCalculator) BeforeKVRequest

func (*SQLCalculator) BeforeKVRequest(*rmpb.Consumption, RequestInfo)

BeforeKVRequest ...

func (*SQLCalculator) Trickle

func (dsc *SQLCalculator) Trickle(consumption *rmpb.Consumption)

Trickle update sql layer CPU consumption.

type TestRequestInfo

type TestRequestInfo struct {
	// contains filtered or unexported fields
}

TestRequestInfo is used to test the request info interface.

func NewTestRequestInfo

func NewTestRequestInfo(isWrite bool, writeBytes uint64, storeID uint64) *TestRequestInfo

NewTestRequestInfo creates a new TestRequestInfo.

func (*TestRequestInfo) IsWrite

func (tri *TestRequestInfo) IsWrite() bool

IsWrite implements the RequestInfo interface.

func (*TestRequestInfo) ReplicaNumber

func (*TestRequestInfo) ReplicaNumber() int64

ReplicaNumber implements the RequestInfo interface.

func (*TestRequestInfo) StoreID

func (tri *TestRequestInfo) StoreID() uint64

StoreID implements the RequestInfo interface.

func (*TestRequestInfo) WriteBytes

func (tri *TestRequestInfo) WriteBytes() uint64

WriteBytes implements the RequestInfo interface.

type TestResponseInfo

type TestResponseInfo struct {
	// contains filtered or unexported fields
}

TestResponseInfo is used to test the response info interface.

func NewTestResponseInfo

func NewTestResponseInfo(readBytes uint64, kvCPU time.Duration, succeed bool) *TestResponseInfo

NewTestResponseInfo creates a new TestResponseInfo.

func (*TestResponseInfo) KVCPU

func (tri *TestResponseInfo) KVCPU() time.Duration

KVCPU implements the ResponseInfo interface.

func (*TestResponseInfo) ReadBytes

func (tri *TestResponseInfo) ReadBytes() uint64

ReadBytes implements the ResponseInfo interface.

func (*TestResponseInfo) Succeed

func (tri *TestResponseInfo) Succeed() bool

Succeed implements the ResponseInfo interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL