lighttaskscheduler

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2025 License: MIT Imports: 8 Imported by: 0

README

轻量级任务调度框架

框架设计

框架对整个任务调度的过程进行4种抽象。

  1. 任务容器——定义任务如何进行存取和状态流转。
  2. 任务执行器——定义真正执行任务的逻辑。
  3. 任务数据持久化——定义任务数据如果持久化的存储。
  4. 任务调度器——定义实现任务调度的流程。

其中,任务容器、任务执行器、数据持久化可能和业务相关、所以用一系列的接口(interface)来抽象,开发者根据自己的业务实现接口。任务调度流程比较固定,由框架实现。

image

任务调度架构如下:

image

主要分成三个主线程

  1. 调度线程:负责对等待中的任务进行限频调度。

  2. 轮询维护任务状态线程:主要负责感知运行中的任务执行情况,包含超时控制、重试、状态转移,以及执行成功后数据持久化过程。

  3. 回调处理线程:任务状态更新支持回调模式。

其中轮询维护任务状态和回调处理状态可以选择其一、或者同时打开。

整体的设计思想是通过抽象实现调度流程的共享,开发者只需要业务关注的部分。

任务容器分类

任务可以分成临时任务和持久化任务。

  1. 临时任务——比如存储在内存队列里面的任务,执行完成以后,或者服务宕机、重启以后,任务相关的数据消失。
  2. 可持久化任务——任务记录持久化存储,支持查询、修改和删除。

根据是否可持久化,继续对任务容器抽象,分成两类任务容器:

  • MemeoryContainer——内存型任务容器,优点:可以快读快写,缺点:不可持久化。MemeoryContainer 实际上是可以和业务无关的,所以框架预置了三种MemeoryContainer——queueContainer,orderedMapContainer,redisContainer

    • queueContainer:queueContainer 队列型容器,任务无状态,无优先级,先进先出,任务数据,多进程数据无法共享数据

    • orderedMapContainerOrderedMap 作为容器,支持任务优先级,多进程数据无法共享数据

    • redisContainer:redis 作为容器,支持任务优先级,并且可以多进程,多副本共享数据

  • PersistContainer——可持久化任务容器,优点:可持久化存储,缺点:依赖db、需要扫描表,对 db 压力比较大。开发者可以参考exampleSQLContainer 实现自己的 SQLContainer,修改数据表的结构。

由于 MemeoryContainer 和 PersistContainer 各有优缺点,如果可以组合两种容器,生成一种新的任务容器combinationContainer,既能够通过内存实现快写快读,又能够通过DB实现可持久化。

image

Usage

go get -u github.com/memory-overflow/light-task-scheduler
构建任务调度器
func MakeScheduler(
	container TaskContainer,
	actuator TaskActuator,
	persistencer TaskdataPersistencer,
	config Config) (*TaskScheduler, error)

通过任务容器、执行器、持久化器和任务配置构建一个任务调度器,构建完后自动开始调度。其中持久化器可以为空, 任务配置 可以配置是否需要回调,如果需要回调,需要配置一个自定义的 回调器

函数执行器

框架预制了函数执行器,借助函数执行器,可以轻松实现函数调度。

使用框架预制的队列容器和函数执行器可以轻松实现一个函数的调度。参考 a+b example

docker 执行器

框架预制了docker 执行器,借助函数执行器,可以轻松实现函数调度。

使用框架预制的队列容器和函数执行器可以轻松实现 docker 的调度。参考 docker example

Example: 使用内存容器实现视频裁剪异步任务调度

本例子演示如何用本框架实现一个持久化的任务调度系统。包含一个简单的 web 管理界面。

首先实现一个异步裁剪的微服务,一共四个接口,需要先安装ffmpeg命令

  1. /VideoCut, 输入原视频, 裁剪开始时间,结束时间,返回 taskId。
  2. /Status,输入 taskId, 返回该任务的状态,是否已经完成,或者失败。
  3. /GetOutputVideo, 如果任务已经完成,输入 TaskId,返回已经完成的视频路径结果。
  4. /Stop, 如果任务执行时间过长,可以支持停止。

服务代码参考 video_cut_service.go

现在我们通过本任务调度框架实现一个对裁剪任务进行调度系统,可以控制任务并发数,和任务超时时间。并且按照队列依次调度。

定义视频裁剪任务
// VideoCutTask 视频裁剪任务结构
type VideoCutTask struct {
	TaskId                   string
	CutStartTime, CutEndTime float32
	InputVideo               string
}
实现视频裁剪任务执行器

实现一个视频裁剪任务的执行器,执行器实际上就是调用视频裁剪微服务的 API 接口,执行器的实现参考video_cut_actuator.go

实现视频裁剪任务容器

首先实现一个 sql 任务容器,从数据库存取任务,然后和队列容器组合一起成为一个组合容器作为裁剪任务的容器

实现数据持久化

这里为了连接 db 方便,把任务容器和持久化合并到了一起。 参考代码DataPersistence

构建调度器

参考代码main.go

实现简单管理接口的 web 页面

参考代码 web.go

启动服务

执行 go run example/videocut_example/main_web/main.go 启动服务,然后在浏览器输入http://127.0.0.1:8000/html即可进入管理界面。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncTaskStatus

type AsyncTaskStatus struct {
	TaskStatus   TaskStatus
	FailedReason error
	Progress     interface{}
}

AsyncTaskStatus 异步任务状态

type CallbackReceiver added in v1.1.1

type CallbackReceiver interface {
	GetCallbackChannel(ctx context.Context) (taskChannel chan Task)
}

CallbackReceiver 任务状态回调接收器

type Config

type Config struct {
	// 任务执行超时时间,超过该时间后,任务将被强制结束,并且视为任务失败
	TaskTimeout time.Duration

	// 任务并发限制
	TaskLimit int32

	// 任务失败最大尝试次数
	MaxFailedAttempts int32

	// 任务调度固定使用轮询,会定期使用任务容器的接口获取执行中的任务数和任务等待队列中的任务
	// SchedulingPollInterval 用来配置该定期轮询的时间周期
	// 根据任务容器配置合理的值,比如 db 任务容器,配置合理的轮询间隔,避免对 db 压力过大
	SchedulingPollInterval time.Duration

	// 任务状态维护默认使用轮询,定期通过执行器接口获取执行中的任务状态,然后对任务状态进行更新
	// 如果 DisableStatePoll 设置为 true,将关闭轮询状态维护,
	// 但是必须要开启 EnableStateCallback 通过回调的方式通知任务状态
	DisableStatePoll bool

	// 在 DisableStatePoll 为 false 的情况下生效,任务状态维护轮询的时间周期
	// 根据任务容器配置合理的值,比如 db 任务容器,配置合理的轮询间隔,避免对 db 压力过大
	StatePollInterval time.Duration

	// 是否开启任务状态回调,回调比轮询对任务状态维护具有更地的延时,能够及时更新完成的任务
	// 如果有条件,建议同时开始轮询和回调,回调可以更早的感知任务结束,
	// 轮询可以为任务回调失败或者丢失兜底,保证任务状态一定可以更新
	// 单回调模式,无法对任务进行超时感知处理
	EnableStateCallback bool

	// CallbackReceiver 任务回调接收器
	// 如果 EnableStateCallback 为 true 开启任务状态回调,必须要要配置任务回调接收器
	CallbackReceiver CallbackReceiver
	// 是否需要调度器返回已完成的任务
	// 如果为 true,需要通过
	// for finishedTask := range TaskScheduler.FinshedTasks() {
	//   ...
	// }
	// 及时取走 channel 中的数据,否则可能造成 channel 满了,任务调度阻塞
	// 默认不开启,为 false
	EnableFinshedTaskList bool
}

Config 配置

type Task

type Task struct {
	// 该任务的唯一标识id,创建任务的时候赋予
	TaskId string
	// 任务优先级, 创建任务的时候可选
	TaskPriority int
	// 任务对象,创建任务的时候赋予
	TaskItem interface{}

	TaskStartTime time.Time // 框架赋予值
	TaskEnbTime   time.Time // 框架赋予值
	// 任务状态,任务容器负责赋予值
	TaskStatus TaskStatus
	// 任务容器负责赋予值
	FailedReason string
	// 任务已经重试的次数,任务容器负责赋予值
	TaskAttemptsTime int32
}

Task 通用的任务结构

type TaskActuator

type TaskActuator interface {

	// Init 任务在被加入调度系统前的初始化工作
	Init(ctx context.Context, task *Task) (newTask *Task, err error)

	// Start 开始执行任务,不要阻塞该方法,如果是同步任务,在单独的线程执行,执行器在内存中维护任务状态,转成异步任务,
	// 通过 GetAsyncTaskStatus 返回任务状态
	// ignoreErr 是否忽略任务调度的错误,等待恢复,如果 ignoreErr = false, Start 返回 error 任务会失败
	Start(ctx context.Context, task *Task) (newTask *Task, ignoreErr bool, err error)

	// GetOutput 从任务执行器获取任务执行的结果
	GetOutput(ctx context.Context, task *Task) (data interface{}, err error)

	// Stop 停止任务
	Stop(ctx context.Context, task *Task) error

	// GetAsyncTaskStatus 获取异步执行中的任务的状态
	GetAsyncTaskStatus(ctx context.Context, tasks []Task) (status []AsyncTaskStatus, err error)
}

TaskActuator 任务执行器接口

type TaskContainer

type TaskContainer interface {

	// AddTask 向容器添加任务
	AddTask(ctx context.Context, task Task) (err error)

	// GetRunningTask 获取所有运行中的任务
	GetRunningTask(ctx context.Context) (tasks []Task, err error)

	// GetRunningTaskCount 获取运行中的任务数
	GetRunningTaskCount(ctx context.Context) (count int32, err error)

	// GetWaitingTask 获取等待运行中的任务
	GetWaitingTask(ctx context.Context, limit int32) (tasks []Task, err error)

	// ToRunningStatus 转移到运行中的状态
	ToRunningStatus(ctx context.Context, task *Task) (newTask *Task, err error)

	// ToStopStatus 转移到停止状态
	ToStopStatus(ctx context.Context, task *Task) (newTask *Task, err error)

	// ToDeleteStatus 转移到删除状态
	ToDeleteStatus(ctx context.Context, task *Task) (newTask *Task, err error)

	// ToFailedStatus 转移到失败状态
	ToFailedStatus(ctx context.Context, task *Task, reason error) (newTask *Task, err error)

	// ToExportStatus 转移到结果导出状态
	ToExportStatus(ctx context.Context, task *Task) (newTask *Task, err error)

	// ToSuccessStatus 转移到执行成功状态
	ToSuccessStatus(ctx context.Context, task *Task) (newTask *Task, err error)

	// UpdateRunningTaskStatus 更新执行中的任务执行进度状态
	UpdateRunningTaskStatus(ctx context.Context, task *Task, status AsyncTaskStatus) error
}

TaskContainer 抽象的任务容器,需要开发者可以选择使用已有的任务容器,也可以根据实际业务实现自己的任务容器接口

type TaskScheduler

type TaskScheduler struct {
	// Container 配置的任务容器
	Container TaskContainer
	// Actuator 配置的任务执行器
	Actuator TaskActuator
	// Persistencer 数据持久化
	Persistencer TaskdataPersistencer
	// contains filtered or unexported fields
}

TaskScheduler 任务调度器,通过对任务容器和任务执行器的操作,实现任务调度

func MakeScheduler added in v1.1.1

func MakeScheduler(
	container TaskContainer,
	actuator TaskActuator,
	persistencer TaskdataPersistencer,
	config Config) (*TaskScheduler, error)

MakeScheduler 新建任务调度器 如果不需要对任务数据此久化,persistencer 可以设置为 nil 调度器构建以后,自动开始任务调度

func (*TaskScheduler) AddTask

func (s *TaskScheduler) AddTask(ctx context.Context, task Task) error

AddTask 添加一个任务,需要把任务转换成 lighttaskscheduler.Task 的通用形式 注意一定要配置一个唯一的任务 id 标识

func (*TaskScheduler) Close

func (s *TaskScheduler) Close()

Close 停止调度

func (*TaskScheduler) FinshedTasks added in v1.1.1

func (s *TaskScheduler) FinshedTasks() chan *Task

FinshedTasks 返回的完成的任务的 channel

func (*TaskScheduler) StopTask added in v1.1.1

func (s *TaskScheduler) StopTask(ctx context.Context, ftask *Task) error

StopTask 停止一个任务

type TaskStatus

type TaskStatus int32

TaskStatus 任务状态

const (
	TASK_STATUS_INVALID   TaskStatus = 0
	TASK_STATUS_UNSTART   TaskStatus = 1
	TASK_STATUS_WAITING   TaskStatus = 2
	TASK_STATUS_RUNNING   TaskStatus = 3
	TASK_STATUS_SUCCESS   TaskStatus = 4
	TASK_STATUS_FAILED    TaskStatus = 5
	TASK_STATUS_STOPED    TaskStatus = 6
	TASK_STATUS_DELETE    TaskStatus = 7
	TASK_STATUS_EXPORTING TaskStatus = 8
)

type TaskdataPersistencer added in v1.1.1

type TaskdataPersistencer interface {
	// DataPersistence 定义如何把通过执行器 GetOutput 的数据进行此久化存储
	// data 的协议保持和 TaskActuator.GetOutput 一样
	DataPersistence(ctx context.Context, task *Task, data interface{}) (err error)

	// GetPersistenceData 查询任务持久化结果
	GetPersistenceData(ctx context.Context, task *Task) (data interface{}, err error)

	// DeletePersistenceData 删除任务的此久化结果
	DeletePersistenceData(ctx context.Context, task *Task) (err error)
}

TaskdataPersistencer 任务数据的可持久化接口

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL