delayedqueue

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2021 License: MIT Imports: 8 Imported by: 0

README

Go DelayedQueue

Go Reference Actions Status codecov

Delayed queue with Redis implementation

ref: https://redislabs.com/ebook/part-2-core-concepts/chapter-6-application-components-in-redis/6-4-task-queues/6-4-2-delayed-tasks/

Basic concept:

  1. User redis sorted set as the delayed queue
  2. When enqueuing data to the delayed queue, we use execution timestamp as the score, e.g. use 1620199248 as the score (ZSet)
  3. When de-queuing data, we use ZRangeByScore and ZRem with current timestamp to check if there is data.
  4. When there is nothing in the queue, sleep for a period.
Workflow Chart:
+----------+           +--------------+                   +--------------+
|          |           |              |                   |              |
|  Caller  |           | DelayedQueue |                   | WorkerQueue  |
|          |           | Consumer     +----------------+  | Consumers    |
|          |           |              |                |  |              |
+----+-----+           +---+-^--------+                |  +-----+-^------+
     |                     | |                         |        | |
     |1.Add jobs with      | |2.Consume and add jobs   |        | |3.Consume and process the jobs
     | a delayed time      | | to working queue        |        | |
     |                     | |                         |        | |
+----v---------------------v-+-----------------------+ |  +-----v-+-----------------------------------+
|                                                    | |  |                                           |
|  DelayedQueue(Redis Sorted Set)                    | +-->  WorkingQueue(Redis List)[work_q]         |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
| | score     | member                             | |    | | member                             |    |
| |(timestamp)|[queueName,funcName,{"arg1":"1"}]   | |    | |[queueName,funcName,{"arg1":"1"}]   |    |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
| |1620299103 |["work_q","CallAPI","{"arg1":"1"}"] | |    | |["work_q","CallAPI","{"arg1":"1"}"] |    |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
| |1620100100 | ...                                | |    | | ...                                |    |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
|                                                    |    |                                           |
+----------------------------------------------------+    +-------------------------------------------+

Chart created by: https://asciiflow.com/

Example is in example folder.

Documentation

Index

Constants

View Source
const (

	// DefaultMaxPopCount is the number of pulling items from Redis worker queue on each time
	DefaultMaxPopCount = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DelayedItem

type DelayedItem struct {
	ExecuteAt int64 // ExecuteAt is the time-to-run of this item in UnixNano
	WorkingItem
}

DelayedItem see WorkingItem

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the delayed queue service

func New

func New(client *redis.Client, options ...ServiceOption) *Service

New initializes the delayed queue

func (*Service) Close

func (s *Service) Close() error

Close closes Service gracefully

func (*Service) PutDelayedItems

func (s *Service) PutDelayedItems(ctx context.Context, delayedItems ...*DelayedItem) error

PutDelayedItems adds the given DelayedItem the the delayed queue Note: It will return error and will not add all of the data if there is any error happened when marshaling the items.

func (*Service) RegisterWorkingAction

func (s *Service) RegisterWorkingAction(name string, action func(string) error)

RegisterWorkingAction registers the working actions

func (*Service) RunBackgroundLoop

func (s *Service) RunBackgroundLoop()

RunBackgroundLoop runs goroutines (once) to process the delayed queue and the working queue

type ServiceOption

type ServiceOption func(s *Service)

ServiceOption allows to customize the service's options

func WithCustomZapLogger

func WithCustomZapLogger(logger *zap.Logger) ServiceOption

WithCustomZapLogger allows to use custom zap logger

func WithPollingCount

func WithPollingCount(count int) ServiceOption

WithPollingCount overwrites the polling count of delayed queue

func WithPollingInterval

func WithPollingInterval(interval time.Duration) ServiceOption

WithPollingInterval overwrites the polling interval of delayed queue

func WithRedisKeyDelayedQueue

func WithRedisKeyDelayedQueue(name string) ServiceOption

WithRedisKeyDelayedQueue overwrites the redis key of delayed queue

func WithRedisKeyWorkingQueue

func WithRedisKeyWorkingQueue(name string) ServiceOption

WithRedisKeyWorkingQueue overwrites the redis key of working queue

func WithWorkerCount

func WithWorkerCount(workerCount int) ServiceOption

WithWorkerCount overwrites the worker count (the number of goroutines which poll items from working queue)

type WorkingItem

type WorkingItem struct {
	QueueName   string // QueueName is the name of the working queue which this item will be enqueued
	FuncName    string // FuncName indicates the func name which this item will be executed with
	ArgsJSONStr string // ArgsJSONStr is the args data in JSON format
}

WorkingItem [queueName, funcName, {"arg1":"1"}] Reasons why we use custom json Marshaller:

  1. Save space: `[queueName, funcName, {"arg1":"1"}]` v.s. `{"queue_name":"working_queue","funcName":"sendData","args":"{\"arg1\":"1"}"}`
  2. Easy to define the stored string

func (*WorkingItem) MarshalJSON

func (i *WorkingItem) MarshalJSON() ([]byte, error)

MarshalJSON converts WorkingItem to `[queueName, funcName, {"arg1":"1"}]`

func (*WorkingItem) UnmarshalJSON

func (i *WorkingItem) UnmarshalJSON(b []byte) error

UnmarshalJSON coverts `[queueName, funcName, {"arg1":"1"}]` to DelayedItem

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL