Documentation
¶
Index ¶
- Variables
- type Clock
- type Config
- type DummyDLQ
- type DummyMetrics
- func (DummyMetrics) QueueDeadline()
- func (DummyMetrics) QueueLeak(_ string)
- func (DummyMetrics) QueueLost()
- func (DummyMetrics) QueuePull()
- func (DummyMetrics) QueuePut()
- func (DummyMetrics) QueueRetry()
- func (DummyMetrics) SubqLeak(_ string)
- func (DummyMetrics) SubqPull(_ string)
- func (DummyMetrics) SubqPut(_ string)
- func (DummyMetrics) WorkerInit(_ uint32)
- func (DummyMetrics) WorkerSetup(_, _, _ uint)
- func (DummyMetrics) WorkerSleep(_ uint32)
- func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ string)
- func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration)
- func (DummyMetrics) WorkerWakeup(_ uint32)
- type Enqueuer
- type Interface
- type Job
- type LeakDirection
- type Logger
- type MetricsWriter
- type Queue
- type Schedule
- func (s *Schedule) AddRange(raw string, params ScheduleParams) (err error)
- func (s *Schedule) Copy() *Schedule
- func (s *Schedule) Get() (params ScheduleParams, schedID int)
- func (s *Schedule) Len() int
- func (s *Schedule) Less(i, j int) bool
- func (s *Schedule) String() string
- func (s *Schedule) Swap(i, j int)
- func (s *Schedule) WorkersMaxDaily() (max uint32)
- type ScheduleParams
- type Status
- type Worker
- type WorkerStatus
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoConfig = errors.New("no config provided") ErrNoCapacity = errors.New("capacity must be greater than zero") ErrNoWorker = errors.New("no worker provided") ErrNoWorkers = errors.New("no workers available") ErrNoQueue = errors.New("no queue provided") ErrQueueClosed = errors.New("queue closed") ErrSchedMinGtMax = errors.New("min workers greater than max") ErrSchedZeroMax = errors.New("max workers must be greater than 0") ErrSchedBadRange = errors.New("schedule range has bad format") ErrSchedBadTime = errors.New("bad time provided") ErrSchedBadHour = errors.New("hour outside range 0..23") ErrSchedBadMin = errors.New("minute outside range 0..59") ErrSchedBadSec = errors.New("second outside range 0..59") ErrSchedBadMsec = errors.New("millisecond outside range 0..999") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Queue capacity. // Mandatory param if QoS config omitted. QoS (if provided) summing capacity will overwrite this field. Capacity uint64 // Streams allows to avoid mutex starvation by sharing items among Streams sub-channels instead of one singe // channel. Streams uint32 // MaxRetries determines the maximum number of item processing retries. // If MaxRetries is exceeded, the item will send to DLQ (if possible). // The initial attempt is not counted as a retry. MaxRetries uint32 // Simultaneous enqueue operation limit to start force calibration. // Works only on balanced queues. // If this param omit defaultForceCalibrationLimit (1000) will use instead. ForceCalibrationLimit uint32 // Workers number. // Setting this param disables balancing feature. If you want to have balancing use params WorkersMin and WorkersMax // instead. Workers uint32 // HeartbeatInterval rate interval. Need to perform service operation like queue calibration, workers handling, etc. // Setting this param too big (greater than 1 second) is counterproductive - the queue will rarely calibrate and // result may be insufficient good. // If this param omit defaultHeartbeatInterval (1 second) will use instead. HeartbeatInterval time.Duration // QoS scheduling settings. // If this param omit FIFO queue will init by default. // See qos/config.go QoS *qos.Config // Minimum workers number. // Setting this param less than WorkersMax enables balancing feature. WorkersMin uint32 // Maximum workers number. // Setting this param greater than WorkersMin enables balancing feature. WorkersMax uint32 // Worker wake up factor in dependency of queue fullness rate. // When queue fullness rate will exceed that factor, then first available slept worker will wake. // WakeupFactor must be in range [0..0.999999]. // If this param omit defaultWakeupFactor (0.75) will use instead. WakeupFactor float32 // Worker sleep factor in dependency of queue fullness rate. // When queue fullness rate will less than that factor, one of active workers will put to sleep. // SleepFactor must be in range [0..0.999999]. // If this param omit defaultSleepFactor (0.5) will use instead. SleepFactor float32 // Limit of workers could send to sleep at once. // If this param omit the half of available workers will send to sleep at calibration. SleepThreshold uint32 // How long slept worker will wait until stop. // If this param omit defaultSleepInterval (5 seconds) will use instead. SleepInterval time.Duration // Schedule contains base params (like workers min/max and factors) for specific time ranges. // See schedule.go for usage examples. Schedule *Schedule // Worker represents queue worker. // Mandatory param. Worker Worker // Dead letter queue to catch leaky items. // Setting this param enables leaky feature. DLQ Enqueuer // Put failed items to DLQ. // Better to use together with MaxRetries. After all processing attempts item will send to DLQ. FailToDLQ bool // Put denied by deadline items to DLQ. DeadlineToDLQ bool // LeakDirection indicates queue side to leak items (rear or front). LeakDirection LeakDirection // FrontLeakAttempts indicates how many times queue may be shifted to free up space for new rear item. // On limit overflow rear direction will use by fallback. // Low values required. // If this param omit defaultFrontLeakAttempts (5) will use instead. FrontLeakAttempts uint32 // DelayInterval between item enqueue and processing. // Settings this param enables delayed execution (DE) feature. // DE guarantees that item will processed by worker after at least DelayInterval time. // The opposite param to DeadlineInterval. DelayInterval time.Duration // DeadlineInterval to skip useless item processing. // Setting this param enables Deadline-Aware Queue (DAQ) feature. // DAQ guarantees that item will not process if time is over when worker takes it from queue. // The opposite param to DelayInterval. DeadlineInterval time.Duration // Clock represents clock keeper. // If this param omit nativeClock will use instead (see clock.go). Clock Clock // Metrics writer handler. MetricsWriter MetricsWriter // Logger handler. Logger Logger }
Config describes queue properties and behavior.
type DummyDLQ ¶
type DummyDLQ struct{}
DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance. It just leaks data to the trash.
type DummyMetrics ¶
type DummyMetrics struct{}
DummyMetrics is a stub metrics writer handler that uses by default and does nothing. Need just to reduce checks in code.
func (DummyMetrics) QueueDeadline ¶ added in v1.1.5
func (DummyMetrics) QueueDeadline()
func (DummyMetrics) QueueLeak ¶
func (DummyMetrics) QueueLeak(_ string)
func (DummyMetrics) QueueLost ¶
func (DummyMetrics) QueueLost()
func (DummyMetrics) QueuePull ¶
func (DummyMetrics) QueuePull()
func (DummyMetrics) QueuePut ¶
func (DummyMetrics) QueuePut()
func (DummyMetrics) QueueRetry ¶
func (DummyMetrics) QueueRetry()
func (DummyMetrics) SubqLeak ¶ added in v1.1.5
func (DummyMetrics) SubqLeak(_ string)
func (DummyMetrics) SubqPull ¶ added in v1.1.5
func (DummyMetrics) SubqPull(_ string)
func (DummyMetrics) SubqPut ¶ added in v1.1.5
func (DummyMetrics) SubqPut(_ string)
func (DummyMetrics) WorkerInit ¶
func (DummyMetrics) WorkerInit(_ uint32)
func (DummyMetrics) WorkerSetup ¶
func (DummyMetrics) WorkerSetup(_, _, _ uint)
func (DummyMetrics) WorkerSleep ¶
func (DummyMetrics) WorkerSleep(_ uint32)
func (DummyMetrics) WorkerStop ¶
func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ string)
func (DummyMetrics) WorkerWait ¶
func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration)
func (DummyMetrics) WorkerWakeup ¶
func (DummyMetrics) WorkerWakeup(_ uint32)
type Interface ¶
type Interface interface { Enqueuer // Size return actual size of the queue. Size() int // Capacity return max size of the queue. Capacity() int // Rate returns size to capacity ratio. Rate() float32 // Close gracefully stops the queue. Close() error }
Interface describes queue interface.
type Job ¶ added in v1.1.5
type Job struct { // Item payload. Payload any // Item weight. Designed to use together with Weighted priority evaluator (see priority/weighted.go). Weight uint64 // Delay time before processing. DelayInterval time.Duration // DeadlineInterval limits maximum reasonable time to process job. DeadlineInterval time.Duration }
Job is a wrapper about queue item. May provide meta info.
type LeakDirection ¶ added in v1.1.3
type LeakDirection uint
LeakDirection indicates the queue side to leak.
const ( // LeakDirectionRear is a default direction that redirects to DLQ new incoming items. LeakDirectionRear LeakDirection = iota // LeakDirectionFront takes old item from queue front and redirects it to DLQ. Thus releases space for the new // incoming item in the queue. LeakDirectionFront )
func (LeakDirection) String ¶ added in v1.1.6
func (ld LeakDirection) String() string
type MetricsWriter ¶
type MetricsWriter interface { // WorkerSetup set initial workers statuses. // Calls twice: on queue init and schedule's time range changes. WorkerSetup(active, sleep, stop uint) // WorkerInit registers worker's start moment. WorkerInit(idx uint32) // WorkerSleep registers when worker puts to sleep. WorkerSleep(idx uint32) // WorkerWakeup registers when slept worker resumes. WorkerWakeup(idx uint32) // WorkerWait registers how many worker waits due to delayed execution. WorkerWait(idx uint32, dur time.Duration) // WorkerStop registers when sleeping worker stops. WorkerStop(idx uint32, force bool, status string) // QueuePut registers income of new item to the queue. QueuePut() // QueuePull registers outgoing of item from the queue. QueuePull() // QueueRetry registers total amount of retries. QueueRetry() // QueueLeak registers item's leak from the full queue. // Param dir indicates leak direction and may be "rear" or "front". QueueLeak(direction string) // QueueDeadline registers amount of skipped processing of items due to deadline. QueueDeadline() // QueueLost registers lost items missed queue and DLQ. QueueLost() // SubqPut registers income of new item to the sub-queue. SubqPut(subq string) // SubqPull registers outgoing of item from the sub-queue. SubqPull(subq string) // SubqLeak registers item's drop from the full queue. SubqLeak(subq string) }
MetricsWriter is an interface of queue metrics handler. See example of implementations https://github.com/koykov/metrics_writers/tree/master/queue.
type Queue ¶
Queue is an implementation of balanced leaky queue.
The queue balances among available workers [Config.WorkersMin...Config.WorkersMax] in realtime. Queue also has leaky feature: when queue is full and new items continue to flow, then leaked items will forward to DLQ (dead letter queue). For queues with variadic daily load exists special scheduler (see schedule.go) that allow to specify variadic queue params for certain time ranges.
func (*Queue) Close ¶
Close gracefully stops the queue.
After receiving of close signal at least workersMin number of workers will work so long as queue has items. Enqueue of new items to queue will forbid.
func (*Queue) ForceClose ¶
ForceClose closes the queue and immediately stops all active and sleeping workers.
Remaining items in the queue will throw to the trash.
type Schedule ¶
type Schedule struct {
// contains filtered or unexported fields
}
Schedule describes time ranges with specific queue params.
func NewSchedule ¶
func NewSchedule() *Schedule
NewSchedule makes new schedule instance. Don't share it among many queues.
func (*Schedule) AddRange ¶
func (s *Schedule) AddRange(raw string, params ScheduleParams) (err error)
AddRange registers specific params for given time range. Raw specifies time range in format `<left time>-<right time>`. Time point may be in three formats: * HH:MM * HH:MM:SS * HH:MM:SS.MSC (msc is a millisecond 0-999). All time ranges outside registered will use default params specified in config (WorkersMin, WorkersMax, WakeupFactor and SleepFactor).
func (*Schedule) Copy ¶
Copy copies schedule instance to protect queue from changing params after start. It means that after starting queue all schedule modifications will have no effect.
func (*Schedule) Get ¶
func (s *Schedule) Get() (params ScheduleParams, schedID int)
Get returns queue params if current time hits to the one of registered ranges. Param schedID indicates which time range hits and contains -1 on miss.
func (*Schedule) WorkersMaxDaily ¶
WorkersMaxDaily returns maximum number of workers from registered time ranges.
type ScheduleParams ¶
type ScheduleParams realtimeParams
ScheduleParams describes queue params for specific time range.
type WorkerStatus ¶
type WorkerStatus uint32
const ( WorkerStatusIdle WorkerStatus = iota WorkerStatusActive WorkerStatusSleep )
func (WorkerStatus) String ¶ added in v1.1.6
func (s WorkerStatus) String() string