workqueue

package
v0.0.0-...-d07672d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2023 License: GPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const MaxMessageRetryCount = math.MaxInt64 - 1

Variables

View Source
var (
	QueueIsClosedError       = errors.New("queue is closed")
	QueueIsFullError         = errors.New("queue is full")
	SubmitMessageIsNullError = errors.New("submit message is null")
	WorkFuncIsNullError      = errors.New("workFunc is null")
)
View Source
var (
	NeedRetrySubmitMessageCountError = errors.New("need retry submit message count")
)
View Source
var (
	NeedRetrySubmitMessageDelayError = errors.New("need retry submit message delay")
)
View Source
var (
	NeedRetrySubmitMessageError = errors.New("need retry submit message immediately")
)

Functions

This section is empty.

Types

type BucketRateLimiter

type BucketRateLimiter = iwq.BucketRateLimiter

type DelayingInterface

type DelayingInterface = iwq.DelayingInterface

type DelayingQueue

type DelayingQueue struct {
	// contains filtered or unexported fields
}

func NewDelayingQueue

func NewDelayingQueue(queueConf *DelayingQueueConf, featureOpts *FeatureOpts, workFunc DelayingQueueWorkerFunc) (*DelayingQueue, error)

func (*DelayingQueue) Start

func (q *DelayingQueue) Start()

func (*DelayingQueue) Stop

func (q *DelayingQueue) Stop()

Close 安全关闭消息队列

func (*DelayingQueue) Submit

func (q *DelayingQueue) Submit(value interface{}) error

func (*DelayingQueue) SubmitAfter

func (q *DelayingQueue) SubmitAfter(value interface{}, delay time.Duration) error

type DelayingQueueConf

type DelayingQueueConf struct {
	QueueCap   uint32             `json:"queueCap,omitempty" yaml:"queueCap,omitempty"`     // 队列容量
	WorkersNum uint16             `json:"workersNum,omitempty" yaml:"workersNum,omitempty"` // 工作协程数量
	Logger     *zap.SugaredLogger `json:"-" yaml:"-"`
}

func NewDefaultDelayingQueueConfig

func NewDefaultDelayingQueueConfig() *DelayingQueueConf

type DelayingQueueWithPool

type DelayingQueueWithPool struct {
	DelayingQueue
	// contains filtered or unexported fields
}

func NewDelayingQueueWithPool

func NewDelayingQueueWithPool(queueConf *DelayingQueueConf, featureOpts *FeatureOpts, workFunc DelayingQueueWorkerFunc) (*DelayingQueueWithPool, error)

func (*DelayingQueueWithPool) Start

func (q *DelayingQueueWithPool) Start()

func (*DelayingQueueWithPool) Stop

func (q *DelayingQueueWithPool) Stop()

type DelayingQueueWorkerFunc

type DelayingQueueWorkerFunc func(interface{}) (error, int64)

type ExecFuncs

type ExecFuncs interface {
	Submit(interface{}) error
	SubmitAfter(interface{}, time.Duration) error
}

type FeatureOpts

type FeatureOpts struct {
	EnableMetrics bool `json:"enableMetrics,omitempty" yaml:"enableMetrics,omitempty"` // 开启 metrics 记录
}

func NewDefaultFeatureOpts

func NewDefaultFeatureOpts() *FeatureOpts

type Interface

type Interface = iwq.Interface

3种队列的接口

type ItemExponentialFailureRateLimiter

type ItemExponentialFailureRateLimiter = iwq.ItemExponentialFailureRateLimiter

type ItemFastSlowRateLimiter

type ItemFastSlowRateLimiter = iwq.ItemFastSlowRateLimiter

type MaxOfRateLimiter

type MaxOfRateLimiter = iwq.MaxOfRateLimiter

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(queueConf *QueueConf, featureOpts *FeatureOpts, workFunc QueueWorkerFunc) (*Queue, error)

func (*Queue) Start

func (q *Queue) Start()

func (*Queue) Stop

func (q *Queue) Stop()

Close 安全关闭消息队列

func (*Queue) Submit

func (q *Queue) Submit(value interface{}) error

type QueueConf

type QueueConf struct {
	QueueCap   uint32             `json:"queueCap,omitempty" yaml:"queueCap,omitempty"`     // 队列容量
	WorkersNum uint16             `json:"workersNum,omitempty" yaml:"workersNum,omitempty"` // 工作协程数量
	Logger     *zap.SugaredLogger `json:"-" yaml:"-"`
}

func NewDefaultQueueConfig

func NewDefaultQueueConfig() *QueueConf

type QueueWithPool

type QueueWithPool struct {
	Queue
	// contains filtered or unexported fields
}

func NewQueueWithPool

func NewQueueWithPool(queueConf *QueueConf, featureOpts *FeatureOpts, workFunc QueueWorkerFunc) (*QueueWithPool, error)

func (*QueueWithPool) Start

func (q *QueueWithPool) Start()

func (*QueueWithPool) Stop

func (q *QueueWithPool) Stop()

type QueueWorkerFunc

type QueueWorkerFunc func(interface{}) error

type RateLimiter

type RateLimiter = iwq.RateLimiter

ratelimiter 结构体

func DefaultBucketRateLimiter

func DefaultBucketRateLimiter() RateLimiter

func DefaultControllerRateLimiter

func DefaultControllerRateLimiter() RateLimiter

func DefaultItemBasedRateLimiter

func DefaultItemBasedRateLimiter() RateLimiter

func NewBucketRateLimiter

func NewBucketRateLimiter(limit, burst uint32) RateLimiter

func NewItemExponentialFailureRateLimiter

func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter

func NewItemFastSlowRateLimiter

func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter

func NewMaxOfRateLimiter

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter

func NewWithMaxWaitRateLimiter

func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter

type RateLimitingInterface

type RateLimitingInterface = iwq.RateLimitingInterface

type RateLimitingQueue

type RateLimitingQueue struct {
	// contains filtered or unexported fields
}

func NewRateLimitingQueue

func NewRateLimitingQueue(
	queueConf *RateLimitingQueueConf, featureOpts *FeatureOpts,
	rl RateLimiter, workFunc RateLimitingQueueWorkerFunc,
) (*RateLimitingQueue, error)

func (*RateLimitingQueue) Start

func (q *RateLimitingQueue) Start()

func (*RateLimitingQueue) Stop

func (q *RateLimitingQueue) Stop()

Close 安全关闭消息队列

func (*RateLimitingQueue) Submit

func (q *RateLimitingQueue) Submit(value interface{}) error

func (*RateLimitingQueue) SubmitAfter

func (q *RateLimitingQueue) SubmitAfter(value interface{}, delay time.Duration) error

func (*RateLimitingQueue) SubmitRateLimited

func (q *RateLimitingQueue) SubmitRateLimited(value interface{}) error

type RateLimitingQueueConf

type RateLimitingQueueConf struct {
	QueueCap   uint32             `json:"queueCap,omitempty" yaml:"queueCap,omitempty"`     // 队列容量
	WorkersNum uint16             `json:"workersNum,omitempty" yaml:"workersNum,omitempty"` // 工作协程数量
	Logger     *zap.SugaredLogger `json:"-" yaml:"-"`
}

func NewDefaultRateLimitingQueueConfig

func NewDefaultRateLimitingQueueConfig() *RateLimitingQueueConf

type RateLimitingQueueWithPool

type RateLimitingQueueWithPool struct {
	RateLimitingQueue
	// contains filtered or unexported fields
}

func NewRateLimitingQueueWithPool

func NewRateLimitingQueueWithPool(
	queueConf *RateLimitingQueueConf, featureOpts *FeatureOpts,
	rl RateLimiter, workFunc RateLimitingQueueWorkerFunc,
) (*RateLimitingQueueWithPool, error)

func (*RateLimitingQueueWithPool) Start

func (q *RateLimitingQueueWithPool) Start()

func (*RateLimitingQueueWithPool) Stop

func (q *RateLimitingQueueWithPool) Stop()

type RateLimitingQueueWorkerFunc

type RateLimitingQueueWorkerFunc func(interface{}) (error, int64)

type WithMaxWaitRateLimiter

type WithMaxWaitRateLimiter = iwq.WithMaxWaitRateLimiter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL