queue

package
v1.16.109 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: MPL-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DeferQueue

type DeferQueue[T ItemType] interface {
	// Defer defers processing a Request until a given time. When
	// the timeout is hit, the request will be processed by the
	// callback given in the Process loop. If the given context
	// is canceled, the item is not deferred.
	Defer(ctx context.Context, item T, until time.Time)
	// Process processes all items in the defer queue with the
	// given callback, blocking until the given context is canceled.
	// Callers should only ever call Process once, likely in a
	// long-lived goroutine.
	Process(ctx context.Context, callback func(item T))
}

DeferQueue is a generic priority queue implementation that allows for deferring and later processing Requests.

func NewDeferQueue

func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T]

NewDeferQueue returns a priority queue for deferred Requests.

type ItemType

type ItemType interface {
	// Key returns a string that will be used to de-duplicate items in the queue.
	Key() string
}

ItemType is the type constraint for items in the WorkQueue.

type Limiter

type Limiter[T ItemType] interface {
	// NextRetry returns the remaining time until the queue should
	// reprocess a Request.
	NextRetry(request T) time.Duration
	// Forget causes the Limiter to reset the backoff for the Request.
	Forget(request T)
}

Limiter is an interface for a rate limiter that can limit the number of retries processed in the work queue.

func NewRateLimiter

func NewRateLimiter[T ItemType](base, max time.Duration) Limiter[T]

NewRateLimiter returns a Limiter that does per-item exponential backoff.

type WorkQueue

type WorkQueue[T ItemType] interface {
	// Get retrieves the next Request in the queue, blocking until a Request is
	// available, if shutdown is true, then the queue is shutting down and should
	// no longer be used by the caller.
	Get() (item T, shutdown bool)
	// Add immediately adds a Request to the work queue.
	Add(item T)
	// AddAfter adds a Request to the work queue after a given amount of time.
	AddAfter(item T, duration time.Duration)
	// AddRateLimited adds a Request to the work queue after the amount of time
	// specified by applying the queue's rate limiter.
	AddRateLimited(item T)
	// Forget signals the queue to reset the rate-limiting for the given Request.
	Forget(item T)
	// Done tells the work queue that the Request has been successfully processed
	// and can be deleted from the queue.
	Done(item T)
}

WorkQueue is an interface for a work queue with semantics to help with retries and rate limiting.

func RunWorkQueue

func RunWorkQueue[T ItemType](ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue[T]

RunWorkQueue returns a started WorkQueue that has per-item exponential backoff rate-limiting. When the passed in context is canceled, the queue shuts down.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL