Documentation
¶
Index ¶
- Constants
- type CONDITION_OPERATOR
- type Contiditon
- type DescribeTaskRequest
- type QueryTaskRequest
- func (r *QueryTaskRequest) AddPipelineTaskId(ids ...string) *QueryTaskRequest
- func (r *QueryTaskRequest) SetLabel(key, value string) *QueryTaskRequest
- func (r *QueryTaskRequest) SetScheduledAgentId(agentId string) *QueryTaskRequest
- func (r *QueryTaskRequest) SetStatus(status STATUS) *QueryTaskRequest
- type ReRunTaskRequest
- type STATUS
- type Service
- type TYPE
- type Task
- func (r *Task) AddDependsTask(taskNames ...string) *Task
- func (r *Task) AddWhen(conditions ...*Contiditon) *Task
- func (t *Task) Clone() *Task
- func (e *Task) DependsNodes() []string
- func (r *Task) FixStartAt(pipelineStartAt time.Time)
- func (t *Task) GetIgnoreError() bool
- func (t *Task) GetLogCallback() func(content string)
- func (e *Task) GetName() string
- func (e *Task) GetStatus() dag.NodeStatus
- func (e *Task) InitEmbeddedFields() *Task
- func (e *Task) IsCompleted() bool
- func (e *Task) IsContinue() bool
- func (e *Task) IsFailed() bool
- func (e *Task) IsPendding() bool
- func (t *Task) IsScheduleConfirmTimeout() bool
- func (t *Task) IsScheduleTimeout(seconds int) bool
- func (e *Task) IsSuccess() bool
- func (t *Task) IsTimeout() bool
- func (e *Task) LoadFromEvent(event *bus.Event) error
- func (t *Task) LogDebug(msg string)
- func (t *Task) LogError(msg string)
- func (t *Task) LogInfo(msg string)
- func (t *Task) LogSuccess(msg string)
- func (t *Task) LogWarn(msg string)
- func (e *Task) NodeName() string
- func (t *Task) OutputDir() string
- func (t *Task) ResetForRerun()
- func (t *Task) SetAgentEnv(v string) *Task
- func (e *Task) SetDependsOn(depends ...string) *Task
- func (t *Task) SetDescription(v string) *Task
- func (t *Task) SetId(id string) *Task
- func (t *Task) SetIgnoreError(v bool) *Task
- func (r *Task) SetLabel(key, value string) *Task
- func (t *Task) SetLogCallback(callback func(content string)) *Task
- func (t *Task) SetParam(key, value string) *Task
- func (t *Task) SetRefPipelineTask(refPipelineTaskId string) *Task
- func (e *Task) SetStatus(status dag.NodeStatus)
- func (t *Task) SetTimeoutSecond(v int64) *Task
- func (t *Task) SharedDataDir() string
- func (t *Task) String() string
- func (e *Task) TableName() string
- func (t *Task) TimeoutDuration() time.Duration
- func (t *Task) TimeoutDurationString() string
- func (e *Task) ToBusEvent(topic string) *bus.Event
- func (t *Task) UseTaskLog(ctx context.Context) *Task
- func (e *Task) Validate() error
- func (e *Task) When() string
- func (t *Task) WorkDir() string
- type TaskSpec
- type TaskStatus
- func (t *TaskStatus) Canceledf(format string, a ...any) *TaskStatus
- func (r *TaskStatus) ConfirmScheduled() *TaskStatus
- func (t *TaskStatus) Failedf(format string, a ...any) *TaskStatus
- func (r *TaskStatus) GetScheduledAgentId() string
- func (r *TaskStatus) IsRunning() bool
- func (r *TaskStatus) IsScheduleConfirm() bool
- func (r *TaskStatus) IsScheduleConfirmed() bool
- func (r *TaskStatus) IsScheduled() bool
- func (r *TaskStatus) IsWaitUpdate() bool
- func (r *TaskStatus) MarkScheduled(agentId string) *TaskStatus
- func (r *TaskStatus) MarkedRunning()
- func (r *TaskStatus) SetScheduledAgentId(agentId string) *TaskStatus
- func (r *TaskStatus) SetScheduledConfirmed(confirmed bool) *TaskStatus
- func (t *TaskStatus) Skipf(format string, a ...any) *TaskStatus
- func (r *TaskStatus) String() string
- func (t *TaskStatus) Success(format string, a ...any) *TaskStatus
- func (r *TaskStatus) TableName() string
- func (t *TaskStatus) WaitUpdate(updateId string) *TaskStatus
- func (t *TaskStatus) WithDetail(format string, a ...any) *TaskStatus
- func (t *TaskStatus) WithExtra(key, value string) *TaskStatus
- func (t *TaskStatus) WithRefURL(refURL string) *TaskStatus
- type TaskUpdate
Constants ¶
const (
APP_NAME = "devops_task"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CONDITION_OPERATOR ¶
type CONDITION_OPERATOR string
const (
CONDITION_OPERATOR_IN CONDITION_OPERATOR = "in"
)
type Contiditon ¶
type Contiditon struct {
// 输入参数
InputParam string `json:"input_param"`
// 操作符
Operator CONDITION_OPERATOR `json:"operator"`
// In的值列吧
Values []string `json:"values"`
}
inputa == "a"
type DescribeTaskRequest ¶
type DescribeTaskRequest struct {
TaskId string
}
func NewDescribeTaskRequest ¶
func NewDescribeTaskRequest(taskId string) *DescribeTaskRequest
type QueryTaskRequest ¶
type QueryTaskRequest struct {
// 分页请求
*request.PageRequest
// 流水线任务ID
PipelineTaskId []string
// 是否包含日志和详情
WithDetail bool
// 标签过滤
Label map[string]string
// 调度的Agent ID
ScheduledAgentId string
// 任务状态
Status STATUS
}
func NewQueryTaskRequest ¶
func NewQueryTaskRequest() *QueryTaskRequest
func (*QueryTaskRequest) AddPipelineTaskId ¶
func (r *QueryTaskRequest) AddPipelineTaskId(ids ...string) *QueryTaskRequest
func (*QueryTaskRequest) SetLabel ¶
func (r *QueryTaskRequest) SetLabel(key, value string) *QueryTaskRequest
func (*QueryTaskRequest) SetScheduledAgentId ¶
func (r *QueryTaskRequest) SetScheduledAgentId(agentId string) *QueryTaskRequest
func (*QueryTaskRequest) SetStatus ¶
func (r *QueryTaskRequest) SetStatus(status STATUS) *QueryTaskRequest
type ReRunTaskRequest ¶
type ReRunTaskRequest struct {
// 原任务ID
TaskId string `json:"task_id"`
// 是否克隆为新任务运行
// false: 重新运行原任务(覆盖原任务状态)
// true: 克隆为新任务运行(保留原任务记录)
Clone bool `json:"clone" form:"clone"`
}
func NewReRunTaskRequest ¶
func NewReRunTaskRequest(taskId string) *ReRunTaskRequest
ReRunTaskRequest 重新运行任务请求
type Service ¶
type Service interface {
// 运行任务
RunTask(ctx context.Context, in *Task) error
// 重新运行任务(根据已有任务ID重新执行)
ReRunTask(ctx context.Context, in *ReRunTaskRequest) (*Task, error)
// 更新任务状态
UpdateTaskStatus(ctx context.Context, in *Task) error
// 保存任务
SaveTask(ctx context.Context, in *Task) error
// 查询任务
QueryTask(ctx context.Context, in *QueryTaskRequest) (*types.Set[*Task], error)
// 描述任务
DescribeTask(ctx context.Context, in *DescribeTaskRequest) (*Task, error)
// 检查并处理调度超时的任务
CheckScheduleTimeout(ctx context.Context, timeoutSeconds int) error
// 更新任务的调度确认状态(仅更新 scheduled_confirmed 字段)
UpdateScheduleConfirmed(ctx context.Context, taskId string, confirmed bool) error
}
func GetService ¶
func GetService() Service
type TYPE ¶
type TYPE string
const ( TYPE_IMAGE_BUILD TYPE = "image_build" TYPE_TASK_DEBUG TYPE = "task_debug" // 用户自定义任务类型, 由用户指定镜像进行允许 TYPE_DOCKER_RUNNER TYPE = "docker_runner" // 用户自定义任务类型, 由用户指定镜像进行允许 TYPE_K8S_RUNNER TYPE = "k8s_runner" // 用户自定义任务类型, 允许指定的Jenkins Job, 用于兼容之前的Jenkins任务 TYPE_JENKINS_RUNNER TYPE = "jenkins_runner" // 系统管理任务 TYPE_AGENT_UPGRADE TYPE = "@agent_upgrade" )
type Task ¶
type Task struct {
// 任务定义
*TaskSpec
// 任务状态
*TaskStatus
// contains filtered or unexported fields
}
func (*Task) AddDependsTask ¶
func (*Task) AddWhen ¶
func (r *Task) AddWhen(conditions ...*Contiditon) *Task
func (*Task) FixStartAt ¶
func (*Task) GetIgnoreError ¶
func (*Task) GetLogCallback ¶
GetLogCallback 获取日志回调函数
func (*Task) InitEmbeddedFields ¶
InitEmbeddedFields 初始化所有嵌入的指针字段,避免 nil 指针异常 该方法应该在从数据库查询后立即调用
func (*Task) IsContinue ¶
func (*Task) IsPendding ¶
func (*Task) IsScheduleConfirmTimeout ¶
IsScheduleConfirmTimeout 检查任务调度确认是否超时 用于 Agent 接收到任务时判断是否应该执行该任务 只有当任务已被分配且在确认超时窗口内时才应该执行
func (*Task) IsScheduleTimeout ¶
IsScheduleTimeout 判断任务调度是否超时(任务已开始运行但尚未被Agent确认接收) 用于检测长时间未被Agent确认的任务
func (*Task) OutputDir ¶
OutputDir 返回构建产物输出目录(相对于工作目录的路径) 优先使用用户传入的 build_output_dir 参数 流水线任务默认: ../output (与其他任务共享同一个 pipeline 级别的 output 目录) 独立任务默认: ./output (在任务自己的工作目录下)
func (*Task) ResetForRerun ¶
func (t *Task) ResetForRerun()
ResetForRerun 重置任务状态用于重新运行 保留原任务ID、配置和 PipelineTaskId,只重置执行状态相关字段
func (*Task) SetAgentEnv ¶
func (*Task) SetDependsOn ¶
func (*Task) SetDescription ¶
func (*Task) SetIgnoreError ¶
func (*Task) SetLogCallback ¶
SetLogCallback 设置日志回调函数
func (*Task) SetRefPipelineTask ¶
func (*Task) SetStatus ¶
func (e *Task) SetStatus(status dag.NodeStatus)
SetStatus 实现 DAGNoder 接口:设置节点状态
func (*Task) SetTimeoutSecond ¶
func (*Task) SharedDataDir ¶
SharedDataDir 返回共享数据目录(相对于工作目录或绝对路径) 优先使用用户传入的 shared_data_dir 参数 逻辑与 OutputDir 保持一致: - 若为流水线任务(PipelineTaskId 非空),相对路径提升到上一层目录(../) - 若为独立任务,使用当前层级 - 若用户传入绝对路径(以 / 开头),则直接返回该绝对路径 - 若未传入,默认:流水线任务 ../shared;独立任务 ./shared
func (*Task) UseTaskLog ¶
UseTaskLog 使用tasklog模块保存日志到数据库 这个方法会自动设置 logCallback,将日志内容保存到 task_log 表
type TaskSpec ¶
type TaskSpec struct {
// 任务Id
Id string `json:"id" gorm:"column:id;type:string;primary_key"`
// 名称
Type TYPE `json:"type" gorm:"column:type;type:varchar(100);index"`
// 任务名称
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
// 创建时间
CreateAt time.Time `json:"create_at" gorm:"column:create_at;type:datetime"`
// 任务定义, 比如名称, job定义
Define map[string]string `json:"define" gorm:"column:define;type:json;serializer:json;not null;default:'{}'"`
// 运行参数
InputParams map[string]string `json:"input_params" gorm:"column:input_params;type:json;serializer:json;not null;default:'{}'"`
// 流水线任务Id
PipelineTaskId string `json:"pipeline_task_id" gorm:"column:pipeline_task_id;type:varchar(100);index"`
// 说明
Description string `json:"description" gorm:"column:description;type:text"`
// 任务超时时间, 0表示不超时
TimeoutSecond int64 `json:"timeout_second" gorm:"column:timeout_second;type:bigint"`
// 是否忽略错误
IgnoreError *bool `json:"ignore_error" gorm:"column:ignore_error;type:bool;default:false"`
// 环境变量, 运行该任务的 Agent 所属环境, 任务执行时 动态查询该环境下的 Agent 列表进行调度
AgentEnv string `json:"agent_env" gorm:"column:agent_env;type:varchar(255);index"`
// 依赖的任务节点列表
DependsTasks []string `json:"depends_tasks" gorm:"column:depends_tasks;type:json;serializer:json;not null;default:'[]'"`
// 执行条件(旧版,保留向后兼容)
When []Contiditon `json:"when" gorm:"column:when;type:json;serializer:json;"`
// when 条件表达式(新版 DAG 条件系统)
// 支持表达式如: "always", "never", "params.env == 'prod'", "deps.build.status == 'success'"
WhenCondition string `json:"when_condition" gorm:"column:when_condition;type:varchar(1000)"`
// 额外的其他属性
Extras map[string]string `json:"extras" form:"extras" gorm:"column:extras;type:json;serializer:json;"`
// 标签
Label map[string]string `json:"label" gorm:"column:label;type:json;serializer:json;"`
}
func NewTaskSpec ¶
func (*TaskSpec) SetInputParam ¶
func (*TaskSpec) ShortDisplay ¶
type TaskStatus ¶
type TaskStatus struct {
// 状态
Status STATUS `json:"status" gorm:"column:status;type:varchar(100);index"`
// 关联URL
RefURL string `json:"ref_url" gorm:"column:ref_url;type:varchar(255)"`
// 失败原因
Message string `json:"message" gorm:"column:message;type:text"`
// 异步任务调用时的返回详情
Detail string `json:"detail,omitempty" gorm:"column:detail;type:text"`
// 启动人
RunBy string `json:"run_by" gorm:"column:run_by;type:varchar(100)"`
// 开始时间
StartAt *time.Time `json:"start_at" gorm:"column:start_at;type:datetime"`
// 更新时间
UpdateAt time.Time `json:"update_at" gorm:"column:update_at;type:datetime"`
// 结束时间
EndAt time.Time `json:"end_at" gorm:"column:end_at;type:datetime"`
// 其他信息
Extras map[string]string `json:"extras" gorm:"column:extras;type:json;serializer:json;not null;default:'{}'"`
// 调度时间
ScheduledAt *time.Time `json:"scheduled_at" gorm:"column:scheduled_at;type:datetime"`
// 具体执行任务的AgentId
ScheduledAgentId *string `json:"scheduled_agent" gorm:"column:scheduled_agent;type:varchar(255);index"`
// 确认调度超时, 默认15秒
ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" gorm:"column:scheduled_confirm_ttl;type:bigint;default:15"`
// 调度确认, Agent确认接收任务, 下发成功后设置为true
ScheduledConfirmed *bool `json:"scheduled_confirmed" gorm:"column:scheduled_confirmed;type:boolean;default:false"`
// 任务输出参数(供下一个任务使用)
Output map[string]string `json:"output,omitempty" gorm:"column:output;type:json;serializer:json;not null;default:'{}'"`
}
func NewTaskStatus ¶
func NewTaskStatus() *TaskStatus
func (*TaskStatus) Canceledf ¶
func (t *TaskStatus) Canceledf(format string, a ...any) *TaskStatus
func (*TaskStatus) ConfirmScheduled ¶
func (r *TaskStatus) ConfirmScheduled() *TaskStatus
ConfirmScheduled 确认Agent已接收任务分配
func (*TaskStatus) Failedf ¶
func (t *TaskStatus) Failedf(format string, a ...any) *TaskStatus
func (*TaskStatus) GetScheduledAgentId ¶
func (r *TaskStatus) GetScheduledAgentId() string
func (*TaskStatus) IsRunning ¶
func (r *TaskStatus) IsRunning() bool
func (*TaskStatus) IsScheduleConfirm ¶
func (r *TaskStatus) IsScheduleConfirm() bool
IsScheduleConfirm 检查任务是否具有完整的调度信息(调度时间和超时时间都已设置) 用于判断是否需要进行调度超时检查
func (*TaskStatus) IsScheduleConfirmed ¶
func (r *TaskStatus) IsScheduleConfirmed() bool
IsScheduleConfirmed 检查任务分配是否已被Agent确认
func (*TaskStatus) IsScheduled ¶
func (r *TaskStatus) IsScheduled() bool
IsScheduled 检查任务是否已被分配给Agent
func (*TaskStatus) IsWaitUpdate ¶
func (r *TaskStatus) IsWaitUpdate() bool
func (*TaskStatus) MarkScheduled ¶
func (r *TaskStatus) MarkScheduled(agentId string) *TaskStatus
MarkScheduled 标记任务已被分配给指定的Agent 用于Agent模式下的任务分发 同时设置确认超时时间窗口(默认15秒)
func (*TaskStatus) MarkedRunning ¶
func (r *TaskStatus) MarkedRunning()
func (*TaskStatus) SetScheduledAgentId ¶
func (r *TaskStatus) SetScheduledAgentId(agentId string) *TaskStatus
func (*TaskStatus) SetScheduledConfirmed ¶
func (r *TaskStatus) SetScheduledConfirmed(confirmed bool) *TaskStatus
func (*TaskStatus) Skipf ¶
func (t *TaskStatus) Skipf(format string, a ...any) *TaskStatus
func (*TaskStatus) String ¶
func (r *TaskStatus) String() string
func (*TaskStatus) Success ¶
func (t *TaskStatus) Success(format string, a ...any) *TaskStatus
func (*TaskStatus) TableName ¶
func (r *TaskStatus) TableName() string
func (*TaskStatus) WaitUpdate ¶
func (t *TaskStatus) WaitUpdate(updateId string) *TaskStatus
func (*TaskStatus) WithDetail ¶
func (t *TaskStatus) WithDetail(format string, a ...any) *TaskStatus
func (*TaskStatus) WithExtra ¶
func (t *TaskStatus) WithExtra(key, value string) *TaskStatus
func (*TaskStatus) WithRefURL ¶
func (t *TaskStatus) WithRefURL(refURL string) *TaskStatus
type TaskUpdate ¶
type TaskUpdate struct {
// 任务状态
*TaskStatus
// 运行参数, 动态更新合并过来的参数
Params map[string]string `json:"params" gorm:"column:params;type:json;serializer:json;not null;default:'{}'"`
}
存在则更新(同时更新 TaskSpec 和 TaskStatus,因为 Params 在 TaskSpec 中) 使用结构体更新避免遗漏字段
func (*TaskUpdate) TableName ¶
func (r *TaskUpdate) TableName() string