limiters

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2024 License: MIT Imports: 31 Imported by: 2

README

Distributed rate limiters for Golang

Build Status codecov Go Report Card GoDoc

Rate limiters for distributed applications in Golang with configurable back-ends and distributed locks.
Any types of back-ends and locks can be used that implement certain minimalistic interfaces. Most common implementations are already provided.

  • Token bucket

    • in-memory (local)
    • redis
    • memcached
    • etcd
    • dynamodb

    Allows requests at a certain input rate with possible bursts configured by the capacity parameter.
    The output rate equals to the input rate.
    Precise (no over or under-limiting), but requires a lock (provided).

  • Leaky bucket

    • in-memory (local)
    • redis
    • memcached
    • etcd
    • dynamodb

    Puts requests in a FIFO queue to be processed at a constant rate.
    There are no restrictions on the input rate except for the capacity of the queue.
    Requires a lock (provided).

  • Fixed window counter

    • in-memory (local)
    • redis
    • memcached
    • dynamodb

    Simple and resources efficient algorithm that does not need a lock.
    Precision may be adjusted by the size of the window.
    May be lenient when there are many requests around the boundary between 2 adjacent windows.

  • Sliding window counter

    • in-memory (local)
    • redis
    • memcached
    • dynamodb

    Smoothes out the bursts around the boundary between 2 adjacent windows.
    Needs as twice more memory as the Fixed Window algorithm (2 windows instead of 1 at a time).
    It will disallow all the requests in case when a client is flooding the service with requests. It's the client's responsibility to handle a disallowed request properly: wait before making a new one again.

  • Concurrent buffer

    • in-memory (local)
    • redis
    • memcached

    Allows concurrent requests up to the given capacity.
    Requires a lock (provided).

gRPC example

Global token bucket rate limiter for a gRPC service example:

// examples/example_grpc_simple_limiter_test.go
rate := time.Second * 3
limiter := limiters.NewTokenBucket(
    2,
    rate,
    limiters.NewLockerEtcd(etcdClient, "/ratelimiter_lock/simple/", limiters.NewStdLogger()),
    limiters.NewTokenBucketRedis(
        redisClient,
        "ratelimiter/simple",
        rate, false),
    limiters.NewSystemClock(), limiters.NewStdLogger(),
)

// Add a unary interceptor middleware to rate limit all requests.
s := grpc.NewServer(grpc.UnaryInterceptor(
    func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        w, err := limiter.Limit(ctx)
        if err == limiters.ErrLimitExhausted {
            return nil, status.Errorf(codes.ResourceExhausted, "try again later in %s", w)
        } else if err != nil {
            // The limiter failed. This error should be logged and examined.
            log.Println(err)
            return nil, status.Error(codes.Internal, "internal error")
        }
        return handler(ctx, req)
    }))

For something close to a real world example see the IP address based gRPC global rate limiter in the examples directory.

DynamoDB

The use of DynamoDB requires the creation of a DynamoDB Table prior to use. An existing table can be used or a new one can be created. Depending on the limiter backend:

  • Partion Key
    • String
    • Required for all Backends
  • Sort Key
    • String
    • Backends:
      • FixedWindow
      • SlidingWindow
  • TTL
    • Number
    • Backends:
      • FixedWindow
      • SlidingWindow
      • LeakyBucket
      • TokenBucket

All DynamoDB backends accept a DynamoDBTableProperties struct as a paramater. This can be manually created or use the LoadDynamoDBTableProperties with the table name. When using LoadDynamoDBTableProperties, the table description is fetched from AWS and verified that the table can be used for Limiter backends. Results of LoadDynamoDBTableProperties are cached.

Distributed locks

Some algorithms require a distributed lock to guarantee consistency during concurrent requests.
In case there is only 1 running application instance then no distributed lock is needed as all the algorithms are thread-safe (use LockNoop).

Supported backends:

Memcached

It's important to understand that memcached is not ideal for implementing reliable locks or data persistence due to its inherent limitations:

  • No guaranteed data retention: Memcached can evict data at any point due to memory pressure, even if it appears to have space available. This can lead to unexpected lock releases or data loss.
  • Lack of distributed locking features: Memcached doesn't offer functionalities like distributed coordination required for consistent locking across multiple servers.

If memcached exists already and it is okay to handle burst traffic caused by unexpected evicted data, Memcached-based implementations are convenient, otherwise Redis-based implementations will be better choices.

Testing

Run tests locally:

make

Documentation

Overview

Package limiters provides general purpose rate limiter implementations.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrLimitExhausted is returned by the Limiter in case the number of requests overflows the capacity of a Limiter.
	ErrLimitExhausted = errors.New("requests limit exhausted")

	// ErrRaceCondition is returned when there is a race condition while saving a state of a rate limiter.
	ErrRaceCondition = errors.New("race condition detected")
)

Functions

This section is empty.

Types

type Clock

type Clock interface {
	// Now returns the current system time.
	Now() time.Time
}

Clock encapsulates a system Clock. Used

type ConcurrentBuffer added in v0.0.2

type ConcurrentBuffer struct {
	// contains filtered or unexported fields
}

ConcurrentBuffer implements a limiter that allows concurrent requests up to the given capacity.

func NewConcurrentBuffer added in v0.0.2

func NewConcurrentBuffer(locker DistLocker, concurrentStateBackend ConcurrentBufferBackend, capacity int64, logger Logger) *ConcurrentBuffer

NewConcurrentBuffer creates a new ConcurrentBuffer instance.

func (*ConcurrentBuffer) Done added in v0.0.2

func (c *ConcurrentBuffer) Done(ctx context.Context, key string) error

Done removes the request identified by the key from the buffer.

func (*ConcurrentBuffer) Limit added in v0.0.2

func (c *ConcurrentBuffer) Limit(ctx context.Context, key string) error

Limit puts the request identified by the key in a buffer.

type ConcurrentBufferBackend added in v0.0.2

type ConcurrentBufferBackend interface {
	// Add adds the request with the given key to the buffer and returns the total number of requests in it.
	Add(ctx context.Context, key string) (int64, error)
	// Remove removes the request from the buffer.
	Remove(ctx context.Context, key string) error
}

ConcurrentBufferBackend wraps the Add and Remove methods.

type ConcurrentBufferInMemory added in v0.0.2

type ConcurrentBufferInMemory struct {
	// contains filtered or unexported fields
}

ConcurrentBufferInMemory is an in-memory implementation of ConcurrentBufferBackend.

func NewConcurrentBufferInMemory added in v0.0.2

func NewConcurrentBufferInMemory(registry *Registry, ttl time.Duration, clock Clock) *ConcurrentBufferInMemory

NewConcurrentBufferInMemory creates a new instance of ConcurrentBufferInMemory. When the TTL of a key exceeds the key is removed from the buffer. This is needed in case if the process that added that key to the buffer did not call Done() for some reason.

func (*ConcurrentBufferInMemory) Add added in v0.0.2

Add adds the request with the given key to the buffer and returns the total number of requests in it. It also removes the keys with expired TTL.

func (*ConcurrentBufferInMemory) Remove added in v0.0.2

Remove removes the request from the buffer.

type ConcurrentBufferMemcached added in v1.3.0

type ConcurrentBufferMemcached struct {
	// contains filtered or unexported fields
}

ConcurrentBufferMemcached implements ConcurrentBufferBackend in Memcached.

func NewConcurrentBufferMemcached added in v1.3.0

func NewConcurrentBufferMemcached(cli *memcache.Client, key string, ttl time.Duration, clock Clock) *ConcurrentBufferMemcached

NewConcurrentBufferMemcached creates a new instance of ConcurrentBufferMemcached. When the TTL of a key exceeds the key is removed from the buffer. This is needed in case if the process that added that key to the buffer did not call Done() for some reason.

func (*ConcurrentBufferMemcached) Add added in v1.3.0

func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (int64, error)

Add adds the request with the given key to the slice in Memcached and returns the total number of requests in it. It also removes the keys with expired TTL.

func (*ConcurrentBufferMemcached) Remove added in v1.3.0

Remove removes the request identified by the key from the slice in Memcached.

type ConcurrentBufferRedis added in v0.0.2

type ConcurrentBufferRedis struct {
	// contains filtered or unexported fields
}

ConcurrentBufferRedis implements ConcurrentBufferBackend in Redis.

func NewConcurrentBufferRedis added in v0.0.2

func NewConcurrentBufferRedis(cli redis.UniversalClient, key string, ttl time.Duration, clock Clock) *ConcurrentBufferRedis

NewConcurrentBufferRedis creates a new instance of ConcurrentBufferRedis. When the TTL of a key exceeds the key is removed from the buffer. This is needed in case if the process that added that key to the buffer did not call Done() for some reason.

func (*ConcurrentBufferRedis) Add added in v0.0.2

func (c *ConcurrentBufferRedis) Add(ctx context.Context, key string) (int64, error)

Add adds the request with the given key to the sorted set in Redis and returns the total number of requests in it. It also removes the keys with expired TTL.

func (*ConcurrentBufferRedis) Remove added in v0.0.2

func (c *ConcurrentBufferRedis) Remove(ctx context.Context, key string) error

Remove removes the request identified by the key from the sorted set in Redis.

type DistLocker added in v0.0.2

type DistLocker interface {
	// Lock locks the locker.
	Lock(ctx context.Context) error
	// Unlock unlocks the previously successfully locked lock.
	Unlock(ctx context.Context) error
}

DistLocker is a context aware distributed locker (interface is similar to sync.Locker).

type DynamoDBTableProperties added in v1.1.0

type DynamoDBTableProperties struct {
	//TableName is the name of the table.
	TableName string
	//PartitionKeyName is the name of the PartitionKey attribute.
	PartitionKeyName string
	//SortKeyName is the name of the SortKey attribute.
	SortKeyName string
	//SortKeyUsed indicates if a SortKey is present on the table.
	SortKeyUsed bool
	//TTLFieldName is the name of the attribute configured for TTL.
	TTLFieldName string
}

DynamoDBTableProperties are supplied to DynamoDB limiter backends. This struct informs the backend what the name of the table is and what the names of the key fields are.

func LoadDynamoDBTableProperties added in v1.1.0

func LoadDynamoDBTableProperties(ctx context.Context, client *dynamodb.Client, tableName string) (DynamoDBTableProperties, error)

LoadDynamoDBTableProperties fetches a table description with the supplied client and returns a DynamoDBTableProperties struct

type FixedWindow

type FixedWindow struct {
	// contains filtered or unexported fields
}

FixedWindow implements a Fixed Window rate limiting algorithm.

Simple and memory efficient algorithm that does not need a distributed lock. However it may be lenient when there are many requests around the boundary between 2 adjacent windows.

func NewFixedWindow

func NewFixedWindow(capacity int64, rate time.Duration, fixedWindowIncrementer FixedWindowIncrementer, clock Clock) *FixedWindow

NewFixedWindow creates a new instance of FixedWindow. Capacity is the maximum amount of requests allowed per window. Rate is the window size.

func (*FixedWindow) Limit

func (f *FixedWindow) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. It returns ErrLimitExhausted if the request overflows the window's capacity.

type FixedWindowDynamoDB added in v1.1.0

type FixedWindowDynamoDB struct {
	// contains filtered or unexported fields
}

FixedWindowDynamoDB implements FixedWindow in DynamoDB.

func NewFixedWindowDynamoDB added in v1.1.0

func NewFixedWindowDynamoDB(client *dynamodb.Client, partitionKey string, props DynamoDBTableProperties) *FixedWindowDynamoDB

NewFixedWindowDynamoDB creates a new instance of FixedWindowDynamoDB. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * SortKey * TTL

func (*FixedWindowDynamoDB) Increment added in v1.1.0

func (f *FixedWindowDynamoDB) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)

Increment increments the window's counter in DynamoDB.

type FixedWindowInMemory

type FixedWindowInMemory struct {
	// contains filtered or unexported fields
}

FixedWindowInMemory is an in-memory implementation of FixedWindowIncrementer.

func NewFixedWindowInMemory

func NewFixedWindowInMemory() *FixedWindowInMemory

NewFixedWindowInMemory creates a new instance of FixedWindowInMemory.

func (*FixedWindowInMemory) Increment

func (f *FixedWindowInMemory) Increment(ctx context.Context, window time.Time, _ time.Duration) (int64, error)

Increment increments the window's counter.

type FixedWindowIncrementer

type FixedWindowIncrementer interface {
	// Increment increments the request counter for the window and returns the counter value.
	// TTL is the time duration before the next window.
	Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)
}

FixedWindowIncrementer wraps the Increment method.

type FixedWindowMemcached added in v1.3.0

type FixedWindowMemcached struct {
	// contains filtered or unexported fields
}

FixedWindowMemcached implements FixedWindow in Memcached.

func NewFixedWindowMemcached added in v1.3.0

func NewFixedWindowMemcached(cli *memcache.Client, prefix string) *FixedWindowMemcached

NewFixedWindowMemcached returns a new instance of FixedWindowMemcached. Prefix is the key prefix used to store all the keys used in this implementation in Memcached.

func (*FixedWindowMemcached) Increment added in v1.3.0

func (f *FixedWindowMemcached) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)

Increment increments the window's counter in Memcached.

type FixedWindowRedis

type FixedWindowRedis struct {
	// contains filtered or unexported fields
}

FixedWindowRedis implements FixedWindow in Redis.

func NewFixedWindowRedis

func NewFixedWindowRedis(cli redis.UniversalClient, prefix string) *FixedWindowRedis

NewFixedWindowRedis returns a new instance of FixedWindowRedis. Prefix is the key prefix used to store all the keys used in this implementation in Redis.

func (*FixedWindowRedis) Increment

func (f *FixedWindowRedis) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)

Increment increments the window's counter in Redis.

type LeakyBucket

type LeakyBucket struct {
	// contains filtered or unexported fields
}

LeakyBucket implements the https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue algorithm.

func NewLeakyBucket

func NewLeakyBucket(capacity int64, rate time.Duration, locker DistLocker, leakyBucketStateBackend LeakyBucketStateBackend, clock Clock, logger Logger) *LeakyBucket

NewLeakyBucket creates a new instance of LeakyBucket.

func (*LeakyBucket) Limit

func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. If the last request happened earlier than the rate this method returns zero duration. It returns ErrLimitExhausted if the the request overflows the bucket's capacity. In this case the returned duration means how long it would have taken to wait for the request to be processed if the bucket was not overflowed.

type LeakyBucketDynamoDB added in v1.1.0

type LeakyBucketDynamoDB struct {
	// contains filtered or unexported fields
}

LeakyBucketDynamoDB is a DyanamoDB implementation of a LeakyBucketStateBackend.

func NewLeakyBucketDynamoDB added in v1.1.0

func NewLeakyBucketDynamoDB(client *dynamodb.Client, partitionKey string, tableProps DynamoDBTableProperties, ttl time.Duration, raceCheck bool) *LeakyBucketDynamoDB

NewLeakyBucketDynamoDB creates a new LeakyBucketDynamoDB instance. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * TTL

TTL is the TTL of the stored item.

If raceCheck is true and the item in DynamoDB are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketDynamoDB) SetState added in v1.1.0

func (t *LeakyBucketDynamoDB) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state in DynamoDB.

func (*LeakyBucketDynamoDB) State added in v1.1.0

State gets the bucket's state from DynamoDB.

type LeakyBucketEtcd

type LeakyBucketEtcd struct {
	// contains filtered or unexported fields
}

LeakyBucketEtcd is an etcd implementation of a LeakyBucketStateBackend. See the TokenBucketEtcd description for the details on etcd usage.

func NewLeakyBucketEtcd

func NewLeakyBucketEtcd(cli *clientv3.Client, prefix string, ttl time.Duration, raceCheck bool) *LeakyBucketEtcd

NewLeakyBucketEtcd creates a new LeakyBucketEtcd instance. Prefix is used as an etcd key prefix for all keys stored in etcd by this algorithm. TTL is a TTL of the etcd lease used to store all the keys.

If raceCheck is true and the keys in etcd are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketEtcd) SetState

func (l *LeakyBucketEtcd) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state of the bucket in etcd.

func (*LeakyBucketEtcd) State

State gets the bucket's current state from etcd. If there is no state available in etcd then the initial bucket's state is returned.

type LeakyBucketInMemory

type LeakyBucketInMemory struct {
	// contains filtered or unexported fields
}

LeakyBucketInMemory is an in-memory implementation of LeakyBucketStateBackend.

func NewLeakyBucketInMemory

func NewLeakyBucketInMemory() *LeakyBucketInMemory

NewLeakyBucketInMemory creates a new instance of LeakyBucketInMemory.

func (*LeakyBucketInMemory) SetState

func (l *LeakyBucketInMemory) SetState(ctx context.Context, state LeakyBucketState) error

SetState sets the current state of the bucket.

func (*LeakyBucketInMemory) State

State gets the current state of the bucket.

type LeakyBucketMemcached added in v1.3.0

type LeakyBucketMemcached struct {
	// contains filtered or unexported fields
}

LeakyBucketMemcached is a Memcached implementation of a LeakyBucketStateBackend.

func NewLeakyBucketMemcached added in v1.3.0

func NewLeakyBucketMemcached(cli *memcache.Client, key string, ttl time.Duration, raceCheck bool) *LeakyBucketMemcached

NewLeakyBucketMemcached creates a new LeakyBucketMemcached instance. Key is the key used to store all the keys used in this implementation in Memcached. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Memcached are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketMemcached) SetState added in v1.3.0

func (t *LeakyBucketMemcached) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state in Memcached. The provided fencing token is checked on the Memcached side before saving the keys.

func (*LeakyBucketMemcached) State added in v1.3.0

State gets the bucket's state from Memcached.

type LeakyBucketRedis

type LeakyBucketRedis struct {
	// contains filtered or unexported fields
}

LeakyBucketRedis is a Redis implementation of a LeakyBucketStateBackend.

func NewLeakyBucketRedis

func NewLeakyBucketRedis(cli redis.UniversalClient, prefix string, ttl time.Duration, raceCheck bool) *LeakyBucketRedis

NewLeakyBucketRedis creates a new LeakyBucketRedis instance. Prefix is the key prefix used to store all the keys used in this implementation in Redis. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Redis are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketRedis) SetState

func (t *LeakyBucketRedis) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state in Redis. The provided fencing token is checked on the Redis side before saving the keys.

func (*LeakyBucketRedis) State

State gets the bucket's state from Redis.

type LeakyBucketState

type LeakyBucketState struct {
	// Last is the Unix timestamp in nanoseconds of the most recent request.
	Last int64
}

LeakyBucketState represents the state of a LeakyBucket.

func (LeakyBucketState) IzZero

func (s LeakyBucketState) IzZero() bool

IzZero returns true if the bucket state is zero valued.

type LeakyBucketStateBackend

type LeakyBucketStateBackend interface {
	// State gets the current state of the LeakyBucket.
	State(ctx context.Context) (LeakyBucketState, error)
	// SetState sets (persists) the current state of the LeakyBucket.
	SetState(ctx context.Context, state LeakyBucketState) error
}

LeakyBucketStateBackend interface encapsulates the logic of retrieving and persisting the state of a LeakyBucket.

type LockConsul added in v0.0.2

type LockConsul struct {
	// contains filtered or unexported fields
}

LockConsul is a wrapper around github.com/hashicorp/consul/api.Lock that implements the DistLocker interface.

func NewLockConsul added in v0.0.2

func NewLockConsul(lock *api.Lock) *LockConsul

NewLockConsul creates a new LockConsul instance.

func (*LockConsul) Lock added in v0.0.2

func (l *LockConsul) Lock(ctx context.Context) error

Lock locks the lock in Consul.

func (*LockConsul) Unlock added in v0.0.2

func (l *LockConsul) Unlock(_ context.Context) error

Unlock unlocks the lock in Consul.

type LockEtcd added in v0.0.2

type LockEtcd struct {
	// contains filtered or unexported fields
}

LockEtcd implements the DistLocker interface using etcd.

See https://github.com/etcd-io/etcd/blob/master/Documentation/learning/why.md#using-etcd-for-distributed-coordination

func NewLockEtcd added in v0.0.2

func NewLockEtcd(cli *clientv3.Client, prefix string, logger Logger) *LockEtcd

NewLockEtcd creates a new instance of LockEtcd.

func (*LockEtcd) Lock added in v0.0.2

func (l *LockEtcd) Lock(ctx context.Context) error

Lock creates a new session-based lock in etcd and locks it.

func (*LockEtcd) Unlock added in v0.0.2

func (l *LockEtcd) Unlock(ctx context.Context) error

Unlock unlocks the previously locked lock.

type LockMemcached added in v1.3.0

type LockMemcached struct {
	// contains filtered or unexported fields
}

LockMemcached is a wrapper around github.com/alessandro-c/gomemcached-lock that implements the DistLocker interface. It is caller's responsibility to make sure the uniqueness of mutexName, and not to use the same key in multiple Memcached-based implementations

func NewLockMemcached added in v1.3.0

func NewLockMemcached(client *memcache.Client, mutexName string) *LockMemcached

NewLockMemcached creates a new instance of LockMemcached. Default backoff is to retry every 100ms for 100 times (10 seconds).

func (*LockMemcached) Lock added in v1.3.0

func (l *LockMemcached) Lock(ctx context.Context) error

Lock locks the lock in Memcached.

func (*LockMemcached) Unlock added in v1.3.0

func (l *LockMemcached) Unlock(ctx context.Context) error

Unlock unlocks the lock in Memcached.

func (*LockMemcached) WithLockAcquireBackoff added in v1.3.0

func (l *LockMemcached) WithLockAcquireBackoff(b backoff.BackOff) *LockMemcached

WithLockAcquireBackoff sets the backoff policy for retrying an operation.

type LockNoop added in v0.0.2

type LockNoop struct {
}

LockNoop is a no-op implementation of the DistLocker interface. It should only be used with the in-memory backends as they are already thread-safe and don't need distributed locks.

func NewLockNoop added in v0.0.2

func NewLockNoop() *LockNoop

NewLockNoop creates a new LockNoop.

func (LockNoop) Lock added in v0.0.2

func (n LockNoop) Lock(ctx context.Context) error

Lock imitates locking.

func (LockNoop) Unlock added in v0.0.2

func (n LockNoop) Unlock(_ context.Context) error

Unlock does nothing.

type LockPostgreSQL added in v1.5.0

type LockPostgreSQL struct {
	// contains filtered or unexported fields
}

LockPostgreSQL is an implementation of the DistLocker interface using PostgreSQL's advisory lock.

func NewLockPostgreSQL added in v1.5.0

func NewLockPostgreSQL(db *sql.DB, id uint32) *LockPostgreSQL

NewLockPostgreSQL creates a new LockPostgreSQL.

func (*LockPostgreSQL) Lock added in v1.5.0

func (l *LockPostgreSQL) Lock(ctx context.Context) error

Lock acquire an advisory lock in PostgreSQL

func (*LockPostgreSQL) Unlock added in v1.5.0

func (l *LockPostgreSQL) Unlock(ctx context.Context) error

Unlock releases an advisory lock in PostgreSQL

type LockRedis added in v1.2.0

type LockRedis struct {
	// contains filtered or unexported fields
}

LockRedis is a wrapper around github.com/go-redsync/redsync that implements the DistLocker interface.

func NewLockRedis added in v1.2.0

func NewLockRedis(pool redsyncredis.Pool, mutexName string, options ...redsync.Option) *LockRedis

NewLockRedis creates a new instance of LockRedis.

func (*LockRedis) Lock added in v1.2.0

func (l *LockRedis) Lock(ctx context.Context) error

Lock locks the lock in Redis.

func (*LockRedis) Unlock added in v1.2.0

func (l *LockRedis) Unlock(ctx context.Context) error

Unlock unlocks the lock in Redis.

type LockZookeeper added in v0.0.2

type LockZookeeper struct {
	// contains filtered or unexported fields
}

LockZookeeper is a wrapper around github.com/samuel/go-zookeeper/zk.Lock that implements the DistLocker interface.

func NewLockZookeeper added in v0.0.2

func NewLockZookeeper(lock *zk.Lock) *LockZookeeper

NewLockZookeeper creates a new instance of LockZookeeper.

func (*LockZookeeper) Lock added in v0.0.2

func (l *LockZookeeper) Lock(_ context.Context) error

Lock locks the lock in Zookeeper. TODO: add context aware support once https://github.com/samuel/go-zookeeper/pull/168 is merged.

func (*LockZookeeper) Unlock added in v0.0.2

func (l *LockZookeeper) Unlock(_ context.Context) error

Unlock unlocks the lock in Zookeeper.

type Logger

type Logger interface {
	// Log logs the given arguments.
	Log(v ...interface{})
}

Logger wraps the Log method for logging.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is a garbage-collectable registry of values.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new instance of Registry.

func (*Registry) Delete added in v0.0.2

func (r *Registry) Delete(key string)

Delete deletes an item from the registry.

func (*Registry) DeleteExpired

func (r *Registry) DeleteExpired(now time.Time) int

DeleteExpired deletes expired items from the registry and returns the number of deleted items.

func (*Registry) Exists

func (r *Registry) Exists(key string) bool

Exists returns true if an item with the given key exists in the registry.

func (*Registry) GetOrCreate

func (r *Registry) GetOrCreate(key string, value func() interface{}, ttl time.Duration, now time.Time) interface{}

GetOrCreate gets an existing value by key and updates its expiration time. If the key lookup fails it creates a new value by calling the provided value closure and puts it on the queue.

func (*Registry) Len added in v0.0.2

func (r *Registry) Len() int

Len returns the number of items in the registry.

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow implements a Sliding Window rate limiting algorithm.

It does not require a distributed lock and uses a minimum amount of memory, however it will disallow all the requests in case when a client is flooding the service with requests. It's the client's responsibility to handle the disallowed request and wait before making a new request again.

func NewSlidingWindow

func NewSlidingWindow(capacity int64, rate time.Duration, slidingWindowIncrementer SlidingWindowIncrementer, clock Clock, epsilon float64) *SlidingWindow

NewSlidingWindow creates a new instance of SlidingWindow. Capacity is the maximum amount of requests allowed per window. Rate is the window size. Epsilon is the max-allowed range of difference when comparing the current weighted number of requests with capacity.

func (*SlidingWindow) Limit

func (s *SlidingWindow) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. It returns ErrLimitExhausted if the request overflows the capacity.

type SlidingWindowDynamoDB added in v1.1.0

type SlidingWindowDynamoDB struct {
	// contains filtered or unexported fields
}

SlidingWindowDynamoDB implements SlidingWindow in DynamoDB.

func NewSlidingWindowDynamoDB added in v1.1.0

func NewSlidingWindowDynamoDB(client *dynamodb.Client, partitionKey string, props DynamoDBTableProperties) *SlidingWindowDynamoDB

NewSlidingWindowDynamoDB creates a new instance of SlidingWindowDynamoDB. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * SortKey * TTL

func (*SlidingWindowDynamoDB) Increment added in v1.1.0

func (s *SlidingWindowDynamoDB) Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (int64, int64, error)

Increment increments the current window's counter in DynamoDB and returns the number of requests in the previous window and the current one.

type SlidingWindowInMemory

type SlidingWindowInMemory struct {
	// contains filtered or unexported fields
}

SlidingWindowInMemory is an in-memory implementation of SlidingWindowIncrementer.

func NewSlidingWindowInMemory

func NewSlidingWindowInMemory() *SlidingWindowInMemory

NewSlidingWindowInMemory creates a new instance of SlidingWindowInMemory.

func (*SlidingWindowInMemory) Increment

func (s *SlidingWindowInMemory) Increment(ctx context.Context, prev, curr time.Time, _ time.Duration) (int64, int64, error)

Increment increments the current window's counter and returns the number of requests in the previous window and the current one.

type SlidingWindowIncrementer

type SlidingWindowIncrementer interface {
	// Increment increments the request counter for the current window and returns the counter values for the previous
	// window and the current one.
	// TTL is the time duration before the next window.
	Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (prevCount, currCount int64, err error)
}

SlidingWindowIncrementer wraps the Increment method.

type SlidingWindowMemcached added in v1.3.0

type SlidingWindowMemcached struct {
	// contains filtered or unexported fields
}

SlidingWindowMemcached implements SlidingWindow in Memcached.

func NewSlidingWindowMemcached added in v1.3.0

func NewSlidingWindowMemcached(cli *memcache.Client, prefix string) *SlidingWindowMemcached

NewSlidingWindowMemcached creates a new instance of SlidingWindowMemcached.

func (*SlidingWindowMemcached) Increment added in v1.3.0

func (s *SlidingWindowMemcached) Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (int64, int64, error)

Increment increments the current window's counter in Memcached and returns the number of requests in the previous window and the current one.

type SlidingWindowRedis

type SlidingWindowRedis struct {
	// contains filtered or unexported fields
}

SlidingWindowRedis implements SlidingWindow in Redis.

func NewSlidingWindowRedis

func NewSlidingWindowRedis(cli redis.UniversalClient, prefix string) *SlidingWindowRedis

NewSlidingWindowRedis creates a new instance of SlidingWindowRedis.

func (*SlidingWindowRedis) Increment

func (s *SlidingWindowRedis) Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (int64, int64, error)

Increment increments the current window's counter in Redis and returns the number of requests in the previous window and the current one.

type SortedSetNode added in v1.3.0

type SortedSetNode struct {
	CreatedAt int64
	Value     string
}

type StdLogger

type StdLogger struct{}

StdLogger implements the Logger interface.

func NewStdLogger

func NewStdLogger() *StdLogger

NewStdLogger creates a new instance of StdLogger.

func (*StdLogger) Log

func (l *StdLogger) Log(v ...interface{})

Log delegates the logging to the std logger.

type SystemClock

type SystemClock struct {
}

SystemClock implements the Clock interface by using the real system clock.

func NewSystemClock

func NewSystemClock() *SystemClock

NewSystemClock creates a new instance of SystemClock.

func (*SystemClock) Now

func (c *SystemClock) Now() time.Time

Now returns the current system time.

func (*SystemClock) Sleep

func (c *SystemClock) Sleep(d time.Duration)

Sleep blocks (sleeps) for the given duration.

type TokenBucket

type TokenBucket struct {
	// contains filtered or unexported fields
}

TokenBucket implements the https://en.wikipedia.org/wiki/Token_bucket algorithm.

func NewTokenBucket

func NewTokenBucket(capacity int64, refillRate time.Duration, locker DistLocker, tokenBucketStateBackend TokenBucketStateBackend, clock Clock, logger Logger) *TokenBucket

NewTokenBucket creates a new instance of TokenBucket.

func (*TokenBucket) Limit

func (t *TokenBucket) Limit(ctx context.Context) (time.Duration, error)

Limit takes 1 token from the bucket.

func (*TokenBucket) Take

func (t *TokenBucket) Take(ctx context.Context, tokens int64) (time.Duration, error)

Take takes tokens from the bucket.

It returns a zero duration and a nil error if the bucket has sufficient amount of tokens.

It returns ErrLimitExhausted if the amount of available tokens is less than requested. In this case the returned duration is the amount of time to wait to retry the request.

type TokenBucketDynamoDB added in v1.1.0

type TokenBucketDynamoDB struct {
	// contains filtered or unexported fields
}

TokenBucketDynamoDB is a DynamoDB implementation of a TokenBucketStateBackend.

func NewTokenBucketDynamoDB added in v1.1.0

func NewTokenBucketDynamoDB(client *dynamodb.Client, partitionKey string, tableProps DynamoDBTableProperties, ttl time.Duration, raceCheck bool) *TokenBucketDynamoDB

NewTokenBucketDynamoDB creates a new TokenBucketDynamoDB instance. PartitionKey is the key used to store all the this implementation in DynamoDB.

TableProps describe the table that this backend should work with. This backend requires the following on the table: * TTL

TTL is the TTL of the stored item.

If raceCheck is true and the item in DynamoDB are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*TokenBucketDynamoDB) SetState added in v1.1.0

func (t *TokenBucketDynamoDB) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state in DynamoDB.

func (*TokenBucketDynamoDB) State added in v1.1.0

State gets the bucket's state from DynamoDB.

type TokenBucketEtcd

type TokenBucketEtcd struct {
	// contains filtered or unexported fields
}

TokenBucketEtcd is an etcd implementation of a TokenBucketStateBackend.

See https://github.com/etcd-io/etcd/blob/master/Documentation/learning/data_model.md

etcd is designed to reliably store infrequently updated data, thus it should only be used for the API endpoints which are accessed less frequently than it can be processed by the rate limiter.

Aggressive compaction and defragmentation has to be enabled in etcd to prevent the size of the storage to grow indefinitely: every change of the state of the bucket (every access) will create a new revision in etcd.

It probably makes it impractical for the high load cases, but can be used to reliably and precisely rate limit an access to the business critical endpoints where each access must be reliably logged.

func NewTokenBucketEtcd

func NewTokenBucketEtcd(cli *clientv3.Client, prefix string, ttl time.Duration, raceCheck bool) *TokenBucketEtcd

NewTokenBucketEtcd creates a new TokenBucketEtcd instance. Prefix is used as an etcd key prefix for all keys stored in etcd by this algorithm. TTL is a TTL of the etcd lease in seconds used to store all the keys: all the keys are automatically deleted after the TTL expires.

If raceCheck is true and the keys in etcd are modified in between State() and SetState() calls then ErrRaceCondition is returned. It does not add any significant overhead as it can be trivially checked on etcd side before updating the keys.

func (*TokenBucketEtcd) SetState

func (t *TokenBucketEtcd) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state of the bucket.

func (*TokenBucketEtcd) State

State gets the bucket's current state from etcd. If there is no state available in etcd then the initial bucket's state is returned.

type TokenBucketInMemory

type TokenBucketInMemory struct {
	// contains filtered or unexported fields
}

TokenBucketInMemory is an in-memory implementation of TokenBucketStateBackend.

The state is not shared nor persisted so it won't survive restarts or failures. Due to the local nature of the state the rate at which some endpoints are accessed can't be reliably predicted or limited.

Although it can be used as a global rate limiter with a round-robin load-balancer.

func NewTokenBucketInMemory

func NewTokenBucketInMemory() *TokenBucketInMemory

NewTokenBucketInMemory creates a new instance of TokenBucketInMemory.

func (*TokenBucketInMemory) SetState

func (t *TokenBucketInMemory) SetState(ctx context.Context, state TokenBucketState) error

SetState sets the current bucket's state.

func (*TokenBucketInMemory) State

State returns the current bucket's state.

type TokenBucketMemcached added in v1.3.0

type TokenBucketMemcached struct {
	// contains filtered or unexported fields
}

TokenBucketMemcached is a Memcached implementation of a TokenBucketStateBackend.

Memcached is a distributed memory object caching system.

func NewTokenBucketMemcached added in v1.3.0

func NewTokenBucketMemcached(cli *memcache.Client, key string, ttl time.Duration, raceCheck bool) *TokenBucketMemcached

NewTokenBucketMemcached creates a new TokenBucketMemcached instance. Key is the key used to store all the keys used in this implementation in Memcached. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Memcached are modified in between State() and SetState() calls then ErrRaceCondition is returned. This adds an extra overhead since a Lua script has to be executed on the Memcached side which locks the entire database.

func (*TokenBucketMemcached) SetState added in v1.3.0

func (t *TokenBucketMemcached) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state in Memcached.

func (*TokenBucketMemcached) State added in v1.3.0

State gets the bucket's state from Memcached.

type TokenBucketRedis

type TokenBucketRedis struct {
	// contains filtered or unexported fields
}

TokenBucketRedis is a Redis implementation of a TokenBucketStateBackend.

Redis is an in-memory key-value data storage which also supports persistence. It is a better choice for high load cases than etcd as it does not keep old values of the keys thus does not need the compaction/defragmentation.

Although depending on a persistence and a cluster configuration some data might be lost in case of a failure resulting in an under-limiting the accesses to the service.

func NewTokenBucketRedis

func NewTokenBucketRedis(cli redis.UniversalClient, prefix string, ttl time.Duration, raceCheck bool) *TokenBucketRedis

NewTokenBucketRedis creates a new TokenBucketRedis instance. Prefix is the key prefix used to store all the keys used in this implementation in Redis. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Redis are modified in between State() and SetState() calls then ErrRaceCondition is returned. This adds an extra overhead since a Lua script has to be executed on the Redis side which locks the entire database.

func (*TokenBucketRedis) SetState

func (t *TokenBucketRedis) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state in Redis.

func (*TokenBucketRedis) State

State gets the bucket's state from Redis.

type TokenBucketState

type TokenBucketState struct {
	// Last is the last time the state was updated (Unix timestamp in nanoseconds).
	Last int64
	// Available is the number of available tokens in the bucket.
	Available int64
}

TokenBucketState represents a state of a token bucket.

type TokenBucketStateBackend

type TokenBucketStateBackend interface {
	// State gets the current state of the TokenBucket.
	State(ctx context.Context) (TokenBucketState, error)
	// SetState sets (persists) the current state of the TokenBucket.
	SetState(ctx context.Context, state TokenBucketState) error
}

TokenBucketStateBackend interface encapsulates the logic of retrieving and persisting the state of a TokenBucket.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL