Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetProvider ¶
func SetProvider(metricsProvider MetricsProvider)
SetProvider sets the metrics provider for all subsequently created work queues. Only the first call has an effect.
Types ¶
type CounterMetric ¶
type CounterMetric interface {
Inc()
}
CounterMetric represents a single numerical value that only ever goes up.
type GaugeMetric ¶
type GaugeMetric interface {
Inc()
Dec()
}
GaugeMetric represents a single numerical value that can arbitrarily go up and down.
type HistogramMetric ¶
type HistogramMetric interface {
Observe(float64)
}
HistogramMetric counts individual observations.
type MetricsProvider ¶
type MetricsProvider interface {
NewDepthMetric(name string) GaugeMetric
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) HistogramMetric
NewWorkDurationMetric(name string) HistogramMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
}
MetricsProvider generates various metrics used by the queue.
type Queue ¶
type Queue[T comparable] interface { // Touch can be hooked when an existing item is added again. This may be // useful if the implementation allows priority change for the given item. Touch(item T) // Push adds a new item. Push(item T) // Len tells the total number of items. Len() int // Pop retrieves an item. Pop() (item T) }
Queue is the underlying storage for items. The functions below are always called from the same goroutine.
func DefaultQueue ¶
func DefaultQueue[T comparable]() Queue[T]
DefaultQueue is a slice based FIFO queue.
type SettableGaugeMetric ¶
type SettableGaugeMetric interface {
Set(float64)
}
SettableGaugeMetric represents a single numerical value that can arbitrarily go up and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SummaryMetric ¶
type SummaryMetric interface {
Observe(float64)
}
SummaryMetric captures individual observations.
type Typed ¶
type Typed[t comparable] struct { // contains filtered or unexported fields }
func NewTyped ¶
func NewTyped[T comparable]() *Typed[T]
NewTyped constructs a new work queue (see the package comment).
func NewTypedWithConfig ¶
func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T]
NewTypedWithConfig constructs a new workqueue with ability to customize different properties.
func (*Typed[T]) Done ¶
func (q *Typed[T]) Done(item T)
Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.
func (*Typed[T]) Get ¶
Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.
func (*Typed[T]) Len ¶
Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.
func (*Typed[T]) ShutDown ¶
func (q *Typed[T]) ShutDown()
ShutDown will cause q to ignore all new items added to it and immediately instruct the worker goroutines to exit.
func (*Typed[T]) ShutDownWithDrain ¶
func (q *Typed[T]) ShutDownWithDrain()
ShutDownWithDrain will cause q to ignore all new items added to it. As soon as the worker goroutines have "drained", i.e: finished processing and called Done on all existing items in the queue; they will be instructed to exit and ShutDownWithDrain will return. Hence: a strict requirement for using this is; your workers must ensure that Done is called on all items in the queue once the shut down has been initiated, if that is not the case: this will block indefinitely. It is, however, safe to call ShutDown after having called ShutDownWithDrain, as to force the queue shut down to terminate immediately without waiting for the drainage.
func (*Typed[T]) ShuttingDown ¶
type TypedInterface ¶
type TypedInterface[T comparable] interface { // check if the item is in the queue Exist(item T) bool // repeated additions will be intercepted. // If it has not been Get, the Queue.Touch method will be called. You can adjust the priority in turn. Add(item T) (exist bool) Len() int Get() (item T, shutdown bool) Done(item T) ShutDown() ShutDownWithDrain() ShuttingDown() bool }
TypedInterface warp a queue to supports advanced queue functions such as concurrency control and Remove duplicates judgment
type TypedQueueConfig ¶
type TypedQueueConfig[T comparable] struct { // Name for the queue. If unnamed, the metrics will not be registered. Name string // MetricsProvider optionally allows specifying a metrics provider to use for the queue // instead of the global provider. MetricsProvider MetricsProvider // Clock ability to inject real or fake clock for testing purposes. Clock clock.WithTicker // Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue. Queue Queue[T] }
type UniKey ¶
type UniKey[T comparable] interface { GetUniKey() T }
UniKeyable is an interface of TypedInterface'Item through which you can customize the comparison logic of elements. Note that this T and TypedInterface T need to be of the same type, and don't use pointer types. for this way, we can handle the situation where some fields in the structure are used as keys. eg.
type MyTask struct {
key string `json:"key"`
name string `json:"name"`
}
func (t MyTask) GetUniKey() MyTask {return MyTask{key: t.key}}