slidingwindow

package module
v0.0.0-...-535bb99 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2020 License: MIT Imports: 3 Imported by: 29

README

slidingwindow

Golang implementation of Sliding Window Algorithm for distributed rate limiting.

Installation

$ go get -u github.com/RussellLuo/slidingwindow

Design

slidingwindow is an implementation of the scalable rate limiting algorithm used by Kong.

Suppose we have a limiter that permits 100 events per minute, and now the time comes at the "75s" point, then the internal windows will be as below:

slidingwindow

In this situation, the limiter has permitted 12 events during the current window, which started 15 seconds ago, and 86 events during the entire previous window. Then the count approximation during the sliding window can be calculated like this:

count = 86 * ((60-15)/60) + 12
      = 86 * 0.75 + 12
      = 76.5 events

Test Utility

prom_reports

For details, see testutil.

Documentation

For usage and examples see the Godoc.

License

MIT

Documentation

Overview

Example (LocalWindow)
package main

import (
	"fmt"
	"time"

	sw "github.com/RussellLuo/slidingwindow"
)

func main() {
	lim, _ := sw.NewLimiter(time.Second, 10, func() (sw.Window, sw.StopFunc) {
		// NewLocalWindow returns an empty stop function, so it's
		// unnecessary to call it later.
		return sw.NewLocalWindow()
	})

	ok := lim.Allow()
	fmt.Printf("ok: %v\n", ok)

}
Output:

ok: true
Example (SyncWindow)
package main

import (
	"fmt"
	"strconv"
	"time"

	sw "github.com/RussellLuo/slidingwindow"
	"github.com/go-redis/redis"
)

// RedisDatastore is a reference implementation of the Redis-based datastore,
// which can be used directly if you happen to use go-redis.
type RedisDatastore struct {
	client redis.Cmdable
	ttl    time.Duration
}

func NewRedisDatastore(client redis.Cmdable, ttl time.Duration) *RedisDatastore {
	return &RedisDatastore{client: client, ttl: ttl}
}

func (d *RedisDatastore) fullKey(key string, start int64) string {
	return fmt.Sprintf("%s@%d", key, start)
}

func (d *RedisDatastore) Add(key string, start, value int64) (int64, error) {
	k := d.fullKey(key, start)
	c, err := d.client.IncrBy(k, value).Result()
	if err != nil {
		return 0, err
	}
	// Ignore the possible error from EXPIRE command.
	d.client.Expire(k, d.ttl).Result() // nolint:errcheck
	return c, err
}

func (d *RedisDatastore) Get(key string, start int64) (int64, error) {
	k := d.fullKey(key, start)
	value, err := d.client.Get(k).Result()
	if err != nil {
		if err == redis.Nil {
			// redis.Nil is not an error, it only indicates the key does not exist.
			err = nil
		}
		return 0, err
	}
	return strconv.ParseInt(value, 10, 64)
}

func main() {
	size := time.Second
	store := NewRedisDatastore(
		redis.NewClient(&redis.Options{
			Addr: "localhost:6379",
		}),
		2*size, // twice of window-size is just enough.
	)

	lim, stop := sw.NewLimiter(size, 10, func() (sw.Window, sw.StopFunc) {
		return sw.NewSyncWindow("test", sw.NewBlockingSynchronizer(store, 500*time.Millisecond))
	})
	defer stop()

	ok := lim.Allow()
	fmt.Printf("ok: %v\n", ok)

}
Output:

ok: true

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLimiter

func NewLimiter(size time.Duration, limit int64, newWindow NewWindow) (*Limiter, StopFunc)

NewLimiter creates a new limiter, and returns a function to stop the possible sync behaviour within the current window.

func NewLocalWindow

func NewLocalWindow() (*LocalWindow, StopFunc)

func NewSyncWindow

func NewSyncWindow(key string, syncer Synchronizer) (*SyncWindow, StopFunc)

NewSyncWindow creates an instance of SyncWindow with the given synchronizer.

Types

type BlockingSynchronizer

type BlockingSynchronizer struct {
	// contains filtered or unexported fields
}

BlockingSynchronizer does synchronization in a blocking mode and consumes no extra goroutine.

It's recommended to use BlockingSynchronizer in low-concurrency scenarios, either for higher accuracy, or for less goroutine consumption.

func NewBlockingSynchronizer

func NewBlockingSynchronizer(store Datastore, syncInterval time.Duration) *BlockingSynchronizer

func (*BlockingSynchronizer) Start

func (s *BlockingSynchronizer) Start()

func (*BlockingSynchronizer) Stop

func (s *BlockingSynchronizer) Stop()

func (*BlockingSynchronizer) Sync

func (s *BlockingSynchronizer) Sync(now time.Time, makeReq MakeFunc, handleResp HandleFunc)

Sync sends the window's count to the central datastore, and then update the window's count according to the response from the datastore.

type Datastore

type Datastore interface {
	// Add adds delta to the count of the window represented
	// by start, and returns the new count.
	Add(key string, start, delta int64) (int64, error)

	// Get returns the count of the window represented by start.
	Get(key string, start int64) (int64, error)
}

Datastore represents the central datastore.

type HandleFunc

type HandleFunc func(SyncResponse)

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

func (*Limiter) Allow

func (lim *Limiter) Allow() bool

Allow is shorthand for AllowN(time.Now(), 1).

func (*Limiter) AllowN

func (lim *Limiter) AllowN(now time.Time, n int64) bool

AllowN reports whether n events may happen at time now.

func (*Limiter) Limit

func (lim *Limiter) Limit() int64

Limit returns the maximum events permitted to happen during one window size.

func (*Limiter) SetLimit

func (lim *Limiter) SetLimit(newLimit int64)

SetLimit sets a new Limit for the limiter.

func (*Limiter) Size

func (lim *Limiter) Size() time.Duration

Size returns the time duration of one window size. Note that the size is defined to be read-only, if you need to change the size, create a new limiter with a new size instead.

type LocalWindow

type LocalWindow struct {
	// contains filtered or unexported fields
}

LocalWindow represents a window that ignores sync behavior entirely and only stores counters in memory.

func (*LocalWindow) AddCount

func (w *LocalWindow) AddCount(n int64)

func (*LocalWindow) Count

func (w *LocalWindow) Count() int64

func (*LocalWindow) Reset

func (w *LocalWindow) Reset(s time.Time, c int64)

func (*LocalWindow) Start

func (w *LocalWindow) Start() time.Time

func (*LocalWindow) Sync

func (w *LocalWindow) Sync(now time.Time)

type MakeFunc

type MakeFunc func() SyncRequest

type NewWindow

type NewWindow func() (Window, StopFunc)

NewWindow creates a new window, and returns a function to stop the possible sync behaviour within it.

type NonblockingSynchronizer

type NonblockingSynchronizer struct {
	// contains filtered or unexported fields
}

NonblockingSynchronizer does synchronization in a non-blocking mode. To achieve this, it needs to spawn a goroutine to exchange data with the central datastore.

It's recommended to always use NonblockingSynchronizer in high-concurrency scenarios.

func NewNonblockingSynchronizer

func NewNonblockingSynchronizer(store Datastore, syncInterval time.Duration) *NonblockingSynchronizer

func (*NonblockingSynchronizer) Start

func (s *NonblockingSynchronizer) Start()

func (*NonblockingSynchronizer) Stop

func (s *NonblockingSynchronizer) Stop()

func (*NonblockingSynchronizer) Sync

func (s *NonblockingSynchronizer) Sync(now time.Time, makeReq MakeFunc, handleResp HandleFunc)

Sync tries to send the window's count to the central datastore, or to update the window's count according to the response from the latest synchronization. Since the exchange with the datastore is always slower than the execution of Sync, usually Sync must be called at least twice to update the window's count finally.

type StopFunc

type StopFunc func()

StopFunc stops the window's sync behaviour.

type SyncRequest

type SyncRequest struct {
	Key     string
	Start   int64
	Count   int64
	Changes int64
}

type SyncResponse

type SyncResponse struct {
	// Whether the synchronization succeeds.
	OK    bool
	Start int64
	// The changes accumulated by the local limiter.
	Changes int64
	// The total changes accumulated by all the other limiters.
	OtherChanges int64
}

type SyncWindow

type SyncWindow struct {
	LocalWindow
	// contains filtered or unexported fields
}

SyncWindow represents a window that will sync counter data to the central datastore asynchronously.

Note that for the best coordination between the window and the synchronizer, the synchronization is not automatic but is driven by the call to Sync.

func (*SyncWindow) AddCount

func (w *SyncWindow) AddCount(n int64)

func (*SyncWindow) Reset

func (w *SyncWindow) Reset(s time.Time, c int64)

func (*SyncWindow) Sync

func (w *SyncWindow) Sync(now time.Time)

type Synchronizer

type Synchronizer interface {
	// Start starts the synchronization goroutine, if any.
	Start()

	// Stop stops the synchronization goroutine, if any, and waits for it to exit.
	Stop()

	// Sync sends a synchronization request.
	Sync(time.Time, MakeFunc, HandleFunc)
}

type Window

type Window interface {
	// Start returns the start boundary.
	Start() time.Time

	// Count returns the accumulated count.
	Count() int64

	// AddCount increments the accumulated count by n.
	AddCount(n int64)

	// Reset sets the state of the window with the given settings.
	Reset(s time.Time, c int64)

	// Sync tries to exchange data between the window and the central
	// datastore at time now, to keep the window's count up-to-date.
	Sync(now time.Time)
}

Window represents a fixed-window.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL