Package workerqueue extends client-go's workqueue functionality into an opinionated queue + worker model that is reusable



func FastRateLimiter added in v1.1.0

func FastRateLimiter(maxDelay time.Duration) workqueue.RateLimiter

FastRateLimiter returns a rate limiter without exponential back-off, with specified maximum per-item retry delay.

func NewDebugError added in v1.13.0

func NewDebugError(err error) error

NewDebugError returns a debugError wrapper around an error.


type Handler

type Handler func(context.Context, string) error

Handler is the handler for processing the work queue This is usually a syncronisation handler for a controller or related

type WorkerQueue

type WorkerQueue struct {

	// SyncHandler is exported to make testing easier (hack)
	SyncHandler Handler
	// contains filtered or unexported fields

WorkerQueue is an opinionated queue + worker for use with controllers and related and processing Kubernetes watched events and synchronising resources

func NewWorkerQueue

func NewWorkerQueue(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string) *WorkerQueue

NewWorkerQueue returns a new worker queue for a given name

func NewWorkerQueueWithRateLimiter added in v0.8.0

func NewWorkerQueueWithRateLimiter(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string, rateLimiter workqueue.RateLimiter) *WorkerQueue

NewWorkerQueueWithRateLimiter returns a new worker queue for a given name and a custom rate limiter.

func (*WorkerQueue) Enqueue

func (wq *WorkerQueue) Enqueue(obj interface{})

Enqueue puts the name of the runtime.Object in the queue to be processed. If you need to send through an explicit key, use an cache.ExplicitKey

func (*WorkerQueue) EnqueueAfter added in v0.11.0

func (wq *WorkerQueue) EnqueueAfter(obj interface{}, duration time.Duration)

EnqueueAfter delays an enqueue operation by duration

func (*WorkerQueue) EnqueueImmediately added in v0.8.0

func (wq *WorkerQueue) EnqueueImmediately(obj interface{})

EnqueueImmediately performs Enqueue but without rate-limiting. This should be used to continue partially completed work after giving other items in the queue a chance of running.

func (*WorkerQueue) Healthy

func (wq *WorkerQueue) Healthy() error

Healthy reports whether all the worker goroutines are running.

func (*WorkerQueue) Run

func (wq *WorkerQueue) Run(ctx context.Context, workers int)

Run the WorkerQueue processing via the Handler. Will block until stop is closed. Runs a certain number workers to process the rate limited queue

func (*WorkerQueue) RunCount

func (wq *WorkerQueue) RunCount() int

RunCount reports the number of running worker goroutines started by Run.

