Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncTaskStatus ¶
type AsyncTaskStatus struct { TaskStatus TaskStatus FailedReason error Progress interface{} }
AsyncTaskStatus 异步任务状态
type CallbackReceiver ¶ added in v1.1.1
CallbackReceiver 任务状态回调接收器
type Config ¶
type Config struct { // 任务执行超时时间,超过该时间后,任务将被强制结束,并且视为任务失败 TaskTimeout time.Duration // 任务并发限制 TaskLimit int32 // 任务失败最大尝试次数 MaxFailedAttempts int32 // 任务调度固定使用轮询,会定期使用任务容器的接口获取执行中的任务数和任务等待队列中的任务 // SchedulingPollInterval 用来配置该定期轮询的时间周期 // 根据任务容器配置合理的值,比如 db 任务容器,配置合理的轮询间隔,避免对 db 压力过大 SchedulingPollInterval time.Duration // 任务状态维护默认使用轮询,定期通过执行器接口获取执行中的任务状态,然后对任务状态进行更新 // 如果 DisableStatePoll 设置为 true,将关闭轮询状态维护, // 但是必须要开启 EnableStateCallback 通过回调的方式通知任务状态 DisableStatePoll bool // 在 DisableStatePoll 为 false 的情况下生效,任务状态维护轮询的时间周期 // 根据任务容器配置合理的值,比如 db 任务容器,配置合理的轮询间隔,避免对 db 压力过大 StatePollInterval time.Duration // 是否开启任务状态回调,回调比轮询对任务状态维护具有更地的延时,能够及时更新完成的任务 // 如果有条件,建议同时开始轮询和回调,回调可以更早的感知任务结束, // 轮询可以为任务回调失败或者丢失兜底,保证任务状态一定可以更新 // 单回调模式,无法对任务进行超时感知处理 EnableStateCallback bool // CallbackReceiver 任务回调接收器 // 如果 EnableStateCallback 为 true 开启任务状态回调,必须要要配置任务回调接收器 CallbackReceiver CallbackReceiver // 是否需要调度器返回已完成的任务 // 如果为 true,需要通过 // for finishedTask := range TaskScheduler.FinshedTasks() { // ... // } // 及时取走 channel 中的数据,否则可能造成 channel 满了,任务调度阻塞 // 默认不开启,为 false EnableFinshedTaskList bool }
Config 配置
type Task ¶
type Task struct { // 该任务的唯一标识id,创建任务的时候赋予 TaskId string // 任务优先级, 创建任务的时候可选 TaskPriority int // 任务对象,创建任务的时候赋予 TaskItem interface{} TaskStartTime time.Time // 框架赋予值 TaskEnbTime time.Time // 框架赋予值 // 任务状态,任务容器负责赋予值 TaskStatus TaskStatus // 任务容器负责赋予值 FailedReason string // 任务已经重试的次数,任务容器负责赋予值 TaskAttemptsTime int32 }
Task 通用的任务结构
type TaskActuator ¶
type TaskActuator interface { // Init 任务在被加入调度系统前的初始化工作 Init(ctx context.Context, task *Task) (newTask *Task, err error) // Start 开始执行任务,不要阻塞该方法,如果是同步任务,在单独的线程执行,执行器在内存中维护任务状态,转成异步任务, // 通过 GetAsyncTaskStatus 返回任务状态 // ignoreErr 是否忽略任务调度的错误,等待恢复,如果 ignoreErr = false, Start 返回 error 任务会失败 Start(ctx context.Context, task *Task) (newTask *Task, ignoreErr bool, err error) // GetOutput 从任务执行器获取任务执行的结果 GetOutput(ctx context.Context, task *Task) (data interface{}, err error) // Stop 停止任务 Stop(ctx context.Context, task *Task) error // GetAsyncTaskStatus 获取异步执行中的任务的状态 GetAsyncTaskStatus(ctx context.Context, tasks []Task) (status []AsyncTaskStatus, err error) }
TaskActuator 任务执行器接口
type TaskContainer ¶
type TaskContainer interface { // AddTask 向容器添加任务 AddTask(ctx context.Context, task Task) (err error) // GetRunningTask 获取所有运行中的任务 GetRunningTask(ctx context.Context) (tasks []Task, err error) // GetRunningTaskCount 获取运行中的任务数 GetRunningTaskCount(ctx context.Context) (count int32, err error) // GetWaitingTask 获取等待运行中的任务 GetWaitingTask(ctx context.Context, limit int32) (tasks []Task, err error) // ToRunningStatus 转移到运行中的状态 ToRunningStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToStopStatus 转移到停止状态 ToStopStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToDeleteStatus 转移到删除状态 ToDeleteStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToFailedStatus 转移到失败状态 ToFailedStatus(ctx context.Context, task *Task, reason error) (newTask *Task, err error) // ToExportStatus 转移到结果导出状态 ToExportStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToSuccessStatus 转移到执行成功状态 ToSuccessStatus(ctx context.Context, task *Task) (newTask *Task, err error) // UpdateRunningTaskStatus 更新执行中的任务执行进度状态 UpdateRunningTaskStatus(ctx context.Context, task *Task, status AsyncTaskStatus) error }
TaskContainer 抽象的任务容器,需要开发者可以选择使用已有的任务容器,也可以根据实际业务实现自己的任务容器接口
type TaskScheduler ¶
type TaskScheduler struct { // Container 配置的任务容器 Container TaskContainer // Actuator 配置的任务执行器 Actuator TaskActuator // Persistencer 数据持久化 Persistencer TaskdataPersistencer // contains filtered or unexported fields }
TaskScheduler 任务调度器,通过对任务容器和任务执行器的操作,实现任务调度
func MakeScheduler ¶ added in v1.1.1
func MakeScheduler( container TaskContainer, actuator TaskActuator, persistencer TaskdataPersistencer, config Config) (*TaskScheduler, error)
MakeScheduler 新建任务调度器 如果不需要对任务数据此久化,persistencer 可以设置为 nil 调度器构建以后,自动开始任务调度
func (*TaskScheduler) AddTask ¶
func (s *TaskScheduler) AddTask(ctx context.Context, task Task) error
AddTask 添加一个任务,需要把任务转换成 lighttaskscheduler.Task 的通用形式 注意一定要配置一个唯一的任务 id 标识
func (*TaskScheduler) FinshedTasks ¶ added in v1.1.1
func (s *TaskScheduler) FinshedTasks() chan *Task
FinshedTasks 返回的完成的任务的 channel
type TaskStatus ¶
type TaskStatus int32
TaskStatus 任务状态
const ( TASK_STATUS_INVALID TaskStatus = 0 TASK_STATUS_UNSTART TaskStatus = 1 TASK_STATUS_WAITING TaskStatus = 2 TASK_STATUS_RUNNING TaskStatus = 3 TASK_STATUS_SUCCESS TaskStatus = 4 TASK_STATUS_FAILED TaskStatus = 5 TASK_STATUS_STOPED TaskStatus = 6 TASK_STATUS_DELETE TaskStatus = 7 TASK_STATUS_EXPORTING TaskStatus = 8 )
type TaskdataPersistencer ¶ added in v1.1.1
type TaskdataPersistencer interface { // DataPersistence 定义如何把通过执行器 GetOutput 的数据进行此久化存储 // data 的协议保持和 TaskActuator.GetOutput 一样 DataPersistence(ctx context.Context, task *Task, data interface{}) (err error) // GetPersistenceData 查询任务持久化结果 GetPersistenceData(ctx context.Context, task *Task) (data interface{}, err error) // DeletePersistenceData 删除任务的此久化结果 DeletePersistenceData(ctx context.Context, task *Task) (err error) }
TaskdataPersistencer 任务数据的可持久化接口
Source Files
¶
Click to show internal directories.
Click to hide internal directories.