Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncTaskStatus ¶
type AsyncTaskStatus struct { TaskStatus TaskStatus FailedReason error Progress interface{} }
AsyncTaskStatus 异步任务状态
type Config ¶
type Config struct { TaskLimit int32 // 任务并发限制 // ScanInterval 任务调度期会使用任务容器的接口定期获取任务等待队列和执行中的任务,进行调度和更新操作。 // 如果是 db 类的 TaskContainer, 可能涉及到扫 db,可以适当配置大一点。 ScanInterval time.Duration TaskTimeout time.Duration // 任务超时时间 }
Config 配置
type Task ¶
type Task struct { TaskId string // 该任务的唯一标识id TaskPriority int // 任务优先级 TaskItem interface{} TaskStartTime time.Time TaskStatus TaskStatus }
Task 通用的任务结构
type TaskActuator ¶
type TaskActuator interface { // Init 任务在被加入调度系统前的初始化工作 Init(ctx context.Context, task *Task) (newTask *Task, err error) // Start 开始执行任务,不要阻塞该方法,如果是同步任务,在单独的携程执行,执行器在内存中维护任务状态,转成异步任务, // 通过 GetAsyncTaskStatus 返回任务状态 // ignoreErr 是否忽略任务调度的错误,等待恢复,如果 ignoreErr = false, Start 返回 error 任务会失败 Start(ctx context.Context, task *Task) (newTask *Task, ignoreErr bool, err error) // ExportOutput 导出任务输出,自行处理任务结果 ExportOutput(ctx context.Context, task *Task) error // GetOutput 获取任务数据,调度框架不会调用该接口,提供给用户自由选择是否实现 GetOutput(ctx context.Context, task *Task) (data interface{}, err error) // Stop 停止任务 Stop(ctx context.Context, task *Task) error // GetTaskStatus 获取异步执行中的任务的状态 GetAsyncTaskStatus(ctx context.Context, tasks []Task) (status []AsyncTaskStatus, err error) // Delete 删除任务 Delete(ctx context.Context, task *Task) error }
TaskActuator 任务执行器接口
type TaskContainer ¶
type TaskContainer interface { // AddTask 向容器添加任务 AddTask(ctx context.Context, task Task) (err error) // GetRunningTask 获取所有运行中的任务 GetRunningTask(ctx context.Context) (tasks []Task, err error) // GetRunningTaskCount 获取运行中的任务数 GetRunningTaskCount(ctx context.Context) (count int32, err error) // GetWaitingTask 获取等待运行中的任务 GetWaitingTask(ctx context.Context, limit int32) (tasks []Task, err error) // ToRunningStatus 转移到运行中的状态 ToRunningStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToStopStatus 转移到停止状态 ToStopStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToDeleteStatus 转移到删除状态 ToDeleteStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToFailedStatus 转移到失败状态 ToFailedStatus(ctx context.Context, task *Task, reason error) (newTask *Task, err error) // ToExportStatus 转移到结果导出状态 ToExportStatus(ctx context.Context, task *Task) (newTask *Task, err error) // ToSuccessStatus 转移到执行成功状态 ToSuccessStatus(ctx context.Context, task *Task) (newTask *Task, err error) // UpdateRunningTaskStatus 更新执行中的任务执行进度状态 UpdateRunningTaskStatus(ctx context.Context, task *Task, status AsyncTaskStatus) error }
TaskContainer 抽象的任务容器,需要开发者可以选择使用已有的任务容器,也可以根据实际业务实现自己的任务容器接口
type TaskScheduler ¶
type TaskScheduler struct { // Container 配置的任务容器 Container TaskContainer // Actuator 配置的任务执行器 Actuator TaskActuator // contains filtered or unexported fields }
TaskScheduler 任务调度器,通过对任务容器和任务执行器的操作,实现任务调度
func MakeNewScheduler ¶
func MakeNewScheduler( ctx context.Context, container TaskContainer, actuator TaskActuator, config Config) *TaskScheduler
MakeNewScheduler 新建任务调度器
type TaskStatus ¶
type TaskStatus int32
TaskStatus 任务状态
const ( TASK_STATUS_INVALID TaskStatus = 0 TASK_STATUS_UNSTART TaskStatus = 1 TASK_STATUS_WAITING TaskStatus = 2 TASK_STATUS_RUNNING TaskStatus = 3 TASK_STATUS_SUCCESS TaskStatus = 4 TASK_STATUS_FAILED TaskStatus = 5 TASK_STATUS_STOPED TaskStatus = 6 TASK_STATUS_DELETE TaskStatus = 7 TASK_STATUS_EXPORTING TaskStatus = 8 )
Click to show internal directories.
Click to hide internal directories.