task

package
v1.2.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskMustFunc      = errors.New("task type must func")
	ErrTaskReturnNoValue = errors.New("task return is no value")
	ErrTaskReturnNoErr   = errors.New("task return last values is must be error")
)
View Source
var (
	ErrDispatching = errors.New("dispatch task err")
)

Functions

func AddArgs

func AddArgs(args ...Arg) options.Option

AddArgs 追加参数

func AddCallbackOnError

func AddCallbackOnError(tasks ...*Signature) options.Option

AddCallbackOnError 追加失败后回调

func AddCallbackOnSuccess

func AddCallbackOnSuccess(tasks ...*Signature) options.Option

AddCallbackOnSuccess 追加成功后回调

func ConvertResult

func ConvertResult(result []*Result) ([]reflect.Value, error)

ConvertResult 将Result类型转换成reflect.Value

func FormatResult

func FormatResult(values []reflect.Value) string

FormatResult 将reflect.Value转换为可读答案

func HumanReadableResults

func HumanReadableResults(results []reflect.Value) string

HumanReadableResults ...

func NewErrNonsupportType

func NewErrNonsupportType(valueType string) error

func ReflectTaskResults

func ReflectTaskResults(taskResults []*Result) ([]reflect.Value, error)

ReflectTaskResults ...

func ReflectValue

func ReflectValue(valueType string, value interface{}) (reflect.Value, error)

ReflectValue converts interface{} to reflect.Value based on string type

func SetArgs

func SetArgs(args ...Arg) options.Option

SetArgs 设置参数

func SetCallbackOnError

func SetCallbackOnError(tasks ...*Signature) options.Option

SetCallbackOnError 设置失败后回调

func SetCallbackOnSuccess

func SetCallbackOnSuccess(tasks ...*Signature) options.Option

SetCallbackOnSuccess 设置成功后回调

func SetETATime

func SetETATime(after *time.Time) options.Option

SetETATime 延时任务设置执行时间

func SetGroupID

func SetGroupID(id string) options.Option

SetGroupID 设置多群组中id

func SetIgnoreNotRegisteredTask

func SetIgnoreNotRegisteredTask(register bool) options.Option

SetIgnoreNotRegisteredTask 设置任务未注册是否忽略

func SetMeta

func SetMeta(meta *Meta) options.Option

SetMeta 设置Meta

func SetMetaSafe

func SetMetaSafe(safe bool) options.Option

SetMetaSafe 设置是否创建安全的meta

func SetPriority

func SetPriority(priority uint8) options.Option

SetPriority 设置任务优先级

func SetRetryCount

func SetRetryCount(count int) options.Option

SetRetryCount 设置重试次数

func SetRetryInterval

func SetRetryInterval(interval int) options.Option

SetRetryInterval 设置间隔时间

func SetRouter

func SetRouter(router string) options.Option

SetRouter 设置路由

func SetStopTaskDeletionOnError

func SetStopTaskDeletionOnError(deleteOnErr bool) options.Option

SetStopTaskDeletionOnError 设置任务出错后是否删除

func SetTriggerChord

func SetTriggerChord(task *Signature) options.Option

SetTriggerChord .

func ValidateTask

func ValidateTask(task interface{}) error

Types

type Arg

type Arg struct {
	Type  string      `json:"type" bson:"type"`
	Key   string      `json:"key" bson:"key"`
	Value interface{} `json:"value" bson:"value"`
}

Arg task中的参数

type Chain

type Chain struct {
	Name  string
	Tasks []*Signature
}

Chain 创建链式调用的任务

func NewChain

func NewChain(name string, signatures ...*Signature) (*Chain, error)

NewChain 创建链式调用任务

type ErrRetryTaskLater

type ErrRetryTaskLater struct {
	// contains filtered or unexported fields
}

ErrRetryTaskLater 重试错误

func NewErrRetryTaskLater

func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater

NewErrRetryTaskLater 生成重试错误

func (ErrRetryTaskLater) Error

func (e ErrRetryTaskLater) Error() string

Error 实现标准error接口

func (ErrRetryTaskLater) RetryIn

func (e ErrRetryTaskLater) RetryIn() time.Duration

RetryIn 返回重试时间,从现在开始到执行的间隔

type Group

type Group struct {
	Name    string
	GroupID string
	Tasks   []*Signature
}

Group 创建并行执行的任务组

func NewGroup

func NewGroup(groupID string, name string, signatures ...*Signature) (*Group, error)

NewGroup 创建并行执行的任务组

func (*Group) GetTaskIDs

func (g *Group) GetTaskIDs() []string

GetTaskIDs 获取组任务的所有ID

type GroupCallback

type GroupCallback struct {
	Name     string
	Group    *Group
	Callback *Signature
}

GroupCallback 具有回调任务的任务组

func NewGroupCallback

func NewGroupCallback(group *Group, name string, callback *Signature) (*GroupCallback, error)

NewGroupCallback 创建具有回调任务的个任务组

type GroupMeta

type GroupMeta struct {
	ID uint `json:"-" bson:"-" gorm:"column:_id;primarykey;comment:_id"`
	// GroupID 组的唯一标识
	GroupID string `json:"group_id" bson:"_id" gorm:"column:id;index;comment:id"`
	// 组名称
	Name string `json:"name" bson:"name" gorm:"column:name;comment:组名称"`
	// TaskIDs 接管的任务id
	TaskIDs StringSlice `json:"task_ids" bson:"task_ids" gorm:"column:task_ids;comment:接管的任务id;type:text"`
	// TriggerCompleted 是否触发完成
	TriggerCompleted bool `json:"trigger_chord" bson:"trigger_chord" gorm:"column:trigger_chord;comment:是否触发完成"`
	// Lock 是否锁定
	Lock bool `json:"lock" gorm:"column:lock;comment:锁"`
	// TTL 有效时间
	TTL int64 `json:"ttl,omitempty" bson:"ttl,omitempty" gorm:"column:ttl;comment:过期时间"`
	// CreateAt 创建时间
	CreateAt  time.Time      `json:"create_at" bson:"create_at" gorm:"column:create_at;comment:创建时间"`
	DeletedAt gorm.DeletedAt `json:"-" bson:"-" gorm:"index"`
}

GroupMeta 组详情

func InitGroupMeta

func InitGroupMeta(groupID string, name string, ttl int64, taskIDs ...string) *GroupMeta

type Meta

type Meta struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Meta task可以携带元信息

func NewMeta

func NewMeta(safe bool) *Meta

NewMeta 生成meta信息

func (*Meta) Get

func (m *Meta) Get(key string) (interface{}, bool)

func (*Meta) Range

func (m *Meta) Range(f func(key string, value interface{}))

func (*Meta) Set

func (m *Meta) Set(key string, value interface{})

Set 如果存在会覆盖

type Processor

type Processor interface {
	// Process 处理程序
	Process(t *Signature) error
	// ConsumeQueue 消费队列
	ConsumeQueue() string
	// PreConsumeHandler 是否进行预处理
	PreConsumeHandler() bool
}

Processor 任务处理器

type Result

type Result struct {
	// Type 标注返回的类型
	Type string `json:"type" bson:"type"`
	// Value 根据type解压value
	Value interface{} `json:"value" bson:"value"`
}

Result 任务返回携带的kv键值对

type Results

type Results []*Result

func (*Results) Scan

func (s *Results) Scan(src interface{}) error

func (Results) Value

func (s Results) Value() (driver.Value, error)

type Retrievable

type Retrievable interface {
	RetryIn() time.Duration
}

type Signature

type Signature struct {
	// ID 任务唯一id,要保证多实例中id唯一
	ID string `json:"id" bson:"_id"`
	// Name 任务名称
	Name string `json:"name" bson:"name"`
	// GroupID 多集群中组id
	GroupID string `json:"group_id" bson:"groupID"`
	// GroupTaskCount 组中任务计数
	GroupTaskCount int `json:"group_task_count" bson:"group_task_count"`
	// Priority 任务优先级
	Priority uint8 `json:"priority" bson:"priority"`
	// RetryCount 重试次数
	RetryCount int `json:"retry_count" bson:"retry_count"`
	// RetryInterval 重试间隔时间
	RetryInterval int `json:"retry_timeout" bson:"retry_timeout"`
	// StopTaskDeletionOnError 任务出错后删除
	StopTaskDeletionOnError bool `json:"stop_task_deletion_on_error" bson:"stop_task_deletion_on_error"`
	// IgnoreNotRegisteredTask 忽略未注册的任务
	IgnoreNotRegisteredTask bool `json:"not_registered" bson:"not_registered"`
	// Router 路由
	Router string `json:"router" bson:"router"`
	// Args 携带参数
	Args []Arg `json:"args" bson:"args"`
	// MetaSafe 安全的Meta
	MetaSafe bool `json:"meta_safe" bson:"meta_safe"`
	// Meta 携带原信息
	Meta *Meta `json:"meta" bson:"meta"`
	// ETA 延时任务
	ETA *time.Time `json:"eta" bson:"eta"`
	// CallbackChord 组任务回调
	CallbackChord *Signature `json:"callback_chord" bson:"callback_chord"`
	// CallbackOnSuccess 任务成功后回调
	CallbackOnSuccess []*Signature `json:"callback_on_success" bson:"callback_on_success"`
	// CallbackOnError 任务失败后回调
	CallbackOnError []*Signature `json:"callback_on_error" bson:"callback_on_error"`
}

func CopySignature

func CopySignature(signature *Signature) *Signature

func CopySignatures

func CopySignatures(signatures ...*Signature) []*Signature

func NewSignature

func NewSignature(id string, name string, options ...options.Option) *Signature

NewSignature 创建Signature

func SignatureFromContext

func SignatureFromContext(ctx context.Context) *Signature

SignatureFromContext 获取上下文任务签名

type State

type State int
const (
	// StatePending 任务初始状态
	StatePending State = iota
	// StateReceived 收到任务
	StateReceived
	// StateStarted 开始执行任务
	StateStarted
	// StateRetry 准备重试
	StateRetry
	// StateSuccess 任务成功
	StateSuccess
	// StateFailure 任务失败
	StateFailure
)

func (State) String

func (s State) String() string

type Status

type Status struct {
	ID        uint           `json:"-" bson:"-" gorm:"column:_id;primarykey;comment:_id"`
	TaskID    string         `json:"task_id" bson:"_id" gorm:"column:id;index;comment:id"`
	GroupID   string         `json:"group_id" bson:"group_id" gorm:"column:group_id;comment:组唯一标识"`
	Name      string         `json:"name" bson:"name" gorm:"column:name;comment:组名称"`
	Status    State          `json:"status" bson:"status" gorm:"column:status;comment:任务状态"`
	TTL       int64          `json:"ttl" bson:"ttl" gorm:"column:ttl;comment:过期时间"`
	Error     string         `json:"error" bson:"error" gorm:"column:error;comment:错误"`
	Results   Results        `json:"results" bson:"results" gorm:"column:results;comment:结果;type:text"`
	CreateAt  time.Time      `json:"create_at" bson:"create_at" gorm:"column:create_at;comment:创建时间"`
	DeletedAt gorm.DeletedAt `json:"-" bson:"-" gorm:"index"`
}

Status 任务状态

func NewFailureState

func NewFailureState(task *Signature, err string) *Status

NewFailureState 创建Failure状态

func NewPendingState

func NewPendingState(task *Signature) *Status

NewPendingState 创建pending状态

func NewReceivedState

func NewReceivedState(task *Signature) *Status

NewReceivedState 创建Received状态

func NewRetryState

func NewRetryState(task *Signature) *Status

NewRetryState 创建Retry状态

func NewStartedState

func NewStartedState(task *Signature) *Status

NewStartedState 创建Started状态

func NewSuccessState

func NewSuccessState(task *Signature, results ...*Result) *Status

NewSuccessState 创建Success状态

func (*Status) IsCompleted

func (t *Status) IsCompleted() bool

func (*Status) IsFailure

func (t *Status) IsFailure() bool

func (*Status) IsSuccess

func (t *Status) IsSuccess() bool

type StringSlice

type StringSlice []string

func (*StringSlice) Scan

func (s *StringSlice) Scan(src interface{}) error

func (StringSlice) Value

func (s StringSlice) Value() (driver.Value, error)

type Task

type Task struct {
	// TaskFunc 执行任务的函数
	TaskFunc reflect.Value
	// UseContext 是否使用上下文
	UseContext bool
	// Context 上下文信息
	Context context.Context
	// Args 执行任务需要的参数
	Args []reflect.Value
}

func NewTask

func NewTask(taskFunc interface{}, args []Arg) (*Task, error)

NewTask 初始化Task

func NewTaskWithSignature

func NewTaskWithSignature(taskFunc interface{}, signature *Signature) (*Task, error)

NewTaskWithSignature 初始化Task通过Signature

func (*Task) Call

func (t *Task) Call() (taskResults []*Result, err error)

Call 调用方法

func (*Task) TransformArgs

func (t *Task) TransformArgs(args []Arg) error

TransformArgs 将[]Arg转化为[]reflect.Value

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL