task

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 18 Imported by: 0

README

Task 模块

Task 模块是 mflow 工作流引擎的核心执行单元,负责任务的生命周期管理、状态跟踪和调度执行。

⚠️ 架构特性: 系统强制使用 Agent 模式,所有任务必须通过 Agent 执行,不支持本地执行。这确保了 Server 端(调度)与 Agent 端(执行)的职责清晰分离。

目录结构

task/
├── README.md           # 模块文档
├── interface.go        # 服务接口定义
├── model.go           # 数据模型定义
├── enum.go            # 枚举类型定义
└── impl/              # 服务实现
    ├── impl.go        # 服务初始化
    └── task.go        # 核心业务逻辑

核心概念

任务模型

Task 由两部分组成:

  • TaskSpec: 任务定义(不可变)

    • 任务ID、名称、类型
    • 任务配置(Define)和输入参数(InputParams)
    • 超时设置、依赖关系、执行条件
    • Agent 环境配置
  • TaskStatus: 任务状态(可变)

    • 运行状态、开始/结束时间
    • 执行结果、日志详情
    • Agent 调度信息(Agent ID、调度时间、确认状态)
    • 输出参数(供下游任务使用)
任务类型
  • image_build: 镜像构建任务
  • task_debug: 任务调试
  • docker_runner: Docker 容器执行
  • k8s_runner: Kubernetes 任务执行
  • jenkins_runner: Jenkins 任务集成
  • @agent_upgrade: Agent 升级(系统任务)
任务状态
STATUS_PENDDING     // 等待处理
STATUS_RUNNING      // 运行中
STATUS_WAIT_UPDATE  // 等待更新(异步任务)
STATUS_SKIP         // 忽略执行
STATUS_SUCCESS      // 成功
STATUS_CANCELED     // 取消
STATUS_FAILED       // 失败

核心功能

1. 任务执行
RunTask - 运行任务
func (i *TaskServiceImpl) RunTask(ctx context.Context, in *Task) error

功能

  • 验证任务配置:检查 AgentEnv 是否存在(强制要求)
  • 调度 Agent 并发送任务
  • 标记任务为运行中状态
  • 记录 Agent 调度信息(Agent ID、调度时间)
  • 设置调度确认状态(等待 Agent 确认接收)

执行流程

  1. 验证 AgentEnv 必须存在,否则返回错误
  2. 根据任务的 AgentEnv 查找可用 Agent
  3. 将任务发送到选中的 Agent
  4. 成功发送后标记任务为 RUNNING
  5. 记录调度信息:ScheduledAgentIdScheduledAt
  6. 设置 ScheduledConfirmed=false,等待 Agent 确认
ReRunTask - 重新运行任务
func (i *TaskServiceImpl) ReRunTask(ctx context.Context, in *ReRunTaskRequest) (*Task, error)

支持两种模式:

  • ReRun 模式 (Clone=false): 重置原任务状态后重新执行
  • Clone 模式 (Clone=true): 克隆为新任务执行(仅限独立任务)

注意:Pipeline 任务不支持 Clone 模式,以保证流水线流程的完整性。

2. 状态管理
UpdateTaskStatus - 更新任务状态
func (i *TaskServiceImpl) UpdateTaskStatus(ctx context.Context, in *Task) error

功能

  • 保存任务状态到数据库
  • 触发 Pipeline 状态更新(如果任务属于 Pipeline)
  • 记录状态变更日志
SaveTask - 保存任务
func (i *TaskServiceImpl) SaveTask(ctx context.Context, in *Task) error

保存策略

  • 任务不存在:创建新记录
  • 任务已存在:更新 TaskStatus 和 InputParams
3. 任务查询
QueryTask - 查询任务列表
func (i *TaskServiceImpl) QueryTask(ctx context.Context, in *QueryTaskRequest) (*types.Set[*Task], error)

查询条件

  • 按 PipelineTaskId 过滤
  • 支持分页
  • 可选择是否包含详细日志(WithDetail)
DescribeTask - 查询任务详情
func (i *TaskServiceImpl) DescribeTask(ctx context.Context, in *DescribeTaskRequest) (*Task, error)
4. 调度超时处理
CheckScheduleTimeout - 检查调度超时
func (i *TaskServiceImpl) CheckScheduleTimeout(ctx context.Context, timeoutSeconds int) error

功能

  • 检查运行中但长时间未被 Agent 确认的任务
  • 将超时任务标记为失败
  • 防止任务长时间悬挂在运行状态

超时判断条件

  • 任务为 Agent 模式(AgentEnv 非空)
  • 任务状态为 RUNNING
  • ScheduledConfirmed=false(未被 Agent 确认)
  • 超过指定的超时时间(从 ScheduledAt 或 StartAt 开始计算)
UpdateScheduleConfirmed - 更新调度确认状态
func (i *TaskServiceImpl) UpdateScheduleConfirmed(ctx context.Context, taskId string, confirmed bool) error

使用场景:Agent 接收到任务后调用,确认任务已成功接收。

重要特性

1. 强制 Agent 模式

系统架构要求:所有任务必须通过 Agent 执行,不支持本地执行模式。

  • AgentEnv 必填: 每个任务必须指定 AgentEnv(Agent 环境标识)
  • 架构清晰: Server 端只负责调度,Agent 负责执行,职责分离明确
  • 创建验证: 任务创建时会自动验证 AgentEnv 是否存在

Agent 调度流程

任务提交 → 验证 AgentEnv → 选择 Agent → 发送任务 → 标记调度 → Agent 确认 → 执行任务
         (必填)           (AgentEnv)    (ScheduledAt)  (ScheduledConfirmed)

调度超时保护

  • 默认超时时间:15秒(ScheduledConfirmTTL)
  • 超时后任务自动标记为失败
  • 防止 Agent 离线或网络故障导致的任务悬挂
2. 任务依赖和条件执行

依赖管理

task.SetDependsOn("task1", "task2")  // 设置依赖任务
task.DependsNodes()                   // 获取依赖列表

条件执行

task.AddWhen(&Contiditon{
    InputParam: "env",
    Operator:   CONDITION_OPERATOR_IN,
    Values:     []string{"prod", "staging"},
})
3. 参数传递

输入参数 (InputParams):

  • 任务启动时设置的参数
  • 从 Pipeline 上下文注入的参数

输出参数 (OutputParams):

  • 任务执行完成后产生的输出
  • 供下游任务使用
  • 由 Pipeline 负责参数传递
4. 工作目录管理
WorkDir() - 工作目录
func (t *Task) WorkDir() string

目录结构

  • Pipeline 任务:{pipeline_id}/{task_number}
  • 独立任务:{task_id}
OutputDir() - 输出目录
func (t *Task) OutputDir() string

默认位置

  • Pipeline 任务:../output (Pipeline 级别共享)
  • 独立任务:./output (任务自己的目录)
  • 支持通过 build_output_dir 参数自定义
SharedDataDir() - 共享数据目录
func (t *Task) SharedDataDir() string

默认位置

  • Pipeline 任务:../shared (Pipeline 级别共享)
  • 独立任务:./shared
  • 支持通过 shared_data_dir 参数自定义
  • 支持绝对路径
5. 任务重试和错误处理

忽略错误

task.SetIgnoreError(true)  // 任务失败时继续执行
task.IsContinue()          // 检查是否忽略错误

超时控制

task.SetTimeoutSecond(3600)         // 设置超时时间(秒)
task.IsTimeout()                     // 检查是否超时
task.TimeoutDurationString()         // 获取超时时间描述
6. Pipeline 集成

关联 Pipeline

task.SetRefPipelineTask(pipelineTaskId)

参数注入

  • Pipeline 运行时自动注入累积的输出参数
  • ReRun 场景重新从数据库查询最新参数

状态联动

  • Task 状态更新触发 Pipeline 状态更新
  • 通过 runtime.UpdatePipelineTaskStatus 实现

数据库表结构

表名:devops_tasks

主要字段

字段 类型 说明
id string 任务ID(主键)
type varchar(100) 任务类型
name varchar(255) 任务名称
status varchar(100) 任务状态
create_at timestamptz 创建时间
start_at timestamptz 开始时间
end_at timestamptz 结束时间
define jsonb 任务定义
input_params jsonb 输入参数
output_params jsonb 输出参数
pipeline_task_id varchar(100) Pipeline任务ID
agent_env varchar(255) Agent环境
depends_tasks jsonb 依赖任务列表
scheduled_at timestamptz 调度时间
scheduled_agent varchar(255) 调度的Agent ID
scheduled_confirmed boolean 调度确认状态
scheduled_confirm_ttl int8 确认超时时间(秒)

使用示例

创建并运行任务

// 必须设置 AgentEnv(强制要求)
task.SetAgentEnv("production")

task.SetParam("image", "nginx:latest")
task.SetTimeoutSecond(3600)
task.SetIgnoreError(false)

// 运行任务(会自动验证 AgentEnv)
service := task.GetService()
err := service.RunTask(ctx, task)
if err != nil {
    // 如果 AgentEnv 为空,这里会返回错误.GetService()
err := service.RunTask(ctx, task)
if err != nil {
    return err
}

// 保存任务状态
err = service.SaveTask(ctx, task)
查询任务
// 查询 Pipeline 的所有任务
req := task.NewQueryTaskRequest()
req.AddPipelineTaskId(pipelineTaskId)
req.WithDetail = true

result, err := service.QueryTask(ctx, req)
if err != nil {
    return err
}

for _, t := range result.Items {
    fmt.Printf("Task: %s, Status: %s\n", t.Name, t.Status)
}
重新运行任务
// ReRun 模式(覆盖原任务)
req := task.NewReRunTaskRequest(taskId)
req.Clone = false

newTask, err := service.ReRunTask(ctx, req)
if err != nil {
    return err
}

// 保存任务状态
err = service.SaveTask(ctx, newTask)
定期检查调度超时
// 在定时任务中运行
func checkScheduleTimeout(ctx context.Context) {
    service := task.GetService()
    
    // 检查超过30秒未确认的任务
    err := service.CheckScheduleTimeout(ctx, 30)
    if err != nil {
        log.Error("检查调度超时失败: %v", err)
    }
}
```强制 Agent 模式**(⚠️ 重要):
   - **所有任务必须设置 `AgentEnv`**,否则无法运行
   - 系统不支持本地执行模式,架构上强制职责分离
   - 创建任务时务必调用 `task.SetAgentEnv(envName)`

2. **任务状态持久化**:
   - `RunTask` 不自动保存状态,需要调用方手动调用 `SaveTask`
   - Pipeline 通过 hook 机制自动保存
   - 独立任务需要自行处理

3. **调度确认机制**:
   - Agent 接收任务后必须调用 `UpdateScheduleConfirmed`
   - 超时未确认的任务会被自动标记为失败
   - 防止 Agent 故障导致的任务悬挂

4. **Pipeline 任务限制**:
   - 不支持 Clone 模式的 ReRun
   - 必须保持任务ID和 PipelineTaskId 的关联
   - 状态更新会触发 Pipeline 状态联动

5. **嵌入字段初始化**:
   - 从数据库查询后必须调用 `InitEmbeddedFields()`
   - 防止 TaskSpec 或 TaskStatus 为 nil 导致的空指针异常

6. **嵌入字段初始化**:
   - 从数据库查询后必须调用 `InitEmbeddedFields()`
   - 防止 TaskSpec 或 TaskStatus 为 nil 导致的空指针异常

5. **工作目录设计**:
   - Pipeline 任务共享上层目录,避免重复产物
   - 独立任务使用自己的工作目录
   - 支持自定义路径配置

## 依赖模块

- `agent`: Agent 管理和任务调度
- `runtime`: Pipeline 运行时和状态管理
- `mcube/ioc`: 依赖注入框架
- `mcube/exception`: 异常处理
- `gorm`: 数据库 ORM

## 相关文档

- [Pipeline 模块](../runtime/README.md)
- [Agent 模块](../agent/README.md)
- [MFlow 架构设计](../README.md)

Documentation

Index

Constants

View Source
const (
	APP_NAME = "devops_task"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CONDITION_OPERATOR

type CONDITION_OPERATOR string
const (
	CONDITION_OPERATOR_IN CONDITION_OPERATOR = "in"
)

type Contiditon

type Contiditon struct {
	// 输入参数
	InputParam string `json:"input_param"`
	// 操作符
	Operator CONDITION_OPERATOR `json:"operator"`
	// In的值列吧
	Values []string `json:"values"`
}

inputa == "a"

type DescribeTaskRequest

type DescribeTaskRequest struct {
	TaskId string
}

func NewDescribeTaskRequest

func NewDescribeTaskRequest(taskId string) *DescribeTaskRequest

type QueryTaskRequest

type QueryTaskRequest struct {
	// 分页请求
	*request.PageRequest
	// 流水线任务ID
	PipelineTaskId []string
	// 是否包含日志和详情
	WithDetail bool
	// 标签过滤
	Label map[string]string
	// 调度的Agent ID
	ScheduledAgentId string
	// 任务状态
	Status STATUS
}

func NewQueryTaskRequest

func NewQueryTaskRequest() *QueryTaskRequest

func (*QueryTaskRequest) AddPipelineTaskId

func (r *QueryTaskRequest) AddPipelineTaskId(ids ...string) *QueryTaskRequest

func (*QueryTaskRequest) SetLabel

func (r *QueryTaskRequest) SetLabel(key, value string) *QueryTaskRequest

func (*QueryTaskRequest) SetScheduledAgentId

func (r *QueryTaskRequest) SetScheduledAgentId(agentId string) *QueryTaskRequest

func (*QueryTaskRequest) SetStatus

func (r *QueryTaskRequest) SetStatus(status STATUS) *QueryTaskRequest

type ReRunTaskRequest

type ReRunTaskRequest struct {
	// 原任务ID
	TaskId string `json:"task_id"`
	// 是否克隆为新任务运行
	// false: 重新运行原任务(覆盖原任务状态)
	// true:  克隆为新任务运行(保留原任务记录)
	Clone bool `json:"clone" form:"clone"`
}

func NewReRunTaskRequest

func NewReRunTaskRequest(taskId string) *ReRunTaskRequest

ReRunTaskRequest 重新运行任务请求

type STATUS

type STATUS string
const (
	STATUS_PENDDING STATUS = "等待处理"
	STATUS_RUNNING  STATUS = "运行中"
	// 该状态 通过PipelineChecker的Crontab 轮训更新状态
	STATUS_WAIT_UPDATE STATUS = "等待更新"
	// 忽略执行, 等同为成功
	STATUS_SKIP     STATUS = "忽略执行"
	STATUS_SUCCESS  STATUS = "成功"
	STATUS_CANCELED STATUS = "取消"
	STATUS_FAILED   STATUS = "失败"
)

func (STATUS) IsComplete

func (s STATUS) IsComplete() bool

func (STATUS) String

func (s STATUS) String() string

type Service

type Service interface {
	// 运行任务
	RunTask(ctx context.Context, in *Task) error
	// 重新运行任务(根据已有任务ID重新执行)
	ReRunTask(ctx context.Context, in *ReRunTaskRequest) (*Task, error)
	// 更新任务状态
	UpdateTaskStatus(ctx context.Context, in *Task) error
	// 保存任务
	SaveTask(ctx context.Context, in *Task) error
	// 查询任务
	QueryTask(ctx context.Context, in *QueryTaskRequest) (*types.Set[*Task], error)
	// 描述任务
	DescribeTask(ctx context.Context, in *DescribeTaskRequest) (*Task, error)
	// 检查并处理调度超时的任务
	CheckScheduleTimeout(ctx context.Context, timeoutSeconds int) error
	// 更新任务的调度确认状态(仅更新 scheduled_confirmed 字段)
	UpdateScheduleConfirmed(ctx context.Context, taskId string, confirmed bool) error
}

func GetService

func GetService() Service

type TYPE

type TYPE string
const (
	TYPE_IMAGE_BUILD TYPE = "image_build"
	TYPE_TASK_DEBUG  TYPE = "task_debug"

	// 用户自定义任务类型, 由用户指定镜像进行允许
	TYPE_DOCKER_RUNNER TYPE = "docker_runner"
	// 用户自定义任务类型, 由用户指定镜像进行允许
	TYPE_K8S_RUNNER TYPE = "k8s_runner"
	// 用户自定义任务类型, 允许指定的Jenkins Job, 用于兼容之前的Jenkins任务
	TYPE_JENKINS_RUNNER TYPE = "jenkins_runner"

	// 系统管理任务
	TYPE_AGENT_UPGRADE TYPE = "@agent_upgrade"
)

func (TYPE) String

func (t TYPE) String() string

type Task

type Task struct {
	// 任务定义
	*TaskSpec
	// 任务状态
	*TaskStatus
	// contains filtered or unexported fields
}

func NewTask

func NewTask(t TYPE, name string) *Task

func (*Task) AddDependsTask

func (r *Task) AddDependsTask(taskNames ...string) *Task

func (*Task) AddWhen

func (r *Task) AddWhen(conditions ...*Contiditon) *Task

func (*Task) Clone

func (t *Task) Clone() *Task

Clone 克隆任务,生成新的任务ID和状态,保留原任务的配置 注意:Clone 仅用于独立任务,Pipeline 任务不支持 Clone

func (*Task) DependsNodes

func (e *Task) DependsNodes() []string

DependsNodes 获取节点依赖的任务

func (*Task) FixStartAt

func (r *Task) FixStartAt(pipelineStartAt time.Time)

func (*Task) GetIgnoreError

func (t *Task) GetIgnoreError() bool

func (*Task) GetLogCallback

func (t *Task) GetLogCallback() func(content string)

GetLogCallback 获取日志回调函数

func (*Task) GetName

func (e *Task) GetName() string

GetName 获取节点名称

func (*Task) GetStatus

func (e *Task) GetStatus() dag.NodeStatus

GetStatus 实现 DAGNoder 接口:获取节点状态

func (*Task) InitEmbeddedFields

func (e *Task) InitEmbeddedFields() *Task

InitEmbeddedFields 初始化所有嵌入的指针字段,避免 nil 指针异常 该方法应该在从数据库查询后立即调用

func (*Task) IsCompleted

func (e *Task) IsCompleted() bool

IsCompleted 实现 DAGNoder 接口:判断节点是否已完成

func (*Task) IsContinue

func (e *Task) IsContinue() bool

func (*Task) IsFailed

func (e *Task) IsFailed() bool

func (*Task) IsPendding

func (e *Task) IsPendding() bool

func (*Task) IsScheduleConfirmTimeout

func (t *Task) IsScheduleConfirmTimeout() bool

IsScheduleConfirmTimeout 检查任务调度确认是否超时 用于 Agent 接收到任务时判断是否应该执行该任务 只有当任务已被分配且在确认超时窗口内时才应该执行

func (*Task) IsScheduleTimeout

func (t *Task) IsScheduleTimeout(seconds int) bool

IsScheduleTimeout 判断任务调度是否超时(任务已开始运行但尚未被Agent确认接收) 用于检测长时间未被Agent确认的任务

func (*Task) IsSuccess

func (e *Task) IsSuccess() bool

IsSuccess 实现 DAGNoder 接口:判断节点是否成功完成

func (*Task) IsTimeout

func (t *Task) IsTimeout() bool

判断是否超时

func (*Task) LoadFromEvent

func (e *Task) LoadFromEvent(event *bus.Event) error

func (*Task) LogDebug

func (t *Task) LogDebug(msg string)

LogDebug 输出 DEBUG 日志

func (*Task) LogError

func (t *Task) LogError(msg string)

LogError 输出 ERROR 日志(红色)

func (*Task) LogInfo

func (t *Task) LogInfo(msg string)

LogInfo 输出 INFO 日志(无颜色)

func (*Task) LogSuccess

func (t *Task) LogSuccess(msg string)

LogSuccess 输出 SUCCESS 日志(绿色)

func (*Task) LogWarn

func (t *Task) LogWarn(msg string)

LogWarn 输出 WARNING 日志(黄色)

func (*Task) NodeName

func (e *Task) NodeName() string

NodeName 实现 DAGNoder 接口:获取节点名称

func (*Task) OutputDir

func (t *Task) OutputDir() string

OutputDir 返回构建产物输出目录(相对于工作目录的路径) 优先使用用户传入的 build_output_dir 参数 流水线任务默认: ../output (与其他任务共享同一个 pipeline 级别的 output 目录) 独立任务默认: ./output (在任务自己的工作目录下)

func (*Task) ResetForRerun

func (t *Task) ResetForRerun()

ResetForRerun 重置任务状态用于重新运行 保留原任务ID、配置和 PipelineTaskId,只重置执行状态相关字段

func (*Task) SetAgentEnv

func (t *Task) SetAgentEnv(v string) *Task

func (*Task) SetDependsOn

func (e *Task) SetDependsOn(depends ...string) *Task

func (*Task) SetDescription

func (t *Task) SetDescription(v string) *Task

func (*Task) SetId

func (t *Task) SetId(id string) *Task

func (*Task) SetIgnoreError

func (t *Task) SetIgnoreError(v bool) *Task

func (*Task) SetLabel

func (r *Task) SetLabel(key, value string) *Task

func (*Task) SetLogCallback

func (t *Task) SetLogCallback(callback func(content string)) *Task

SetLogCallback 设置日志回调函数

func (*Task) SetParam

func (t *Task) SetParam(key, value string) *Task

func (*Task) SetRefPipelineTask

func (t *Task) SetRefPipelineTask(refPipelineTaskId string) *Task

func (*Task) SetStatus

func (e *Task) SetStatus(status dag.NodeStatus)

SetStatus 实现 DAGNoder 接口:设置节点状态

func (*Task) SetTimeoutSecond

func (t *Task) SetTimeoutSecond(v int64) *Task

func (*Task) SharedDataDir

func (t *Task) SharedDataDir() string

SharedDataDir 返回共享数据目录(相对于工作目录或绝对路径) 优先使用用户传入的 shared_data_dir 参数 逻辑与 OutputDir 保持一致: - 若为流水线任务(PipelineTaskId 非空),相对路径提升到上一层目录(../) - 若为独立任务,使用当前层级 - 若用户传入绝对路径(以 / 开头),则直接返回该绝对路径 - 若未传入,默认:流水线任务 ../shared;独立任务 ./shared

func (*Task) String

func (t *Task) String() string

func (*Task) TableName

func (e *Task) TableName() string

func (*Task) TimeoutDuration

func (t *Task) TimeoutDuration() time.Duration

判断是否超时

func (*Task) TimeoutDurationString

func (t *Task) TimeoutDurationString() string

判断是否超时

func (*Task) ToBusEvent

func (e *Task) ToBusEvent(topic string) *bus.Event

func (*Task) UseTaskLog

func (t *Task) UseTaskLog(ctx context.Context) *Task

UseTaskLog 使用tasklog模块保存日志到数据库 这个方法会自动设置 logCallback,将日志内容保存到 task_log 表

func (*Task) Validate

func (e *Task) Validate() error

Validate 验证任务配置,强制要求必须是 Agent 模式

func (*Task) When

func (e *Task) When() string

When 实现 DAGNoder 接口:返回 when 条件表达式

func (*Task) WorkDir

func (t *Task) WorkDir() string

WorkDir 生成工作目录路径 如果任务ID格式为 "pipeline_id-task_number" (例如: 5f4aa9a2-d870-4812-9894-67e6aef9dd28-0) 则返回两级目录: pipeline_id/task_number 否则返回单层目录: task_id

type TaskSpec

type TaskSpec struct {
	// 任务Id
	Id string `json:"id" gorm:"column:id;type:string;primary_key"`
	// 名称
	Type TYPE `json:"type" gorm:"column:type;type:varchar(100);index"`
	// 任务名称
	Name string `json:"name" gorm:"column:name;type:varchar(255)"`
	// 创建时间
	CreateAt time.Time `json:"create_at" gorm:"column:create_at;type:datetime"`
	// 任务定义, 比如名称, job定义
	Define map[string]string `json:"define" gorm:"column:define;type:json;serializer:json;not null;default:'{}'"`
	// 运行参数
	InputParams map[string]string `json:"input_params" gorm:"column:input_params;type:json;serializer:json;not null;default:'{}'"`
	// 流水线任务Id
	PipelineTaskId string `json:"pipeline_task_id" gorm:"column:pipeline_task_id;type:varchar(100);index"`
	// 说明
	Description string `json:"description" gorm:"column:description;type:text"`
	// 任务超时时间, 0表示不超时
	TimeoutSecond int64 `json:"timeout_second" gorm:"column:timeout_second;type:bigint"`
	// 是否忽略错误
	IgnoreError *bool `json:"ignore_error" gorm:"column:ignore_error;type:bool;default:false"`
	// 环境变量, 运行该任务的 Agent 所属环境, 任务执行时 动态查询该环境下的 Agent 列表进行调度
	AgentEnv string `json:"agent_env" gorm:"column:agent_env;type:varchar(255);index"`
	// 依赖的任务节点列表
	DependsTasks []string `json:"depends_tasks" gorm:"column:depends_tasks;type:json;serializer:json;not null;default:'[]'"`
	// 执行条件(旧版,保留向后兼容)
	When []Contiditon `json:"when" gorm:"column:when;type:json;serializer:json;"`
	// when 条件表达式(新版 DAG 条件系统)
	// 支持表达式如: "always", "never", "params.env == 'prod'", "deps.build.status == 'success'"
	WhenCondition string `json:"when_condition" gorm:"column:when_condition;type:varchar(1000)"`
	// 额外的其他属性
	Extras map[string]string `json:"extras" form:"extras" gorm:"column:extras;type:json;serializer:json;"`
	// 标签
	Label map[string]string `json:"label" gorm:"column:label;type:json;serializer:json;"`
}

func NewTaskSpec

func NewTaskSpec(t TYPE, name string) *TaskSpec

func (*TaskSpec) GetParam

func (s *TaskSpec) GetParam(key string) string

func (*TaskSpec) SetDefine

func (s *TaskSpec) SetDefine(key, value string) *TaskSpec

func (*TaskSpec) SetInputParam

func (s *TaskSpec) SetInputParam(key, value string) *TaskSpec

func (*TaskSpec) ShortDisplay

func (r *TaskSpec) ShortDisplay() string

func (*TaskSpec) String

func (r *TaskSpec) String() string

type TaskStatus

type TaskStatus struct {
	// 状态
	Status STATUS `json:"status" gorm:"column:status;type:varchar(100);index"`
	// 关联URL
	RefURL string `json:"ref_url" gorm:"column:ref_url;type:varchar(255)"`
	// 失败原因
	Message string `json:"message" gorm:"column:message;type:text"`
	// 异步任务调用时的返回详情
	Detail string `json:"detail,omitempty" gorm:"column:detail;type:text"`
	// 启动人
	RunBy string `json:"run_by" gorm:"column:run_by;type:varchar(100)"`
	// 开始时间
	StartAt *time.Time `json:"start_at" gorm:"column:start_at;type:datetime"`
	// 更新时间
	UpdateAt time.Time `json:"update_at" gorm:"column:update_at;type:datetime"`
	// 结束时间
	EndAt time.Time `json:"end_at" gorm:"column:end_at;type:datetime"`
	// 其他信息
	Extras map[string]string `json:"extras" gorm:"column:extras;type:json;serializer:json;not null;default:'{}'"`

	// 调度时间
	ScheduledAt *time.Time `json:"scheduled_at" gorm:"column:scheduled_at;type:datetime"`
	// 具体执行任务的AgentId
	ScheduledAgentId *string `json:"scheduled_agent" gorm:"column:scheduled_agent;type:varchar(255);index"`
	// 确认调度超时, 默认15秒
	ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" gorm:"column:scheduled_confirm_ttl;type:bigint;default:15"`
	// 调度确认, Agent确认接收任务, 下发成功后设置为true
	ScheduledConfirmed *bool `json:"scheduled_confirmed" gorm:"column:scheduled_confirmed;type:boolean;default:false"`

	// 任务输出参数(供下一个任务使用)
	Output map[string]string `json:"output,omitempty" gorm:"column:output;type:json;serializer:json;not null;default:'{}'"`
}

func NewTaskStatus

func NewTaskStatus() *TaskStatus

func (*TaskStatus) Canceledf

func (t *TaskStatus) Canceledf(format string, a ...any) *TaskStatus

func (*TaskStatus) ConfirmScheduled

func (r *TaskStatus) ConfirmScheduled() *TaskStatus

ConfirmScheduled 确认Agent已接收任务分配

func (*TaskStatus) Failedf

func (t *TaskStatus) Failedf(format string, a ...any) *TaskStatus

func (*TaskStatus) GetScheduledAgentId

func (r *TaskStatus) GetScheduledAgentId() string

func (*TaskStatus) IsRunning

func (r *TaskStatus) IsRunning() bool

func (*TaskStatus) IsScheduleConfirm

func (r *TaskStatus) IsScheduleConfirm() bool

IsScheduleConfirm 检查任务是否具有完整的调度信息(调度时间和超时时间都已设置) 用于判断是否需要进行调度超时检查

func (*TaskStatus) IsScheduleConfirmed

func (r *TaskStatus) IsScheduleConfirmed() bool

IsScheduleConfirmed 检查任务分配是否已被Agent确认

func (*TaskStatus) IsScheduled

func (r *TaskStatus) IsScheduled() bool

IsScheduled 检查任务是否已被分配给Agent

func (*TaskStatus) IsWaitUpdate

func (r *TaskStatus) IsWaitUpdate() bool

func (*TaskStatus) MarkScheduled

func (r *TaskStatus) MarkScheduled(agentId string) *TaskStatus

MarkScheduled 标记任务已被分配给指定的Agent 用于Agent模式下的任务分发 同时设置确认超时时间窗口(默认15秒)

func (*TaskStatus) MarkedRunning

func (r *TaskStatus) MarkedRunning()

func (*TaskStatus) SetScheduledAgentId

func (r *TaskStatus) SetScheduledAgentId(agentId string) *TaskStatus

func (*TaskStatus) SetScheduledConfirmed

func (r *TaskStatus) SetScheduledConfirmed(confirmed bool) *TaskStatus

func (*TaskStatus) Skipf

func (t *TaskStatus) Skipf(format string, a ...any) *TaskStatus

func (*TaskStatus) String

func (r *TaskStatus) String() string

func (*TaskStatus) Success

func (t *TaskStatus) Success(format string, a ...any) *TaskStatus

func (*TaskStatus) TableName

func (r *TaskStatus) TableName() string

func (*TaskStatus) WaitUpdate

func (t *TaskStatus) WaitUpdate(updateId string) *TaskStatus

func (*TaskStatus) WithDetail

func (t *TaskStatus) WithDetail(format string, a ...any) *TaskStatus

func (*TaskStatus) WithExtra

func (t *TaskStatus) WithExtra(key, value string) *TaskStatus

func (*TaskStatus) WithRefURL

func (t *TaskStatus) WithRefURL(refURL string) *TaskStatus

type TaskUpdate

type TaskUpdate struct {
	// 任务状态
	*TaskStatus
	// 运行参数, 动态更新合并过来的参数
	Params map[string]string `json:"params" gorm:"column:params;type:json;serializer:json;not null;default:'{}'"`
}

存在则更新(同时更新 TaskSpec 和 TaskStatus,因为 Params 在 TaskSpec 中) 使用结构体更新避免遗漏字段

func (*TaskUpdate) TableName

func (r *TaskUpdate) TableName() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL