queue

package module
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2024 License: MIT Imports: 15 Imported by: 3

README

Queue

A queue is a wrapper over Go channels that has the following features:

  • balanced
  • leaky
  • retryable
  • scheduled
  • delayed execution
  • deadline-aware
  • prioretizable
  • covered with metrics
  • logged

queue was developed in response to a need to create a lot of queues with the same structure and functionality. Create identical channels with only different workers, cover them with metrics was too boring and as result this solution was born.

queue allows to abstract from the implementation of the channel and producers and focus only on worker implementation. It's enough to write a worker that implements Worker, bind it to the config of the queue, and it will do all work itself.

queue isn't a classic queue with Enqueue/Dequeue methods. This implementation hasn't Dequeue method due to queue extracts items itself and forward them to any of active workers. That implementation is similar to thread pool template, but queue go beyond frames of this template.

Queue initializes using Config. It has only two necessary settings - Capacity and Worker. Worker must implement Worker interface. Worker can only process the item and return error if occurred. Workers count can be setup using Workers param. If you want dynamic workers count, i.e. balanced queue, count should be setup using WorkersMin/WorkersMax params (see balanced queue chapter).

As result queue will work as classic thread pool with static workers count.

Let's see how enables and works other features of the queue.

Balanced queue

Usually all queues has variadic load - maximum at the day, minimum at the night. The maximum number of workers must be active independent of current load. It isn't a big problem due to goroutines is cheap, but solving the problem of balancing count of workers depending on load was too interesting and therefore this feature was implemented.

Balancing enables by setting up params WorkersMin and WorkersMax (WorkersMin must be less that WorkersMax). Setting up this params disables Workers param, i.e. balancing feature has high priority than static workers count.

WorkersMin limits low count of active workers. Independent of conditions the queue will keep that number of workers active.

WorkersMax limits maximum count of active workers. Queue wouldn't run more workers than WorkersMax. Even if WorkersMax workers isn't enough, then leaky feature may help (see chapter Leaky queue).

All workers in range WorkersMin - WorkersMax may have three state:

  • active - worker works and processes the items.
  • sleep - worker is active, but does nothing due to queue hasn't enough items to process. This state isn't permanent - after waiting SleepInterval worker become idle.
  • idle - worker (goroutine) stops and release resources. By need queue may make idle worker to active (run goroutine).

Queue makes a decision to run new worker when rate became greather than WakeupFactor [0..0.999999].

Eg: let's imagine queue with capacity 100 and WakeupFactor 0.5. If queue size will greater than 50, the queue will run new worker. If new worker wouldn't help to reduce size, queue will start another one till rate become less than WakeupFactor or WorkersMax limit reached.

Let's imagine next that queue's load reduces and count of active workers became redundant. In that case queue will check SleepFactor [0..0.999999]. If queue's rate become less that SleepFactor one of active workers will force to sleep state. Next check another on will sleep, if condition (rate < SleepFactor) keep true - till rate will greater that SleepFactor or WorkersMin limit reaches. Sleeping worker will not sleep forever. After waiting SleepInterval his goroutine will stop and status become idle. Sleeping state is required for queues with often variadic load. Permanent goroutine running/stopping triggers runtime.findrunnable function. SleepInterval helps amortize that load.

Queue in balancing mode permanent balances workers count so that queue's rate is between SleepFactor and WakeupFactor.

Leaky queue

Let's imagine a queue with so huge load, that even WorkersMax active can't process the items in time. The queue blocks all threads calling Enqueue, that may produces deadlock or reduce performance.

For solving this problem was implemented DLQ (dead letter queue) - an auxiliary component, implements Enqueuer interface. Thus, you may forward leaked items to another queue or even make a chain of queues.

Setting up param DLQ in config enables "leaky" feature of the queue. It based on "leaky bucket algorithm". It described in Effective Go as "leaky buffer".

Package contains builtin Dummy DLQ implementation. It just throws leaked items to the trash - you will lose some items, but will keep queue and application alive. However, there are dlqdump solution, that may dump leaked items to some storage (eg: disk). See package description for details.

Final note of leaky queue: there is config flag FailToDLQ. If worker reports that item processing fails, the item will forward to DLQ, even if queue isn't leaked at the moment. It may be helpful for to make fallback method of item processing.

Retryable

One attempt of item processing may be not enough. For example, queue must send HTTP request and sending in worker fails due to network problem and makes sense to try again. Param MaxRetries indicates how many repeated attempts worker can take. The first attempt of processing isn't a retry. All next attempts interpreted as retry.

This param may work together with FailToDLQ param. Item will send to DLQ if all repeated attempts fails.

Scheduled queue

Let's imagine we know periodicity of growing/reducing of queue load. For example, from 8:00 AM till 12:00 AM and from 04:00 PM till 06:00 PM the load is moderate and 5 workers is enough. From 12:00 AM till 04:00 PM the load is maximal and minimum 10 workers must be active. And at night the load is lowes and one worker is enough. For that cases was implemented scheduler of queue params. It allows to set more optimal values of the following params for certain periods of time:

  • WorkersMin
  • WorkersMax
  • WakeupFactor
  • SleepFactor

These params replaces corresponding config's value in the given period.

For above example, the scheduler initialization look the following:

sched := NewSchedule()
sched.AddRange("08:00-12:00", ScheduleParams{WorkersMin: 5, WorkersMax: 10})
sched.AddRange("12:00-16:00", ScheduleParams{WorkersMin: 10, WorkersMax: 20})
sched.AddRange("16:00-18:00", ScheduleParams{WorkersMin: 5, WorkersMax: 10})
config := Config{
	...
	WorkersMin: 1,
	WorkersMax: 4,
	Schedule: sched,
	...
}

This config will balance queue for periods:

  • from 5 to 10 active workers in period 8:00 AM - 12:00 AM
  • from 10 to 20 active workers in period 12:00 AM - 04:00 PM
  • from 5 to 10 active workers in period 04:00 PM - 06:00 PM
  • from 1 to 4 active workers in the rest of time

The reason of this feature development is balances simplification in hot periods.

Delayed execution queue (DEQ)

If queue must process item not immediately after enqueue, but after a period you may use param DelayInterval. Setting this param enables DEQ feature and guarantees that item will process after at least DelayInterval period.

This param is opposite to DeadlineInterval.

Deadline-aware queue (DAQ)

In high-loaded queues the delivery of item to worker may take so much time that processing loss the meaning. The param DeadlineInterval may help in that case. If item acquires by worker, but DeadlineInterval (since enqueue) already passed, the item will not process.

This params may work together with DeadlineToDLQ flag.

This param is opposite to DelayInterval.

Prioretizable queue

By default, queue works as FIFO stack. It works good while queue gets items with the same priority. But if queue receives items of both types - priority and non-priority sooner or later will happen the case when queue will have non-priority elements in the head and priority in the tail. Priority items may become outdated when they are turn and their processing will lose maindness. In computer networks this problem was solved a long time ago and solution calls Quality of Service (QoS).

The config has param QoS with type qos.Config. Setting up this param makes the queue prioretizable. See configuration details in readme.

Metrics coverage

Config has a param calls MetricsWriter that must implement MetricsWriter interface.

There are two implementation of the interface:

The first is useless in production and may be need only for debugging purposes. Second one is totally tested and works well. You may write your own implementation of MetricsWriter for any required TSDB.

Builtin workers

queue has three helper workers:

  • transit just forwards the item to another queue.
  • chain joins several workers to one. The item will synchronously processed by all "child" workers. You may, for example, build a chain of workers and finish it with transit worker.
  • async_chain also joins workers into one, but item will process asynchronously by "child" workers.

Logging

queue may report about internal events (calibration(balancing), closing, worker signals, ...) for debugging purposes. There is param Logger in config that must implement Logger interface.

Showcase

During development the biggest problem was a covering with tests. Due to impossibility of unit-testing the demo showcase project was developed, where were tested different scenarios of queue configs. The project has Docker-container, including Grafana, Prometheus and queue daemon. The project controls using HTTP requests, see readme.

Typical sets of requests https://github.com/koykov/demo/tree/master/queue/request.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoConfig    = errors.New("no config provided")
	ErrNoCapacity  = errors.New("capacity must be greater than zero")
	ErrNoWorker    = errors.New("no worker provided")
	ErrNoWorkers   = errors.New("no workers available")
	ErrNoQueue     = errors.New("no queue provided")
	ErrQueueClosed = errors.New("queue closed")

	ErrSchedMinGtMax = errors.New("min workers greater than max")
	ErrSchedZeroMax  = errors.New("max workers must be greater than 0")
	ErrSchedBadRange = errors.New("schedule range has bad format")
	ErrSchedBadTime  = errors.New("bad time provided")
	ErrSchedBadHour  = errors.New("hour outside range 0..23")
	ErrSchedBadMin   = errors.New("minute outside range 0..59")
	ErrSchedBadSec   = errors.New("second outside range 0..59")
	ErrSchedBadMsec  = errors.New("millisecond outside range 0..999")
)

Functions

This section is empty.

Types

type Clock

type Clock interface {
	Now() time.Time
}

Clock represents clock interface to get current time.

type Config

type Config struct {
	// Queue capacity.
	// Mandatory param if QoS config omitted. QoS (if provided) summing capacity will overwrite this field.
	Capacity uint64
	// Streams allows to avoid mutex starvation by sharing items among Streams sub-channels instead of one singe
	// channel.
	Streams uint32
	// MaxRetries determines the maximum number of item processing retries.
	// If MaxRetries is exceeded, the item will send to DLQ (if possible).
	// The initial attempt is not counted as a retry.
	MaxRetries uint32
	// Simultaneous enqueue operation limit to start force calibration.
	// Works only on balanced queues.
	// If this param omit defaultForceCalibrationLimit (1000) will use instead.
	ForceCalibrationLimit uint32
	// Workers number.
	// Setting this param disables balancing feature. If you want to have balancing use params WorkersMin and WorkersMax
	// instead.
	Workers uint32
	// HeartbeatInterval rate interval. Need to perform service operation like queue calibration, workers handling, etc.
	// Setting this param too big (greater than 1 second) is counterproductive - the queue will rarely calibrate and
	// result may be insufficient good.
	// If this param omit defaultHeartbeatInterval (1 second) will use instead.
	HeartbeatInterval time.Duration

	// QoS scheduling settings.
	// If this param omit FIFO queue will init by default.
	// See qos/config.go
	QoS *qos.Config

	// Minimum workers number.
	// Setting this param less than WorkersMax enables balancing feature.
	WorkersMin uint32
	// Maximum workers number.
	// Setting this param greater than WorkersMin enables balancing feature.
	WorkersMax uint32
	// Worker wake up factor in dependency of queue fullness rate.
	// When queue fullness rate will exceed that factor, then first available slept worker will wake.
	// WakeupFactor must be in range [0..0.999999].
	// If this param omit defaultWakeupFactor (0.75) will use instead.
	WakeupFactor float32
	// Worker sleep factor in dependency of queue fullness rate.
	// When queue fullness rate will less than  that factor, one of active workers will put to sleep.
	// SleepFactor must be in range [0..0.999999].
	// If this param omit defaultSleepFactor (0.5) will use instead.
	SleepFactor float32
	// Limit of workers could send to sleep at once.
	// If this param omit the half of available workers will send to sleep at calibration.
	SleepThreshold uint32
	// How long slept worker will wait until stop.
	// If this param omit defaultSleepInterval (5 seconds) will use instead.
	SleepInterval time.Duration

	// Schedule contains base params (like workers min/max and factors) for specific time ranges.
	// See schedule.go for usage examples.
	Schedule *Schedule

	// Worker represents queue worker.
	// Mandatory param.
	Worker Worker
	// Dead letter queue to catch leaky items.
	// Setting this param enables leaky feature.
	DLQ Enqueuer
	// Put failed items to DLQ.
	// Better to use together with MaxRetries. After all processing attempts item will send to DLQ.
	FailToDLQ bool
	// Put denied by deadline items to DLQ.
	DeadlineToDLQ bool
	// LeakDirection indicates queue side to leak items (rear or front).
	LeakDirection LeakDirection
	// FrontLeakAttempts indicates how many times queue may be shifted to free up space for new rear item.
	// On limit overflow rear direction will use by fallback.
	// Low values required.
	// If this param omit defaultFrontLeakAttempts (5) will use instead.
	FrontLeakAttempts uint32

	// DelayInterval between item enqueue and processing.
	// Settings this param enables delayed execution (DE) feature.
	// DE guarantees that item will processed by worker after at least DelayInterval time.
	// The opposite param to DeadlineInterval.
	DelayInterval time.Duration
	// DeadlineInterval to skip useless item processing.
	// Setting this param enables Deadline-Aware Queue (DAQ) feature.
	// DAQ guarantees that item will not process if time is over when worker takes it from queue.
	// The opposite param to DelayInterval.
	DeadlineInterval time.Duration

	// Clock represents clock keeper.
	// If this param omit nativeClock will use instead (see clock.go).
	Clock Clock

	// Metrics writer handler.
	MetricsWriter MetricsWriter

	// Logger handler.
	Logger Logger
}

Config describes queue properties and behavior.

func (*Config) Copy

func (c *Config) Copy() *Config

Copy copies config instance to protect queue from changing params after start. It means that after starting queue all config modifications will have no effect.

type DummyDLQ

type DummyDLQ struct{}

DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance. It just leaks data to the trash.

func (DummyDLQ) Capacity

func (DummyDLQ) Capacity() int

func (DummyDLQ) Close

func (DummyDLQ) Close() error

func (DummyDLQ) Enqueue

func (DummyDLQ) Enqueue(_ any) error

func (DummyDLQ) Rate

func (DummyDLQ) Rate() float32

func (DummyDLQ) Size

func (DummyDLQ) Size() int

type DummyMetrics

type DummyMetrics struct{}

DummyMetrics is a stub metrics writer handler that uses by default and does nothing. Need just to reduce checks in code.

func (DummyMetrics) QueueDeadline added in v1.1.5

func (DummyMetrics) QueueDeadline()

func (DummyMetrics) QueueLeak

func (DummyMetrics) QueueLeak(_ string)

func (DummyMetrics) QueueLost

func (DummyMetrics) QueueLost()

func (DummyMetrics) QueuePull

func (DummyMetrics) QueuePull()

func (DummyMetrics) QueuePut

func (DummyMetrics) QueuePut()

func (DummyMetrics) QueueRetry

func (DummyMetrics) QueueRetry()

func (DummyMetrics) SubqLeak added in v1.1.5

func (DummyMetrics) SubqLeak(_ string)

func (DummyMetrics) SubqPull added in v1.1.5

func (DummyMetrics) SubqPull(_ string)

func (DummyMetrics) SubqPut added in v1.1.5

func (DummyMetrics) SubqPut(_ string)

func (DummyMetrics) WorkerInit

func (DummyMetrics) WorkerInit(_ uint32)

func (DummyMetrics) WorkerSetup

func (DummyMetrics) WorkerSetup(_, _, _ uint)

func (DummyMetrics) WorkerSleep

func (DummyMetrics) WorkerSleep(_ uint32)

func (DummyMetrics) WorkerStop

func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ string)

func (DummyMetrics) WorkerWait

func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration)

func (DummyMetrics) WorkerWakeup

func (DummyMetrics) WorkerWakeup(_ uint32)

type Enqueuer added in v1.1.5

type Enqueuer interface {
	// Enqueue puts item to the queue.
	Enqueue(x any) error
}

Enqueuer describes component that can enqueue items.

type Interface

type Interface interface {
	Enqueuer
	// Size return actual size of the queue.
	Size() int
	// Capacity return max size of the queue.
	Capacity() int
	// Rate returns size to capacity ratio.
	Rate() float32
	// Close gracefully stops the queue.
	Close() error
}

Interface describes queue interface.

type Job added in v1.1.5

type Job struct {
	// Item payload.
	Payload any
	// Item weight. Designed to use together with Weighted priority evaluator (see priority/weighted.go).
	Weight uint64
	// Delay time before processing.
	DelayInterval time.Duration
	// DeadlineInterval limits maximum reasonable time to process job.
	DeadlineInterval time.Duration
}

Job is a wrapper about queue item. May provide meta info.

type LeakDirection added in v1.1.3

type LeakDirection uint

LeakDirection indicates the queue side to leak.

const (
	// LeakDirectionRear is a default direction that redirects to DLQ new incoming items.
	LeakDirectionRear LeakDirection = iota
	// LeakDirectionFront takes old item from queue front and redirects it to DLQ. Thus releases space for the new
	// incoming item in the queue.
	LeakDirectionFront
)

func (LeakDirection) String added in v1.1.6

func (ld LeakDirection) String() string

type Logger

type Logger interface {
	Printf(format string, v ...any)
	Print(v ...any)
	Println(v ...any)
}

Logger is an interface of logger interface. Prints verbose messages.

type MetricsWriter

type MetricsWriter interface {
	// WorkerSetup set initial workers statuses.
	// Calls twice: on queue init and schedule's time range changes.
	WorkerSetup(active, sleep, stop uint)
	// WorkerInit registers worker's start moment.
	WorkerInit(idx uint32)
	// WorkerSleep registers when worker puts to sleep.
	WorkerSleep(idx uint32)
	// WorkerWakeup registers when slept worker resumes.
	WorkerWakeup(idx uint32)
	// WorkerWait registers how many worker waits due to delayed execution.
	WorkerWait(idx uint32, dur time.Duration)
	// WorkerStop registers when sleeping worker stops.
	WorkerStop(idx uint32, force bool, status string)
	// QueuePut registers income of new item to the queue.
	QueuePut()
	// QueuePull registers outgoing of item from the queue.
	QueuePull()
	// QueueRetry registers total amount of retries.
	QueueRetry()
	// QueueLeak registers item's leak from the full queue.
	// Param dir indicates leak direction and may be "rear" or "front".
	QueueLeak(direction string)
	// QueueDeadline registers amount of skipped processing of items due to deadline.
	QueueDeadline()
	// QueueLost registers lost items missed queue and DLQ.
	QueueLost()

	// SubqPut registers income of new item to the sub-queue.
	SubqPut(subq string)
	// SubqPull registers outgoing of item from the sub-queue.
	SubqPull(subq string)
	// SubqLeak registers item's drop from the full queue.
	SubqLeak(subq string)
}

MetricsWriter is an interface of queue metrics handler. See example of implementations https://github.com/koykov/metrics_writers/tree/master/queue.

type Queue

type Queue struct {
	bitset.Bitset
	// contains filtered or unexported fields
}

Queue is an implementation of balanced leaky queue.

The queue balances among available workers [Config.WorkersMin...Config.WorkersMax] in realtime. Queue also has leaky feature: when queue is full and new items continue to flow, then leaked items will forward to DLQ (dead letter queue). For queues with variadic daily load exists special scheduler (see schedule.go) that allow to specify variadic queue params for certain time ranges.

func New

func New(config *Config) (*Queue, error)

New makes new queue instance and initialize it according config params.

func (*Queue) Capacity

func (q *Queue) Capacity() int

Capacity return max size of the queue.

func (*Queue) Close

func (q *Queue) Close() error

Close gracefully stops the queue.

After receiving of close signal at least workersMin number of workers will work so long as queue has items. Enqueue of new items to queue will forbid.

func (*Queue) Enqueue

func (q *Queue) Enqueue(x any) error

Enqueue puts x to the queue.

func (*Queue) Error added in v1.1.5

func (q *Queue) Error() error

func (*Queue) ForceClose

func (q *Queue) ForceClose() error

ForceClose closes the queue and immediately stops all active and sleeping workers.

Remaining items in the queue will throw to the trash.

func (*Queue) Rate

func (q *Queue) Rate() float32

Rate returns size to capacity ratio.

func (*Queue) Size

func (q *Queue) Size() int

Size return actual size of the queue.

func (*Queue) String

func (q *Queue) String() string

type Schedule

type Schedule struct {
	// contains filtered or unexported fields
}

Schedule describes time ranges with specific queue params.

func NewSchedule

func NewSchedule() *Schedule

NewSchedule makes new schedule instance. Don't share it among many queues.

func (*Schedule) AddRange

func (s *Schedule) AddRange(raw string, params ScheduleParams) (err error)

AddRange registers specific params for given time range. Raw specifies time range in format `<left time>-<right time>`. Time point may be in three formats: * HH:MM * HH:MM:SS * HH:MM:SS.MSC (msc is a millisecond 0-999). All time ranges outside registered will use default params specified in config (WorkersMin, WorkersMax, WakeupFactor and SleepFactor).

func (*Schedule) Copy

func (s *Schedule) Copy() *Schedule

Copy copies schedule instance to protect queue from changing params after start. It means that after starting queue all schedule modifications will have no effect.

func (*Schedule) Get

func (s *Schedule) Get() (params ScheduleParams, schedID int)

Get returns queue params if current time hits to the one of registered ranges. Param schedID indicates which time range hits and contains -1 on miss.

func (*Schedule) Len

func (s *Schedule) Len() int

func (*Schedule) Less

func (s *Schedule) Less(i, j int) bool

func (*Schedule) String

func (s *Schedule) String() string

func (*Schedule) Swap

func (s *Schedule) Swap(i, j int)

func (*Schedule) WorkersMaxDaily

func (s *Schedule) WorkersMaxDaily() (max uint32)

WorkersMaxDaily returns maximum number of workers from registered time ranges.

type ScheduleParams

type ScheduleParams realtimeParams

ScheduleParams describes queue params for specific time range.

type Status

type Status uint32
const (
	StatusNil Status = iota
	StatusFail
	StatusActive
	StatusThrottle
	StatusClose
)

type Worker

type Worker interface {
	// Do process the item.
	Do(x any) error
}

Worker describes queue worker interface.

type WorkerStatus

type WorkerStatus uint32
const (
	WorkerStatusIdle WorkerStatus = iota
	WorkerStatusActive
	WorkerStatusSleep
)

func (WorkerStatus) String added in v1.1.6

func (s WorkerStatus) String() string

Directories

Path Synopsis
metrics
log module
prometheus module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL