proxymw

package
v0.1.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2025 License: MIT Imports: 23 Imported by: 1

Documentation

Overview

Package proxymw holds interfaces and configuration to safeguard backend services from dynamic load

Index

Constants

View Source
const (
	BackpressureProxyType     = "backpressure"
	BackpressureUpdateCadence = 30 * time.Second
	MonitorQueryTimeout       = 15 * time.Second
	DefaultThrottleCurve      = 4.0
)
View Source
const (
	// https://sre.google/sre-book/handling-overload/
	CriticalityCriticalPlus = "CRITICAL_PLUS"
	CriticalityCritical     = "CRITICAL"
	// CriticalityDefault is used when the client does not set the X-Request-Criticality header.
	CriticalityDefault = CriticalityCritical
)
View Source
const (
	BlockerProxyType = "blocker"
)
View Source
const DefaultRangeStep = time.Second * 30
View Source
const (
	InstantQueryEndpoint = "/api/v1/query"
)
View Source
const (
	NoJitter time.Duration = 0
)
View Source
const ObjectStorageThreshold = 100

Variables

View Source
var (
	ErrJitterDelayRequired       = errors.New("delay must be non-empty when jitter is enabled")
	ErrBackpressureQueryRequired = errors.New(
		"must provide at least one backpressure query when backpressure is enabled",
	)
	ErrCongestionWindowMinBelowOne = errors.New("backpressure min window < 1")
	ErrCongestionWindowMaxBelowMin = errors.New("backpressure max window <= min window")
	ErrNegativeThrottleCurve       = errors.New("throttle curve cannot be negative")
	ErrNegativeQueryThresholds     = errors.New("backpressure query thresholds cannot be negative")
	ErrEmergencyBelowWarnThreshold = errors.New("emergency threshold must be > warn threshold")
	ErrExtraQueryQuotes            = errors.New("backpressure PromQL cannot be wrapped in quotes")

	ErrBackpressureBackoff = BlockErr(
		BackpressureProxyType,
		"congestion window closed, backoff from backpressure",
	)

	ErrNilRequest        = errors.New("nil *http.Request")
	ErrNilResponseWriter = errors.New("nil http.ResponseWriter")
	ErrNilResponse       = errors.New("nil *http.Response")
)

Functions

func BlockErr

func BlockErr(t string, format string, a ...any) error

func DupRequest added in v0.1.25

func DupRequest(req *http.Request) (*http.Request, error)

func LowCostRequest added in v0.1.23

func LowCostRequest(rr Request) (bool, error)

func NewServeFuncFromConfig added in v0.1.17

func NewServeFuncFromConfig(cfg Config, next http.HandlerFunc) http.HandlerFunc

func ParseHeaderKey added in v0.1.15

func ParseHeaderKey(rr Request, key HeaderKey) string

func QueryCost added in v0.1.23

func QueryCost(rr Request) (int, error)

func ValidateBlockPatterns added in v0.1.20

func ValidateBlockPatterns(patterns []string) error

func ValueFromPromQL added in v0.1.20

func ValueFromPromQL(
	ctx context.Context, client *http.Client, endpoint, query string,
) (float64, error)

ValueFromPromQL queries the prometheus instant API for the prometheus query. Throws an error if the response is not a single value.

Types

type APIErrorResponse added in v0.0.3

type APIErrorResponse struct {
	Status    string `json:"status"`
	ErrorType string `json:"errorType"`
	Error     string `json:"error"`
}

APIErrorResponse represents the standard error response format

type Backpressure

type Backpressure struct {
	// contains filtered or unexported fields
}

Backpressure uses Additive Increase Multiplicative Decrease which is a congestion control algorithm to back off of expensive queries and is modeled after TCP's https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease. Backpressure signals are derived from PromQL metric signals and the system will never let less than a minimum number of queries through at one time. How does it work? 1. Start a background thread to keep backpressure metrics updated 2. On each request, set the "window" for how many concurrent requests are allowed 3. If we are within bounds, allow the request 4. If backpressure is not spiking, widen the window by one (additive) 5. if backpressure signals fire, cut the window in proportion to signal strength (multiplicative)

func NewBackpressure

func NewBackpressure(client ProxyClient, cfg BackpressureConfig) *Backpressure

func (*Backpressure) Init

func (bp *Backpressure) Init(ctx context.Context)

func (*Backpressure) Next added in v0.1.0

func (bp *Backpressure) Next(rr Request) error

type BackpressureConfig

type BackpressureConfig struct {
	EnableBackpressure        bool                `yaml:"enable_backpressure"`
	BackpressureMonitoringURL string              `yaml:"backpressure_monitoring_url"`
	BackpressureQueries       []BackpressureQuery `yaml:"backpressure_queries"`
	CongestionWindowMin       int                 `yaml:"congestion_window_min"`
	CongestionWindowMax       int                 `yaml:"congestion_window_max"`
	// EnableLowCostBypass assumes proxy requests are Prometheus queries.
	// If the promQL will query data more than 2 hours ago, the query is considered high cost.
	// When enabled, low cost queries bypass the backpressure congestion control queue.
	EnableLowCostBypass bool `yaml:"enable_low_cost_bypass"`
}

func (BackpressureConfig) Validate

func (c BackpressureConfig) Validate() error

type BackpressureQuery added in v0.1.0

type BackpressureQuery struct {
	// Name is an optional human readable field used to emit tagged metrics.
	// When unset, operational metrics are omitted.
	// When set, read warn_threshold as proxymw_bp_warn_threshold{query_name="<name>"}
	Name string `yaml:"name,omitempty"`
	// Query is the PromQL to monitor system load or usage
	Query string `yaml:"query"`
	// WarningThreshold is the load value at which throttling begins (e.g., 80% capacity)
	WarningThreshold float64 `yaml:"warning_threshold"`
	// EmergencyThreshold is the load value at which the max num of requests are blocked (e.g., 100% capacity). Still lets through CongestionWindowMin
	EmergencyThreshold float64 `yaml:"emergency_threshold"`
	// ThrottlingCurve is a constant controlling the aggressiveness of throttling (e.g., default 4.0 for steep growth)
	ThrottlingCurve float64 `yaml:"throttling_curve"`
}

func ParseBackpressureQueries added in v0.1.20

func ParseBackpressureQueries(
	bpQueries, bpQueryNames []string, bpWarnThresholds, bpEmergencyThresholds []float64,
) ([]BackpressureQuery, error)

func (BackpressureQuery) Validate added in v0.1.0

func (q BackpressureQuery) Validate() error

type Blocker added in v0.1.20

type Blocker struct {
	// contains filtered or unexported fields
}

func NewBlocker added in v0.1.20

func NewBlocker(client ProxyClient, cfg BlockerConfig) *Blocker

func (*Blocker) Init added in v0.1.20

func (b *Blocker) Init(ctx context.Context)

func (*Blocker) Next added in v0.1.20

func (b *Blocker) Next(rr Request) error

type BlockerConfig added in v0.1.20

type BlockerConfig struct {
	EnableBlocker bool `yaml:"enable_blocker"`
	// BlockPatterns is a list of header values to block and looks like `<header>=<pattern>`.
	// Ex. `X-user-agent=service-to-block.*`
	BlockPatterns []string `yaml:"block_patterns"`
}

func (BlockerConfig) Validate added in v0.1.20

func (q BlockerConfig) Validate() error

type Config

type Config struct {
	BackpressureConfig `yaml:"backpressure_config"`
	BlockerConfig      `yaml:"blocker_config"`
	EnableJitter       bool          `yaml:"enable_jitter"`
	JitterDelay        time.Duration `yaml:"jitter_delay"`
	EnableObserver     bool          `yaml:"enable_observer"`
	ClientTimeout      time.Duration `yaml:"client_timeout"`
	EnableCriticality  bool          `yaml:"enable_criticality"`
}

Config holds all middleware configuration options

func (Config) Validate

func (c Config) Validate() error

Validate ensures all enabled features have proper configuration

type HeaderKey added in v0.1.15

type HeaderKey string
const (
	HeaderCriticality HeaderKey = "X-Request-Criticality"
	HeaderCanWait     HeaderKey = "X-Can-Wait"
)

type Jitterer

type Jitterer struct {
	// contains filtered or unexported fields
}

Jitterer sleeps for a random amount of jitter before passing the request through. When EnableCriticality is set

1. CRITICAL_PLUS requests do not get jittered

2. Use max(X-Can-Wait, default) jitter if header is set

func NewJitterer

func NewJitterer(client ProxyClient, delay time.Duration, criticality bool) *Jitterer

func (*Jitterer) Init

func (j *Jitterer) Init(ctx context.Context)

func (*Jitterer) Next added in v0.1.0

func (j *Jitterer) Next(rr Request) error

type Mocker

type Mocker struct {
	ServeHTTPFunc func(w http.ResponseWriter, r *http.Request)
	RoundTripFunc func(r *http.Request) (*http.Response, error)
	InitFunc      func(context.Context)
	NextFunc      func(Request) error
	RequestFunc   func() *http.Request
}

Mocker simply mocks the main interfaces for unit testing

func (*Mocker) Init added in v0.1.0

func (m *Mocker) Init(ctx context.Context)

func (*Mocker) Next added in v0.1.0

func (m *Mocker) Next(rr Request) error

func (*Mocker) Request added in v0.1.11

func (m *Mocker) Request() *http.Request

func (*Mocker) RoundTrip added in v0.1.0

func (m *Mocker) RoundTrip(r *http.Request) (*http.Response, error)

func (*Mocker) ServeHTTP

func (m *Mocker) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

Observer wraps a ProxyClient to emit metrics such as error rate and blocked requests. Each client that blocks requests should tag their errors with a client type to filter metrics.

func NewObserver

func NewObserver(client ProxyClient) *Observer

NewObserver creates a new Observer wrapping the provided ProxyClient.

func (*Observer) Init

func (o *Observer) Init(ctx context.Context)

Init initializes the underlying ProxyClient.

func (*Observer) Next added in v0.1.0

func (o *Observer) Next(rr Request) error

Next processes the request and records relevant metrics.

type PrometheusResponse added in v0.0.3

type PrometheusResponse struct {
	Data struct {
		Result model.Vector `json:"result"`
	} `json:"data"`
}

type ProxyClient

type ProxyClient interface {
	// Init initializes the middleware component with a context.
	// It should be called before the middleware starts processing requests.
	Init(context.Context)

	// Next processes the incoming request through the middleware chain.
	// It returns an error if the request cannot be processed.
	Next(Request) error
}

ProxyClient defines the interface for middleware components in the chain. Each middleware component must implement Init for setup and Next for request processing.

func NewFromConfig

func NewFromConfig(cfg Config, client ProxyClient) ProxyClient

type Request added in v0.1.0

type Request interface {
	Request() *http.Request
}

Request represents an HTTP request in the middleware chain. It provides access to the underlying http.Request.

type RequestBlockedError

type RequestBlockedError struct {
	Err  error
	Type string
}

func (*RequestBlockedError) Error

func (e *RequestBlockedError) Error() string

type RequestResponseWrapper added in v0.1.0

type RequestResponseWrapper struct {
	// contains filtered or unexported fields
}

RequestResponseWrapper implements Request, Response, and ResponseWriter interfaces to wrap HTTP request/response pairs as they flow through the middleware chain.

func (*RequestResponseWrapper) Request added in v0.1.0

func (c *RequestResponseWrapper) Request() *http.Request

func (*RequestResponseWrapper) Response added in v0.1.0

func (c *RequestResponseWrapper) Response() *http.Response

func (*RequestResponseWrapper) ResponseWriter added in v0.1.0

func (c *RequestResponseWrapper) ResponseWriter() http.ResponseWriter

func (*RequestResponseWrapper) SetResponse added in v0.1.0

func (c *RequestResponseWrapper) SetResponse(res *http.Response)

type Response added in v0.1.0

type Response interface {
	Response() *http.Response
	SetResponse(*http.Response)
}

Response represents an HTTP response in the middleware chain. It provides access to and modification of the underlying http.Response.

type ResponseWriter added in v0.1.0

type ResponseWriter interface {
	ResponseWriter() http.ResponseWriter
}

ResponseWriter represents the HTTP response writer in the middleware chain. It provides access to the underlying http.ResponseWriter.

type RoundTripperEntry added in v0.1.0

type RoundTripperEntry struct {
	// contains filtered or unexported fields
}

func NewRoundTripperFromConfig added in v0.1.0

func NewRoundTripperFromConfig(cfg Config, rt http.RoundTripper) *RoundTripperEntry

func (*RoundTripperEntry) Init added in v0.1.0

func (rte *RoundTripperEntry) Init(ctx context.Context)

func (*RoundTripperEntry) RoundTrip added in v0.1.0

func (rte *RoundTripperEntry) RoundTrip(req *http.Request) (*http.Response, error)

type RoundTripperExit added in v0.1.0

type RoundTripperExit struct {
	// contains filtered or unexported fields
}

RoundTripperExit represents the final handler in the middleware chain for http.RoundTripper

func (*RoundTripperExit) Init added in v0.1.0

func (rte *RoundTripperExit) Init(_ context.Context)

func (*RoundTripperExit) Next added in v0.1.0

func (rte *RoundTripperExit) Next(r Request) error

type ServeEntry added in v0.1.0

type ServeEntry struct {
	// contains filtered or unexported fields
}

ServeEntry represents the entry point of the middleware chain

func NewServeFromConfig added in v0.1.0

func NewServeFromConfig(cfg Config, next http.HandlerFunc) *ServeEntry

NewServeFromConfig constructs a middleware chain based on configuration. The middleware chain is constructed in the following order: 1. Request wrapping (Entry) 2. Metrics collection (Observer) 3. Request spreading (Jitter) 4. Adaptive rate limiting (Backpressure) 6. Final handler (Exit)

func (*ServeEntry) Init added in v0.1.0

func (se *ServeEntry) Init(ctx context.Context)

Init initializes the middleware chain

func (*ServeEntry) ServeHTTP added in v0.1.17

func (se *ServeEntry) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP processes requests through the middleware chain

type ServeExit added in v0.1.0

type ServeExit struct {
	// contains filtered or unexported fields
}

ServeExit represents the final handler in the middleware chain for http.HandlerFunc

func (*ServeExit) Init added in v0.1.0

func (se *ServeExit) Init(_ context.Context)

func (*ServeExit) Next added in v0.1.0

func (se *ServeExit) Next(rr Request) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL