Documentation ¶
Index ¶
- Constants
- type DelayedItem
- type Service
- type ServiceOption
- func WithCustomZapLogger(logger *zap.Logger) ServiceOption
- func WithPollingCount(count int) ServiceOption
- func WithPollingInterval(interval time.Duration) ServiceOption
- func WithRedisKeyDelayedQueue(name string) ServiceOption
- func WithRedisKeyWorkingQueue(name string) ServiceOption
- func WithWorkerCount(workerCount int) ServiceOption
- type WorkingItem
Constants ¶
const (
// DefaultMaxPopCount is the number of pulling items from Redis worker queue on each time
DefaultMaxPopCount = 100
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DelayedItem ¶
type DelayedItem struct { ExecuteAt int64 // ExecuteAt is the time-to-run of this item in UnixNano WorkingItem }
DelayedItem see WorkingItem
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the delayed queue service
func New ¶
func New(client *redis.Client, options ...ServiceOption) *Service
New initializes the delayed queue
func (*Service) PutDelayedItems ¶
func (s *Service) PutDelayedItems(ctx context.Context, delayedItems ...*DelayedItem) error
PutDelayedItems adds the given DelayedItem the the delayed queue Note: It will return error and will not add all of the data if there is any error happened when marshaling the items.
func (*Service) RegisterWorkingAction ¶
RegisterWorkingAction registers the working actions
func (*Service) RunBackgroundLoop ¶
func (s *Service) RunBackgroundLoop()
RunBackgroundLoop runs goroutines (once) to process the delayed queue and the working queue
type ServiceOption ¶
type ServiceOption func(s *Service)
ServiceOption allows to customize the service's options
func WithCustomZapLogger ¶
func WithCustomZapLogger(logger *zap.Logger) ServiceOption
WithCustomZapLogger allows to use custom zap logger
func WithPollingCount ¶
func WithPollingCount(count int) ServiceOption
WithPollingCount overwrites the polling count of delayed queue
func WithPollingInterval ¶
func WithPollingInterval(interval time.Duration) ServiceOption
WithPollingInterval overwrites the polling interval of delayed queue
func WithRedisKeyDelayedQueue ¶
func WithRedisKeyDelayedQueue(name string) ServiceOption
WithRedisKeyDelayedQueue overwrites the redis key of delayed queue
func WithRedisKeyWorkingQueue ¶
func WithRedisKeyWorkingQueue(name string) ServiceOption
WithRedisKeyWorkingQueue overwrites the redis key of working queue
func WithWorkerCount ¶
func WithWorkerCount(workerCount int) ServiceOption
WithWorkerCount overwrites the worker count (the number of goroutines which poll items from working queue)
type WorkingItem ¶
type WorkingItem struct { QueueName string // QueueName is the name of the working queue which this item will be enqueued FuncName string // FuncName indicates the func name which this item will be executed with ArgsJSONStr string // ArgsJSONStr is the args data in JSON format }
WorkingItem [queueName, funcName, {"arg1":"1"}] Reasons why we use custom json Marshaller:
- Save space: `[queueName, funcName, {"arg1":"1"}]` v.s. `{"queue_name":"working_queue","funcName":"sendData","args":"{\"arg1\":"1"}"}`
- Easy to define the stored string
func (*WorkingItem) MarshalJSON ¶
func (i *WorkingItem) MarshalJSON() ([]byte, error)
MarshalJSON converts WorkingItem to `[queueName, funcName, {"arg1":"1"}]`
func (*WorkingItem) UnmarshalJSON ¶
func (i *WorkingItem) UnmarshalJSON(b []byte) error
UnmarshalJSON coverts `[queueName, funcName, {"arg1":"1"}]` to DelayedItem