rate

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2023 License: MIT Imports: 1 Imported by: 0

README

Documentation Go workflow CircleCI codecov Go Report Card GitHub tag (latest SemVer)

Rate limiter

Token bucket rate limiter.

Usage

Create limiter

l := rate.NewLimiter(time.Now(),
	1000 / time.Second.Seconds(), // 1000 tokens per second
	2000, // 2000 tokens burst
	)

// smooth 1KB per second with at most 128 bytes at a time
l = rate.NewLimiter(time.Now(),
	1000 / time.Second.Seconds(),
	128)

// 3 MB per minute allowing to spend it all at once
l = rate.NewLimiter(time.Now(),
	3000000 / time.Minute.Seconds(),
	3000000)

Take or drop

func (c *Conn) Write(p []byte) (int, error) {
	if !l.Take(time.Now(), len(p)) {
		return 0, ErrLimited
	}

	return c.Conn.Write(p)
}

Borrow and wait

func (c *Conn) Write(p []byte) (int, error) {
	delay := l.Borrow(time.Now(), len(p))

	if delay != 0 {
		time.Sleep(delay)
	}

	return c.Conn.Write(p)
}

Write as much as we can

func (c *Conn) Write(p []byte) (int, error) {
	now := time.Now()

	val := l.Value(now)

	n := int(val)
	if n > len(p) {
		n = len(p)
	}

	_ = l.Take(now, float64(n)) // must be true

	n, err := c.Conn.Write(p[:n])
	if err != nil {
		return n, err
	}
	if n != len(p) {
		err = ErrLimited
	}

	return n, err
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter uses classic token bucket algorithm. Bucket is full when created.

func NewLimiter

func NewLimiter(now time.Time, rate, cap float64, opts ...Option) *Limiter

NewLimiter creates new token bucket limiter. Bucket is full when created. Use it to limit to at most rate per second tokens with at most cap burst.

Each operaion takes current time and updates the state as it was filled up continuously. If time goes backwards it's ignored as it was already accounted. Thus you can't take unused tokens from the past if bucket is full now.

Limiter is not safe to use in parallel.

func (*Limiter) Borrow

func (l *Limiter) Borrow(now time.Time, v float64) time.Duration

Borrow takes v tokens even if there is not enough of them and returns the time needed to wait before using them. If there was enough tokens the returned time is 0. Borrow can take even more tokens then burst capacity. Callers must check for Capacity on their own.

Example (Full)
package main

import (
	"fmt"
	"time"

	"nikand.dev/go/rate"
)

type WriterFunc func([]byte) (int, error)

func (f WriterFunc) Write(p []byte) (int, error) {
	return f(p)
}

func main() {
	// Mind we want to limit output speed for tcp connection.

	// Out test fake time
	base := time.Now()
	now := base

	// downstream writer
	w := WriterFunc(func(p []byte) (int, error) {
		fmt.Printf("%5d bytes written at %v\n", len(p), now.Sub(base))

		return len(p), nil
	})

	var r *rate.Limiter // will init later

	// the actual usage example of rate.Limiter
	limitedWrite := func(p []byte) (n int, err error) {
		var m int
		lim := len(p)

		if c := r.Capacity(); float64(lim) > c {
			lim = int(c)
		}

		for err == nil && n < len(p) {
			if lim > len(p)-n {
				lim = len(p) - n
			}

			delay := r.Borrow(now, float64(lim))
			if delay > 0 {
				now = now.Add(delay) // time.Sleep(delay)
			}

			m, err = w.Write(p[n : n+lim])
			n += m
		}

		return
	}

	fmt.Printf("Max burst of 515 bytes with rate of 1KiB per second\n")

	tRate := float64(1*1024) / time.Second.Seconds() // 1KiB per second
	burst := float64(512)                            // 128 bytes at once at most

	r = rate.NewLimiter(now, tRate, burst)

	_, _ = limitedWrite(make([]byte, 5*1024))

}
Output:

Max burst of 515 bytes with rate of 1KiB per second
  512 bytes written at 0s
  512 bytes written at 500ms
  512 bytes written at 1s
  512 bytes written at 1.5s
  512 bytes written at 2s
  512 bytes written at 2.5s
  512 bytes written at 3s
  512 bytes written at 3.5s
  512 bytes written at 4s
  512 bytes written at 4.5s

func (*Limiter) Capacity

func (l *Limiter) Capacity() float64

Capacity returns the current capacity.

func (*Limiter) Have

func (l *Limiter) Have(now time.Time, v float64) bool

Have checks if Limiter have at least v tokens but don't take them.

func (*Limiter) Rate

func (l *Limiter) Rate() float64

Rate returns the current rate.

func (*Limiter) Return

func (l *Limiter) Return(now time.Time, v float64)

Return returns borrowed time back. If current value + returned v > Capacity it's truncated.

func (*Limiter) Set

func (l *Limiter) Set(now time.Time, v float64) float64

Set sets current limiter state. It can be used to drain or fill the Limiter. Returns previous value.

func (*Limiter) Take

func (l *Limiter) Take(now time.Time, v float64) bool

Take takes v tokens if there is enough of them. Take returns true if taken and false otherwise.

Example (Full)
package main

import (
	"errors"
	"fmt"
	"io"
	"time"

	"nikand.dev/go/rate"
)

func main() {
	// Mind we want to limit packets bandwidth.

	// Out test fake time
	base := time.Now()
	now := base

	// downstream writer
	w := io.Discard

	var ErrSpeedLimited = errors.New("speed limited")
	var r *rate.Limiter // will init later

	// the actual usage example of rate.Limiter
	limitedWrite := func(p []byte) (n int, err error) {
		if !r.Take(now, float64(len(p))) {
			return 0, ErrSpeedLimited
		}

		return w.Write(p)
	}

	fmt.Printf("Max burst of 2KiB with rate of 1KiB per second\n")

	tRate := float64(1*1024) / time.Second.Seconds() // 1KiB per second
	burst := float64(2 * 1024)                       // 2KiB at once at most

	r = rate.NewLimiter(now, tRate, burst)

	for i := 0; i < 4; i++ {
		n, err := limitedWrite(make([]byte, 1024))
		fmt.Printf("time %5v: %d %v\n", now.Sub(base), n, err)
		now = now.Add(time.Second / 2)
	}

}
Output:

Max burst of 2KiB with rate of 1KiB per second
time    0s: 1024 <nil>
time 500ms: 1024 <nil>
time    1s: 1024 <nil>
time  1.5s: 0 speed limited

func (*Limiter) Update

func (l *Limiter) Update(now time.Time, rate, cap float64)

Update advances Limiter state and sets new rate and cap for the future operations.

func (*Limiter) Value

func (l *Limiter) Value(now time.Time) float64

Value returns the current value. That is how much can we take at most in the moment now.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithValue

func WithValue(v float64) Option

WithValue is an Option for NewLimiter that sets initial value.

type OptionFunc

type OptionFunc func(l *Limiter, now time.Time)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL