Documentation

Overview

    Package workqueue provides a simple queue that supports the following features:

    * Fair: items processed in the order in which they are added.
    * Stingy: a single item will not be processed multiple times concurrently,
        and if an item is added multiple times before it can be processed, it
        will only be processed once.
    * Multiple consumers and producers. In particular, it is allowed for an
        item to be reenqueued while it is being processed.
    * Shutdown notifications.
    

    Index

    Constants

    This section is empty.

    Variables

    This section is empty.

    Functions

    func ParallelizeUntil

    func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options)

      ParallelizeUntil is a framework that allows for parallelizing N independent pieces of work until done or the context is canceled.

      func SetProvider

      func SetProvider(metricsProvider MetricsProvider)

        SetProvider sets the metrics provider for all subsequently created work queues. Only the first call has an effect.

        func WithChunkSize

        func WithChunkSize(c int) func(*options)

          WithChunkSize allows to set chunks of work items to the workers, rather than processing one by one. It is recommended to use this option if the number of pieces significantly higher than the number of workers and the work done for each item is small.

          Types

          type BucketRateLimiter

          type BucketRateLimiter struct {
          	*rate.Limiter
          }

            BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API

            func (*BucketRateLimiter) Forget

            func (r *BucketRateLimiter) Forget(item interface{})

            func (*BucketRateLimiter) NumRequeues

            func (r *BucketRateLimiter) NumRequeues(item interface{}) int

            func (*BucketRateLimiter) When

            func (r *BucketRateLimiter) When(item interface{}) time.Duration

            type CounterMetric

            type CounterMetric interface {
            	Inc()
            }

              CounterMetric represents a single numerical value that only ever goes up.

              type DelayingInterface

              type DelayingInterface interface {
              	Interface
              	// AddAfter adds an item to the workqueue after the indicated duration has passed
              	AddAfter(item interface{}, duration time.Duration)
              }

                DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.

                func NewDelayingQueue

                func NewDelayingQueue() DelayingInterface

                  NewDelayingQueue constructs a new workqueue with delayed queuing ability

                  func NewDelayingQueueWithCustomClock

                  func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface

                    NewDelayingQueueWithCustomClock constructs a new named workqueue with ability to inject real or fake clock for testing purposes

                    func NewDelayingQueueWithCustomQueue

                    func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface

                      NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to inject custom queue Interface instead of the default one

                      func NewNamedDelayingQueue

                      func NewNamedDelayingQueue(name string) DelayingInterface

                        NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability

                        type DoWorkPieceFunc

                        type DoWorkPieceFunc func(piece int)

                        type GaugeMetric

                        type GaugeMetric interface {
                        	Inc()
                        	Dec()
                        }

                          GaugeMetric represents a single numerical value that can arbitrarily go up and down.

                          type HistogramMetric

                          type HistogramMetric interface {
                          	Observe(float64)
                          }

                            HistogramMetric counts individual observations.

                            type Interface

                            type Interface interface {
                            	Add(item interface{})
                            	Len() int
                            	Get() (item interface{}, shutdown bool)
                            	Done(item interface{})
                            	ShutDown()
                            	ShuttingDown() bool
                            }

                            type ItemExponentialFailureRateLimiter

                            type ItemExponentialFailureRateLimiter struct {
                            	// contains filtered or unexported fields
                            }

                              ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit dealing with max failures and expiration are up to the caller

                              func (*ItemExponentialFailureRateLimiter) Forget

                              func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})

                              func (*ItemExponentialFailureRateLimiter) NumRequeues

                              func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int

                              func (*ItemExponentialFailureRateLimiter) When

                              func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration

                              type ItemFastSlowRateLimiter

                              type ItemFastSlowRateLimiter struct {
                              	// contains filtered or unexported fields
                              }

                                ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that

                                func (*ItemFastSlowRateLimiter) Forget

                                func (r *ItemFastSlowRateLimiter) Forget(item interface{})

                                func (*ItemFastSlowRateLimiter) NumRequeues

                                func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int

                                func (*ItemFastSlowRateLimiter) When

                                func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration

                                type MaxOfRateLimiter

                                type MaxOfRateLimiter struct {
                                	// contains filtered or unexported fields
                                }

                                  MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.

                                  func (*MaxOfRateLimiter) Forget

                                  func (r *MaxOfRateLimiter) Forget(item interface{})

                                  func (*MaxOfRateLimiter) NumRequeues

                                  func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int

                                  func (*MaxOfRateLimiter) When

                                  func (r *MaxOfRateLimiter) When(item interface{}) time.Duration

                                  type MetricsProvider

                                  type MetricsProvider interface {
                                  	NewDepthMetric(name string) GaugeMetric
                                  	NewAddsMetric(name string) CounterMetric
                                  	NewLatencyMetric(name string) HistogramMetric
                                  	NewWorkDurationMetric(name string) HistogramMetric
                                  	NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
                                  	NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
                                  	NewRetriesMetric(name string) CounterMetric
                                  }

                                    MetricsProvider generates various metrics used by the queue.

                                    type Options

                                    type Options func(*options)

                                    type RateLimiter

                                    type RateLimiter interface {
                                    	// When gets an item and gets to decide how long that item should wait
                                    	When(item interface{}) time.Duration
                                    	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
                                    	// or for success, we'll stop tracking it
                                    	Forget(item interface{})
                                    	// NumRequeues returns back how many failures the item has had
                                    	NumRequeues(item interface{}) int
                                    }

                                    func DefaultControllerRateLimiter

                                    func DefaultControllerRateLimiter() RateLimiter

                                      DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential

                                      func DefaultItemBasedRateLimiter

                                      func DefaultItemBasedRateLimiter() RateLimiter

                                      func NewItemExponentialFailureRateLimiter

                                      func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter

                                      func NewItemFastSlowRateLimiter

                                      func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter

                                      func NewMaxOfRateLimiter

                                      func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter

                                      type RateLimitingInterface

                                      type RateLimitingInterface interface {
                                      	DelayingInterface
                                      
                                      	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
                                      	AddRateLimited(item interface{})
                                      
                                      	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
                                      	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
                                      	// still have to call `Done` on the queue.
                                      	Forget(item interface{})
                                      
                                      	// NumRequeues returns back how many times the item was requeued
                                      	NumRequeues(item interface{}) int
                                      }

                                        RateLimitingInterface is an interface that rate limits items being added to the queue.

                                        func NewNamedRateLimitingQueue

                                        func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface

                                        func NewRateLimitingQueue

                                        func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface

                                          NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever.

                                          type SettableGaugeMetric

                                          type SettableGaugeMetric interface {
                                          	Set(float64)
                                          }

                                            SettableGaugeMetric represents a single numerical value that can arbitrarily go up and down. (Separate from GaugeMetric to preserve backwards compatibility.)

                                            type SummaryMetric

                                            type SummaryMetric interface {
                                            	Observe(float64)
                                            }

                                              SummaryMetric captures individual observations.

                                              type Type

                                              type Type struct {
                                              	// contains filtered or unexported fields
                                              }

                                                Type is a work queue (see the package comment).

                                                func New

                                                func New() *Type

                                                  New constructs a new work queue (see the package comment).

                                                  func NewNamed

                                                  func NewNamed(name string) *Type

                                                  func (*Type) Add

                                                  func (q *Type) Add(item interface{})

                                                    Add marks item as needing processing.

                                                    func (*Type) Done

                                                    func (q *Type) Done(item interface{})

                                                      Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.

                                                      func (*Type) Get

                                                      func (q *Type) Get() (item interface{}, shutdown bool)

                                                        Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.

                                                        func (*Type) Len

                                                        func (q *Type) Len() int

                                                          Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.

                                                          func (*Type) ShutDown

                                                          func (q *Type) ShutDown()

                                                            ShutDown will cause q to ignore all new items added to it. As soon as the worker goroutines have drained the existing items in the queue, they will be instructed to exit.

                                                            func (*Type) ShuttingDown

                                                            func (q *Type) ShuttingDown() bool