rate

package
v1.2.117 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2024 License: MIT Imports: 5 Imported by: 1

Documentation

Overview

Package rate The key observation and some code is borrowed from golang.org/x/time/rate/rate.go

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BurstLimiter

type BurstLimiter struct {
	// contains filtered or unexported fields
}

BurstLimiter Informally, in any large enough time interval, the BurstLimiter limits the burst tokens, with a maximum burst size of b events. As a special case, if r == Inf (the infinite rate), b is ignored. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.

Reorder Buffer It allows instructions to be committed in-order. - Allocated by `Reserve` or `ReserveN` into account when allowing future events - Wait by `Wait` or `WaitN` blocks until lim permits n events to happen - Allow and Wait Complete by `PutToken` or `PutTokenN` - Reserve Complete by `Cancel` of the Reservation self, GC Cancel supported See https://en.wikipedia.org/wiki/Re-order_buffer for more about Reorder buffer. See https://web.archive.org/web/20040724215416/http://lgjohn.okstate.edu/6253/lectures/reorder.pdf for more about Reorder buffer.

The zero value is a valid BurstLimiter, but it will reject all events. Use NewFullBurstLimiter to create non-zero Limiters.

BurstLimiter has three main methods, Allow, Reserve, and Wait. Most callers should use Wait for token bucket. Most callers should use Reserve for Reorder buffer.

Each of the three methods consumes a single token. They differ in their behavior when no token is available. If no token is available, Allow returns false. If no token is available, Reserve returns a reservation for a future token and the amount of time the caller must wait before using it. If no token is available, Wait blocks until one can be obtained or its associated context.Context is canceled.

The methods AllowN, ReserveN, and WaitN consume n tokens.

func NewEmptyBurstLimiter

func NewEmptyBurstLimiter(b int) *BurstLimiter

NewEmptyBurstLimiter returns a new BurstLimiter with zero tokens that allows events up to burst b and permits bursts of at most b tokens.

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/searKing/golang/go/time/rate"
)

func main() {
	const (
		burst       = 3
		concurrency = 2
	)
	limiter := rate.NewEmptyBurstLimiter(burst)
	ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
	defer cancel()

	fmt.Printf("tokens left: %d\n", limiter.Tokens())

	// expect not allowed, as limiter is initialized with empty tokens(0)
	if limiter.Allow() {
		fmt.Printf("allow passed\n")
	} else {
		fmt.Printf("allow refused\n")
	}
	// fill one token
	limiter.PutToken()
	fmt.Printf("tokens left: %d\n", limiter.Tokens())

	// expect allowed, as limiter is filled with one token(1)
	if limiter.Allow() {
		fmt.Printf("allow passed\n")
	} else {
		fmt.Printf("allow refused\n")
	}
	fmt.Printf("tokens left: %d\n", limiter.Tokens())

	var mu sync.Mutex
	var wg sync.WaitGroup
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			// fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
			mu.Lock()
			fmt.Printf("Wait 1 Token, tokens left: %d\n", limiter.Tokens())
			mu.Unlock()
			err := limiter.Wait(ctx)
			if err != nil {
				mu.Lock()
				fmt.Printf("err: %s\n", err.Error())
				mu.Unlock()
				return
			}

			mu.Lock()
			fmt.Printf("Got 1 Token, tokens left: %d\n", limiter.Tokens())
			mu.Unlock()
		}()
	}

	time.Sleep(10 * time.Millisecond)
	for i := 0; i < concurrency; i++ {
		time.Sleep(10 * time.Millisecond)
		mu.Lock()
		fmt.Printf("PutToken #%d: before tokens left: %d\n", i, limiter.Tokens())
		// fill one token
		limiter.PutToken()
		fmt.Printf("PutToken #%d: after tokens left: %d\n", i, limiter.Tokens())
		mu.Unlock()
	}
	wg.Wait()
	fmt.Printf("tokens left: %d\n", limiter.Tokens())

	// expect allowed, as limiter is filled with one token(1)
	if limiter.Allow() {
		fmt.Printf("allow passed\n")
	} else {
		fmt.Printf("allow refused\n")
	}
	fmt.Printf("tokens left: %d\n", limiter.Tokens())

	// expect not allowed, as limiter is initialized with empty tokens(0)
	if limiter.Allow() {
		fmt.Printf("allow passed\n")
	} else {
		fmt.Printf("allow refused\n")
	}
}
Output:

tokens left: 0
allow refused
tokens left: 1
allow passed
tokens left: 0
Wait 1 Token, tokens left: 0
Wait 1 Token, tokens left: 0
PutToken #0: before tokens left: 0
PutToken #0: after tokens left: 0
Got 1 Token, tokens left: 0
PutToken #1: before tokens left: 0
PutToken #1: after tokens left: 0
Got 1 Token, tokens left: 0
tokens left: 0
allow refused
tokens left: 0
allow refused

func NewFullBurstLimiter

func NewFullBurstLimiter(b int) *BurstLimiter

NewFullBurstLimiter returns a new BurstLimiter with full tokens that allows events up to burst b and permits bursts of at most b tokens.

Example
package main

import (
	"context"
	"fmt"
	"runtime"
	"time"

	"github.com/searKing/golang/go/time/rate"
)

func main() {
	const (
		burst = 3
	)
	limiter := rate.NewFullBurstLimiter(burst)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// expect dropped, as limiter is initialized with full tokens(3)
	limiter.PutToken()

	for i := 0; ; i++ {
		// fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
		fmt.Printf("Wait %03d, tokens left: %d\n", i, limiter.Tokens())
		err := limiter.Wait(ctx)
		if err != nil {
			fmt.Printf("err: %s\n", err.Error())
			return
		}
		fmt.Printf("Got %03d, tokens left: %d\n", i, limiter.Tokens())

		// actor mocked by gc
		runtime.GC()

		if i == 0 {
			// refill one token
			limiter.PutToken()
		}
	}
}
Output:

Wait 000, tokens left: 3
Got 000, tokens left: 2
Wait 001, tokens left: 3
Got 001, tokens left: 2
Wait 002, tokens left: 2
Got 002, tokens left: 1
Wait 003, tokens left: 1
Got 003, tokens left: 0
Wait 004, tokens left: 0
err: context deadline exceeded

func NewReorderBuffer added in v1.2.116

func NewReorderBuffer() *BurstLimiter

NewReorderBuffer returns a new BurstLimiter with exactly only one token that allows instructions to be committed in-order. - Allocated by `Reserve` into account when allowing future events - Wait by `Wait` blocks until lim permits n events to happen - Allow and Wait Complete by `PutToken` - Reserve Complete by `Cancel` of the Reservation self, GC Cancel supported See https://en.wikipedia.org/wiki/Re-order_buffer for more about Reorder buffer. See https://web.archive.org/web/20040724215416/http://lgjohn.okstate.edu/6253/lectures/reorder.pdf for more about Reorder buffer.

Example
package main

import (
	"context"
	"fmt"
	"runtime"
	"sync"
	"time"

	"github.com/searKing/golang/go/time/rate"
)

func main() {
	const n = 10
	// See https://web.archive.org/web/20040724215416/http://lgjohn.okstate.edu/6253/lectures/reorder.pdf for more about Reorder buffer.
	limiter := rate.NewReorderBuffer()
	ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
	defer cancel()

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		i := i

		// Allocate: The dispatch stage reserves space in the reorder buffer for instructions in program order.
		r := limiter.Reserve(ctx)

		wg.Add(1)
		go func() {
			defer wg.Done()
			// Execute out of order
			runtime.Gosched() // Increase probability of a race for out-of-order mock
			//fmt.Printf("%03d Execute out of order\n", i)

			defer r.PutToken()
			//fmt.Printf("%03d Wait until in order\n", i)
			// Wait: The complete stage must wait for instructions to finish execution.
			err := r.Wait(ctx) // Commit in order
			if err != nil {
				fmt.Printf("err: %s\n", err.Error())
				return
			}
			// Complete: Finished instructions are allowed to write results in order into the architected registers.
			fmt.Printf("%03d Complete in order\n", i)
		}()
	}
	wg.Wait()
}
Output:

000 Complete in order
001 Complete in order
002 Complete in order
003 Complete in order
004 Complete in order
005 Complete in order
006 Complete in order
007 Complete in order
008 Complete in order
009 Complete in order

func (*BurstLimiter) Allow

func (lim *BurstLimiter) Allow() bool

Allow is shorthand for AllowN(time.Now(), 1). 当没有可用或足够的事件时,返回false

func (*BurstLimiter) AllowN

func (lim *BurstLimiter) AllowN(n int) bool

AllowN reports whether n events may happen at time now. AllowN is shorthand for GetTokenN. Use this method if you intend to drop / skip events that exceed the rate limit. Otherwise, use Reserve or Wait. 当没有可用或足够的事件时,返回false

func (*BurstLimiter) Burst

func (lim *BurstLimiter) Burst() int

Burst returns the maximum burst size. Burst is the maximum number of tokens that can be consumed in a single call to Allow, Reserve, or Wait, so higher Burst values allow more events to happen at once. A zero Burst allows no events, unless limit == Inf.

func (*BurstLimiter) GetToken

func (lim *BurstLimiter) GetToken() (ok bool)

GetToken is shorthand for GetTokenN(ctx, 1).

func (*BurstLimiter) GetTokenN

func (lim *BurstLimiter) GetTokenN(n int) (ok bool)

GetTokenN returns true if token is got

func (*BurstLimiter) PutToken

func (lim *BurstLimiter) PutToken()

PutToken is shorthand for PutTokenN(ctx, 1).

func (*BurstLimiter) PutTokenN

func (lim *BurstLimiter) PutTokenN(n int)

func (*BurstLimiter) Reserve

func (lim *BurstLimiter) Reserve(ctx context.Context) *Reservation

Reserve is shorthand for ReserveN(1). 当没有可用或足够的事件时,返回 Reservation,和要等待多久才能获得足够的事件。

Example
package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"

	"github.com/searKing/golang/go/time/rate"
)

func main() {
	const (
		burst = 1
		n     = 10
	)
	limiter := rate.NewFullBurstLimiter(burst)
	ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
	defer cancel()

	// expect dropped, as limiter is initialized with full tokens(1)
	limiter.PutToken()

	type Reservation struct {
		index int
		r     *rate.Reservation
	}

	var mu sync.Mutex
	var wg sync.WaitGroup
	var rs []*Reservation

	for i := 0; i < n; i++ {
		// fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
		// fmt.Printf("Reserve %03d\n", i)
		r := &Reservation{
			index: i,
			r:     limiter.Reserve(ctx),
		}
		if i%2 == rand.Intn(2)%2 {
			rs = append(rs, r)
			continue
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			// fmt.Printf("%03d %s\n", r.index, time.Now().Format(time.RFC3339))
			// fmt.Printf("Wait %03d\n", r.index)
			err := r.r.Wait(ctx)
			if err != nil {
				mu.Lock()
				fmt.Printf("err: %s\n", err.Error())
				mu.Unlock()
			}

			mu.Lock()
			fmt.Printf("%03d Got 1 Token, tokens left: %d\n", r.index, limiter.Tokens())
			mu.Unlock()
			r.r.PutToken()
		}()
	}

	for i := 0; i < len(rs); i++ {
		r := rs[i]
		wg.Add(1)
		go func() {
			defer wg.Done()
			// fmt.Printf("%03d %s\n", r.index, time.Now().Format(time.RFC3339))
			// fmt.Printf("Wait %03d\n", r.index)
			err := r.r.Wait(ctx)
			if err != nil {
				mu.Lock()
				fmt.Printf("err: %s\n", err.Error())
				mu.Unlock()
			}

			mu.Lock()
			fmt.Printf("%03d Got 1 Token, tokens left: %d\n", r.index, limiter.Tokens())
			mu.Unlock()
			r.r.PutToken()
		}()
	}
	wg.Wait()
}
Output:

000 Got 1 Token, tokens left: 0
001 Got 1 Token, tokens left: 0
002 Got 1 Token, tokens left: 0
003 Got 1 Token, tokens left: 0
004 Got 1 Token, tokens left: 0
005 Got 1 Token, tokens left: 0
006 Got 1 Token, tokens left: 0
007 Got 1 Token, tokens left: 0
008 Got 1 Token, tokens left: 0
009 Got 1 Token, tokens left: 0

func (*BurstLimiter) ReserveN

func (lim *BurstLimiter) ReserveN(ctx context.Context, n int) *Reservation

ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. The BurstLimiter takes this Reservation into account when allowing future events. ReserveN returns false if n exceeds the BurstLimiter's burst size. Usage example:

    // Allocate: The dispatch stage reserves space in the reorder buffer for instructions in program order.
	r := lim.ReserveN(context.Background(), 1)
	if !r.OK() {
		// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
		return
	}

	// Execute: out-of-order execution
	Act()

	// Wait: The complete stage must wait for instructions to finish execution.
	if err:= r.Wait(); err!=nil {
	// Not allowed to act! Reservation or context canceled ?
		return
	}
	// Complete: Finished instructions are allowed to write results in order into the architected registers.
	// It allows instructions to be committed in-order.
	defer r.PutToken()

	// Execute: in-order execution
	Act()

Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. If you need to respect a deadline or cancel the delay, use Wait instead. To drop or skip events exceeding rate limit, use Allow instead. 当没有可用或足够的事件时,返回 Reservation,和要等待多久才能获得足够的事件。 See https://en.wikipedia.org/wiki/Re-order_buffer for more about Reorder buffer. See https://web.archive.org/web/20040724215416/http://lgjohn.okstate.edu/6253/lectures/reorder.pdf for more about Reorder buffer.

func (*BurstLimiter) SetBurst

func (lim *BurstLimiter) SetBurst(newBurst int)

SetBurst sets a new burst size for the limiter.

func (*BurstLimiter) Tokens

func (lim *BurstLimiter) Tokens() int

Tokens returns the token nums unconsumed.

func (*BurstLimiter) Wait

func (lim *BurstLimiter) Wait(ctx context.Context) (err error)

Wait is shorthand for WaitN(ctx, 1).

func (*BurstLimiter) WaitN

func (lim *BurstLimiter) WaitN(ctx context.Context, n int) (err error)

WaitN blocks until lim permits n events to happen. It returns an error if n exceeds the BurstLimiter's burst size, the Context is canceled, or the expected wait time exceeds the Context's Deadline. The burst limit is ignored if the rate limit is Inf.

type Reservation

type Reservation struct {
	// contains filtered or unexported fields
}

A Reservation holds information about events that are permitted by a BurstLimiter to happen after a delay. A Reservation may be canceled, which may enable the BurstLimiter to permit additional events.

func (*Reservation) Cancel

func (r *Reservation) Cancel()

Cancel indicates that the reservation holder will not perform the reserved action and reverses the effects of this Reservation on the rate limit as much as possible, considering that other reservations may have already been made.

func (*Reservation) OK

func (r *Reservation) OK() bool

OK returns whether the limiter can provide the requested number of tokens within the maximum wait time. If OK is false, Delay returns InfDuration, and Cancel does nothing.

func (*Reservation) PutToken added in v1.2.56

func (r *Reservation) PutToken()

PutToken (as Complete): refill all tokens taken by the Reservation back to BurstLimiter. PutToken is shorthand for Cancel().

func (*Reservation) Ready added in v1.2.56

func (r *Reservation) Ready() bool

Ready returns whether the limiter can provide the requested number of tokens within the maximum wait time. If Ready is false, Wait returns nil directly, and Cancel or GC does put back the token reserved in the Reservation. If Ready is false, WaitN blocks until lim permits n events to happen.

func (*Reservation) Wait

func (r *Reservation) Wait(ctx context.Context) error

Wait blocks before taking the reserved action Wait 当没有可用或足够的事件时,将阻塞等待

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL