worker

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RunnerAliveStatusTTL = 30 // runner存活状态持续时间
	RedisTimeout         = time.Second * 3

	Prefix = "worker:"

	KeyWorkers           = Prefix + "workers" // 存储work数据
	KeyRunnerAlivePrefix = Prefix + "alive#"  // 设置runner存活状态
	KeyWaitingQueue      = Prefix + "waiting" // 等待队列
	ReadyQueueLockTerm   = 60 * time.Second   // 就绪队列锁有效期

	KeyReadyQueueLocker   = Prefix + "readyQueueLocker" // 就绪队列锁
	KeyWaitingQueueLocker = Prefix + "waitingLocker"    // 等待队列锁
	KeyWorkingCheckLocker = Prefix + "workingLocker"    // 工作空间状态检查锁

	KeyWorking                      = Prefix + "working" // 工作空间
	WorkingCheckLockerTerm          = 6 * time.Minute    // 工作空间状态检查锁超时时间
	WaitingQueueCatchMissingWaiting = 30 * time.Second   // 等待队列线程未获取锁时,等待时间
	WaitingQueueCatchEmptyWaiting   = 1 * time.Second    // 等待队列线程
	WaitingQueueLockTerm            = 60 * time.Second   // 等待队列锁有效期
	WaitingQueueCatchBatchSize      = 100                // 等待队列转移批次大小
	WaitingQueueDataIDSeparator     = "#"                // 等待队列内存储队列名称和ID,使用分隔符连接

	ReadyQueuePullBatchSize = 30 // 就绪队列请求批量大小
	NeedPullThresholdRatio  = 3  // 工作空间数量小于NeedPullThresholdRatio * Threads 时,触发请求就绪队列逻辑
)

Variables

View Source
var (
	ErrWorkerNotRegistry = errors.New("unregistry worker")

	KeyReadyQueueHigh = QueueKey(QueueHigh) // 就绪队列高优先级
	KeyReadyQueueLow  = QueueKey(QueueLow)  // 就绪队列低优先级
)
View Source
var (
	ErrLockerAlreadySet = errors.New("locker has been occupied")
)

Functions

func QueueKey

func QueueKey(queue string) string

func RedisLock

func RedisLock(cli *redis.Client, key string, ttl time.Duration) (unlocker func(), err error)

func RedisLockV

func RedisLockV(cli *redis.Client, key string, val string, ttl time.Duration) (unlocker func(), err error)

func RedisLockerE

func RedisLockerE(cli *redis.Client, key, val string, ttl time.Duration, f func() error) (bool, error)

Types

type Context

type Context interface {
	context.Context
	Meta() *Meta
}

func NewContext

func NewContext(ctx context.Context, m *Meta) Context

type ContextKey

type ContextKey string
const (
	ContextKeyMeta ContextKey = "meta"
)

type Logger

type Logger interface {
	Debugf(template string, args ...interface{})
	Infof(template string, args ...interface{})
	Warnf(template string, args ...interface{})
	Errorf(template string, args ...interface{})

	Debug(args ...interface{})
	Info(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
}

type Meta

type Meta struct {
	ID        string
	Name      string
	PerformAt *time.Time
	Retry     int
	Queue     Queue

	CreatedAt  time.Time
	RetryCount int
	Success    bool
	Error      string
	Raw        []byte
}

func NewMetaByWorker

func NewMetaByWorker(w Worker, opts ...Option) (*Meta, error)

func (*Meta) String

func (m *Meta) String() string

type Option

type Option func(c *Meta)

func WithPerformAt

func WithPerformAt(performAt time.Time) Option

set worker execute time

func WithQueue

func WithQueue(q Queue) Option

set worker Queue default low

func WithRetry

func WithRetry(retry int) Option

retry < 0 means retry count unlimits default WorkerDefaultRetryCount

type Queue

type Queue = string
const (
	QueueHigh Queue = "High"
	QueueLow  Queue = "Low"

	DefaultRetryCount = 13
)

type RedisRunner

type RedisRunner struct {
	ID string

	RegistryWorkers map[string]reflect.Type
	// contains filtered or unexported fields
}

保证消息不丢失,但是可能出现消息重复消费情况, 有需要可以业务端确保幂等消费逻辑 等待队列:延时执行的worker到等待队列,时间到以后,被转移到就绪队列 就绪队列:待执行的worker,有多个优先级(优先级调度策略,如何处理饥饿情况) 工作空间:为了保证多进程下数据安全,每个worker当前处理任务会分发到工作空间,工作空间即为分配给当前进程的任务,成功执行后才从工作空间删除 支持任务错误重试逻辑,失败任务会根据重试次数来确定重新调度时间,并发布到等待队列 任务失败次数超过重试阈值后,任务丢弃

func NewRunner

func NewRunner(redisCli *redis.Client, threads uint, logger Logger) (*RedisRunner, error)

func (*RedisRunner) Declare

func (r *RedisRunner) Declare(work Worker, opts ...Option) (*Meta, error)

Declare should used before worker Registry

func (*RedisRunner) GetQueueLen

func (r *RedisRunner) GetQueueLen(queue string) (int64, error)

func (*RedisRunner) RegistryWorker

func (r *RedisRunner) RegistryWorker(work Worker) error

worker should registry before worker loop lanch

func (*RedisRunner) Run

func (r *RedisRunner) Run(ctx context.Context) error

type RunnerStatus

type RunnerStatus struct {
	ExecCount int64
	FailCount int64
}

type Worker

type Worker interface {
	WorkerName() string
	Perform(ctx Context) error
}

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL