Documentation
¶
Overview ¶
Package backpressure allows subsystems in a distributed system to reject requests beyond their capacity and for clients to cooperate by slowing down.
See https://github.com/bradenaw/backpressure for more background.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrRejected is returned by RateLimiter and Semaphore when a request times out before being // admitted. ErrRejected = errors.New("rejected") // ErrClientRejection is returned by an AdaptiveThrottle when the request was not even sent to // the backend because it is overloaded. ErrClientRejection = errors.New("rejected without sending: backend is unhealthy") )
Functions ¶
func AcceptedError ¶
AcceptedError wraps the given err as "accepted" by the backend for the purposes of AdaptiveThrottle. This should be done for regular protocol errors that do not mean the backend is unhealthy, for example a precondition failure as part of an optimistic update or a rejection for a particular request being too large.
func WithAdaptiveThrottle ¶
func WithAdaptiveThrottle[T any]( at *AdaptiveThrottle, p Priority, f func() (T, error), ) (T, error)
WithAdaptiveThrottle is used to send a request to a backend using the given AdaptiveThrottle for client-rejections.
If f returns an error, at considers this to be a rejection unless it is wrapped with AcceptedError(). If there are enough rejections within a given time window, further calls to WithAdaptiveThrottle may begin returning ErrClientRejection immediately without invoking f. The rate at which this happens depends on the error rate of f.
WithAdaptiveThrottle will prefer to reject lower-priority requests if it can.
Types ¶
type AdaptiveThrottle ¶
type AdaptiveThrottle struct {
// contains filtered or unexported fields
}
AdaptiveThrottle is used in a client to throttle requests to a backend as it becomes unhealthy to help it recover from overload more quickly. Because backends must expend resources to reject requests over their capacity it is vital for clients to ease off on sending load when they are in trouble, lest the backend spend all of its resources on rejecting requests and have none left over to actually serve any.
The adaptive throttle works by tracking the success rate of requests over some time interval (usually a minute or so), and randomly rejecting requests without sending them to avoid sending too much more than the rate that are expected to actually be successful. Some slop is included, because even if the backend is serving zero requests successfully, we do need to occasionally send it requests to learn when it becomes healthy again.
More on adaptive throttles in https://sre.google/sre-book/handling-overload/
func NewAdaptiveThrottle ¶
func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *AdaptiveThrottle
NewAdaptiveThrottle returns an AdaptiveThrottle.
priorities is the number of priorities that the throttle will accept. Giving a priority outside of `[0, priorities)` will panic.
func (*AdaptiveThrottle) Accepted ¶
func (at *AdaptiveThrottle) Accepted(p Priority)
Accepted records that a request was accepted by the backend.
func (*AdaptiveThrottle) Attempt ¶
func (at *AdaptiveThrottle) Attempt( p Priority, ) bool
Attempt returns true if the request should be attempted, and false if it should be rejected.
func (*AdaptiveThrottle) Rejected ¶
func (at *AdaptiveThrottle) Rejected(p Priority)
Rejected records that a request was rejected by the backend.
type AdaptiveThrottleOption ¶
type AdaptiveThrottleOption struct {
// contains filtered or unexported fields
}
Additional options for the AdaptiveThrottle type. These options do not frequently need to be tuned as the defaults work in a majority of cases.
func AdaptiveThrottleMinimumRate ¶
func AdaptiveThrottleMinimumRate(x float64) AdaptiveThrottleOption
AdaptiveThrottleMinimumRate sets the minimum number of requests per second that the adaptive throttle will allow (approximately) through to the upstream, even if every request is failing. This is important because this is how the adaptive throttle 'learns' when the upstream becomes healthy again.
func AdaptiveThrottleRatio ¶
func AdaptiveThrottleRatio(k float64) AdaptiveThrottleOption
AdaptiveThrottleRatio sets the ratio of the measured success rate and the rate that the throttle will admit. For example, when k is 2 the throttle will allow twice as many requests to actually reach the backend as it believes will succeed. Higher values of k mean that the throttle will react more slowly when a backend becomes unhealthy, but react more quickly when it becomes healthy again, and will allow more load to an unhealthy backend. k=2 is usually a good place to start, but backends that serve "cheap" requests (e.g. in-memory caches) may need a lower value.
func AdaptiveThrottleWindow ¶
func AdaptiveThrottleWindow(d time.Duration) AdaptiveThrottleOption
AdaptiveThrottleWindow sets the time window over which the throttle remembers requests for use in figuring out the success rate.
type Priority ¶
type Priority int
Priority is the importance of a request. Types in this package prefer higher priority requests.
Priorities are non-negative integers and are numbered in decreasing order of priority. `Priority(0)` is the highest priority, meaning it is preferred over all other priorities. `Priority(1)` is higher priority than all priorities except `Priority(0)`, and so on.
Types in this package generally have overhead per-priority and function best when priorities are quite coarse, and so it's recommended to use a relatively small number of priorities. Around four works well in practice, for example:
Critical - A user is waiting and critical user flows are functional, if severely degraded, if only Critical requests are served.
High - A user is waiting, and this request is important to the overall experience. The default for anything that a user might see.
Medium - No user is directly waiting, or the request is noncritical to the experience. Examples: type-ahead search suggestions, asynchronous work.
Low - This is a request that is content to consume only leftover capacity, and may wait until off-peak when there is more capacity available. Examples: daily batch jobs, continuous verification, one-offs, backfills.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is used to bound the rate of some operation. It is a token-bucket similar to golang.org/x/time/rate. Conceptually, it is a bucket of some capacity (the "burst"), which refills at some rate. Callers ask for some number of tokens from the bucket using Wait, they take the tokens from the bucket and Wait returns. If there are not enough tokens, then Wait blocks to wait for enough tokens to be replenished.
It has two major differences from golang.org/x/time/rate:
1. It is prioritized, preferring to accept higher priority requests first.
2. Each queue of waiters is a CoDel, which is not fair but can behave better in a real-time system under load.
In order to minimize wait times for high-priority requests, it self balances using "debt." Debt is tracked per priority and is the number of tokens that must be left in the bucket before a given priority may be admitted. For example, if `Priority(1)` has a debt of 5, then a `Wait(ctx, Priority(1), 3)` cannot be admitted until there are 8 tokens in the bucket.
Debt is self-adjusting: whenever a high-priority `Wait()` cannot immediately be accepted, the debt for all lower priorities is increased. Intuitively, this request would not have had to wait if this debt already existed, so the bucket self-corrects by adding it. Whenever a high-priority `Acquire()` can be admitted without waiting, then any existing debt may not have been necessary and so some of it is forgiven. Additionally, debt decays over time, since anything the bucket has learned about a load pattern may become out-of-date as load changes.
func NewRateLimiter ¶
func NewRateLimiter( priorities int, rate float64, burst float64, options ...RateLimiterOption, ) *RateLimiter
NewRateLimiter returns a rate limiter with the given number of priorities, allowing the given aggregate rate, and up to the given burst. That is, the rate limiter is initially empty and its tokens refill at `rate` up to `burst` total tokens.
The other options do not frequently need to be modified.
func (*RateLimiter) Close ¶
func (rl *RateLimiter) Close()
Close frees background resources used by the rate limiter.
func (*RateLimiter) SetRate ¶
func (rl *RateLimiter) SetRate(rate float64, burst float64)
type RateLimiterOption ¶
type RateLimiterOption struct {
// contains filtered or unexported fields
}
Additional options for the RateLimiter type. These options do not frequently need to be tuned as the defaults work in a majority of cases, but they're included for completeness.
func RateLimiterDebtDecayInterval ¶
func RateLimiterDebtDecayInterval(d time.Duration) RateLimiterOption
The time it takes for 100% debt to be completely forgiven. Debt decays linearly over time since load patterns change and a previously learned debt amount may no longer be relevant.
func RateLimiterDebtForgivePerSuccess ¶
func RateLimiterDebtForgivePerSuccess(x float64) RateLimiterOption
The proportion of debt that is forgiven for lower priorities whenever a higher-priority request succeeds, in [0, 1].
func RateLimiterLongTimeout ¶
func RateLimiterLongTimeout(d time.Duration) RateLimiterOption
The long timeout for the internal CoDels. See the README for more on CoDel.
func RateLimiterShortTimeout ¶
func RateLimiterShortTimeout(d time.Duration) RateLimiterOption
The short timeout for the internal CoDels. See the README for more on CoDel.
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore is a way to bound concurrency and similar to golang.org/x/sync/semaphore. Conceptually, it is a bucket of some number of tokens. Callers can take tokens out of this bucket using Acquire, do whatever operation needs concurrency bounding, and then return the tokens with Release. If the bucket does not have enough tokens in it to Acquire, it will block for some time in case another user of tokens Releases.
It has two major differences from golang.org/x/sync/semaphore:
1. It is prioritized, preferring to accept higher priority requests first.
2. Each queue of waiters is a CoDel, which is not fair but can behave better in a real-time system under load.
In order to minimize wait times for high-priority requests, it self balances using "debt." Debt is tracked per priority and is the number of tickets that must be left in the semaphore before a given request may be admitted.
Debt is self-adjusting: whenever a high-priority `Acquire()` cannot immediately be accepted, the debt for all lower priorities is increased. Intuitively, this request would not have had to wait if this debt already existed, so the semaphore self-corrects by adding it. Whenever a high-priority `Acquire()` can be admitted without waiting, then any existing debt may not have been necessary and so some of it is forgiven. Additionally, debt decays over time, since anything the semaphore has learned about a load pattern may become out-of-date as load changes.
func NewSemaphore ¶
func NewSemaphore( priorities int, capacity int, options ...SemaphoreOption, ) *Semaphore
NewSemaphore returns a semaphore with the given number of priorities, and will allow at most capacity concurrency.
The other options do not frequently need to be modified.
func (*Semaphore) Acquire ¶
Acquire attempts to acquire some number of tokens from the semaphore on behalf of the given priority. If Acquire returns nil, these tokens should be returned to the semaphore when the caller is finished with them by using Release. Acquire returns non-nil if the given context expires before the tokens can be acquired, or if the request is rejected for timing out with the semaphore's own timeout.
func (*Semaphore) Close ¶
func (s *Semaphore) Close()
Close frees background resources used by the semaphore.
func (*Semaphore) Release ¶
Release returns the given number of tokens to the semaphore. It should only be called if these tokens are known to be acquired from the semaphore with a corresponding Acquire.
func (*Semaphore) SetCapacity ¶
SetCapacity sets the maximum number of outstanding tokens for the semaphore. If more tokens than this new value are already outstanding the semaphore simply waits for them to be released, it has no way of recalling them. If the new capacity is higher than the old, this will immediately admit the waiters it can.
type SemaphoreOption ¶
type SemaphoreOption struct {
// contains filtered or unexported fields
}
Additional options for the Semaphore type. These options do not frequently need to be tuned as the defaults work in a majority of cases, but they're included for completeness.
func SemaphoreDebtDecayInterval ¶
func SemaphoreDebtDecayInterval(x time.Duration) SemaphoreOption
The time it takes for 100% debt to be completely forgiven. Debt decays linearly over time since load patterns change and a previously learned debt amount may no longer be relevant.
func SemaphoreDebtForgivePerSuccess ¶
func SemaphoreDebtForgivePerSuccess(x float64) SemaphoreOption
The proportion of debt that is forgiven for lower priorities whenever a higher-priority request succeeds, in [0, 1].
func SemaphoreLongTimeout ¶
func SemaphoreLongTimeout(d time.Duration) SemaphoreOption
The long timeout for the internal CoDels. See the README for more on CoDel.
func SemaphoreShortTimeout ¶
func SemaphoreShortTimeout(d time.Duration) SemaphoreOption
The short timeout for the internal CoDels. See the README for more on CoDel.