backpressure

package module
v0.0.0-...-04ab815 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: MIT Imports: 9 Imported by: 0

README

backpressure

Go Reference

This package is intended to help with load management in a distributed system.

It provides:

Each of these is prioritized and generally auto-tuning.

Read on for motivation and explanation of each.

Background

Resources are in general finite: the number of cores or amount of memory in a machine, the number of IOPS an SSD is capable of, the throughput of shared resources like mutexes. Generally, capacity planning can ensure that enough of these resources are available. However, occasionally these resources are overloaded - often caused by unplanned demand, hotspotting, or accidents. The question is what to do in these scenarios and how to ensure that a system recovers from them.

A truly perfect system would just 'scale up' to handle the load:

This is usually not possible. Scaling up generally generates load before it can alleviate it (for example, re-replication is not free). An ideal system (slightly short of a perfect one, let's say) looks like this: an under-capacity ideal system serves every request it gets, and an over-capacity ideal system serves exactly its capacity.

graph of requests vs. successful responses, linear and then leveling off

In reality, systems do not look like this. Managing requests has overhead, even if all we're doing is queueing them. A realistic system looks something like this:

graph of requests vs. successful responses, linear at first and then slowing down, leveling off, and eventually falling back to zero

Under-capacity it serves every request it gets within a reasonable amount of time. Adding more requests begins to have diminishing returns, some of those requests complete and some end up waiting in a queue for too long. At some amount of load, the system begins spending a disproportionate amount of time managing the requests that it does not end up serving successfully, and the success rate falls off again. This is called congestion collapse. We want to avoid reaching this state.

Rejection and Backpressure

The overloaded system's best option in this situation is to reject requests beyond those that it can serve successfully. (Let's say successfully also requires serving it within a reasonable time bound, because higher layers of the stack including the user may time out and retry.) This widens the part of the curve where it can serve near its theoretical capacity, because rejecting a request is (usually) cheaper than serving it, especially if the request cannot be finished in time and the work spent on it is wasted.

The Semaphore and RateLimiter types in this package are meant to help with this. Semaphores may be a better choice when requests have little transit overhead (say, the resource being accessed is on the same machine as the semaphore, so there's no latency to actually access it), when requests are highly heterogeneous and their relative expense cannot be easily estimated (e.g. arbitrary SQL queries), or when the resource itself scales very poorly under concurrency (e.g. CPU-bound tasks, or those that involve concurrency primitives that behave poorly under contention). Rate limiters work better than semaphores when requests' relative costs can be easily estimated, when there is difficult-to-measure overhead that isn't costly (e.g. network transit to the resource being protected), or when protecting a throughput resource like disk IOPS or replication speed. Each of these can do their job best when placed as close to the resource as possible, usually in the same process or failing that on the same machine, but with some care, success can be had deploying them in proxies. This is because the resource itself will have the highest-signal view of its own load.

Astute readers will have noticed by now that rejection can only delay the inevitable: congestion collapse still lurks at some rate of requests, even if rejection can move it further away. This is where backpressure comes in: recruiting the cooperation of clients to slow down when too many requests are sent. (This is not always possible, because your clients may be, say, a DDoS attack from the wide internet, in which case the only viable avenue is making rejections as cheap as possible by placing them as close to the client as possible.) Naturally, the best way to communicate overload to your clients is with the rejections themselves, including nondescript errors since we may be too overloaded to even reject properly or timely. Clients can then use this signal to estimate the rate of requests that they can send without overloading you, and slow themselves down accordingly.

There is tons of precedent out there for this: TCP itself behaves this way - two machines communicating over the internet have to talk through a huge network of routers, and have no idea how much capacity is available along the entire path from A to B. They learn it by watching how many packets successfully reach their destination, since overloaded hardware in the middle simply drops packets on the floor. The AdaptiveThrottle type in this package is meant to help with this. It measures the rate at which requests are being accepted by the upstream, and begins (randomly) rejecting requests in the client to limit the rate of requests actually sent to the upstream to those that it can handle. Paired with exponential backoff retries in the proper layer of the stack (choosing this is a story for another time), subsystems can relatively accurately estimate the resources available to them without even explicitly communicating about this.

Queueing and Buffer Bloat

Rejecting a request outright may cost us more resources than it needs to. The client is extremely likely to retry relatively soon, and if we have the resources available to serve them then, we had to pay resources for the rejection and for serving the request. This is not an uncommon case - it's desirable to run systems as close to their capacity as possible to reduce costs. Even if the aggregate rate of requests is reasonably below the capacity, it will frequently instantaneously pass above it, because load is usually rather randomly distributed over time and thus looks extremely spiky at a fine enough time resolution.

Naturally, the proper response is queueing: hold onto the requests we do not currently have the capacity to serve, because we'll probably be able to serve them very soon. Queues work extremely well for this purpose: absorbing short-term load spikes.

Queues, however, can become extremely painful in slightly more sustained overload. Again imagine a properly-provisioned server that is serving very near its capacity. When this server inevitably receives a large load spike, it results in a very large queue. Even once the load spike goes away, because the server is normally serving at very near its capacity, the queue shrinks very slowly. If the queue is very large, then the requests at the front of the queue have been waiting there for some time, and may even time out before they can be served. This creates a sustaining effect for overload: now every request arriving at this server needs to wait its turn in a large backlog of requests, increasing latency, and many of the requests are unsuccessful because they take so long to actually be processed. This situation is called buffer bloat.

The important thing to note is that good queues, those that are used to absorb short-term load spikes, are frequently empty. A small spike comes in and fills up the queue, the queue quickly gets emptied again by the server catching up. This is where Controlled Delay (CoDel) comes in. A CoDel is a queue that is processed in FIFO order for fairness during "good" times to allow for fairness and short tail-latency, and switches to LIFO order with a very aggressive timeout during overload situations to ensure the queue empties again to avoid buffer bloat.

At the heart of any semaphore or rate limiter is indeed a queue, and the types provided in this package use CoDel as their queueing policy to avoid accidentally introducing buffer bloat while trying to solve load management problems.

Prioritization

If we're going to reject requests in an overload, we may as well be discerning about which requests to reject. Systems usually have quite a bit of low-priority traffic flowing through them, for example background jobs, backfills, and verifiers. During an overload, we should sacrifice these requests first to ensure that the highest-priority requests, usually those that a user might be waiting for, can be served successfully.

Types in this package are all prioritized. In practice, they work best when prioritization is relatively coarse and only a small handful of different priorities are used. Four has proven to work well, for example:

  • Critical (Priority(0)) - A user is waiting and critical user flows are functional, if severely degraded, if only Critical requests are served.

  • High (Priority(1)) - A user is waiting, and this request is important to the overall experience. The default for anything that a user might see.

  • Medium (Priority(2)) - No user is directly waiting, or the request is noncritical to the experience. Examples: type-ahead search suggestions, asynchronous work.

  • Low (Priority(3)) - This is a request that is content to consume only leftover capacity, and may wait until off-peak when there is more capacity available. Examples: daily batch jobs, continuous verification, one-offs, backfills.

Documentation

Overview

Package backpressure allows subsystems in a distributed system to reject requests beyond their capacity and for clients to cooperate by slowing down.

See https://github.com/bradenaw/backpressure for more background.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrRejected is returned by RateLimiter and Semaphore when a request times out before being
	// admitted.
	ErrRejected = errors.New("rejected")

	// ErrClientRejection is returned by an AdaptiveThrottle when the request was not even sent to
	// the backend because it is overloaded.
	ErrClientRejection = errors.New("rejected without sending: backend is unhealthy")
)

Functions

func AcceptedError

func AcceptedError(err error) error

AcceptedError wraps the given err as "accepted" by the backend for the purposes of AdaptiveThrottle. This should be done for regular protocol errors that do not mean the backend is unhealthy, for example a precondition failure as part of an optimistic update or a rejection for a particular request being too large.

func WithAdaptiveThrottle

func WithAdaptiveThrottle[T any](
	at *AdaptiveThrottle,
	p Priority,
	f func() (T, error),
) (T, error)

WithAdaptiveThrottle is used to send a request to a backend using the given AdaptiveThrottle for client-rejections.

If f returns an error, at considers this to be a rejection unless it is wrapped with AcceptedError(). If there are enough rejections within a given time window, further calls to WithAdaptiveThrottle may begin returning ErrClientRejection immediately without invoking f. The rate at which this happens depends on the error rate of f.

WithAdaptiveThrottle will prefer to reject lower-priority requests if it can.

Types

type AdaptiveThrottle

type AdaptiveThrottle struct {
	// contains filtered or unexported fields
}

AdaptiveThrottle is used in a client to throttle requests to a backend as it becomes unhealthy to help it recover from overload more quickly. Because backends must expend resources to reject requests over their capacity it is vital for clients to ease off on sending load when they are in trouble, lest the backend spend all of its resources on rejecting requests and have none left over to actually serve any.

The adaptive throttle works by tracking the success rate of requests over some time interval (usually a minute or so), and randomly rejecting requests without sending them to avoid sending too much more than the rate that are expected to actually be successful. Some slop is included, because even if the backend is serving zero requests successfully, we do need to occasionally send it requests to learn when it becomes healthy again.

More on adaptive throttles in https://sre.google/sre-book/handling-overload/

func NewAdaptiveThrottle

func NewAdaptiveThrottle(priorities int, options ...AdaptiveThrottleOption) *AdaptiveThrottle

NewAdaptiveThrottle returns an AdaptiveThrottle.

priorities is the number of priorities that the throttle will accept. Giving a priority outside of `[0, priorities)` will panic.

func (*AdaptiveThrottle) Accepted

func (at *AdaptiveThrottle) Accepted(p Priority)

Accepted records that a request was accepted by the backend.

func (*AdaptiveThrottle) Attempt

func (at *AdaptiveThrottle) Attempt(
	p Priority,
) bool

Attempt returns true if the request should be attempted, and false if it should be rejected.

func (*AdaptiveThrottle) Rejected

func (at *AdaptiveThrottle) Rejected(p Priority)

Rejected records that a request was rejected by the backend.

type AdaptiveThrottleOption

type AdaptiveThrottleOption struct {
	// contains filtered or unexported fields
}

Additional options for the AdaptiveThrottle type. These options do not frequently need to be tuned as the defaults work in a majority of cases.

func AdaptiveThrottleMinimumRate

func AdaptiveThrottleMinimumRate(x float64) AdaptiveThrottleOption

AdaptiveThrottleMinimumRate sets the minimum number of requests per second that the adaptive throttle will allow (approximately) through to the upstream, even if every request is failing. This is important because this is how the adaptive throttle 'learns' when the upstream becomes healthy again.

func AdaptiveThrottleRatio

func AdaptiveThrottleRatio(k float64) AdaptiveThrottleOption

AdaptiveThrottleRatio sets the ratio of the measured success rate and the rate that the throttle will admit. For example, when k is 2 the throttle will allow twice as many requests to actually reach the backend as it believes will succeed. Higher values of k mean that the throttle will react more slowly when a backend becomes unhealthy, but react more quickly when it becomes healthy again, and will allow more load to an unhealthy backend. k=2 is usually a good place to start, but backends that serve "cheap" requests (e.g. in-memory caches) may need a lower value.

func AdaptiveThrottleWindow

func AdaptiveThrottleWindow(d time.Duration) AdaptiveThrottleOption

AdaptiveThrottleWindow sets the time window over which the throttle remembers requests for use in figuring out the success rate.

type Priority

type Priority int

Priority is the importance of a request. Types in this package prefer higher priority requests.

Priorities are non-negative integers and are numbered in decreasing order of priority. `Priority(0)` is the highest priority, meaning it is preferred over all other priorities. `Priority(1)` is higher priority than all priorities except `Priority(0)`, and so on.

Types in this package generally have overhead per-priority and function best when priorities are quite coarse, and so it's recommended to use a relatively small number of priorities. Around four works well in practice, for example:

Critical - A user is waiting and critical user flows are functional, if severely degraded, if only Critical requests are served.

High - A user is waiting, and this request is important to the overall experience. The default for anything that a user might see.

Medium - No user is directly waiting, or the request is noncritical to the experience. Examples: type-ahead search suggestions, asynchronous work.

Low - This is a request that is content to consume only leftover capacity, and may wait until off-peak when there is more capacity available. Examples: daily batch jobs, continuous verification, one-offs, backfills.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter is used to bound the rate of some operation. It is a token-bucket similar to golang.org/x/time/rate. Conceptually, it is a bucket of some capacity (the "burst"), which refills at some rate. Callers ask for some number of tokens from the bucket using Wait, they take the tokens from the bucket and Wait returns. If there are not enough tokens, then Wait blocks to wait for enough tokens to be replenished.

It has two major differences from golang.org/x/time/rate:

1. It is prioritized, preferring to accept higher priority requests first.

2. Each queue of waiters is a CoDel, which is not fair but can behave better in a real-time system under load.

In order to minimize wait times for high-priority requests, it self balances using "debt." Debt is tracked per priority and is the number of tokens that must be left in the bucket before a given priority may be admitted. For example, if `Priority(1)` has a debt of 5, then a `Wait(ctx, Priority(1), 3)` cannot be admitted until there are 8 tokens in the bucket.

Debt is self-adjusting: whenever a high-priority `Wait()` cannot immediately be accepted, the debt for all lower priorities is increased. Intuitively, this request would not have had to wait if this debt already existed, so the bucket self-corrects by adding it. Whenever a high-priority `Acquire()` can be admitted without waiting, then any existing debt may not have been necessary and so some of it is forgiven. Additionally, debt decays over time, since anything the bucket has learned about a load pattern may become out-of-date as load changes.

func NewRateLimiter

func NewRateLimiter(
	priorities int,
	rate float64,
	burst float64,
	options ...RateLimiterOption,
) *RateLimiter

NewRateLimiter returns a rate limiter with the given number of priorities, allowing the given aggregate rate, and up to the given burst. That is, the rate limiter is initially empty and its tokens refill at `rate` up to `burst` total tokens.

The other options do not frequently need to be modified.

func (*RateLimiter) Close

func (rl *RateLimiter) Close()

Close frees background resources used by the rate limiter.

func (*RateLimiter) SetRate

func (rl *RateLimiter) SetRate(rate float64, burst float64)

func (*RateLimiter) Wait

func (rl *RateLimiter) Wait(ctx context.Context, p Priority, tokens float64) error

Wait blocks until the given number of tokens is available for the given priority. It returns nil if the tokens were successfully acquired or ErrRejected if the internal CoDel times out the request before it can succeed.

type RateLimiterOption

type RateLimiterOption struct {
	// contains filtered or unexported fields
}

Additional options for the RateLimiter type. These options do not frequently need to be tuned as the defaults work in a majority of cases, but they're included for completeness.

func RateLimiterDebtDecayInterval

func RateLimiterDebtDecayInterval(d time.Duration) RateLimiterOption

The time it takes for 100% debt to be completely forgiven. Debt decays linearly over time since load patterns change and a previously learned debt amount may no longer be relevant.

func RateLimiterDebtForgivePerSuccess

func RateLimiterDebtForgivePerSuccess(x float64) RateLimiterOption

The proportion of debt that is forgiven for lower priorities whenever a higher-priority request succeeds, in [0, 1].

func RateLimiterLongTimeout

func RateLimiterLongTimeout(d time.Duration) RateLimiterOption

The long timeout for the internal CoDels. See the README for more on CoDel.

func RateLimiterShortTimeout

func RateLimiterShortTimeout(d time.Duration) RateLimiterOption

The short timeout for the internal CoDels. See the README for more on CoDel.

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore is a way to bound concurrency and similar to golang.org/x/sync/semaphore. Conceptually, it is a bucket of some number of tokens. Callers can take tokens out of this bucket using Acquire, do whatever operation needs concurrency bounding, and then return the tokens with Release. If the bucket does not have enough tokens in it to Acquire, it will block for some time in case another user of tokens Releases.

It has two major differences from golang.org/x/sync/semaphore:

1. It is prioritized, preferring to accept higher priority requests first.

2. Each queue of waiters is a CoDel, which is not fair but can behave better in a real-time system under load.

In order to minimize wait times for high-priority requests, it self balances using "debt." Debt is tracked per priority and is the number of tickets that must be left in the semaphore before a given request may be admitted.

Debt is self-adjusting: whenever a high-priority `Acquire()` cannot immediately be accepted, the debt for all lower priorities is increased. Intuitively, this request would not have had to wait if this debt already existed, so the semaphore self-corrects by adding it. Whenever a high-priority `Acquire()` can be admitted without waiting, then any existing debt may not have been necessary and so some of it is forgiven. Additionally, debt decays over time, since anything the semaphore has learned about a load pattern may become out-of-date as load changes.

func NewSemaphore

func NewSemaphore(
	priorities int,
	capacity int,
	options ...SemaphoreOption,
) *Semaphore

NewSemaphore returns a semaphore with the given number of priorities, and will allow at most capacity concurrency.

The other options do not frequently need to be modified.

func (*Semaphore) Acquire

func (s *Semaphore) Acquire(ctx context.Context, p Priority, tokens int) error

Acquire attempts to acquire some number of tokens from the semaphore on behalf of the given priority. If Acquire returns nil, these tokens should be returned to the semaphore when the caller is finished with them by using Release. Acquire returns non-nil if the given context expires before the tokens can be acquired, or if the request is rejected for timing out with the semaphore's own timeout.

func (*Semaphore) Close

func (s *Semaphore) Close()

Close frees background resources used by the semaphore.

func (*Semaphore) Release

func (s *Semaphore) Release(tokens int)

Release returns the given number of tokens to the semaphore. It should only be called if these tokens are known to be acquired from the semaphore with a corresponding Acquire.

func (*Semaphore) SetCapacity

func (s *Semaphore) SetCapacity(capacity int)

SetCapacity sets the maximum number of outstanding tokens for the semaphore. If more tokens than this new value are already outstanding the semaphore simply waits for them to be released, it has no way of recalling them. If the new capacity is higher than the old, this will immediately admit the waiters it can.

type SemaphoreOption

type SemaphoreOption struct {
	// contains filtered or unexported fields
}

Additional options for the Semaphore type. These options do not frequently need to be tuned as the defaults work in a majority of cases, but they're included for completeness.

func SemaphoreDebtDecayInterval

func SemaphoreDebtDecayInterval(x time.Duration) SemaphoreOption

The time it takes for 100% debt to be completely forgiven. Debt decays linearly over time since load patterns change and a previously learned debt amount may no longer be relevant.

func SemaphoreDebtForgivePerSuccess

func SemaphoreDebtForgivePerSuccess(x float64) SemaphoreOption

The proportion of debt that is forgiven for lower priorities whenever a higher-priority request succeeds, in [0, 1].

func SemaphoreLongTimeout

func SemaphoreLongTimeout(d time.Duration) SemaphoreOption

The long timeout for the internal CoDels. See the README for more on CoDel.

func SemaphoreShortTimeout

func SemaphoreShortTimeout(d time.Duration) SemaphoreOption

The short timeout for the internal CoDels. See the README for more on CoDel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL