task

package
v0.0.0-...-bc04f27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EXECUTE_AGENTS           = "execute_agents"
	FAILURE_EXIT_MAINTENANCE = "failure_exit_maintenance"
)
View Source
const (
	NOT_BOOTSTRAP = iota
	NOT_UNDER_MAINTENANCE
	GLOBAL_MAINTENANCE
	TENANT_MAINTENANCE
	OBPROXY_MAINTENACE
)
View Source
const (
	NORMAL   = "normal"
	PARALLEL = "parallel"
)
View Source
const (
	PENDING = iota + 1
	READY
	RUNNING
	FAILED
	SUCCEED

	PENDING_STR = "PENDING"
	READY_STR   = "READY"
	RUNNING_STR = "RUNNING"
	FAILED_STR  = "FAILED"
	SUCCEED_STR = "SUCCEED"
)

State

View Source
const (
	RUN = iota + 1
	RETRY
	ROLLBACK
	CANCEL
	PASS

	RUN_STR      = "RUN"
	RETRY_STR    = "RETRY"
	ROLLBACK_STR = "ROLLBACK"
	CANCEL_STR   = "CANCEL"
	PASS_STR     = "PASS"
)

Operator

View Source
const DEFAULT_TIMEOUT = 3600 * time.Second
View Source
const TIMEOUT_KEY = "timeout"

Variables

View Source
var (
	DAG_TYPE_MAP = map[DagType]string{
		DAG_OB:      "ob",
		DAG_OBPROXY: "obproxy",
	}
)
View Source
var (
	ERR_WAIT_OPERATOR = errors.New("wait operator to advance")
)
View Source
var TASK_TYPE = make(map[string]reflect.Type)

Functions

func ConvertGenericID

func ConvertGenericID(genericID string) (id int64, agent meta.AgentInfoInterface, err error)

ConvertGenericID will onvert dto id to instance id.

func ConvertIDToGenericID

func ConvertIDToGenericID(dagID int64, isLocal bool, dagType string) string

func ConvertLocalIDToGenericID

func ConvertLocalIDToGenericID(id int64, dagType string) string

ConvertLocalIDToGenericID will convert id of local task to generic id.

func ConvertObproxyIDToGenericID

func ConvertObproxyIDToGenericID(id int64) string

func ConvertToGenericID

func ConvertToGenericID(instance TaskInfoInterface, dagType string) string

ConvertToGenericID will convert task instance id to generic dto id.

func IsObproxyTask

func IsObproxyTask(genericID string) bool

func RegisterTaskType

func RegisterTaskType(typedNil interface{})

Types

type AdditionalData

type AdditionalData interface {
	GetAdditionalData() map[string]interface{}
}

type AdditionalDataDTO

type AdditionalDataDTO struct {
	AdditionalData *map[string]any `json:"additional_data"`
	// contains filtered or unexported fields
}

func (*AdditionalDataDTO) SetVisible

func (a *AdditionalDataDTO) SetVisible(visible bool)

type CancelableTask

type CancelableTask interface {
	IsCancel() bool
	CanCancel() bool
	Cancel()
	SetCancelFunc(cancel context.CancelFunc)
}

type ContinuableTask

type ContinuableTask interface {
	CanContinue() bool
	IsContinue() bool
	SetIsContinue()
}

type Dag

type Dag struct {
	TaskInfo
	// contains filtered or unexported fields
}

func NewDag

func NewDag(dagId int64, dagName string, dagType string, state int, stage int, maxStage int, operator int, maintenance Maintainer, ctx *TaskContext, isLocalTask bool, startTime time.Time, endTime time.Time) *Dag

func (*Dag) GetContext

func (dag *Dag) GetContext() *TaskContext

func (*Dag) GetDagType

func (dag *Dag) GetDagType() string

func (*Dag) GetMaintenanceKey

func (dag *Dag) GetMaintenanceKey() string

func (*Dag) GetMaintenanceType

func (dag *Dag) GetMaintenanceType() int

func (*Dag) GetMaxStage

func (dag *Dag) GetMaxStage() int

func (*Dag) GetStage

func (dag *Dag) GetStage() int

func (*Dag) IsMaintenance

func (dag *Dag) IsMaintenance() bool

func (*Dag) MergeContext

func (dag *Dag) MergeContext(ctx *TaskContext)

func (*Dag) SetStage

func (dag *Dag) SetStage(stage int)

type DagDetail

type DagDetail struct {
	DagID           int64  `json:"dag_id" uri:"dag_id"`
	Name            string `json:"name"`
	Stage           int    `json:"stage"`
	MaxStage        int    `json:"max_stage"`
	MaintenanceType int    `json:"maintenance_type"`
	MaintenanceKey  string `json:"maintenance_key"`
	TaskStatusDTO
	AdditionalDataDTO
	Nodes []*NodeDetailDTO `json:"nodes"`
}

func NewDagDetail

func NewDagDetail(dag *Dag) *DagDetail

type DagDetailDTO

type DagDetailDTO struct {
	*GenericDTO
	*DagDetail
}

func NewDagDetailDTO

func NewDagDetailDTO(dag *Dag) *DagDetailDTO

func (*DagDetailDTO) SetVisible

func (a *DagDetailDTO) SetVisible(visible bool)

type DagOperator

type DagOperator struct {
	DagDetailDTO
	Operator string `json:"operator" binding:"required"`
}

type DagType

type DagType uint8
const (
	DAG_OB DagType = iota
	DAG_OBPROXY
)

type Executable

type Executable interface {
	IsRun() bool
	Execute() error
	SetLogChannel(logChan chan<- TaskExecuteLogDTO)
	GetTimeout() time.Duration
	TimeoutCheck()
	CanPass() bool
	GetResult() TaskResult
	GetContext() *TaskContext
	SetContext(context *TaskContext)
	GetLocalData(key string) interface{}
	GetLocalDataWithValue(key string, value interface{}) error
	SetLocalData(key string, data interface{})
	GetExecuteAgent() meta.AgentInfo
	GetExecuteTimes() int
	AddExecuteTimes()
	Finish(err error)
}

type ExecutableTask

func CreateSubTaskInstance

func CreateSubTaskInstance(
	taskType string, id int64, taskName string, ctx *TaskContext, state int, operator int,
	canCancel bool, canContinue bool, canPass bool, canRetry bool, canRollback bool, executeTimes int,
	executerAgent meta.AgentInfo, isLocalTask bool, startTime time.Time, endTime time.Time) (ExecutableTask, error)

type GenericDTO

type GenericDTO struct {
	GenericID string `json:"id" uri:"id" binding:"required"`
}

type Maintainer

type Maintainer interface {
	IsMaintenance() bool
	GetMaintenanceType() int
	GetMaintenanceKey() string
}

func GlobalMaintenance

func GlobalMaintenance() Maintainer

func NewMaintenance

func NewMaintenance(maintenanceType int, maintenanceKey string) Maintainer

func ObproxyMaintenance

func ObproxyMaintenance() Maintainer

func TenantMaintenance

func TenantMaintenance(tenantName string) Maintainer

func UnMaintenance

func UnMaintenance() Maintainer

type Node

type Node struct {
	TaskInfo
	// contains filtered or unexported fields
}

func NewNode

func NewNode(task ExecutableTask, paralle bool) *Node

func NewNodeWithContext

func NewNodeWithContext(task ExecutableTask, paralle bool, ctx *TaskContext) *Node

func NewNodeWithId

func NewNodeWithId(id int64, name string, dagId int, nodeType string, state int, operator int, structName string, ctx *TaskContext, isLocalTask bool, startTime time.Time, endTime time.Time) *Node

func (*Node) AddDownstream

func (node *Node) AddDownstream(downstream *Node)

func (*Node) AddSubTask

func (node *Node) AddSubTask(task ExecutableTask) error

func (*Node) AddUpstream

func (node *Node) AddUpstream(upstream *Node)

func (*Node) CanCancel

func (node *Node) CanCancel() bool

func (*Node) CanContinue

func (node *Node) CanContinue() bool

func (*Node) CanPass

func (node *Node) CanPass() bool

func (*Node) CanRetry

func (node *Node) CanRetry() bool

func (*Node) CanRollback

func (node *Node) CanRollback() bool

func (*Node) GetContext

func (node *Node) GetContext() *TaskContext

func (*Node) GetDagId

func (node *Node) GetDagId() int

func (*Node) GetDownstream

func (node *Node) GetDownstream() *Node

func (*Node) GetNodeType

func (node *Node) GetNodeType() string

func (*Node) GetSubTasks

func (node *Node) GetSubTasks() []ExecutableTask

func (*Node) GetTaskType

func (node *Node) GetTaskType() reflect.Type

func (*Node) GetUpstream

func (node *Node) GetUpstream() *Node

func (*Node) IsParallel

func (node *Node) IsParallel() bool

func (*Node) MergeContext

func (node *Node) MergeContext(ctx *TaskContext)

func (*Node) SetContext

func (node *Node) SetContext(ctx *TaskContext)

type NodeDetail

type NodeDetail struct {
	NodeID int64  `json:"node_id" uri:"node_id"`
	Name   string `json:"name"`
	TaskStatusDTO
	AdditionalDataDTO
	SubTasks []*TaskDetailDTO `json:"sub_tasks"`
}

func NewNodeDetail

func NewNodeDetail(node *Node) *NodeDetail

type NodeDetailDTO

type NodeDetailDTO struct {
	*GenericDTO
	*NodeDetail
}

func NewNodeDetailDTO

func NewNodeDetailDTO(node *Node, dagType string) *NodeDetailDTO

func (*NodeDetailDTO) SetVisible

func (a *NodeDetailDTO) SetVisible(visible bool)

type NodeOperator

type NodeOperator struct {
	NodeDetailDTO
	Operator string `json:"operator" binding:"required"`
}

type RemoteTask

type RemoteTask struct {
	TaskID        int64          `json:"task_id" binding:"required"`
	Name          string         `json:"name"`
	StructName    string         `json:"struct_name"`
	State         int            `json:"state" binding:"required"`
	Operator      int            `json:"operator" binding:"required"`
	CanCancel     bool           `json:"can_cancel"`
	CanContinue   bool           `json:"can_continue"`
	CanPass       bool           `json:"can_pass"`
	CanRetry      bool           `json:"can_retry"`
	CanRollback   bool           `json:"can_rollback"`
	Context       TaskContext    `json:"context" binding:"required"`
	ExecuteTimes  int            `json:"execute_times" binding:"required"`
	ExecuterAgent meta.AgentInfo `json:"executer_agent" binding:"required"`
	StartTime     time.Time      `json:"start_time"`
	EndTime       time.Time      `json:"end_time"`
}

func NewRemoteTask

func NewRemoteTask(
	structName string, taskID int64, taskName string, ctx *TaskContext, state int, operator int,
	canCancel bool, canContinue bool, canPass bool, canRetry bool, canRollback bool, executeTimes int,
	executerAgent meta.AgentInfo, startTime time.Time, endTime time.Time) *RemoteTask

func (*RemoteTask) Execute

func (t *RemoteTask) Execute() error

func (*RemoteTask) GetStructName

func (t *RemoteTask) GetStructName() string

type Retryable

type Retryable interface {
	IsRetry() bool
	CanRetry() bool
}

type RollableTask

type RollableTask interface {
	IsRollback() bool
	CanRollback() bool
	Rollback() error
}

type Task

type Task struct {
	TaskInfo
	// contains filtered or unexported fields
}

func NewSubTask

func NewSubTask(name string) *Task

func (*Task) AddExecuteTimes

func (task *Task) AddExecuteTimes()

func (*Task) Cancel

func (task *Task) Cancel()

func (*Task) ExecuteErrorLog

func (task *Task) ExecuteErrorLog(err error)

func (*Task) ExecuteErrorLogf

func (task *Task) ExecuteErrorLogf(format string, args ...interface{})

func (*Task) ExecuteInfoLog

func (task *Task) ExecuteInfoLog(text string)

func (*Task) ExecuteInfoLogf

func (task *Task) ExecuteInfoLogf(format string, args ...interface{})

func (*Task) ExecuteLog

func (task *Task) ExecuteLog(text string)

func (*Task) ExecuteLogf

func (task *Task) ExecuteLogf(format string, args ...interface{})

func (*Task) ExecuteWarnLog

func (task *Task) ExecuteWarnLog(err error)

func (*Task) ExecuteWarnLogf

func (task *Task) ExecuteWarnLogf(format string, args ...interface{})

func (*Task) Finish

func (task *Task) Finish(err error)

func (*Task) GetAdditionalData

func (task *Task) GetAdditionalData() map[string]interface{}

func (*Task) GetContext

func (task *Task) GetContext() *TaskContext

func (*Task) GetExecuteAgent

func (task *Task) GetExecuteAgent() meta.AgentInfo

func (*Task) GetExecuteTimes

func (task *Task) GetExecuteTimes() int

func (*Task) GetLocalData

func (task *Task) GetLocalData(key string) interface{}

func (*Task) GetLocalDataWithValue

func (task *Task) GetLocalDataWithValue(key string, value interface{}) error

func (*Task) GetResult

func (task *Task) GetResult() TaskResult

func (*Task) GetTimeout

func (task *Task) GetTimeout() time.Duration

func (*Task) IsContinue

func (task *Task) IsContinue() bool

func (*Task) Rollback

func (task *Task) Rollback() error

func (*Task) SetCanCancel

func (task *Task) SetCanCancel() *Task

func (*Task) SetCanContinue

func (task *Task) SetCanContinue() *Task

func (*Task) SetCanPass

func (task *Task) SetCanPass() *Task

func (*Task) SetCanRetry

func (task *Task) SetCanRetry() *Task

SetCanRetry set task can retry, and Retryable task must be rollbackable.

func (*Task) SetCanRollback

func (task *Task) SetCanRollback() *Task

func (*Task) SetCancelFunc

func (task *Task) SetCancelFunc(cancel context.CancelFunc)

func (*Task) SetContext

func (task *Task) SetContext(taskContext *TaskContext)

func (*Task) SetExecuteAgent

func (task *Task) SetExecuteAgent(agent meta.AgentInfo)

func (*Task) SetIsContinue

func (task *Task) SetIsContinue()

func (*Task) SetLocalData

func (task *Task) SetLocalData(key string, data interface{})

func (*Task) SetLogChannel

func (task *Task) SetLogChannel(logChan chan<- TaskExecuteLogDTO)

func (*Task) TimeoutCheck

func (task *Task) TimeoutCheck()

type TaskContext

type TaskContext struct {
	Params               map[string]interface{} // params can not be rewritten when merge context
	Data                 map[string]interface{} // global data will be rewritten when merge context
	AgentData            map[string]map[string]interface{}
	AgentDataUpdateCount map[string]int
}

func NewTaskContext

func NewTaskContext() *TaskContext

func (*TaskContext) GetAgentData

func (ctx *TaskContext) GetAgentData(agent meta.AgentInfoInterface, key string) interface{}

func (*TaskContext) GetAgentDataByAgentKey

func (ctx *TaskContext) GetAgentDataByAgentKey(agentKey string, key string) interface{}

func (*TaskContext) GetAgentDataByAgentKeyWithValue

func (ctx *TaskContext) GetAgentDataByAgentKeyWithValue(agentKey string, key string, value interface{}) error

func (*TaskContext) GetAgentDataWithValue

func (ctx *TaskContext) GetAgentDataWithValue(agent meta.AgentInfoInterface, key string, value interface{}) error

func (*TaskContext) GetData

func (ctx *TaskContext) GetData(key string) interface{}

func (*TaskContext) GetDataWithValue

func (ctx *TaskContext) GetDataWithValue(key string, value interface{}) error

func (*TaskContext) GetParam

func (ctx *TaskContext) GetParam(key string) interface{}

func (*TaskContext) GetParamWithValue

func (ctx *TaskContext) GetParamWithValue(key string, value interface{}) error

func (*TaskContext) MergeContext

func (ctx *TaskContext) MergeContext(other *TaskContext)

func (*TaskContext) MergeContextWithoutExecAgents

func (ctx *TaskContext) MergeContextWithoutExecAgents(other *TaskContext)

func (*TaskContext) MergeContextWithoutFailureExitMaintenance

func (ctx *TaskContext) MergeContextWithoutFailureExitMaintenance(other *TaskContext)

func (*TaskContext) MergeContextWithoutKeyords

func (ctx *TaskContext) MergeContextWithoutKeyords(other *TaskContext)

func (*TaskContext) SetAgentData

func (ctx *TaskContext) SetAgentData(agent meta.AgentInfoInterface, key string, value interface{}) *TaskContext

func (*TaskContext) SetAgentDataByAgentKey

func (ctx *TaskContext) SetAgentDataByAgentKey(agentKey string, key string, value interface{}) *TaskContext

func (*TaskContext) SetData

func (ctx *TaskContext) SetData(key string, value interface{}) *TaskContext

func (*TaskContext) SetParam

func (ctx *TaskContext) SetParam(key string, value interface{}) *TaskContext

type TaskDetail

type TaskDetail struct {
	TaskID int64  `json:"task_id" uri:"task_id"`
	Name   string `json:"name"`
	TaskStatusDTO
	AdditionalDataDTO
	ExecuteTimes int            `json:"execute_times"`
	ExecuteAgent meta.AgentInfo `json:"execute_agent"`
	TaskLogs     []string       `json:"task_logs"`
}

func NewTaskDetail

func NewTaskDetail(task ExecutableTask) *TaskDetail

type TaskDetailDTO

type TaskDetailDTO struct {
	*GenericDTO
	*TaskDetail
}

func NewTaskDetailDTO

func NewTaskDetailDTO(task ExecutableTask, dagType string) *TaskDetailDTO

type TaskExecuteLogDTO

type TaskExecuteLogDTO struct {
	TaskId       int64  `json:"task_id" binding:"required,min=1"`
	ExecuteTimes int    `json:"execute_times" binding:"required,min=1"`
	LogContent   string `json:"log_content" binding:"required"`
	IsSync       bool   `json:"is_sync"`
}

type TaskInfo

type TaskInfo struct {
	// contains filtered or unexported fields
}

func (*TaskInfo) CanCancel

func (task *TaskInfo) CanCancel() bool

func (*TaskInfo) CanContinue

func (task *TaskInfo) CanContinue() bool

func (*TaskInfo) CanPass

func (task *TaskInfo) CanPass() bool

func (*TaskInfo) CanRetry

func (task *TaskInfo) CanRetry() bool

func (*TaskInfo) CanRollback

func (task *TaskInfo) CanRollback() bool

func (*TaskInfo) GetEndTime

func (task *TaskInfo) GetEndTime() time.Time

func (*TaskInfo) GetID

func (task *TaskInfo) GetID() int64

func (*TaskInfo) GetName

func (task *TaskInfo) GetName() string

func (*TaskInfo) GetOperator

func (task *TaskInfo) GetOperator() int

func (*TaskInfo) GetStartTime

func (task *TaskInfo) GetStartTime() time.Time

func (*TaskInfo) GetState

func (task *TaskInfo) GetState() int

func (*TaskInfo) IsCancel

func (task *TaskInfo) IsCancel() bool

func (*TaskInfo) IsFail

func (task *TaskInfo) IsFail() bool

func (*TaskInfo) IsFinished

func (task *TaskInfo) IsFinished() bool

func (*TaskInfo) IsLocalTask

func (task *TaskInfo) IsLocalTask() bool

func (*TaskInfo) IsPending

func (task *TaskInfo) IsPending() bool

func (*TaskInfo) IsReady

func (task *TaskInfo) IsReady() bool

func (*TaskInfo) IsRetry

func (task *TaskInfo) IsRetry() bool

func (*TaskInfo) IsRollback

func (task *TaskInfo) IsRollback() bool

func (*TaskInfo) IsRun

func (task *TaskInfo) IsRun() bool

func (*TaskInfo) IsRunning

func (task *TaskInfo) IsRunning() bool

func (*TaskInfo) IsSuccess

func (task *TaskInfo) IsSuccess() bool

func (*TaskInfo) SetEndTime

func (task *TaskInfo) SetEndTime(endTime time.Time)

func (*TaskInfo) SetOperator

func (task *TaskInfo) SetOperator(operator int)

func (*TaskInfo) SetStartTime

func (task *TaskInfo) SetStartTime(startTime time.Time)

func (*TaskInfo) SetState

func (task *TaskInfo) SetState(state int)

type TaskInfoInterface

type TaskInfoInterface interface {
	GetID() int64
	GetName() string
	SetState(state int)
	GetState() int
	IsLocalTask() bool
	TaskStatusInterface
}

type TaskLogInterface

type TaskLogInterface interface {
	ExecuteLog(text string)
	ExecuteInfoLog(text string)
	ExecuteWarnLog(err error)
	ExecuteErrorLog(err error)
	ExecuteLogf(format string, args ...interface{})
	ExecuteInfoLogf(format string, args ...interface{})
	ExecuteWarnLogf(format string, args ...interface{})
	ExecuteErrorLogf(format string, args ...interface{})
}

type TaskResult

type TaskResult struct {
	Finished    bool
	Ok          bool
	LogContents []string
}

type TaskStatusDTO

type TaskStatusDTO struct {
	State     string    `json:"state"`
	Operator  string    `json:"operator"`
	StartTime time.Time `json:"start_time"`
	EndTime   time.Time `json:"end_time"`
}

func NewTaskStatusDTO

func NewTaskStatusDTO(task *TaskInfo) *TaskStatusDTO

func (*TaskStatusDTO) IsFailed

func (t *TaskStatusDTO) IsFailed() bool

func (*TaskStatusDTO) IsFinished

func (t *TaskStatusDTO) IsFinished() bool

func (*TaskStatusDTO) IsPending

func (t *TaskStatusDTO) IsPending() bool

func (*TaskStatusDTO) IsReady

func (t *TaskStatusDTO) IsReady() bool

func (*TaskStatusDTO) IsRunning

func (t *TaskStatusDTO) IsRunning() bool

func (*TaskStatusDTO) IsSucceed

func (t *TaskStatusDTO) IsSucceed() bool

type TaskStatusInterface

type TaskStatusInterface interface {
	IsSuccess() bool
	IsFail() bool
	IsRunning() bool
	IsPending() bool
	IsReady() bool
	IsFinished() bool
	IsRollback() bool
	SetOperator(operator int)
	GetOperator() int
	GetStartTime() time.Time
	GetEndTime() time.Time
	SetStartTime(startTime time.Time)
	SetEndTime(endTime time.Time)
}

type Template

type Template struct {
	Name string

	Type string
	// contains filtered or unexported fields
}

func (*Template) AddNode

func (template *Template) AddNode(node *Node)

func (*Template) GetMaintenanceKey

func (template *Template) GetMaintenanceKey() string

func (*Template) GetMaintenanceType

func (template *Template) GetMaintenanceType() int

func (*Template) GetNodes

func (template *Template) GetNodes() []*Node

func (*Template) IsEmpty

func (template *Template) IsEmpty() bool

func (*Template) IsMaintenance

func (template *Template) IsMaintenance() bool

type TemplateBuilder

type TemplateBuilder struct {
	Template *Template
}

func NewTemplateBuilder

func NewTemplateBuilder(name string) *TemplateBuilder

func (*TemplateBuilder) AddNode

func (builder *TemplateBuilder) AddNode(node *Node) *TemplateBuilder

func (*TemplateBuilder) AddTask

func (builder *TemplateBuilder) AddTask(task ExecutableTask, parallel bool) *TemplateBuilder

func (*TemplateBuilder) AddTemplate

func (builder *TemplateBuilder) AddTemplate(template *Template) *TemplateBuilder

func (*TemplateBuilder) Build

func (builder *TemplateBuilder) Build() *Template

func (*TemplateBuilder) SetMaintenance

func (builder *TemplateBuilder) SetMaintenance(maintenanceType Maintainer) *TemplateBuilder

func (*TemplateBuilder) SetType

func (builder *TemplateBuilder) SetType(dagType DagType) *TemplateBuilder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL