throttle

package module
v0.0.0-...-588217f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2023 License: MIT Imports: 6 Imported by: 0

README

What

Simple module for throttling execution by string key in-memory. Example use case: You want to notify a user whenever an event of a certain type happens. But, if that event fires multiple times in a succession, you don't want to send them multiple notifications. Instead, you want to send one notification for the first event, and then another one for multiple remaining events in a provided time span. The throttler works on multiple keys and throttles separately for each key. It does grab a mutex so you may encounter limitations in very high contention environments. It obviously does not hold it for asynchronous callback invocations - only for synchronous updates.

Install

go get github.com/wit221/go/throttle

Examples

See examples/**/main.go. Basic examples:

Throttle:

callback := func(ctx context.Context, key string, items []*Pair, instantCallback bool) error {
    values := make([]string, 0, len(items))
    for _, item := range items {
        values = append(values, item.value)
    }

    fmt.Printf("key[%v] values[%s]\n", key, strings.Join(values, ", "))
    return nil
}

ctx := context.Background()
throttler := throttle.New(callback, 10*time.Second)
reader := bufio.NewReader(os.Stdin)
go func() {
    for {
        select {
        case <-ctx.Done():
            return
        case <-time.After(5 * time.Second):
            fmt.Printf("num_goroutines [%v]\n", runtime.NumGoroutine())
        }
    }
}()
for {
    select {
    case <-ctx.Done():
        return
    default:
        text, err := reader.ReadString('\n')
        if err != nil {
            return
        }
        text = strings.Replace(text, "\n", "", -1)
        parts := strings.Split(text, ",")
        go func() {
            pair := &Pair{key: parts[0], value: parts[1]}
            throttler.Submit(ctx, pair.key, pair)
        }()
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ThrottleCallback

type ThrottleCallback[T any] func(ctx context.Context, key string, items []T, instantCallback bool) error

type ThrottleItem

type ThrottleItem[T any] struct {
	// contains filtered or unexported fields
}

func (*ThrottleItem[T]) Submit

func (i *ThrottleItem[T]) Submit(ctx context.Context, item T)

type Throttler

type Throttler[T any] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func New

func New[T any](callback ThrottleCallback[T], releaseInterval time.Duration) *Throttler[T]

func (*Throttler[T]) Submit

func (q *Throttler[T]) Submit(ctx context.Context, key string, item T)

Submit will perform the callback immediately if no items of the given key have been processed in recent time. Otherwise, it will throttle the item and only execute it after releaseInterval has passed since the previous item. If new items come in with that key before releaseInterval, then the time at which they will all be released will be postponed accordingly.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL