queue

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2025 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetProvider

func SetProvider(metricsProvider MetricsProvider)

SetProvider sets the metrics provider for all subsequently created work queues. Only the first call has an effect.

Types

type CounterMetric

type CounterMetric interface {
	Inc()
}

CounterMetric represents a single numerical value that only ever goes up.

type GaugeMetric

type GaugeMetric interface {
	Inc()
	Dec()
}

GaugeMetric represents a single numerical value that can arbitrarily go up and down.

type HistogramMetric

type HistogramMetric interface {
	Observe(float64)
}

HistogramMetric counts individual observations.

type MetricsProvider

type MetricsProvider interface {
	NewDepthMetric(name string) GaugeMetric
	NewAddsMetric(name string) CounterMetric
	NewLatencyMetric(name string) HistogramMetric
	NewWorkDurationMetric(name string) HistogramMetric
	NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
	NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
	NewRetriesMetric(name string) CounterMetric
}

MetricsProvider generates various metrics used by the queue.

type Queue

type Queue[T comparable] interface {
	// Touch can be hooked when an existing item is added again. This may be
	// useful if the implementation allows priority change for the given item.
	Touch(item T)
	// Push adds a new item.
	Push(item T)
	// Len tells the total number of items.
	Len() int
	// Pop retrieves an item.
	Pop() (item T)
}

Queue is the underlying storage for items. The functions below are always called from the same goroutine.

func DefaultQueue

func DefaultQueue[T comparable]() Queue[T]

DefaultQueue is a slice based FIFO queue.

type SettableGaugeMetric

type SettableGaugeMetric interface {
	Set(float64)
}

SettableGaugeMetric represents a single numerical value that can arbitrarily go up and down. (Separate from GaugeMetric to preserve backwards compatibility.)

type SummaryMetric

type SummaryMetric interface {
	Observe(float64)
}

SummaryMetric captures individual observations.

type Type

type Type = Typed[any]

Type is a work queue (see the package comment). Deprecated: Use Typed instead.

type Typed

type Typed[t comparable] struct {
	// contains filtered or unexported fields
}

func NewTyped

func NewTyped[T comparable]() *Typed[T]

NewTyped constructs a new work queue (see the package comment).

func NewTypedWithConfig

func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T]

NewTypedWithConfig constructs a new workqueue with ability to customize different properties.

func (*Typed[T]) Add

func (q *Typed[T]) Add(item T) (exist bool)

Add marks item as needing processing.

func (*Typed[T]) Done

func (q *Typed[T]) Done(item T)

Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.

func (*Typed[T]) Exist

func (q *Typed[T]) Exist(item T) bool

Exist checks if the item is in the queue.

func (*Typed[T]) Get

func (q *Typed[T]) Get() (item T, shutdown bool)

Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.

func (*Typed[T]) Len

func (q *Typed[T]) Len() int

Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.

func (*Typed[T]) ShutDown

func (q *Typed[T]) ShutDown()

ShutDown will cause q to ignore all new items added to it and immediately instruct the worker goroutines to exit.

func (*Typed[T]) ShutDownWithDrain

func (q *Typed[T]) ShutDownWithDrain()

ShutDownWithDrain will cause q to ignore all new items added to it. As soon as the worker goroutines have "drained", i.e: finished processing and called Done on all existing items in the queue; they will be instructed to exit and ShutDownWithDrain will return. Hence: a strict requirement for using this is; your workers must ensure that Done is called on all items in the queue once the shut down has been initiated, if that is not the case: this will block indefinitely. It is, however, safe to call ShutDown after having called ShutDownWithDrain, as to force the queue shut down to terminate immediately without waiting for the drainage.

func (*Typed[T]) ShuttingDown

func (q *Typed[T]) ShuttingDown() bool

type TypedInterface

type TypedInterface[T comparable] interface {
	// check if the item is in the queue
	Exist(item T) bool
	// repeated additions will be intercepted.
	// If it has not been Get, the Queue.Touch method will be called. You can adjust the priority in turn.
	Add(item T) (exist bool)
	Len() int
	Get() (item T, shutdown bool)
	Done(item T)
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}

TypedInterface warp a queue to supports advanced queue functions such as concurrency control and Remove duplicates judgment

type TypedQueueConfig

type TypedQueueConfig[T comparable] struct {
	// Name for the queue. If unnamed, the metrics will not be registered.
	Name string

	// MetricsProvider optionally allows specifying a metrics provider to use for the queue
	// instead of the global provider.
	MetricsProvider MetricsProvider

	// Clock ability to inject real or fake clock for testing purposes.
	Clock clock.WithTicker

	// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
	Queue Queue[T]
}

type UniKey

type UniKey[T comparable] interface {
	GetUniKey() T
}

UniKeyable is an interface of TypedInterface'Item through which you can customize the comparison logic of elements. Note that this T and TypedInterface T need to be of the same type, and don't use pointer types. for this way, we can handle the situation where some fields in the structure are used as keys. eg.

type MyTask struct {
  key string `json:"key"`
	 name string `json:"name"`
}

func (t MyTask) GetUniKey() MyTask {return MyTask{key: t.key}}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL