Documentation ¶
Overview ¶
Package rate contains rate limiting strategies for asynq.Handler(s).
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore is a distributed counting semaphore which can be used to set maxTokens across multiple asynq servers.
func NewSemaphore ¶
func NewSemaphore(rco asynq.RedisConnOpt, scope string, maxTokens int) *Semaphore
NewSemaphore creates a counting Semaphore for the given scope with the given number of tokens.
Example ¶
package main import ( "context" "fmt" "time" "github.com/awanganddong/asynq" "github.com/awanganddong/asynq/x/rate" ) type RateLimitError struct { RetryIn time.Duration } func (e *RateLimitError) Error() string { return fmt.Sprintf("rate limited (retry in %v)", e.RetryIn) } func main() { redisConnOpt := asynq.RedisClientOpt{Addr: ":6379"} sema := rate.NewSemaphore(redisConnOpt, "my_queue", 10) // call sema.Close() when appropriate _ = asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error { ok, err := sema.Acquire(ctx) if err != nil { return err } if !ok { return &RateLimitError{RetryIn: 30 * time.Second} } // Make sure to release the token once we're done. defer sema.Release(ctx) // Process task return nil }) }
Output:
func (*Semaphore) Acquire ¶
Acquire attempts to acquire a token from the semaphore. - Returns (true, nil), iff semaphore key exists and current value is less than maxTokens - Returns (false, nil) when token cannot be acquired - Returns (false, error) otherwise
The context.Context passed to Acquire must have a deadline set, this ensures that token is released if the job goroutine crashes and does not call Release.