golimiter

package module
v0.0.0-...-7baf738 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2019 License: BSD-3-Clause Imports: 14 Imported by: 0

README

golimiter

A rate limiter for Go that uses a fixed-window approach to tracking rates. Each incoming request increments the counter of the window. If the counter exceeds the limit, the request is denied.

Counter storage: golimiter's implementation uses an in memory store (LRU with expiry and a bound upper limit of entries). It also also includes a distributed counter store using Redis. For performance reasons, this distributed store will be eventually consistent with the in memory store. The centralized store will be updated using a set-then-get approach, relying on atomic Redis operators to eliminate race conditions under high-concurrency. The central store is optional... if it's not configured, then golimiter is happy to just keep working by only using it's in memory store.

Middleware

Golimiter includes Gin middleware and an example how-to service.

Installation

$ go get github.com/Bose/golimiter

You'll also want to install go-cache for counter storage.

$ go get "github.com/Bose/go-cache

Benchmarks (for the rate limiter itself)

$ ./run-benchmarks.sh 
goos: darwin
goarch: amd64
pkg: github.com/Bose/golimiter
BenchmarkIncrInMemory1-8    	 1000000	      1903 ns/op
BenchmarkIncrInMemory2-8    	 1000000	      1910 ns/op
BenchmarkIncrInMemory3-8    	 1000000	      1929 ns/op
BenchmarkIncrInMemory10-8   	 1000000	      1912 ns/op
BenchmarkIncrInMemory20-8   	 1000000	      1912 ns/op
BenchmarkIncrInMemory40-8   	 1000000	      1909 ns/op
PASS
ok  	github.com/Bose/golimiter	12.620s

Usage (general)

package main

// Create a rate with the given limit (number of requests) for the given
// period (a time.Duration of your choice).
import (
	"context"
	"fmt"
	"os"
	"time"

	goCache "github.com/Bose/go-cache"
	"github.com/Bose/golimiter"
	"github.com/sirupsen/logrus"
)

const (
	maxEntriesInMemory    = 100
	redisTestServer       = "redis://localhost:6379"
	sentinelTestServer    = "redis://localhost:26379"
	redisMasterIdentifier = "mymaster"
	sharedSecret          = "test-secret-must"
	useSentinel           = true
	defExpSeconds         = 0
	//myEnv                 = "local"
	maxConnectionsAllowed = 5
	redisOpsTimeout       = 50  // millisecods
	redisConnTimeout      = 500 // milliseconds
)

func main() {
	logrus.SetLevel(logrus.DebugLevel)
	logger := logrus.WithFields(logrus.Fields{"method": "main"})

	rate := golimiter.Rate{
		Period: 1 * time.Hour,
		Limit:  1000,
	}

	logger.Debug("setting envVar TRACE_SYNC= true will turn-on trace logging to the central store for the limiters, you must call this before creating any limiters")
	os.Setenv("TRACE_SYNC", "true")

	ctx := context.Background()

	rateStore := golimiter.New(golimiter.NewInMemoryLimiterStore(maxEntriesInMemory, golimiter.WithLimiterExpiration(rate.Period)), rate)

	c, err := rateStore.Get(ctx, "test-rate-object")
	if err != nil {
		panic(err)
	}
	fmt.Printf("Limit: %d\nReached: %v\nRemaining: %d\nReset: %d\n", c.Limit, c.Reached, c.Remaining, c.Reset)

	// You can also use the simplified format "<limit>-<duration>-<period>"", with the given
	// periods:
	//
	// * "S": second
	// * "M": minute
	// * "H": hour
	//
	// You can also add an optional delay to the format "<limit>-<duration>-<period>-<delay>":
	// "1-1-S-10"  represents 1 req in 1 second with 10ms delay
	//
	// Examples:
	//
	// * 5 reqs in 10 seconds: "5-10-S"
	// * 10 reqs in 5 minutes: "10-5-M"
	// * 1000 reqs in 1 hour: "1000-1-H"
	// * 5 reqs in 10 seconds with 20ms delay: "5-10-S-20"
	//

	rate, err = golimiter.NewRateFromFormatted("1000-1-H")
	if err != nil {
		panic(err)

	}

	// let's make a limiter that uses redis for cross process coordiation
	store := golimiter.NewInMemoryLimiterStore(maxEntriesInMemory, golimiter.WithLimiterExpiration(rate.Period))
	store.DistributedRedisCache = setupRedisCentralStore(logger)
	rateStore = golimiter.New(store, rate)
	c, err = rateStore.Get(ctx, "test-rate-object")
	if err != nil {
		panic(err)
	}
	fmt.Printf("Limit: %d\nReached: %v\nRemaining: %d\nReset: %d\nDelay: %d\n", c.Limit, c.Reached, c.Remaining, c.Reset, c.Delay)
	time.Sleep(500 * time.Millisecond) // since it's eventually consistent with redis, you need to give it a sec to sync

	rateWithDelay, err := golimiter.NewRateFromFormatted("1000-1-H-1000")
	if err != nil {
		panic(err)

	}
	rateStore = golimiter.New(store, rateWithDelay)
	c, err = rateStore.Get(ctx, "test-rate-object")
	if err != nil {
		panic(err)
	}
	fmt.Printf("Limit: %d\nReached: %v\nRemaining: %d\nReset: %d\nDelay: %d\n", c.Limit, c.Reached, c.Remaining, c.Reset, c.Delay)
	time.Sleep(500 * time.Millisecond) // since it's eventually consistent with redis, you need to give it a sec to sync

}

func setupRedisCentralStore(logger *logrus.Entry) *goCache.GenericCache {
	os.Setenv("NO_REDIS_PASSWORD", "true")
	// os.Setenv("REDIS_READ_ONLY_ADDRESS","redis://localhost:6379")
	// os.Setenv("REDIS_SENTINEL_ADDRESS", "redis://localhost:26379")
	selectDatabase := 3
	useSentinel := true
	cacheWritePool, err := goCache.InitRedisCache(useSentinel, defExpSeconds, nil, defExpSeconds, defExpSeconds, selectDatabase, logger)
	if err != nil {
		logrus.Errorf("couldn't connect to redis on %s", redisTestServer)
		panic("")
	}
	logger.Info("cacheWritePool initialized")
	readOnlyPool, err := goCache.InitReadOnlyRedisCache("redis://localhost:6379", "", redisOpsTimeout, redisConnTimeout, defExpSeconds, maxConnectionsAllowed, selectDatabase, logger)
	if err != nil {
		logrus.Errorf("couldn't connect to redis on %s", redisTestServer)
		panic("")
	}
	logger.Info("cacheReadPool initialized")
	return goCache.NewCacheWithMultiPools(cacheWritePool, readOnlyPool, goCache.L2, sharedSecret, defExpSeconds, []byte("test"), false)
}

See also:

Usage with Gin

package main

import (
	"os"

	"github.com/Bose/golimiter"

	ginlimiter "github.com/Bose/golimiter/gin"
	ginprometheus "github.com/zsais/go-gin-prometheus"

	goCache "github.com/Bose/go-cache"

	"github.com/gin-gonic/gin"
	"github.com/sirupsen/logrus"
)

const maxConnectionsAllowed = 50 // max connections allowed to the read-only redis cluster from this service
const maxEntries = 1000          // max entries allowed in the LRU + expiry in memory store

const (
	redisTestServer       = "redis://localhost:6379"
	sentinelTestServer    = "redis://localhost:26379"
	redisMasterIdentifier = "mymaster"
	sharedSecret          = "test-secret-must"
	useSentinel           = true
	defExpSeconds         = 0
	redisOpsTimeout       = 50  // millisecods
	redisConnTimeout      = 500 // milliseconds
)

func main() {
	// use the JSON formatter
	// logrus.SetFormatter(&logrus.JSONFormatter{})
	logrus.SetLevel(logrus.DebugLevel)

	r := gin.Default()
	r.Use(gin.Recovery()) // add Recovery middleware
	p := ginprometheus.NewPrometheus("go_limiter_example")
	p.Use(r)

	redisCache := setupRedisCentralStore()

	l, err := ginlimiter.NewLimiter(
		"GET::/helloworld",                     // define the GUID that identifies this rate limiter as GET on the route
		"10-2-M",                               // defines the rate limit based on a std format
		"ip-address",                           // a label that defines the type of rate limiter it is
		ginlimiter.DefaultKeyGetter,            // get the key for this request: IP, request USER identifier, etc
		ginlimiter.DefaultRateExceededReporter, // func that does all the required reporting when a rate is exceeded
		maxEntries,                             // max inmemory entries
		"constant",                             // this hint is sent back to the client: constant, exponential, etc
		1,                                      // primary error code
		2,                                      // error sub code
		"hello-world",                          // prometheus metric label for this route
		ginlimiter.NewDefaultMetric("limtier_example", "helloworld", "count times helloworld is rate limited"), // prometheus.ConterVec for rate limit exceeded
		ginlimiter.DefaultMetricIncrementer, // how-to increment the prometheus.CounterVec with the required labels
		golimiter.WithUnsyncCounterLimit(1), // use this option to limit how far the mem and central store can drift
		golimiter.WithUnsyncTimeLimit(5),    // use this option to update the central store at least every 10ms
	)
	if err != nil {
		panic("failed to create limiter " + err.Error())
	}
	l.Store.DistributedRedisCache = redisCache

	l2, err := ginlimiter.NewLimiter(
		"GET::/helloworld-delayed",             // define the GUID that identifies this rate limiter as GET on the route
		"10-2-M-1000",                          // defines the rate limit based on a std format with a delay of 1s
		"ip-address",                           // a label that defines the type of rate limiter it is
		ginlimiter.DefaultKeyGetter,            // get the key for this request: IP, request USER identifier, etc
		ginlimiter.DefaultRateExceededReporter, // func that does all the required reporting when a rate is exceeded
		maxEntries,                             // max inmemory entries
		"constant",                             // this hint is sent back to the client: constant, exponential, etc
		1,                                      // primary error code
		2,                                      // error sub code
		"hello-world",                          // prometheus metric label for this route
		ginlimiter.NewDefaultMetric("limiter_example", "helloworld_delayed", "count times helloworld is rate limited"), // prometheus.ConterVec for rate limit exceeded
		ginlimiter.DefaultMetricIncrementer) // how-to increment the prometheus.CounterVec with the required labels
	if err != nil {
		panic("failed to create limiter " + err.Error())
	}
	l2.Store.DistributedRedisCache = redisCache

	// add the rate limiter decorator (which uses a slice of rate limiters)
	r.GET("/helloworld", ginlimiter.LimitRoute([]ginlimiter.RateLimiter{l}, func(c *gin.Context) {
		c.JSON(200, gin.H{"msg": "Hello world!\n"})
	}))

	// add the rate limiter decorator (which uses a slice of rate limiters)
	r.GET("/helloworld-delayed", ginlimiter.LimitRoute([]ginlimiter.RateLimiter{l2}, func(c *gin.Context) {
		c.JSON(200, gin.H{"msg": "Hello world!\n"})
	}))
	r.GET("nolimits", func(c *gin.Context) {
		c.JSON(200, gin.H{"msg": "Hello world!\n"})
	})

	if err := r.Run(":9090"); err != nil {
		logrus.Error(err)
	}
}

func setupRedisCentralStore() *goCache.GenericCache {
	logrus.SetLevel(logrus.DebugLevel)
	logger := logrus.WithFields(logrus.Fields{"method": "newInMemoryLimiter"})

	logger.Debug("setting envVar TRACE_SYNC= true will turn-on trace logging to the central store for the limiters, you must call this before creating any limiters")
	os.Setenv("TRACE_SYNC", "true")

	os.Setenv("NO_REDIS_PASSWORD", "true")
	// os.Setenv("REDIS_READ_ONLY_ADDRESS","redis://localhost:6379")
	// os.Setenv("REDIS_SENTINEL_ADDRESS", "redis://localhost:26379")
	selectDatabase := 3
	useSentinel := true
	cacheWritePool, err := goCache.InitRedisCache(useSentinel, defExpSeconds, nil, defExpSeconds, defExpSeconds, selectDatabase, logger)
	if err != nil {
		logrus.Errorf("couldn't connect to redis on %s", redisTestServer)
		panic("")
	}
	logger.Info("cacheWritePool initialized")
	readOnlyPool, err := goCache.InitReadOnlyRedisCache(redisTestServer, "", redisOpsTimeout, redisConnTimeout, defExpSeconds, maxConnectionsAllowed, selectDatabase, logger)
	if err != nil {
		logrus.Errorf("couldn't connect to redis on %s", redisTestServer)
		panic("")
	}
	logger.Info("cacheReadPool initialized")
	return goCache.NewCacheWithMultiPools(cacheWritePool, readOnlyPool, goCache.L2, sharedSecret, defExpSeconds, []byte("test"), false)
}

See also:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoCentralStore - no central store for the rate limiter was initialized
	ErrNoCentralStore = errors.New("no central limiter store initialized")
)

Functions

func EntrySyncInfo

func EntrySyncInfo(e goCache.GenericCacheEntry) (currentCount int, lastSyncCount int, delta uint64)

EntrySyncInfo - delta between last sync count

Types

type Context

type Context struct {
	Limit     int64
	Count     int64
	Remaining int64
	Reset     int64
	Reached   bool
	Delay     int64
}

Context is the limit context.

func GetContextFromState

func GetContextFromState(now time.Time, rate Rate, expiration time.Time, count int64) Context

GetContextFromState generate a new Context from given state.

type Limiter

type Limiter struct {
	Store Store
	Rate  Rate
}

Limiter is the limiter instance.

func New

func New(store Store, rate Rate) *Limiter

New returns an instance of Limiter.

func (*Limiter) Get

func (l *Limiter) Get(ctx context.Context, key string) (Context, error)

Get returns the limit for given identifier.

func (*Limiter) Peek

func (l *Limiter) Peek(ctx context.Context, key string) (Context, error)

Peek returns the limit for given identifier, without modification on current values.

type LimiterStore

type LimiterStore struct {
	// contains filtered or unexported fields
}

LimiterStore - encapsulate the limiterStore

func NewInMemoryLimiterStore

func NewInMemoryLimiterStore(maxEntries int, opt ...Option) *LimiterStore

NewInMemoryLimiterStore creates a new instance of memory store with defaults.

func (LimiterStore) CentralStorePeek

func (store LimiterStore) CentralStorePeek(key string) (uint64, error)

CentralStorePeek - peek at a value in the distributed central store

func (LimiterStore) CentralStoreSync

func (store LimiterStore) CentralStoreSync() error

CentralStoreSync - orchestrator for keeping the stores in sync that uses concurrency to get stuff done this syncs every entry, every time... see CentralStoreUpdates() which is far more efficient

func (LimiterStore) CentralStoreUpdates

func (store LimiterStore) CentralStoreUpdates() error

CentralStoreUpdates is used by the janitor.Run to "do" the updates via background workers

func (LimiterStore) EntryCentralStoreExpiresIn

func (store LimiterStore) EntryCentralStoreExpiresIn(key string) (int64, error)

EntryCentralStoreExpiresIn retrieves the entries TTL from the central store (if the limiter has one)

func (*LimiterStore) Get

func (store *LimiterStore) Get(ctx context.Context, key string, rate Rate) (lContext Context, err error)

Get returns the limit for given identifier. (and increments the limiter's counter) if it's drifted too far from the central store, it will sync to the central store

func (*LimiterStore) Peek

func (store *LimiterStore) Peek(ctx context.Context, key string, rate Rate) (lContext Context, err error)

Peek returns the limit for given identifier, without modification on current values.

func (*LimiterStore) StopCentralStoreUpdates

func (store *LimiterStore) StopCentralStoreUpdates() bool

StopCentralStoreUpdates - stop the janitor

func (LimiterStore) SyncEntryWithCentralStore

func (store LimiterStore) SyncEntryWithCentralStore(key string, opt ...Option) (updatedInMemoryCounter interface{})

SyncEntryWithCentralStore - figures out what to sync for one entry and sets the in memory TTL to match the central store TTL

type Option

type Option func(*options)

Option - defines a func interface for passing in options to the NewInMemoryLimiterStore()

func WithIncrement

func WithIncrement(i int) Option

WithIncrement passes an optional increment

func WithLimiterExpiration

func WithLimiterExpiration(exp time.Duration) Option

WithLimiterExpSeconds sets the limiter's expiry in seconds

func WithRate

func WithRate(r Rate) Option

WithRate passes an optional rate

func WithUnsyncCounterLimit

func WithUnsyncCounterLimit(deltaLimit uint64) Option

WithUnsyncCounterLimit - allows you to override the default limit to how far the in memory counter can drift from the central store

func WithUnsyncTimeLimit

func WithUnsyncTimeLimit(timeLimit uint64) Option

WithUnsyncTimeLimit - allows you to override the default time limit (milliseconds) between syncs to the central store. this cannot be 0 or it will default to defaultUnsyncTimeLimit

func WithWorkerNumber

func WithWorkerNumber(worker int) Option

WithWorkerNumber passes an optional worker number

type Rate

type Rate struct {
	Formatted string
	Period    time.Duration
	Limit     int64
	Delay     int64
}

Rate is the rate == 1-1-S-50

func NewRateFromFormatted

func NewRateFromFormatted(formatted string) (Rate, error)

NewRateFromFormatted - returns the rate from the formatted version ()

type Store

type Store interface {
	// Get increments the limit for a given identtfier and returns its context
	Get(ctx context.Context, key string, rate Rate) (Context, error)
	// Peek returns the limit for given identifier, without modification on current values.
	Peek(ctx context.Context, key string, rate Rate) (Context, error)
	// CentrailStorePeek - Peek at an entry in the central store
	CentralStorePeek(key string) (uint64, error)
	// CentralStoreSync - syncs all the keys from in memory with the central store
	CentralStoreSync() error
	// CentralStoreUpdates - just sync the updated keys from in memory with the central store
	CentralStoreUpdates() error
	// SyncEntryWithCentralStore - sync just one key from in memory with the central store and return the new entry IF it's updated
	SyncEntryWithCentralStore(key string, opt ...Option) interface{}
	// StopCentralStoreUpdates - stop all janitor syncing to central store and return whether or not the message to stop as successfully sent
	StopCentralStoreUpdates() bool
}

Store is the common interface for limiter stores.

type StoreEntry

type StoreEntry struct {
	CurrentCount    int
	LastSyncedCount int
}

StoreEntry - represent the entry in the store

type StoreOptions

type StoreOptions struct {
	// Prefix is the prefix to use for the key.
	Prefix string

	// MaxRetry is the maximum number of retry under race conditions.
	MaxRetry int

	// CleanUpInterval is the interval for cleanup.
	CleanUpInterval time.Duration
}

StoreOptions are options for store.

Directories

Path Synopsis
examples
gin

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL