ratelimiter

package
v1.81.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package ratelimiter provides admission-control primitives for protecting services from overload.

The package intentionally contains multiple limiter types because overload is not one single problem. Each limiter answers a different operational question:

  • WindowLimiter: "Has this shared key already consumed too many requests in this window?"

  • LeasedWindowLimiter: "Same as WindowLimiter, but can we answer it without turning the cache backend into the bottleneck at very high request volume?"

  • ConcurrencyLimiter: "Do we already have too many expensive operations running at the same time in this process?"

  • QueueDepthLimiter: "Is downstream backlog already high enough that accepting more work would be unsafe?"

Choosing the correct limiter starts with identifying the first resource that fails in production:

  • If a tenant or API key can send too many requests in a minute, use WindowLimiter or LeasedWindowLimiter.

  • If connector calls, CPU-heavy work, or database-heavy handlers collapse when too many run in parallel, use ConcurrencyLimiter.

  • If the real danger is backlog growth in a queue, outbox, or scheduler pipeline, use QueueDepthLimiter.

These limiters are not interchangeable:

  • Do not use ConcurrencyLimiter for fleet-wide fairness. It is process local. Ten replicas with a limit of 100 each permit roughly 1,000 concurrent operations fleet-wide.

  • Do not use QueueDepthLimiter as a generic API rate limiter. It does not shape traffic over time. It only admits or rejects based on observed backlog.

  • Do not use WindowLimiter or LeasedWindowLimiter to protect a resource whose failure mode is in-flight saturation. Request-count limits do not prevent a single expensive class of work from exhausting a process.

The usual production composition is layered rather than exclusive:

  1. WindowLimiter or LeasedWindowLimiter at the ingress boundary for fairness and abuse protection.
  2. QueueDepthLimiter before enqueue or ingest when backlog itself is the overload signal.
  3. ConcurrencyLimiter around expensive local execution such as worker dispatch, connector calls, or CPU-heavy transforms.

In other words:

  • request budgets protect fairness,
  • queue-depth admission protects drainability,
  • concurrency caps protect finite execution capacity.

If the service needs all three protections, use all three protections. They solve different failure modes.

Index

Constants

This section is empty.

Variables

View Source
var ErrCacheDoesNotSupportPerKeyTTL = errors.New("cache backend does not support per-key TTL")
View Source
var ErrConcurrencyLimitReached = errors.New("concurrency limit reached")

ErrConcurrencyLimitReached is returned by callers that choose to convert a failed TryAcquire call into an error.

The limiter itself exposes TryAcquire as a boolean because the common fast path is "acquired" vs "not acquired". This sentinel exists so services can map that condition into their own transport or business error model consistently.

Functions

func GetIP

func GetIP(r *http.Request) string

GetIP extracts caller IP from request headers/remote address.

func GetUserID

func GetUserID(ctx context.Context) string

GetUserID extracts user identity from frame auth claims in context.

func RateLimitMiddleware

func RateLimitMiddleware(limiter *IPRateLimiter) func(http.Handler) http.Handler

RateLimitMiddleware applies cache-backed IP rate limiting.

func UserRateLimitMiddleware

func UserRateLimitMiddleware(userLimiter *UserRateLimiter, ipLimiter *IPRateLimiter) func(http.Handler) http.Handler

UserRateLimitMiddleware applies user-based limiting and falls back to IP for unauthenticated requests.

Types

type ConcurrencyLimiter added in v1.80.1

type ConcurrencyLimiter struct {
	// contains filtered or unexported fields
}

ConcurrencyLimiter caps the number of in-flight operations in a single process.

Use this limiter when the protected resource is local and finite. Typical examples include:

  • connector execution slots
  • CPU-heavy transforms
  • datastore-heavy handlers
  • local worker fan-out that would otherwise swamp memory, CPU, or sockets

This limiter protects simultaneous work, not request rate. A service may process 10,000 requests per minute safely if only 20 run at once, yet fail at 200 requests per minute if all 200 execute concurrently. ConcurrencyLimiter is for the latter case.

This limiter does NOT provide cross-process fairness. If ten replicas each have a limit of 100, the fleet-wide limit is effectively about 1,000. That is often exactly what you want for worker-side protection, but it is not suitable as a tenant-fair global quota.

Use TryAcquire when callers should fail fast instead of waiting. Use Acquire when bounded waiting is correct and the caller can provide a context with a deadline or cancellation signal.

func NewConcurrencyLimiter added in v1.80.1

func NewConcurrencyLimiter(limit int) (*ConcurrencyLimiter, error)

NewConcurrencyLimiter creates a limiter with the given in-flight limit.

The limit must reflect the capacity of one process, not the whole fleet. If callers need a fleet-wide cap, pair this limiter with a distributed admission control mechanism.

func (*ConcurrencyLimiter) Acquire added in v1.80.1

Acquire waits until a permit is available or the context is cancelled.

Callers should almost always provide a context with a deadline. An unbounded wait simply moves overload from "too much concurrent work" to "too many blocked goroutines waiting for work".

func (*ConcurrencyLimiter) Available added in v1.80.1

func (cl *ConcurrencyLimiter) Available() int

Available returns the number of permits that can still be acquired immediately.

func (*ConcurrencyLimiter) Execute added in v1.80.1

func (cl *ConcurrencyLimiter) Execute(ctx context.Context, fn func(context.Context) error) error

Execute acquires a permit, runs fn, and always releases the permit.

This helper is useful when callers want one clear acquisition site and want to avoid forgetting a deferred Release. It is equivalent to Acquire followed by a deferred Release wrapped around fn.

func (*ConcurrencyLimiter) InFlight added in v1.80.1

func (cl *ConcurrencyLimiter) InFlight() int

InFlight returns the current number of acquired permits.

func (*ConcurrencyLimiter) Limit added in v1.80.1

func (cl *ConcurrencyLimiter) Limit() int

Limit returns the configured maximum number of in-flight operations.

func (*ConcurrencyLimiter) TryAcquire added in v1.80.1

func (cl *ConcurrencyLimiter) TryAcquire() (*ConcurrencyPermit, bool)

TryAcquire attempts to take a permit without blocking.

This is the correct API when the caller should return immediately with a rejection, retryable error, or reschedule decision rather than waiting for capacity.

type ConcurrencyPermit added in v1.80.1

type ConcurrencyPermit struct {
	// contains filtered or unexported fields
}

ConcurrencyPermit represents one acquired concurrency slot.

The permit must be released exactly once. Release is idempotent so callers can safely defer it. The usual pattern is:

permit, err := limiter.Acquire(ctx)
if err != nil {
    return err
}
defer permit.Release()

func (*ConcurrencyPermit) Release added in v1.80.1

func (p *ConcurrencyPermit) Release()

Release frees the acquired permit. It is safe to call more than once.

type IPRateLimiter

type IPRateLimiter struct {
	// contains filtered or unexported fields
}

IPRateLimiter applies cache-backed per-IP window limits.

func NewIPRateLimiter

func NewIPRateLimiter(raw cache.RawCache, config *WindowConfig) (*IPRateLimiter, error)

NewIPRateLimiter creates a new cache-backed IP rate limiter. If raw is nil, an in-memory frame cache is created.

func (*IPRateLimiter) Allow

func (rl *IPRateLimiter) Allow(ctx context.Context, ip string) bool

Allow checks whether a request from the given IP should be allowed.

func (*IPRateLimiter) Close

func (rl *IPRateLimiter) Close() error

Close releases owned resources.

type LeasedWindowLimiter added in v1.80.1

type LeasedWindowLimiter struct {
	// contains filtered or unexported fields
}

LeasedWindowLimiter enforces fixed-window limits using chunk reservations from cache to reduce hot-key pressure at high request volume.

func NewLeasedWindowLimiter added in v1.80.1

func NewLeasedWindowLimiter(raw cache.RawCache, cfg *WindowConfig) (*LeasedWindowLimiter, error)

NewLeasedWindowLimiter creates a cache-backed fixed-window limiter that reserves quota from the backend in chunks instead of one increment per call.

func (*LeasedWindowLimiter) Allow added in v1.80.1

func (ll *LeasedWindowLimiter) Allow(ctx context.Context, key string) bool

Allow checks whether key is still within configured window limit.

type QueueDepthConfig added in v1.80.1

type QueueDepthConfig struct {
	RejectAtDepth int64
	ResumeAtDepth int64

	RefreshInterval time.Duration
	FailOpen        bool
}

QueueDepthConfig defines a queue-depth admission controller.

RejectAtDepth is the depth at which new requests start being rejected.

ResumeAtDepth is the depth below which admission resumes after the limiter has entered reject mode. ResumeAtDepth must be strictly lower than RejectAtDepth to provide hysteresis and avoid flapping around a single threshold.

RefreshInterval controls how often the queue depth source is queried. The depth function is commonly backed by Redis, JetStream, SQL, or another external system. Polling that dependency on every request would turn the limiter into a new bottleneck, so the limiter caches the last observation for this interval.

FailOpen decides what happens when backlog cannot be measured:

  • true: allow work to continue on measurement failure
  • false: reject work when backlog cannot be measured

FailOpen is appropriate when temporary blindness is less dangerous than dropping work. FailOpen=false is appropriate when protected systems are so sensitive that inability to measure backlog must conservatively stop admission.

type QueueDepthFunc added in v1.80.1

type QueueDepthFunc func(context.Context) (int64, error)

QueueDepthFunc reports the current backlog depth of the protected queue or work source.

The reported depth must be the backlog whose growth actually matters for the admission decision at the call site. For example, if an HTTP ingest handler is trying to protect an execution worker queue, the depth function should report the execution worker backlog, not an unrelated socket buffer or a different queue.

type QueueDepthLimiter added in v1.80.1

type QueueDepthLimiter struct {
	// contains filtered or unexported fields
}

QueueDepthLimiter rejects new work when downstream backlog is unsafe.

This limiter is an admission controller, not a traffic shaper. It does not smooth requests over time, refill tokens, or grant burst budgets. It answers one question only:

  • "Given the current backlog, should this caller admit more work?"

The intended use is producer-side protection when queue backlog is the dominant overload signal. Typical examples include:

  • event ingest rejecting new events while outbox or worker backlog is too high
  • API producers pausing enqueue while downstream processing is unhealthy
  • scheduler loops refusing to enqueue more work while the work queue is saturated

Do not use QueueDepthLimiter as a substitute for tenant fairness or request quotas. A nearly empty queue does not mean one noisy tenant should be allowed to consume all ingress capacity.

func NewQueueDepthLimiter added in v1.80.1

func NewQueueDepthLimiter(getDepth QueueDepthFunc, cfg QueueDepthConfig) (*QueueDepthLimiter, error)

NewQueueDepthLimiter creates a backlog-based admission controller.

The configuration must define a real hysteresis band: ResumeAtDepth must be lower than RejectAtDepth. If both thresholds are equal, the limiter would flap between admit and reject around a single depth value and would be operationally noisy.

func (*QueueDepthLimiter) Allow added in v1.80.1

func (ql *QueueDepthLimiter) Allow(ctx context.Context) bool

Allow reports whether new work should be admitted.

The limiter refreshes queue depth at most once per RefreshInterval and reuses the cached observation between refreshes. Once rejection starts, the limiter stays in reject mode until the observed depth falls to or below ResumeAtDepth. This hysteresis is deliberate. Without it, a queue oscillating around the reject threshold would flap between allow and reject on nearly every request.

func (*QueueDepthLimiter) State added in v1.80.1

State returns the current queue-depth snapshot, refreshing it when the cached observation is stale.

This method is useful for observability and operator endpoints because it exposes the last observed depth, whether admission is currently closed, and when that observation was taken.

type QueueDepthState added in v1.80.1

type QueueDepthState struct {
	Depth       int64
	Rejecting   bool
	LastUpdated time.Time
}

QueueDepthState is a snapshot of the limiter's current observation and mode.

Rejecting reflects the state after hysteresis is applied. It is therefore not equivalent to simply checking whether Depth is currently above RejectAtDepth.

type UserRateLimiter

type UserRateLimiter struct {
	// contains filtered or unexported fields
}

UserRateLimiter applies cache-backed per-user window limits.

func NewUserRateLimiter

func NewUserRateLimiter(raw cache.RawCache, config *WindowConfig) (*UserRateLimiter, error)

NewUserRateLimiter creates a new cache-backed user rate limiter. If raw is nil, an in-memory frame cache is created.

func (*UserRateLimiter) Allow

func (rl *UserRateLimiter) Allow(ctx context.Context, userID string) bool

Allow checks whether a request from the given user should be allowed.

func (*UserRateLimiter) Close

func (rl *UserRateLimiter) Close() error

Close releases owned resources.

type WindowConfig

type WindowConfig struct {
	WindowDuration  time.Duration
	MaxPerWindow    int
	KeyPrefix       string
	FailOpen        bool
	ReservationSize int
}

WindowConfig defines fixed-window counter limiter settings backed by cache.

func DefaultWindowConfig

func DefaultWindowConfig() *WindowConfig

DefaultWindowConfig returns conservative cache-backed limiter defaults.

type WindowLimiter

type WindowLimiter struct {
	// contains filtered or unexported fields
}

WindowLimiter enforces per-key fixed-window limits using atomic cache increments.

func NewWindowLimiter

func NewWindowLimiter(raw cache.RawCache, cfg *WindowConfig) (*WindowLimiter, error)

NewWindowLimiter creates a cache-backed window limiter.

func (*WindowLimiter) Allow

func (wl *WindowLimiter) Allow(ctx context.Context, key string) bool

Allow checks whether key is still within configured window limit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL