concurrency

package
v0.1.38 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MPL-2.0 Imports: 9 Imported by: 0

Documentation

Overview

concurrency/const.go

concurrency/handler.go

concurrency/metrics.go

concurrency/resize.go

concurrency/scale.go

concurrency/semaphore.go

package provides utilities to manage concurrency control. The Concurrency Manager

ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore

Index

Constants

View Source
const (

	// MaxConcurrency represents the maximum number of concurrent requests the system is designed to handle safely.
	MaxConcurrency = 10

	// MinConcurrency is the minimum number of concurrent requests that the system will maintain,
	// even under low traffic conditions or when scaling down due to low resource utilization.
	MinConcurrency = 1

	// EvaluationInterval specifies the frequency at which the system evaluates its performance metrics
	// to make decisions about scaling concurrency up or down.
	EvaluationInterval = 1 * time.Minute

	// MaxAcceptableTTFB (Time to First Byte) is the threshold for the longest acceptable delay
	// between making a request and receiving the first byte of data in the response. If response
	// times exceed this threshold, it indicates potential performance issues, and the system may
	// scale down concurrency to reduce load on the server.
	MaxAcceptableTTFB = 300 * time.Millisecond

	// MaxAcceptableThroughput is the threshold for the maximum network data transfer rate. If the
	// system's throughput exceeds this value, it may be an indicator of high traffic demanding
	// significant bandwidth, which could warrant a scale-up in concurrency to maintain performance.
	MaxAcceptableThroughput = 5 * 1024 * 1024 // 5 MBps

	// MaxAcceptableResponseTimeVariability is the threshold for the maximum allowed variability or
	// fluctuations in response times. A high variability often indicates an unstable system, which
	// could trigger a scale-down to allow the system to stabilize.
	MaxAcceptableResponseTimeVariability = 500 * time.Millisecond

	// ErrorRateThreshold is the maximum acceptable rate of error responses (such as rate-limit errors
	// and 5xx server errors) compared to the total number of requests. Exceeding this threshold suggests
	// the system is encountering issues that may be alleviated by scaling down concurrency.
	ErrorRateThreshold = 0.1 // 10% error rate

	// RateLimitCriticalThreshold defines the number of available rate limit slots considered critical.
	// Falling at or below this threshold suggests the system is close to hitting the rate limit enforced
	// by the external service, and it should scale down to prevent rate-limiting errors.
	RateLimitCriticalThreshold = 5

	// ErrorResponseThreshold is the threshold for the error rate that, once exceeded, indicates the system
	// should consider scaling down. It is a ratio of the number of error responses to the total number of
	// requests, reflecting the health of the interaction with the external system.
	ErrorResponseThreshold = 0.2 // 20% error rate

	// ResponseTimeCriticalThreshold is the duration beyond which the response time is considered critically
	// high. If response times exceed this threshold, it could signal that the system or the external service
	// is under heavy load and may benefit from scaling down concurrency to alleviate pressure.
	ResponseTimeCriticalThreshold = 2 * time.Second

	//
	AcceptableAverageResponseTime = 100 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConcurrencyHandler

type ConcurrencyHandler struct {
	AcquisitionTimes []time.Duration

	Metrics *ConcurrencyMetrics
	// contains filtered or unexported fields
}

ConcurrencyHandler controls the number of concurrent HTTP requests.

func NewConcurrencyHandler

func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler

NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.

func (*ConcurrencyHandler) AcquireConcurrencyPermit added in v0.1.33

func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error)

AcquireConcurrencyPermit acquires a concurrency permit to manage the number of simultaneous operations within predefined limits. This method ensures system stability and compliance with concurrency policies by regulating the execution of concurrent operations.

Parameters:

  • ctx: A parent context which is used as the basis for permit acquisition. This allows for proper handling of timeouts and cancellation in line with best practices.

Returns:

  • context.Context: A new context derived from the original, including a unique request ID. This context is used to trace and manage operations under the acquired concurrency permit.
  • uuid.UUID: The unique request ID generated during the permit acquisition process.
  • error: An error object that indicates failure to acquire a permit within the allotted timeout, or other system-related issues.

Usage: This function should be used before initiating any operation that requires concurrency control. The returned context should be passed to subsequent operations to maintain consistency in concurrency tracking.

func (*ConcurrencyHandler) EvaluateAndAdjustConcurrency added in v0.1.32

func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)

EvaluateAndAdjustConcurrency assesses the current state of system metrics and decides whether to scale up or down the number of concurrent operations allowed. It employs a combination of strategies: a weighted scoring system, threshold-based direct actions, and cumulative impact assessment.

A weighted scoring system is used to prioritize the importance of different system metrics. Each metric can influence the scaling decision based on its assigned weight, reflecting its relative impact on system performance.

Threshold-based scaling provides a fast-track decision path for critical metrics that have exceeded predefined limits. If a critical metric, such as the rate limit remaining slots or server error rates, crosses a specified threshold, immediate action is taken to scale down the concurrency to prevent system overload.

Cumulative impact assessment calculates a cumulative score from all monitored metrics, taking into account their respective weights. This score determines the overall tendency of the system to either scale up or down. If the score indicates a negative trend (i.e., below zero), the system will scale down to reduce load. Conversely, a positive score suggests that there is capacity to handle more concurrent operations, leading to a scale-up decision.

Parameters:

  • resp: The HTTP response received from the server, providing status codes and headers for rate limiting.
  • responseTime: The time duration between sending the request and receiving the response, indicating the server's responsiveness.

The function logs the decision process at each step, providing traceability and insight into the scaling mechanism. The method should be called after each significant interaction with the external system (e.g., an HTTP request) to ensure concurrency levels are adapted to current conditions.

Returns: None. The function directly calls the ScaleUp or ScaleDown methods as needed.

Note: This function does not return any value; it performs actions based on internal assessments and logs outcomes.

func (*ConcurrencyHandler) MonitorRateLimitHeaders added in v0.1.31

func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int

MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.

func (*ConcurrencyHandler) MonitorResponseTimeVariability added in v0.1.31

func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int

MonitorResponseTimeVariability assesses the response time variability from a series of HTTP requests and decides whether to adjust the concurrency level of outgoing requests. This function is integral to maintaining optimal system performance under varying load conditions.

The function first appends the latest response time to a sliding window of the last 10 response times to maintain a recent history. It then calculates the standard deviation and the average of these times. The standard deviation helps determine the variability or consistency of response times, while the average gives a central tendency.

Based on these calculated metrics, the function employs a multi-factor decision mechanism: - If the standard deviation exceeds a pre-defined threshold and the average response time is greater than an acceptable maximum, a debounce counter is incremented. This counter must reach a predefined threshold (debounceScaleDownThreshold) before a decision to decrease concurrency is made, ensuring that only sustained negative trends lead to a scale down. - If the standard deviation is below or equal to the threshold, suggesting stable response times, and the system is currently operating below its concurrency capacity, it may suggest an increase in concurrency to improve throughput.

This approach aims to prevent transient spikes in response times from causing undue scaling actions, thus stabilizing the overall performance and responsiveness of the system.

Returns: - (-1) to suggest a decrease in concurrency, - (1) to suggest an increase in concurrency, - (0) to indicate no change needed.

func (*ConcurrencyHandler) MonitorServerResponseCodes added in v0.1.31

func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int

MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.

func (*ConcurrencyHandler) ReleaseConcurrencyPermit added in v0.1.35

func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID)

ReleaseConcurrencyPermit releases a concurrency permit back to the semaphore, making it available for other operations. This function is essential for maintaining the health and efficiency of the application's concurrency control system by ensuring that resources are properly recycled and available for use by subsequent operations.

Parameters:

  • requestID: The unique identifier for the request associated with the permit being released. This ID is used for structured logging to aid in tracking and debugging permit lifecycle events.

Usage: This method should be called as soon as a request or operation that required a concurrency permit is completed. It ensures that concurrency limits are adhered to and helps prevent issues such as permit leakage or semaphore saturation, which could lead to degraded performance or deadlock conditions.

Example: defer concurrencyHandler.ReleaseConcurrencyPermit(requestID) This usage ensures that the permit is released in a deferred manner at the end of the operation, regardless of how the operation exits (normal completion or error path).

func (*ConcurrencyHandler) ResizeSemaphore added in v0.1.31

func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int)

ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new semaphore with the specified new size and closes the old semaphore to ensure that no further tokens can be acquired from it. This approach helps manage the transition from the old concurrency level to the new one without affecting ongoing operations significantly.

Parameters:

  • newSize: The new size for the semaphore, representing the updated limit on concurrent requests.

This function should be called from within synchronization contexts, such as AdjustConcurrency, to avoid race conditions and ensure that changes to the semaphore are consistent with the observed metrics.

func (*ConcurrencyHandler) ScaleDown added in v0.1.32

func (ch *ConcurrencyHandler) ScaleDown()

ScaleDown reduces the concurrency level by one, down to the minimum limit.

func (*ConcurrencyHandler) ScaleUp added in v0.1.32

func (ch *ConcurrencyHandler) ScaleUp()

ScaleUp increases the concurrency level by one, up to the maximum limit.

type ConcurrencyMetrics

type ConcurrencyMetrics struct {
	TotalRequests        int64         // Total number of requests made
	TotalRetries         int64         // Total number of retry attempts
	TotalRateLimitErrors int64         // Total number of rate limit errors encountered
	PermitWaitTime       time.Duration // Total time spent waiting for tokens
	TTFB                 struct {
		Total time.Duration // Total Time to First Byte (TTFB) for all requests
		Count int64         // Count of requests used for calculating TTFB
		Lock  sync.Mutex    // Lock for TTFB metrics
	}
	Throughput struct {
		Total float64    // Total network throughput for all requests
		Count int64      // Count of requests used for calculating throughput
		Lock  sync.Mutex // Lock for throughput metrics/
	}
	ResponseTimeVariability struct {
		Total                  time.Duration // Total response time for all requests
		Average                time.Duration // Average response time across all requests
		Variance               float64       // Variance of response times
		Count                  int64         // Count of responses used for calculating response time variability
		Lock                   sync.Mutex    // Lock for response time variability metrics
		StdDevThreshold        float64       // Maximum acceptable standard deviation for adjusting concurrency
		DebounceScaleDownCount int           // Counter to manage scale down actions after consecutive triggers
	}
	ResponseCodeMetrics struct {
		ErrorRate float64    // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests
		Lock      sync.Mutex // Lock for response code metrics
	}
	Lock sync.Mutex // Lock for overall metrics fields
}

ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.

type RequestIDKey

type RequestIDKey struct{}

RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL