Documentation
¶
Index ¶
- Variables
- func AddArgs(args ...Arg) options.Option
- func AddCallbackOnError(tasks ...*Signature) options.Option
- func AddCallbackOnSuccess(tasks ...*Signature) options.Option
- func ConvertResult(result []*Result) ([]reflect.Value, error)
- func FormatResult(values []reflect.Value) string
- func HumanReadableResults(results []reflect.Value) string
- func NewErrNonsupportType(valueType string) error
- func ReflectTaskResults(taskResults []*Result) ([]reflect.Value, error)
- func ReflectValue(valueType string, value interface{}) (reflect.Value, error)
- func SetArgs(args ...Arg) options.Option
- func SetCallbackOnError(tasks ...*Signature) options.Option
- func SetCallbackOnSuccess(tasks ...*Signature) options.Option
- func SetETATime(after *time.Time) options.Option
- func SetGroupID(id string) options.Option
- func SetIgnoreNotRegisteredTask(register bool) options.Option
- func SetMeta(meta *Meta) options.Option
- func SetMetaSafe(safe bool) options.Option
- func SetPriority(priority uint8) options.Option
- func SetRetryCount(count int) options.Option
- func SetRetryInterval(interval int) options.Option
- func SetRouter(router string) options.Option
- func SetStopTaskDeletionOnError(deleteOnErr bool) options.Option
- func SetTriggerChord(task *Signature) options.Option
- func ValidateTask(task interface{}) error
- type Arg
- type Chain
- type ErrRetryTaskLater
- type Group
- type GroupCallback
- type GroupMeta
- type Meta
- type Processor
- type Result
- type Results
- type Retrievable
- type Signature
- type State
- type Status
- func NewFailureState(task *Signature, err string) *Status
- func NewPendingState(task *Signature) *Status
- func NewReceivedState(task *Signature) *Status
- func NewRetryState(task *Signature) *Status
- func NewStartedState(task *Signature) *Status
- func NewSuccessState(task *Signature, results ...*Result) *Status
- type StringSlice
- type Task
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskMustFunc = errors.New("task type must func") ErrTaskReturnNoValue = errors.New("task return is no value") ErrTaskReturnNoErr = errors.New("task return last values is must be error") )
var (
ErrDispatching = errors.New("dispatch task err")
)
Functions ¶
func AddCallbackOnError ¶
AddCallbackOnError 追加失败后回调
func AddCallbackOnSuccess ¶
AddCallbackOnSuccess 追加成功后回调
func ConvertResult ¶
ConvertResult 将Result类型转换成reflect.Value
func FormatResult ¶
FormatResult 将reflect.Value转换为可读答案
func HumanReadableResults ¶
HumanReadableResults ...
func NewErrNonsupportType ¶
func ReflectTaskResults ¶
ReflectTaskResults ...
func ReflectValue ¶
ReflectValue converts interface{} to reflect.Value based on string type
func SetCallbackOnError ¶
SetCallbackOnError 设置失败后回调
func SetCallbackOnSuccess ¶
SetCallbackOnSuccess 设置成功后回调
func SetIgnoreNotRegisteredTask ¶
SetIgnoreNotRegisteredTask 设置任务未注册是否忽略
func SetStopTaskDeletionOnError ¶
SetStopTaskDeletionOnError 设置任务出错后是否删除
func ValidateTask ¶
func ValidateTask(task interface{}) error
Types ¶
type Arg ¶
type Arg struct {
Type string `json:"type" bson:"type"`
Key string `json:"key" bson:"key"`
Value interface{} `json:"value" bson:"value"`
}
Arg task中的参数
type ErrRetryTaskLater ¶
type ErrRetryTaskLater struct {
// contains filtered or unexported fields
}
ErrRetryTaskLater 重试错误
func NewErrRetryTaskLater ¶
func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater
NewErrRetryTaskLater 生成重试错误
func (ErrRetryTaskLater) RetryIn ¶
func (e ErrRetryTaskLater) RetryIn() time.Duration
RetryIn 返回重试时间,从现在开始到执行的间隔
type GroupCallback ¶
GroupCallback 具有回调任务的任务组
func NewGroupCallback ¶
func NewGroupCallback(group *Group, name string, callback *Signature) (*GroupCallback, error)
NewGroupCallback 创建具有回调任务的个任务组
type GroupMeta ¶
type GroupMeta struct {
ID uint `json:"-" bson:"-" gorm:"column:_id;primarykey;comment:_id"`
// GroupID 组的唯一标识
GroupID string `json:"group_id" bson:"_id" gorm:"column:id;index;comment:id"`
// 组名称
Name string `json:"name" bson:"name" gorm:"column:name;comment:组名称"`
// TaskIDs 接管的任务id
TaskIDs StringSlice `json:"task_ids" bson:"task_ids" gorm:"column:task_ids;comment:接管的任务id;type:text"`
// TriggerCompleted 是否触发完成
TriggerCompleted bool `json:"trigger_chord" bson:"trigger_chord" gorm:"column:trigger_chord;comment:是否触发完成"`
// Lock 是否锁定
Lock bool `json:"lock" gorm:"column:lock;comment:锁"`
// TTL 有效时间
TTL int64 `json:"ttl,omitempty" bson:"ttl,omitempty" gorm:"column:ttl;comment:过期时间"`
// CreateAt 创建时间
CreateAt time.Time `json:"create_at" bson:"create_at" gorm:"column:create_at;comment:创建时间"`
DeletedAt gorm.DeletedAt `json:"-" bson:"-" gorm:"index"`
}
GroupMeta 组详情
type Processor ¶
type Processor interface {
// Process 处理程序
Process(t *Signature) error
// ConsumeQueue 消费队列
ConsumeQueue() string
// PreConsumeHandler 是否进行预处理
PreConsumeHandler() bool
}
Processor 任务处理器
type Result ¶
type Result struct {
// Type 标注返回的类型
Type string `json:"type" bson:"type"`
// Value 根据type解压value
Value interface{} `json:"value" bson:"value"`
}
Result 任务返回携带的kv键值对
type Retrievable ¶
type Signature ¶
type Signature struct {
// ID 任务唯一id,要保证多实例中id唯一
ID string `json:"id" bson:"_id"`
// Name 任务名称
Name string `json:"name" bson:"name"`
// GroupID 多集群中组id
GroupID string `json:"group_id" bson:"groupID"`
// GroupTaskCount 组中任务计数
GroupTaskCount int `json:"group_task_count" bson:"group_task_count"`
// Priority 任务优先级
Priority uint8 `json:"priority" bson:"priority"`
// RetryCount 重试次数
RetryCount int `json:"retry_count" bson:"retry_count"`
// RetryInterval 重试间隔时间
RetryInterval int `json:"retry_timeout" bson:"retry_timeout"`
// StopTaskDeletionOnError 任务出错后删除
StopTaskDeletionOnError bool `json:"stop_task_deletion_on_error" bson:"stop_task_deletion_on_error"`
// IgnoreNotRegisteredTask 忽略未注册的任务
IgnoreNotRegisteredTask bool `json:"not_registered" bson:"not_registered"`
// Router 路由
Router string `json:"router" bson:"router"`
// Args 携带参数
Args []Arg `json:"args" bson:"args"`
// MetaSafe 安全的Meta
MetaSafe bool `json:"meta_safe" bson:"meta_safe"`
// Meta 携带原信息
Meta *Meta `json:"meta" bson:"meta"`
// ETA 延时任务
ETA *time.Time `json:"eta" bson:"eta"`
// CallbackChord 组任务回调
CallbackChord *Signature `json:"callback_chord" bson:"callback_chord"`
// CallbackOnSuccess 任务成功后回调
CallbackOnSuccess []*Signature `json:"callback_on_success" bson:"callback_on_success"`
// CallbackOnError 任务失败后回调
CallbackOnError []*Signature `json:"callback_on_error" bson:"callback_on_error"`
}
func CopySignature ¶
func CopySignatures ¶
func NewSignature ¶
NewSignature 创建Signature
func SignatureFromContext ¶
SignatureFromContext 获取上下文任务签名
type Status ¶
type Status struct {
ID uint `json:"-" bson:"-" gorm:"column:_id;primarykey;comment:_id"`
TaskID string `json:"task_id" bson:"_id" gorm:"column:id;index;comment:id"`
GroupID string `json:"group_id" bson:"group_id" gorm:"column:group_id;comment:组唯一标识"`
Name string `json:"name" bson:"name" gorm:"column:name;comment:组名称"`
Status State `json:"status" bson:"status" gorm:"column:status;comment:任务状态"`
TTL int64 `json:"ttl" bson:"ttl" gorm:"column:ttl;comment:过期时间"`
Error string `json:"error" bson:"error" gorm:"column:error;comment:错误"`
Results Results `json:"results" bson:"results" gorm:"column:results;comment:结果;type:text"`
CreateAt time.Time `json:"create_at" bson:"create_at" gorm:"column:create_at;comment:创建时间"`
DeletedAt gorm.DeletedAt `json:"-" bson:"-" gorm:"index"`
}
Status 任务状态
func NewFailureState ¶
NewFailureState 创建Failure状态
func NewReceivedState ¶
NewReceivedState 创建Received状态
func NewSuccessState ¶
NewSuccessState 创建Success状态
func (*Status) IsCompleted ¶
type StringSlice ¶
type StringSlice []string
func (*StringSlice) Scan ¶
func (s *StringSlice) Scan(src interface{}) error
type Task ¶
type Task struct {
// TaskFunc 执行任务的函数
TaskFunc reflect.Value
// UseContext 是否使用上下文
UseContext bool
// Context 上下文信息
Context context.Context
// Args 执行任务需要的参数
Args []reflect.Value
}
func NewTaskWithSignature ¶
NewTaskWithSignature 初始化Task通过Signature
func (*Task) TransformArgs ¶
TransformArgs 将[]Arg转化为[]reflect.Value