limiters

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2023 License: MIT Imports: 23 Imported by: 0

README

Distributed rate limiters for Golang

Build Status codecov Go Report Card GoDoc

Rate limiters for distributed applications in Golang with configurable back-ends and distributed locks. Any types of back-ends and locks can be used that implement certain minimalistic interfaces. Most common implementations are already provided.

  • Token bucket

    • in-memory (local)
    • redis
    • etcd
    • dynamodb

    Allows requests at a certain input rate with possible bursts configured by the capacity parameter. The output rate equals to the input rate. Precise (no over or under-limiting), but requires a lock (provided).

  • Leaky bucket

    • in-memory (local)
    • redis
    • etcd
    • dynamodb

    Puts requests in a FIFO queue to be processed at a constant rate. There are no restrictions on the input rate except for the capacity of the queue. Requires a lock (provided).

  • Fixed window counter

    • in-memory (local)
    • redis
    • dynamodb

    Simple and resources efficient algorithm that does not need a lock. Precision may be adjusted by the size of the window. May be lenient when there are many requests around the boundary between 2 adjacent windows.

  • Sliding window counter

    • in-memory (local)
    • redis
    • dynamodb

    Smoothes out the bursts around the boundary between 2 adjacent windows. Needs as twice more memory as the Fixed Window algorithm (2 windows instead of 1 at a time). It will disallow all the requests in case when a client is flooding the service with requests. It's the client's responsibility to handle a disallowed request properly: wait before making a new one again.

  • Concurrent buffer

    • in-memory (local)
    • redis

    Allows concurrent requests up to the given capacity. Requires a lock (provided).

gRPC example

Global token bucket rate limiter for a gRPC service example:

// examples/example_grpc_simple_limiter_test.go
rate := time.Second * 3
limiter := limiters.NewTokenBucket(
    2,
    rate,
    limiters.NewLockerEtcd(etcdClient, "/ratelimiter_lock/simple/", limiters.NewStdLogger()),
    limiters.NewTokenBucketRedis(
        redisClient,
        "ratelimiter/simple",
        rate, false),
    limiters.NewSystemClock(), limiters.NewStdLogger(),
)

// Add a unary interceptor middleware to rate limit all requests.
s := grpc.NewServer(grpc.UnaryInterceptor(
    func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        w, err := limiter.Limit(ctx)
        if err == limiters.ErrLimitExhausted {
            return nil, status.Errorf(codes.ResourceExhausted, "try again later in %s", w)
        } else if err != nil {
            // The limiter failed. This error should be logged and examined.
            log.Println(err)
            return nil, status.Error(codes.Internal, "internal error")
        }
        return handler(ctx, req)
    }))

For something close to a real world example see the IP address based gRPC global rate limiter in the examples directory.

DynamoDB

The use of DynamoDB requires the creation of a DynamoDB Table prior to use. An existing table can be used or a new one can be created. Depending on the limiter backend:

  • Partion Key
    • String
    • Required for all Backends
  • Sort Key
    • String
    • Backends:
      • FixedWindow
      • SlidingWindow
  • TTL
    • Number
    • Backends:
      • FixedWindow
      • SlidingWindow
      • LeakyBucket
      • TokenBucket

All DynamoDB backends accept a DynamoDBTableProperties struct as a paramater. This can be manually created or use the LoadDynamoDBTableProperties with the table name. When using LoadDynamoDBTableProperties, the table description is fetched from AWS and verified that the table can be used for Limiter backends. Results of LoadDynamoDBTableProperties are cached.

Distributed locks

Some algorithms require a distributed lock to guarantee consistency during concurrent requests. In case there is only 1 running application instance then no distributed lock is needed as all the algorithms are thread-safe (use LockNoop).

Supported backends:

Testing

Run tests locally:

docker-compose up -d  # start etcd, Redis, zookeeper, consul, and localstack
ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" go test -race -v

Run Drone CI tests locally:

for p in "go1.13" "go1.12" "go1.11" "lint"; do drone exec --pipeline=${p}; done

Documentation

Overview

Package limiters provides general purpose rate limiter implementations.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrLimitExhausted is returned by the Limiter in case the number of requests overflows the capacity of a Limiter.
	ErrLimitExhausted = errors.New("requests limit exhausted")

	// ErrRaceCondition is returned when there is a race condition while saving a state of a rate limiter.
	ErrRaceCondition = errors.New("race condition detected")
)

Functions

This section is empty.

Types

type Clock

type Clock interface {
	// Now returns the current system time.
	Now() time.Time
}

Clock encapsulates a system Clock. Used

type ConcurrentBuffer

type ConcurrentBuffer struct {
	// contains filtered or unexported fields
}

ConcurrentBuffer implements a limiter that allows concurrent requests up to the given capacity.

func NewConcurrentBuffer

func NewConcurrentBuffer(locker DistLocker, concurrentStateBackend ConcurrentBufferBackend, capacity int64, logger Logger) *ConcurrentBuffer

NewConcurrentBuffer creates a new ConcurrentBuffer instance.

func (*ConcurrentBuffer) Done

func (c *ConcurrentBuffer) Done(ctx context.Context, key string) error

Done removes the request identified by the key from the buffer.

func (*ConcurrentBuffer) Limit

func (c *ConcurrentBuffer) Limit(ctx context.Context, key string) error

Limit puts the request identified by the key in a buffer.

type ConcurrentBufferBackend

type ConcurrentBufferBackend interface {
	// Add adds the request with the given key to the buffer and returns the total number of requests in it.
	Add(ctx context.Context, key string) (int64, error)
	// Remove removes the request from the buffer.
	Remove(ctx context.Context, key string) error
}

ConcurrentBufferBackend wraps the Add and Remove methods.

type ConcurrentBufferInMemory

type ConcurrentBufferInMemory struct {
	// contains filtered or unexported fields
}

ConcurrentBufferInMemory is an in-memory implementation of ConcurrentBufferBackend.

func NewConcurrentBufferInMemory

func NewConcurrentBufferInMemory(registry *Registry, ttl time.Duration, clock Clock) *ConcurrentBufferInMemory

NewConcurrentBufferInMemory creates a new instance of ConcurrentBufferInMemory. When the TTL of a key exceeds the key is removed from the buffer. This is needed in case if the process that added that key to the buffer did not call Done() for some reason.

func (*ConcurrentBufferInMemory) Add

Add adds the request with the given key to the buffer and returns the total number of requests in it. It also removes the keys with expired TTL.

func (*ConcurrentBufferInMemory) Remove

Remove removes the request from the buffer.

type ConcurrentBufferRedis

type ConcurrentBufferRedis struct {
	// contains filtered or unexported fields
}

ConcurrentBufferRedis implements ConcurrentBufferBackend in Redis.

func NewConcurrentBufferRedis

func NewConcurrentBufferRedis(cli *redis.Client, key string, ttl time.Duration, clock Clock) *ConcurrentBufferRedis

NewConcurrentBufferRedis creates a new instance of ConcurrentBufferRedis. When the TTL of a key exceeds the key is removed from the buffer. This is needed in case if the process that added that key to the buffer did not call Done() for some reason.

func (*ConcurrentBufferRedis) Add

func (c *ConcurrentBufferRedis) Add(ctx context.Context, key string) (int64, error)

Add adds the request with the given key to the sorted set in Redis and returns the total number of requests in it. It also removes the keys with expired TTL.

func (*ConcurrentBufferRedis) Remove

func (c *ConcurrentBufferRedis) Remove(ctx context.Context, key string) error

Remove removes the request identified by the key from the sorted set in Redis.

type DistLocker

type DistLocker interface {
	// Lock locks the locker.
	Lock(ctx context.Context) error
	// Unlock unlocks the previously successfully locked lock.
	Unlock(ctx context.Context) error
}

DistLocker is a context aware distributed locker (interface is similar to sync.Locker).

type DynamoDBTableProperties

type DynamoDBTableProperties struct {
	//TableName is the name of the table.
	TableName string
	//PartitionKeyName is the name of the PartitionKey attribute.
	PartitionKeyName string
	//SortKeyName is the name of the SortKey attribute.
	SortKeyName string
	//SortKeyUsed indicates if a SortKey is present on the table.
	SortKeyUsed bool
	//TTLFieldName is the name of the attribute configured for TTL.
	TTLFieldName string
}

DynamoDBTableProperties are supplied to DynamoDB limiter backends. This struct informs the backend what the name of the table is and what the names of the key fields are.

func LoadDynamoDBTableProperties

func LoadDynamoDBTableProperties(ctx context.Context, client *dynamodb.Client, tableName string) (DynamoDBTableProperties, error)

LoadDynamoDBTableProperties fetches a table description with the supplied client and returns a DynamoDBTableProperties struct

type FixedWindow

type FixedWindow struct {
	// contains filtered or unexported fields
}

FixedWindow implements a Fixed Window rate limiting algorithm.

Simple and memory efficient algorithm that does not need a distributed lock. However it may be lenient when there are many requests around the boundary between 2 adjacent windows.

func NewFixedWindow

func NewFixedWindow(capacity int64, rate time.Duration, fixedWindowIncrementer FixedWindowIncrementer, clock Clock) *FixedWindow

NewFixedWindow creates a new instance of FixedWindow. Capacity is the maximum amount of requests allowed per window. Rate is the window size.

func (*FixedWindow) Limit

func (f *FixedWindow) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. It returns ErrLimitExhausted if the request overflows the window's capacity.

type FixedWindowDynamoDB

type FixedWindowDynamoDB struct {
	// contains filtered or unexported fields
}

FixedWindowDynamoDB implements FixedWindow in DynamoDB.

func NewFixedWindowDynamoDB

func NewFixedWindowDynamoDB(client *dynamodb.Client, partitionKey string, props DynamoDBTableProperties) *FixedWindowDynamoDB

NewFixedWindowDynamoDB creates a new instance of FixedWindowDynamoDB. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * SortKey * TTL

func (*FixedWindowDynamoDB) Increment

func (f *FixedWindowDynamoDB) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)

Increment increments the window's counter in DynamoDB.

type FixedWindowInMemory

type FixedWindowInMemory struct {
	// contains filtered or unexported fields
}

FixedWindowInMemory is an in-memory implementation of FixedWindowIncrementer.

func NewFixedWindowInMemory

func NewFixedWindowInMemory() *FixedWindowInMemory

NewFixedWindowInMemory creates a new instance of FixedWindowInMemory.

func (*FixedWindowInMemory) Increment

func (f *FixedWindowInMemory) Increment(ctx context.Context, window time.Time, _ time.Duration) (int64, error)

Increment increments the window's counter.

type FixedWindowIncrementer

type FixedWindowIncrementer interface {
	// Increment increments the request counter for the window and returns the counter value.
	// TTL is the time duration before the next window.
	Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)
}

FixedWindowIncrementer wraps the Increment method.

type FixedWindowRedis

type FixedWindowRedis struct {
	// contains filtered or unexported fields
}

FixedWindowRedis implements FixedWindow in Redis.

func NewFixedWindowRedis

func NewFixedWindowRedis(cli *redis.Client, prefix string) *FixedWindowRedis

NewFixedWindowRedis returns a new instance of FixedWindowRedis. Prefix is the key prefix used to store all the keys used in this implementation in Redis.

func (*FixedWindowRedis) Increment

func (f *FixedWindowRedis) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)

Increment increments the window's counter in Redis.

type LeakyBucket

type LeakyBucket struct {
	// contains filtered or unexported fields
}

LeakyBucket implements the https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue algorithm.

func NewLeakyBucket

func NewLeakyBucket(capacity int64, rate time.Duration, locker DistLocker, leakyBucketStateBackend LeakyBucketStateBackend, clock Clock, logger Logger) *LeakyBucket

NewLeakyBucket creates a new instance of LeakyBucket.

func (*LeakyBucket) Limit

func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. If the last request happened earlier than the rate this method returns zero duration. It returns ErrLimitExhausted if the the request overflows the bucket's capacity. In this case the returned duration means how long it would have taken to wait for the request to be processed if the bucket was not overflowed.

type LeakyBucketDynamoDB

type LeakyBucketDynamoDB struct {
	// contains filtered or unexported fields
}

LeakyBucketDynamoDB is a DyanamoDB implementation of a LeakyBucketStateBackend.

func NewLeakyBucketDynamoDB

func NewLeakyBucketDynamoDB(client *dynamodb.Client, partitionKey string, tableProps DynamoDBTableProperties, ttl time.Duration, raceCheck bool) *LeakyBucketDynamoDB

NewLeakyBucketDynamoDB creates a new LeakyBucketDynamoDB instance. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * TTL

TTL is the TTL of the stored item.

If raceCheck is true and the item in DynamoDB are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketDynamoDB) SetState

func (t *LeakyBucketDynamoDB) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state in DynamoDB.

func (*LeakyBucketDynamoDB) State

State gets the bucket's state from DynamoDB.

type LeakyBucketEtcd

type LeakyBucketEtcd struct {
	// contains filtered or unexported fields
}

LeakyBucketEtcd is an etcd implementation of a LeakyBucketStateBackend. See the TokenBucketEtcd description for the details on etcd usage.

func NewLeakyBucketEtcd

func NewLeakyBucketEtcd(cli *clientv3.Client, prefix string, ttl time.Duration, raceCheck bool) *LeakyBucketEtcd

NewLeakyBucketEtcd creates a new LeakyBucketEtcd instance. Prefix is used as an etcd key prefix for all keys stored in etcd by this algorithm. TTL is a TTL of the etcd lease used to store all the keys.

If raceCheck is true and the keys in etcd are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketEtcd) SetState

func (l *LeakyBucketEtcd) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state of the bucket in etcd.

func (*LeakyBucketEtcd) State

State gets the bucket's current state from etcd. If there is no state available in etcd then the initial bucket's state is returned.

type LeakyBucketInMemory

type LeakyBucketInMemory struct {
	// contains filtered or unexported fields
}

LeakyBucketInMemory is an in-memory implementation of LeakyBucketStateBackend.

func NewLeakyBucketInMemory

func NewLeakyBucketInMemory() *LeakyBucketInMemory

NewLeakyBucketInMemory creates a new instance of LeakyBucketInMemory.

func (*LeakyBucketInMemory) SetState

func (l *LeakyBucketInMemory) SetState(ctx context.Context, state LeakyBucketState) error

SetState sets the current state of the bucket.

func (*LeakyBucketInMemory) State

State gets the current state of the bucket.

type LeakyBucketRedis

type LeakyBucketRedis struct {
	// contains filtered or unexported fields
}

LeakyBucketRedis is a Redis implementation of a LeakyBucketStateBackend.

func NewLeakyBucketRedis

func NewLeakyBucketRedis(cli *redis.Client, prefix string, ttl time.Duration, raceCheck bool) *LeakyBucketRedis

NewLeakyBucketRedis creates a new LeakyBucketRedis instance. Prefix is the key prefix used to store all the keys used in this implementation in Redis. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Redis are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketRedis) SetState

func (t *LeakyBucketRedis) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state in Redis. The provided fencing token is checked on the Redis side before saving the keys.

func (*LeakyBucketRedis) State

State gets the bucket's state from Redis.

type LeakyBucketState

type LeakyBucketState struct {
	// Last is the Unix timestamp in nanoseconds of the most recent request.
	Last int64
}

LeakyBucketState represents the state of a LeakyBucket.

func (LeakyBucketState) IzZero

func (s LeakyBucketState) IzZero() bool

IzZero returns true if the bucket state is zero valued.

type LeakyBucketStateBackend

type LeakyBucketStateBackend interface {
	// State gets the current state of the LeakyBucket.
	State(ctx context.Context) (LeakyBucketState, error)
	// SetState sets (persists) the current state of the LeakyBucket.
	SetState(ctx context.Context, state LeakyBucketState) error
}

LeakyBucketStateBackend interface encapsulates the logic of retrieving and persisting the state of a LeakyBucket.

type LockConsul

type LockConsul struct {
	// contains filtered or unexported fields
}

LockConsul is a wrapper around github.com/hashicorp/consul/api.Lock that implements the DistLocker interface.

func NewLockConsul

func NewLockConsul(lock *api.Lock) *LockConsul

NewLockConsul creates a new LockConsul instance.

func (*LockConsul) Lock

func (l *LockConsul) Lock(ctx context.Context) error

Lock locks the lock in Consul.

func (*LockConsul) Unlock

func (l *LockConsul) Unlock(_ context.Context) error

Unlock unlocks the lock in Consul.

type LockEtcd

type LockEtcd struct {
	// contains filtered or unexported fields
}

LockEtcd implements the DistLocker interface using etcd.

See https://github.com/etcd-io/etcd/blob/master/Documentation/learning/why.md#using-etcd-for-distributed-coordination

func NewLockEtcd

func NewLockEtcd(cli *clientv3.Client, prefix string, logger Logger) *LockEtcd

NewLockEtcd creates a new instance of LockEtcd.

func (*LockEtcd) Lock

func (l *LockEtcd) Lock(ctx context.Context) error

Lock creates a new session-based lock in etcd and locks it.

func (*LockEtcd) Unlock

func (l *LockEtcd) Unlock(ctx context.Context) error

Unlock unlocks the previously locked lock.

type LockNoop

type LockNoop struct {
}

LockNoop is a no-op implementation of the DistLocker interface. It should only be used with the in-memory backends as they are already thread-safe and don't need distributed locks.

func NewLockNoop

func NewLockNoop() *LockNoop

NewLockNoop creates a new LockNoop.

func (LockNoop) Lock

func (n LockNoop) Lock(ctx context.Context) error

Lock imitates locking.

func (LockNoop) Unlock

func (n LockNoop) Unlock(_ context.Context) error

Unlock does nothing.

type LockRedis

type LockRedis struct {
	// contains filtered or unexported fields
}

LockRedis is a wrapper around github.com/go-redsync/redsync that implements the DistLocker interface.

func NewLockRedis

func NewLockRedis(pool redsyncredis.Pool, mutexName string) *LockRedis

NewLockRedis creates a new instance of LockRedis.

func (*LockRedis) Lock

func (l *LockRedis) Lock(_ context.Context) error

Lock locks the lock in Redis.

func (*LockRedis) Unlock

func (l *LockRedis) Unlock(_ context.Context) error

Unlock unlocks the lock in Redis.

type LockZookeeper

type LockZookeeper struct {
	// contains filtered or unexported fields
}

LockZookeeper is a wrapper around github.com/samuel/go-zookeeper/zk.Lock that implements the DistLocker interface.

func NewLockZookeeper

func NewLockZookeeper(lock *zk.Lock) *LockZookeeper

NewLockZookeeper creates a new instance of LockZookeeper.

func (*LockZookeeper) Lock

func (l *LockZookeeper) Lock(_ context.Context) error

Lock locks the lock in Zookeeper. TODO: add context aware support once https://github.com/samuel/go-zookeeper/pull/168 is merged.

func (*LockZookeeper) Unlock

func (l *LockZookeeper) Unlock(_ context.Context) error

Unlock unlocks the lock in Zookeeper.

type Logger

type Logger interface {
	// Log logs the given arguments.
	Log(v ...interface{})
}

Logger wraps the Log method for logging.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is a garbage-collectable registry of values.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new instance of Registry.

func (*Registry) Delete

func (r *Registry) Delete(key string)

Delete deletes an item from the registry.

func (*Registry) DeleteExpired

func (r *Registry) DeleteExpired(now time.Time) int

DeleteExpired deletes expired items from the registry and returns the number of deleted items.

func (*Registry) Exists

func (r *Registry) Exists(key string) bool

Exists returns true if an item with the given key exists in the registry.

func (*Registry) GetOrCreate

func (r *Registry) GetOrCreate(key string, value func() interface{}, ttl time.Duration, now time.Time) interface{}

GetOrCreate gets an existing value by key and updates its expiration time. If the key lookup fails it creates a new value by calling the provided value closure and puts it on the queue.

func (*Registry) Len

func (r *Registry) Len() int

Len returns the number of items in the registry.

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow implements a Sliding Window rate limiting algorithm.

It does not require a distributed lock and uses a minimum amount of memory, however it will disallow all the requests in case when a client is flooding the service with requests. It's the client's responsibility to handle the disallowed request and wait before making a new request again.

func NewSlidingWindow

func NewSlidingWindow(capacity int64, rate time.Duration, slidingWindowIncrementer SlidingWindowIncrementer, clock Clock, epsilon float64) *SlidingWindow

NewSlidingWindow creates a new instance of SlidingWindow. Capacity is the maximum amount of requests allowed per window. Rate is the window size. Epsilon is the max-allowed range of difference when comparing the current weighted number of requests with capacity.

func (*SlidingWindow) Limit

func (s *SlidingWindow) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. It returns ErrLimitExhausted if the request overflows the capacity.

type SlidingWindowDynamoDB

type SlidingWindowDynamoDB struct {
	// contains filtered or unexported fields
}

SlidingWindowDynamoDB implements SlidingWindow in DynamoDB.

func NewSlidingWindowDynamoDB

func NewSlidingWindowDynamoDB(client *dynamodb.Client, partitionKey string, props DynamoDBTableProperties) *SlidingWindowDynamoDB

NewSlidingWindowDynamoDB creates a new instance of SlidingWindowDynamoDB. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * SortKey * TTL

func (*SlidingWindowDynamoDB) Increment

func (s *SlidingWindowDynamoDB) Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (int64, int64, error)

Increment increments the current window's counter in DynamoDB and returns the number of requests in the previous window and the current one.

type SlidingWindowInMemory

type SlidingWindowInMemory struct {
	// contains filtered or unexported fields
}

SlidingWindowInMemory is an in-memory implementation of SlidingWindowIncrementer.

func NewSlidingWindowInMemory

func NewSlidingWindowInMemory() *SlidingWindowInMemory

NewSlidingWindowInMemory creates a new instance of SlidingWindowInMemory.

func (*SlidingWindowInMemory) Increment

func (s *SlidingWindowInMemory) Increment(ctx context.Context, prev, curr time.Time, _ time.Duration) (int64, int64, error)

Increment increments the current window's counter and returns the number of requests in the previous window and the current one.

type SlidingWindowIncrementer

type SlidingWindowIncrementer interface {
	// Increment increments the request counter for the current window and returns the counter values for the previous
	// window and the current one.
	// TTL is the time duration before the next window.
	Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (prevCount, currCount int64, err error)
}

SlidingWindowIncrementer wraps the Increment method.

type SlidingWindowRedis

type SlidingWindowRedis struct {
	// contains filtered or unexported fields
}

SlidingWindowRedis implements SlidingWindow in Redis.

func NewSlidingWindowRedis

func NewSlidingWindowRedis(cli *redis.Client, prefix string) *SlidingWindowRedis

NewSlidingWindowRedis creates a new instance of SlidingWindowRedis.

func (*SlidingWindowRedis) Increment

func (s *SlidingWindowRedis) Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (int64, int64, error)

Increment increments the current window's counter in Redis and returns the number of requests in the previous window and the current one.

type StdLogger

type StdLogger struct{}

StdLogger implements the Logger interface.

func NewStdLogger

func NewStdLogger() *StdLogger

NewStdLogger creates a new instance of StdLogger.

func (*StdLogger) Log

func (l *StdLogger) Log(v ...interface{})

Log delegates the logging to the std logger.

type SystemClock

type SystemClock struct {
}

SystemClock implements the Clock interface by using the real system clock.

func NewSystemClock

func NewSystemClock() *SystemClock

NewSystemClock creates a new instance of SystemClock.

func (*SystemClock) Now

func (c *SystemClock) Now() time.Time

Now returns the current system time.

func (*SystemClock) Sleep

func (c *SystemClock) Sleep(d time.Duration)

Sleep blocks (sleeps) for the given duration.

type TokenBucket

type TokenBucket struct {
	// contains filtered or unexported fields
}

TokenBucket implements the https://en.wikipedia.org/wiki/Token_bucket algorithm.

func NewTokenBucket

func NewTokenBucket(capacity int64, refillRate time.Duration, locker DistLocker, tokenBucketStateBackend TokenBucketStateBackend, clock Clock, logger Logger) *TokenBucket

NewTokenBucket creates a new instance of TokenBucket.

func (*TokenBucket) Limit

func (t *TokenBucket) Limit(ctx context.Context) (time.Duration, error)

Limit takes 1 token from the bucket.

func (*TokenBucket) Take

func (t *TokenBucket) Take(ctx context.Context, tokens int64) (time.Duration, error)

Take takes tokens from the bucket.

It returns a zero duration and a nil error if the bucket has sufficient amount of tokens.

It returns ErrLimitExhausted if the amount of available tokens is less than requested. In this case the returned duration is the amount of time to wait to retry the request.

type TokenBucketDynamoDB

type TokenBucketDynamoDB struct {
	// contains filtered or unexported fields
}

TokenBucketDynamoDB is a DynamoDB implementation of a TokenBucketStateBackend.

func NewTokenBucketDynamoDB

func NewTokenBucketDynamoDB(client *dynamodb.Client, partitionKey string, tableProps DynamoDBTableProperties, ttl time.Duration, raceCheck bool) *TokenBucketDynamoDB

NewTokenBucketDynamoDB creates a new TokenBucketDynamoDB instance. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * TTL

TTL is the TTL of the stored item.

If raceCheck is true and the item in DynamoDB are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*TokenBucketDynamoDB) SetState

func (t *TokenBucketDynamoDB) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state in DynamoDB.

func (*TokenBucketDynamoDB) State

State gets the bucket's state from DynamoDB.

type TokenBucketEtcd

type TokenBucketEtcd struct {
	// contains filtered or unexported fields
}

TokenBucketEtcd is an etcd implementation of a TokenBucketStateBackend.

See https://github.com/etcd-io/etcd/blob/master/Documentation/learning/data_model.md

etcd is designed to reliably store infrequently updated data, thus it should only be used for the API endpoints which are accessed less frequently than it can be processed by the rate limiter.

Aggressive compaction and defragmentation has to be enabled in etcd to prevent the size of the storage to grow indefinitely: every change of the state of the bucket (every access) will create a new revision in etcd.

It probably makes it impractical for the high load cases, but can be used to reliably and precisely rate limit an access to the business critical endpoints where each access must be reliably logged.

func NewTokenBucketEtcd

func NewTokenBucketEtcd(cli *clientv3.Client, prefix string, ttl time.Duration, raceCheck bool) *TokenBucketEtcd

NewTokenBucketEtcd creates a new TokenBucketEtcd instance. Prefix is used as an etcd key prefix for all keys stored in etcd by this algorithm. TTL is a TTL of the etcd lease in seconds used to store all the keys: all the keys are automatically deleted after the TTL expires.

If raceCheck is true and the keys in etcd are modified in between State() and SetState() calls then ErrRaceCondition is returned. It does not add any significant overhead as it can be trivially checked on etcd side before updating the keys.

func (*TokenBucketEtcd) SetState

func (t *TokenBucketEtcd) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state of the bucket.

func (*TokenBucketEtcd) State

State gets the bucket's current state from etcd. If there is no state available in etcd then the initial bucket's state is returned.

type TokenBucketInMemory

type TokenBucketInMemory struct {
	// contains filtered or unexported fields
}

TokenBucketInMemory is an in-memory implementation of TokenBucketStateBackend.

The state is not shared nor persisted so it won't survive restarts or failures. Due to the local nature of the state the rate at which some endpoints are accessed can't be reliably predicted or limited.

Although it can be used as a global rate limiter with a round-robin load-balancer.

func NewTokenBucketInMemory

func NewTokenBucketInMemory() *TokenBucketInMemory

NewTokenBucketInMemory creates a new instance of TokenBucketInMemory.

func (*TokenBucketInMemory) SetState

func (t *TokenBucketInMemory) SetState(ctx context.Context, state TokenBucketState) error

SetState sets the current bucket's state.

func (*TokenBucketInMemory) State

State returns the current bucket's state.

type TokenBucketRedis

type TokenBucketRedis struct {
	// contains filtered or unexported fields
}

TokenBucketRedis is a Redis implementation of a TokenBucketStateBackend.

Redis is an in-memory key-value data storage which also supports persistence. It is a better choice for high load cases than etcd as it does not keep old values of the keys thus does not need the compaction/defragmentation.

Although depending on a persistence and a cluster configuration some data might be lost in case of a failure resulting in an under-limiting the accesses to the service.

func NewTokenBucketRedis

func NewTokenBucketRedis(cli *redis.Client, prefix string, ttl time.Duration, raceCheck bool) *TokenBucketRedis

NewTokenBucketRedis creates a new TokenBucketRedis instance. Prefix is the key prefix used to store all the keys used in this implementation in Redis. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Redis are modified in between State() and SetState() calls then ErrRaceCondition is returned. This adds an extra overhead since a Lua script has to be executed on the Redis side which locks the entire database.

func (*TokenBucketRedis) SetState

func (t *TokenBucketRedis) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state in Redis.

func (*TokenBucketRedis) State

State gets the bucket's state from Redis.

type TokenBucketState

type TokenBucketState struct {
	// Last is the last time the state was updated (Unix timestamp in nanoseconds).
	Last int64
	// Available is the number of available tokens in the bucket.
	Available int64
}

TokenBucketState represents a state of a token bucket.

type TokenBucketStateBackend

type TokenBucketStateBackend interface {
	// State gets the current state of the TokenBucket.
	State(ctx context.Context) (TokenBucketState, error)
	// SetState sets (persists) the current state of the TokenBucket.
	SetState(ctx context.Context, state TokenBucketState) error
}

TokenBucketStateBackend interface encapsulates the logic of retrieving and persisting the state of a TokenBucket.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL