Documentation
¶
Overview ¶
Package limiter provides rate limiting functionality
Design philosophy: - Standalone package, depends only on the logger component of yogan - Event-driven, the application layer can subscribe to all events - Metrics exposed, application layer can access real-time data - Optional enablement, does not take effect if not configured - Supports multiple algorithms: token bucket, sliding window, concurrent rate limiting, adaptive - Support multiple storages: memory, Redis
Index ¶
- Variables
- type AdaptiveProvider
- type Algorithm
- type AlgorithmMetrics
- type AlgorithmType
- type AllowedEvent
- type BaseEvent
- type Config
- type Event
- type EventBus
- type EventListener
- type EventListenerFunc
- type EventType
- type LimitChangedEvent
- type Limiter
- type Manager
- func (m *Manager) Allow(ctx context.Context, resource string) (bool, error)
- func (m *Manager) AllowN(ctx context.Context, resource string, n int64) (bool, error)
- func (m *Manager) Close() error
- func (m *Manager) GetConfig() Config
- func (m *Manager) GetEventBus() EventBus
- func (m *Manager) GetMetrics(resource string) *MetricsSnapshot
- func (m *Manager) IsEnabled() bool
- func (m *Manager) Reset(resource string)
- func (m *Manager) SetMetrics(metrics *OTelMetrics)
- func (m *Manager) Shutdown() error
- func (m *Manager) Wait(ctx context.Context, resource string) error
- func (m *Manager) WaitN(ctx context.Context, resource string, n int64) error
- type MetricsCollector
- type MetricsConfig
- type MetricsSnapshot
- type OTelMetrics
- func (m *OTelMetrics) IsMetricsEnabled() bool
- func (m *OTelMetrics) IsRegistered() bool
- func (m *OTelMetrics) MetricsName() string
- func (m *OTelMetrics) RecordAllowed(ctx context.Context, resource, algorithm string)
- func (m *OTelMetrics) RecordRejected(ctx context.Context, resource, algorithm, reason string)
- func (m *OTelMetrics) RegisterMetrics(meter metric.Meter) error
- func (m *OTelMetrics) RegisterTokenCallback(resource string, callback func() int64)
- func (m *OTelMetrics) UnregisterTokenCallback(resource string)
- type RedisInstanceConfig
- type RedisStore
- func (s *RedisStore) Close() error
- func (s *RedisStore) Decr(ctx context.Context, key string) (int64, error)
- func (s *RedisStore) DecrBy(ctx context.Context, key string, delta int64) (int64, error)
- func (s *RedisStore) Del(ctx context.Context, keys ...string) error
- func (s *RedisStore) Eval(ctx context.Context, script string, keys []string, args []interface{}) (interface{}, error)
- func (s *RedisStore) Exists(ctx context.Context, key string) (bool, error)
- func (s *RedisStore) Expire(ctx context.Context, key string, expiration time.Duration) error
- func (s *RedisStore) Get(ctx context.Context, key string) (string, error)
- func (s *RedisStore) GetInt64(ctx context.Context, key string) (int64, error)
- func (s *RedisStore) Incr(ctx context.Context, key string) (int64, error)
- func (s *RedisStore) IncrBy(ctx context.Context, key string, value int64) (int64, error)
- func (s *RedisStore) Set(ctx context.Context, key string, value string, ttl time.Duration) error
- func (s *RedisStore) SetInt64(ctx context.Context, key string, value int64, ttl time.Duration) error
- func (s *RedisStore) TTL(ctx context.Context, key string) (time.Duration, error)
- func (s *RedisStore) ZAdd(ctx context.Context, key string, score float64, member string) error
- func (s *RedisStore) ZCount(ctx context.Context, key string, min, max float64) (int64, error)
- func (s *RedisStore) ZRemRangeByScore(ctx context.Context, key string, min, max float64) error
- type RejectedEvent
- type ResourceConfig
- type Response
- type Store
- type StoreType
- type ValidationError
- type WaitEvent
Constants ¶
This section is empty.
Variables ¶
var ( // ErrLimitExceeded Exceeds rate limiting threshold ErrLimitExceeded = errors.New("rate limit exceeded") // ErrWaitTimeout timeout waiting ErrWaitTimeout = errors.New("wait timeout") // ErrKeyNotFound Key does not exist ErrKeyNotFound = errors.New("key not found") // ErrInvalidConfig Invalid configuration ErrInvalidConfig = errors.New("invalid config") // ErrStoreNotSupported Storage Not Supported ErrStoreNotSupported = errors.New("store operation not supported") // ErrResourceNotFound Resource not found ErrResourceNotFound = errors.New("resource not found") )
Functions ¶
This section is empty.
Types ¶
type AdaptiveProvider ¶
type AdaptiveProvider interface {
// GetCPUUsage Gets CPU usage (0.0-1.0)
GetCPUUsage() float64
// GetMemoryUsage Gets memory usage (0.0-1.0)
GetMemoryUsage() float64
// GetSystemLoad Obtain system load (usually load average per CPU core)
GetSystemLoad() float64
}
AdaptiveProvider adaptive rate limiting data provider (dependency injection)
Usage instructions: - Implement this interface to provide system load data (CPU, memory, system load) - If Provider is not injected, adaptive rate limiting will not be effective, falling back to fixed rate limiting - Specific collection logic can be implemented using libraries such as gopsutil
Example implementation:
type SystemMetricsProvider struct {
// Can integrate libraries like gopsutil
}
func (p *SystemMetricsProvider) GetCPUUsage() float64 {
// Implement CPU usage collection
return 0.65 }
type Algorithm ¶
type Algorithm interface {
// Allow check if the request is permitted
Allow(ctx context.Context, store Store, resource string, n int64, cfg ResourceConfig) (*Response, error)
// Wait for permission (blocking until acquired or timed out)
Wait(ctx context.Context, store Store, resource string, n int64, cfg ResourceConfig, timeout time.Duration) error
// GetMetrics retrieves current metrics
GetMetrics(ctx context.Context, store Store, resource string) (*AlgorithmMetrics, error)
// Reset reset status
Reset(ctx context.Context, store Store, resource string) error
// Name Returns algorithm name
Name() string
}
Rate limiting algorithm interface (strategy pattern)
func GetAlgorithm ¶
func GetAlgorithm(cfg ResourceConfig, provider AdaptiveProvider) Algorithm
GetAlgorithm obtains an algorithm instance according to the configuration
func NewAdaptiveAlgorithm ¶
func NewAdaptiveAlgorithm(provider AdaptiveProvider) Algorithm
Create new adaptive rate limiting algorithm
func NewConcurrencyAlgorithm ¶
func NewConcurrencyAlgorithm() Algorithm
NewConcurrencyAlgorithm creates concurrency rate limiting algorithm
func NewSlidingWindowAlgorithm ¶
func NewSlidingWindowAlgorithm() Algorithm
Create new sliding window algorithm
func NewTokenBucketAlgorithm ¶
func NewTokenBucketAlgorithm() Algorithm
NewTokenBucketAlgorithm Creates the token bucket algorithm
type AlgorithmMetrics ¶
type AlgorithmMetrics struct {
Current int64 // Current value (concurrency count/token usage/request count)
Limit int64 // Limit value
Remaining int64 // remaining quota
ResetAt time.Time // Reset time
}
AlgorithmMetrics algorithm metrics
type AlgorithmType ¶
type AlgorithmType string
AlgorithmType algorithm type
const ( // AlgorithmTokenBucket token bucket algorithm AlgorithmTokenBucket AlgorithmType = "token_bucket" // AlgorithmSlidingWindow Sliding Window Algorithm AlgorithmSlidingWindow AlgorithmType = "sliding_window" // AlgorithmConcurrency concurrency rate limiting algorithm AlgorithmConcurrency AlgorithmType = "concurrency" // Algorithm Adaptive for rate limiting AlgorithmAdaptive AlgorithmType = "adaptive" )
type AllowedEvent ¶
AllowedEvent permitted events
type BaseEvent ¶
type BaseEvent struct {
// contains filtered or unexported fields
}
BaseEvent basic event
func NewBaseEvent ¶
NewBaseEvent creates a base event
type Config ¶
type Config struct {
// Enabled whether to enable rate limiting (false means direct passthrough)
Enabled bool `mapstructure:"enabled"`
// StoreType storage type: memory, redis
StoreType string `mapstructure:"store_type"`
// Redis configuration (required when StoreType is redis)
Redis RedisInstanceConfig `mapstructure:"redis"`
// EventBusBuffer event bus buffer size
EventBusBuffer int `mapstructure:"event_bus_buffer"`
// KeyFunc resource key generation method (for middleware)
// Optional values: path, ip, user, path_ip, api_key (default is path)
KeyFunc string `mapstructure:"key_func"`
// SkipPaths list of paths to bypass rate limiting (for middleware)
SkipPaths []string `mapstructure:"skip_paths"`
// Default resource configuration (if a valid default is set, it will be automatically applied to unconfigured resources)
Default ResourceConfig `mapstructure:"default"`
// Resources configuration level (overrides Default)
Resources map[string]ResourceConfig `mapstructure:"resources"`
}
Rate limiter configuration
func (*Config) GetResourceConfig ¶
func (c *Config) GetResourceConfig(resource string) ResourceConfig
GetResourceConfig Retrieve resource configuration (prioritize resource-level configuration, fallback to default)
type Event ¶
type Event interface {
Type() EventType
Resource() string
Context() context.Context
Timestamp() time.Time
}
Event interface
type EventBus ¶
type EventBus interface {
// Subscribe to event
Subscribe(listener EventListener)
// Publish event
Publish(event Event)
// Close event bus
Close()
}
EventBus event bus interface
type EventListener ¶
type EventListener interface {
OnEvent(event Event)
}
EventListener event listener interface
type EventListenerFunc ¶
type EventListenerFunc func(event Event)
EventListenerFunc event listener function type
func (EventListenerFunc) OnEvent ¶
func (f EventListenerFunc) OnEvent(event Event)
OnEvent implements EventListener interface
type EventType ¶
type EventType string
Event Type
const ( // EventAllowed permission granted EventAllowed EventType = "allowed" // EventRejected reject request EventRejected EventType = "rejected" // EventWaitStart Start waiting EventWaitStart EventType = "wait_start" // EventWaitSuccess wait successful EventWaitSuccess EventType = "wait_success" // EventWaitTimeout wait timeout EventWaitTimeout EventType = "wait_timeout" // EventLimitChanged Threshold change for rate limiting (adaptive) EventLimitChanged EventType = "limit_changed" )
type LimitChangedEvent ¶
type LimitChangedEvent struct {
BaseEvent
OldLimit int64
NewLimit int64
CPUUsage float64
MemoryUsage float64
SystemLoad float64
}
LimitChangedEvent Throttling threshold change event (adaptive)
type Limiter ¶
type Limiter interface {
// Allow check if the request is permitted (quick check)
Allow(ctx context.Context, resource string) (bool, error)
// AllowN checks if N requests are permitted
AllowN(ctx context.Context, resource string, n int64) (bool, error)
// Wait for permission acquisition (blocking wait, supports timeout)
Wait(ctx context.Context, resource string) error
// Wait for N licenses to be available
WaitN(ctx context.Context, resource string, n int64) error
// GetMetrics Retrieve metric snapshot
GetMetrics(resource string) *MetricsSnapshot
// GetEventBus Obtain the event bus (for subscribing to events)
GetEventBus() EventBus
// Reset rate limiter state
Reset(resource string)
// Close the rate limiter (clean up resources)
Close() error
// Check if the rate limiter is enabled
IsEnabled() bool
}
Limiter core interface
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Rate Limiter Manager
func NewManagerWithLogger ¶
func NewManagerWithLogger(config Config, ctxLogger *logger.CtxZapLogger, redisClient *redis.Client, provider AdaptiveProvider) (*Manager, error)
Create a rate limiter manager with logger
func (*Manager) GetMetrics ¶
func (m *Manager) GetMetrics(resource string) *MetricsSnapshot
GetMetrics retrieves throttling metrics
func (*Manager) SetMetrics ¶
func (m *Manager) SetMetrics(metrics *OTelMetrics)
SetMetrics injects the OTel metrics provider. This should be called after the Manager is created when metrics are enabled.
func (*Manager) Shutdown ¶
Implements the samber/do.Shutdownable interface for shutdown functionality
type MetricsCollector ¶
type MetricsCollector interface {
// RecordAllowed records allowed requests
RecordAllowed(remaining int64)
// RecordRejected Request for record rejected
RecordRejected(reason string)
// GetSnapshot Retrieve metric snapshot
GetSnapshot() *MetricsSnapshot
// Reset metrics
Reset()
}
MetricsCollector metric collector interface
func NewMetricsCollector ¶
func NewMetricsCollector(resource string, algorithm string) MetricsCollector
Create metric collector
type MetricsConfig ¶
MetricsConfig holds configuration for limiter metrics
type MetricsSnapshot ¶
type MetricsSnapshot struct {
Resource string
Algorithm string
TotalRequests int64
Allowed int64
Rejected int64
CurrentValue int64 // Current value (concurrency count/token count/request count)
Limit int64 // Limit value
Remaining int64 // remaining quota
RejectRate float64 // rejection rate
LastResetAt time.Time
}
MetricsSnapshot metric snapshot
type OTelMetrics ¶
type OTelMetrics struct {
// contains filtered or unexported fields
}
OTelMetrics implements MetricsProvider for OpenTelemetry integration. This replaces the legacy atomic-based metrics with OTel instrumentation.
func NewOTelMetrics ¶
func NewOTelMetrics(cfg MetricsConfig) *OTelMetrics
NewOTelMetrics creates a new OTel metrics provider for limiter
func (*OTelMetrics) IsMetricsEnabled ¶
func (m *OTelMetrics) IsMetricsEnabled() bool
IsMetricsEnabled returns whether metrics collection is enabled
func (*OTelMetrics) IsRegistered ¶
func (m *OTelMetrics) IsRegistered() bool
IsRegistered returns whether metrics have been registered
func (*OTelMetrics) MetricsName ¶
func (m *OTelMetrics) MetricsName() string
MetricsName returns the metrics group name
func (*OTelMetrics) RecordAllowed ¶
func (m *OTelMetrics) RecordAllowed(ctx context.Context, resource, algorithm string)
RecordAllowed records an allowed request
func (*OTelMetrics) RecordRejected ¶
func (m *OTelMetrics) RecordRejected(ctx context.Context, resource, algorithm, reason string)
RecordRejected records a rejected request
func (*OTelMetrics) RegisterMetrics ¶
func (m *OTelMetrics) RegisterMetrics(meter metric.Meter) error
RegisterMetrics registers all limiter metrics with the provided Meter
func (*OTelMetrics) RegisterTokenCallback ¶
func (m *OTelMetrics) RegisterTokenCallback(resource string, callback func() int64)
RegisterTokenCallback registers a callback for a resource's token count
func (*OTelMetrics) UnregisterTokenCallback ¶
func (m *OTelMetrics) UnregisterTokenCallback(resource string)
UnregisterTokenCallback removes a resource's token callback
type RedisInstanceConfig ¶
type RedisInstanceConfig struct {
Instance string `mapstructure:"instance"` // Redis instance name (configured in redis.instances)
KeyPrefix string `mapstructure:"key_prefix"` // Redis key prefix (default "limiter:")
}
RedisInstanceConfig Redis instance reference configuration (reusing kernel redis component)
type RedisStore ¶
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore Redis storage implementation
func NewRedisStore ¶
func NewRedisStore(client *redis.Client, keyPrefix string) *RedisStore
NewRedisStore creates Redis storage
func (*RedisStore) Close ¶
func (s *RedisStore) Close() error
Close Close the connection (RedisStore does not own the client, so there is no need to close)
func (*RedisStore) Del ¶
func (s *RedisStore) Del(ctx context.Context, keys ...string) error
Delete key deletion
func (*RedisStore) Eval ¶
func (s *RedisStore) Eval(ctx context.Context, script string, keys []string, args []interface{}) (interface{}, error)
Evaluate execution of Lua script
func (*RedisStore) SetInt64 ¶
func (s *RedisStore) SetInt64(ctx context.Context, key string, value int64, ttl time.Duration) error
SetInt64 set integer value
func (*RedisStore) TTL ¶
Get the remaining time-to-live for the key Returns 0 if key doesn't exist or has no TTL
func (*RedisStore) ZCount ¶
ZCount statistics the number of members in a sorted set within a specified score range
func (*RedisStore) ZRemRangeByScore ¶
Remove members from a sorted set that are within a specified score range
type RejectedEvent ¶
RejectedEvent rejected event
type ResourceConfig ¶
type ResourceConfig struct {
// Algorithm rate_limiting: token_bucket, sliding_window, concurrency, adaptive
Algorithm string `mapstructure:"algorithm"`
// Token bucket configuration
Rate int64 `mapstructure:"rate"` // token generation rate (per second)
Capacity int64 `mapstructure:"capacity"` // bucket capacity
InitTokens int64 `mapstructure:"init_tokens"` // Initial token count
// Sliding window configuration
Limit int64 `mapstructure:"limit"` // maximum request count within window
WindowSize time.Duration `mapstructure:"window_size"` // window size
BucketSize time.Duration `mapstructure:"bucket_size"` // bucket size
// Concurrent rate limiting configuration
MaxConcurrency int64 `mapstructure:"max_concurrency"` // maximum concurrency limit
Timeout time.Duration `mapstructure:"timeout"` // timeout waiting
// Adaptive rate limiting configuration
MinLimit int64 `mapstructure:"min_limit"` // minimum rate limiting value
MaxLimit int64 `mapstructure:"max_limit"` // maximum rate limiting value
TargetCPU float64 `mapstructure:"target_cpu"` // Target CPU utilization rate
TargetMemory float64 `mapstructure:"target_memory"` // Target memory utilization rate
TargetLoad float64 `mapstructure:"target_load"` // target system load
AdjustInterval time.Duration `mapstructure:"adjust_interval"` // Adjust interval
}
ResourceConfig resource-level configuration
func DefaultResourceConfig ¶
func DefaultResourceConfig() ResourceConfig
Returns default resource configuration
func (ResourceConfig) Merge ¶
func (rc ResourceConfig) Merge(override ResourceConfig) ResourceConfig
Merge merge configuration (override default values)
func (*ResourceConfig) Validate ¶
func (rc *ResourceConfig) Validate() error
Validate resource configuration
type Response ¶
type Response struct {
// Allowed 是否允许: Is allowed
Allowed bool
// RetryAfter suggests retry time (valid when Allowed=false)
RetryAfter time.Duration
// Remaining quota (token bucket/sliding window)
Remaining int64
// Limit total quota
Limit int64
// ResetTime quota reset time
ResetAt time.Time
}
Rate limiting response
type Store ¶
type Store interface {
// Get retrieve value
Get(ctx context.Context, key string) (string, error)
// Set value (with expiration time)
Set(ctx context.Context, key string, value string, ttl time.Duration) error
// GetInt64 get integer value
GetInt64(ctx context.Context, key string) (int64, error)
// SetInt64 set integer value
SetInt64(ctx context.Context, key string, value int64, ttl time.Duration) error
// Atomic increment
Incr(ctx context.Context, key string) (int64, error)
// IncrBy atomic increment by specified value
IncrBy(ctx context.Context, key string, delta int64) (int64, error)
// Atomic decrement
Decr(ctx context.Context, key string) (int64, error)
// Atomically decrement by the specified value
DecrBy(ctx context.Context, key string, delta int64) (int64, error)
// Set expiration time
Expire(ctx context.Context, key string, ttl time.Duration) error
// Get remaining TTL (Time To Live) duration
TTL(ctx context.Context, key string) (time.Duration, error)
// Delete delete key
Del(ctx context.Context, keys ...string) error
// Exists Check if key exists
Exists(ctx context.Context, key string) (bool, error)
// Add to sorted set
ZAdd(ctx context.Context, key string, score float64, member string) error
// Remove by score range
ZRemRangeByScore(ctx context.Context, key string, min, max float64) error
// ZCount statistics the number of elements within a score range
ZCount(ctx context.Context, key string, min, max float64) (int64, error)
// Eval executes Lua scripts (specific to Redis, can return unsupported errors for in-memory storage)
Eval(ctx context.Context, script string, keys []string, args []interface{}) (interface{}, error)
// Close connection
Close() error
}
Store interface (Strategy Pattern)
type ValidationError ¶
ValidationError configuration validation error
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string