pipeline

package
v0.14.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JsonFsOptions   = "fs_options" // 由于在获取BodyMap的FsOptions前已经转为下划线形式,因此这里为fs_options
	JsonUserName    = "username"
	JsonDescription = "description"
	JsonFlavour     = "flavour"
	JsonQueue       = "queue"
	JsonJobType     = "jobType"
	JsonEnv         = "env"

	FinalRunStatus = "FINAL_RUN_STATUS"
	FinalRunMsg    = "FINAL_RUN_MSG"
)
View Source
const (
	OpTypeCreate = "create"
	OpTypeStop   = "stop"
	OpTypeDelete = "delete"
)

Variables

View Source
var (
	GetJobFunc        func(jobID string) (schema.JobView, error)                    = GetJobByRun
	UpdateRuntimeFunc func(id string, event interface{}) (int64, bool)              = UpdateRuntimeByWfEvent
	LogCacheFunc      func(req schema.LogRunCacheRequest) (string, error)           = LogCache
	ListCacheFunc     func(firstFp, fsID, source string) ([]models.RunCache, error) = ListCacheByFirstFp
	LogArtifactFunc   func(req schema.LogRunArtifactRequest) error                  = LogArtifactEvent
)

Functions

func CheckFsAndGetID added in v0.14.3

func CheckFsAndGetID(ctx *logger.RequestContext, fsUserName, fsName string) (fsID string, err error)

func CheckPipelinePermission added in v0.14.3

func CheckPipelinePermission(ctx *logger.RequestContext, userName string, pipelineID string) (bool, model.Pipeline, error)

func CheckPipelineVersionPermission added in v0.14.3

func CheckPipelineVersionPermission(ctx *logger.RequestContext, userName string, pipelineID string, pipelineVersionID string) (bool, model.Pipeline, model.PipelineVersion, error)

func DeleteArtifactEvent added in v0.14.3

func DeleteArtifactEvent(ctx *logger.RequestContext, username, fsname, runID, artifactPath string) error

---------------------artifact_event---------------------//

func DeletePipeline

func DeletePipeline(ctx *logger.RequestContext, pipelineID string) error

func DeletePipelineVersion added in v0.14.3

func DeletePipelineVersion(ctx *logger.RequestContext, pipelineID string, pipelineVersionID string) error

func DeleteRun added in v0.14.3

func DeleteRun(ctx *logger.RequestContext, id string, request *DeleteRunRequest) error

func DeleteRunCache added in v0.14.3

func DeleteRunCache(ctx *logger.RequestContext, id string) error

func DeleteSchedule added in v0.14.3

func DeleteSchedule(ctx *logger.RequestContext, scheduleID string) error

todo: 支持 StopRun

func GetJobByRun added in v0.14.3

func GetJobByRun(jobID string) (schema.JobView, error)

func GetRunByID added in v0.14.3

func GetRunByID(ctx *logger.RequestContext, userName string, runID string) (models.Run, error)

func GetRunCache added in v0.14.3

func GetRunCache(ctx *logger.RequestContext, id string) (models.RunCache, error)

-------------CRUD-----------------//

func InitAndResumeRuns added in v0.14.3

func InitAndResumeRuns() error

func ListCacheByFirstFp added in v0.14.3

func ListCacheByFirstFp(firstFp, fsID, source string) ([]models.RunCache, error)

func LogArtifactEvent added in v0.14.3

func LogArtifactEvent(req schema.LogRunArtifactRequest) error

func LogCache added in v0.14.3

func LogCache(req schema.LogRunCacheRequest) (string, error)

func ParseJsonGlobalEnv added in v0.14.3

func ParseJsonGlobalEnv(jsonAttrMap map[string]interface{}) (map[string]string, error)

func ProcessJsonAttr added in v0.14.3

func ProcessJsonAttr(bodyMap map[string]interface{}) error

func RestartWf added in v0.14.3

func RestartWf(run models.Run, isResume bool) (string, error)

func RetryRun added in v0.14.3

func RetryRun(ctx *logger.RequestContext, runID string) (string, error)

func SendSingnal added in v0.14.3

func SendSingnal(opType, scheduleID string) error

给scheduler发创建channel信号

func StartWf added in v0.14.3

func StartWf(ctx *logger.RequestContext, run *models.Run, wfPtr *pipeline.Workflow) error

func StopRun added in v0.14.3

func StopRun(ctx *logger.RequestContext, userName, runID string, request UpdateRunRequest) error

func StopSchedule added in v0.14.3

func StopSchedule(ctx *logger.RequestContext, scheduleID string) error

todo: 支持 StopRun

func UpdateRunByWfEvent added in v0.14.3

func UpdateRunByWfEvent(id string, event interface{}) (int64, bool)

func UpdateRuntimeByWfEvent added in v0.14.3

func UpdateRuntimeByWfEvent(id string, event interface{}) (int64, bool)

func UpdateRuntimeDagByWfEvent added in v0.14.3

func UpdateRuntimeDagByWfEvent(id string, event interface{}) (int64, bool)

func UpdateRuntimeJobByWfEvent added in v0.14.3

func UpdateRuntimeJobByWfEvent(id string, event interface{}) (int64, bool)

func ValidateAndCreateRun added in v0.14.3

func ValidateAndCreateRun(ctx *logger.RequestContext, run *models.Run, userName string, req CreateRunRequest) (*pipeline.Workflow, string, error)

Types

type ArtifactsJson added in v0.14.3

type ArtifactsJson struct {
	Input  map[string]string `json:"input"`
	Output []string          `json:"output"`
}

used for API CreateRunJson to unmarshal artifacts

type CreatePipelineRequest

type CreatePipelineRequest struct {
	FsName   string `json:"fsName"`
	YamlPath string `json:"yamlPath"` // optional,  use "./run.yaml" if not specified, one of 2 sources of run
	YamlRaw  string `json:"yamlRaw"`  // optional, one of 2 sources of run
	UserName string `json:"username"` // optional, only for root user
	Desc     string `json:"desc"`     // optional
}

type CreatePipelineResponse

type CreatePipelineResponse struct {
	PipelineID        string `json:"pipelineID"`
	PipelineVersionID string `json:"pipelineVersionID"`
	Name              string `json:"name"`
}

type CreateRunRequest added in v0.14.3

type CreateRunRequest struct {
	FsName         string                 `json:"fsName"`
	UserName       string                 `json:"username,omitempty"`       // optional, only for root user
	Name           string                 `json:"name,omitempty"`           // optional
	Description    string                 `json:"desc,omitempty"`           // optional
	Parameters     map[string]interface{} `json:"parameters,omitempty"`     // optional
	DockerEnv      string                 `json:"dockerEnv,omitempty"`      // optional
	Disabled       string                 `json:"disabled,omitempty"`       // optional
	FailureOptions *schema.FailureOptions `json:"failureOptions,omitempty"` // optional
	// run workflow source. priority: RunYamlRaw > PipelineID + PipelineVersionID > RunYamlPath
	// 为了防止字符串或者不同的http客户端对run.yaml
	// 格式中的特殊字符串做特殊过滤处理导致yaml文件不正确,因此采用runYamlRaw采用base64编码传输
	RunYamlRaw        string `json:"runYamlRaw,omitempty"`        // optional. one of 3 sources of run. high priority
	PipelineID        string `json:"pipelineID,omitempty"`        // optional. one of 3 sources of run. medium priority
	PipelineVersionID string `json:"pipelineVersionID,omitempty"` // optional. one of 3 sources of run. medium priority
	RunYamlPath       string `json:"runYamlPath,omitempty"`       // optional. one of 3 sources of run. low priority
	ScheduleID        string `json:"scheduleID"`
	ScheduledAt       string `json:"scheduledAt"`
}

type CreateRunResponse added in v0.14.3

type CreateRunResponse struct {
	RunID string `json:"runID"`
}

func CreateRun added in v0.14.3

func CreateRun(ctx *logger.RequestContext, request *CreateRunRequest, extra map[string]string) (CreateRunResponse, error)

func CreateRunByJson added in v0.14.3

func CreateRunByJson(ctx *logger.RequestContext, bodyMap map[string]interface{}) (CreateRunResponse, error)

func ValidateAndStartRun added in v0.14.3

func ValidateAndStartRun(ctx *logger.RequestContext, run *models.Run, userName string, req CreateRunRequest) (CreateRunResponse, error)

type CreateScheduleRequest added in v0.14.3

type CreateScheduleRequest struct {
	Name              string `json:"name"`
	Desc              string `json:"desc"` // optional
	PipelineID        string `json:"pipelineID"`
	PipelineVersionID string `json:"pipelineVersionID"`
	Crontab           string `json:"crontab"`
	StartTime         string `json:"startTime"`         // optional
	EndTime           string `json:"endTime"`           // optional
	Concurrency       int    `json:"concurrency"`       // optional, 默认 0, 表示不限制
	ConcurrencyPolicy string `json:"concurrencyPolicy"` // optional, 默认 suspend
	ExpireInterval    int    `json:"expireInterval"`    // optional, 默认 0, 表示不限制
	Catchup           bool   `json:"catchup"`           // optional, 默认 false
	UserName          string `json:"username"`          // optional, 只有root用户使用其他用户fsname时,需要指定对应username
}

type CreateScheduleResponse added in v0.14.3

type CreateScheduleResponse struct {
	ScheduleID string `json:"scheduleID"`
}

func CreateSchedule added in v0.14.3

type DeleteRunRequest added in v0.14.3

type DeleteRunRequest struct {
	CheckCache bool `json:"checkCache"`
}

type GetPipelineResponse added in v0.14.3

type GetPipelineResponse struct {
	Pipeline         PipelineBrief    `json:"pipeline"`
	PipelineVersions PipelineVersions `json:"pplVersions"`
}

func GetPipeline added in v0.14.3

func GetPipeline(ctx *logger.RequestContext, pipelineID, marker string, maxKeys int, fsFilter []string) (GetPipelineResponse, error)

type GetPipelineVersionResponse added in v0.14.3

type GetPipelineVersionResponse struct {
	Pipeline        PipelineBrief        `json:"pipeline"`
	PipelineVersion PipelineVersionBrief `json:"pipelineVersion"`
}

func GetPipelineVersion added in v0.14.3

func GetPipelineVersion(ctx *logger.RequestContext, pipelineID string, pipelineVersionID string) (GetPipelineVersionResponse, error)

type GetScheduleResponse added in v0.14.3

type GetScheduleResponse struct {
	ScheduleBrief
	ListRunResponse ListRunResponse `json:"runs"`
}

func GetSchedule added in v0.14.3

func GetSchedule(ctx *logger.RequestContext, scheduleID string,
	marker string, maxKeys int, runFilter, statusFilter []string) (GetScheduleResponse, error)

type ListArtifactEventResponse added in v0.14.3

type ListArtifactEventResponse struct {
	common.MarkerInfo
	ArtifactEventList []model.ArtifactEvent `json:"artifactEventList"`
}

func ListArtifactEvent added in v0.14.3

func ListArtifactEvent(ctx *logger.RequestContext, marker string, maxKeys int, userFilter, fsFilter, runFilter, typeFilter, pathFilter []string) (ListArtifactEventResponse, error)

type ListPipelineResponse

type ListPipelineResponse struct {
	common.MarkerInfo
	PipelineList []PipelineBrief `json:"pipelineList"`
}

func ListPipeline

func ListPipeline(ctx *logger.RequestContext, marker string, maxKeys int, userFilter, nameFilter []string) (ListPipelineResponse, error)

type ListRunCacheResponse added in v0.14.3

type ListRunCacheResponse struct {
	common.MarkerInfo
	RunCacheList []models.RunCache `json:"runCacheList"`
}

func ListRunCache added in v0.14.3

func ListRunCache(ctx *logger.RequestContext, marker string, maxKeys int, userFilter, fsFilter, runFilter []string) (ListRunCacheResponse, error)

type ListRunResponse added in v0.14.3

type ListRunResponse struct {
	common.MarkerInfo
	RunList []RunBrief `json:"runList"`
}

func ListRun added in v0.14.3

func ListRun(ctx *logger.RequestContext, marker string, maxKeys int, userFilter, fsFilter, runFilter, nameFilter, statusFilter, scheduleIDFilter []string) (ListRunResponse, error)

type ListScheduleResponse added in v0.14.3

type ListScheduleResponse struct {
	common.MarkerInfo
	ScheduleList []ScheduleBrief `json:"scheduleList"`
}

func ListSchedule added in v0.14.3

func ListSchedule(ctx *logger.RequestContext, marker string, maxKeys int, pplFilter, pplVersionFilter, userFilter, scheduleFilter, nameFilter, statusFilter []string) (ListScheduleResponse, error)

type OpInfo added in v0.14.3

type OpInfo struct {
	// contains filtered or unexported fields
}

func NewOpInfo added in v0.14.3

func NewOpInfo(opType string, scheduleID string) (OpInfo, error)

func (OpInfo) GetOpType added in v0.14.3

func (opInfo OpInfo) GetOpType() string

func (OpInfo) GetScheduleID added in v0.14.3

func (opInfo OpInfo) GetScheduleID() string

type PipelineBrief added in v0.14.3

type PipelineBrief struct {
	ID         string `json:"pipelineID"`
	Name       string `json:"name"`
	Desc       string `json:"desc"`
	UserName   string `json:"username"`
	CreateTime string `json:"createTime"`
	UpdateTime string `json:"updateTime"`
}

type PipelineVersionBrief added in v0.14.3

type PipelineVersionBrief struct {
	ID           string `json:"pipelineVersionID"`
	PipelineID   string `json:"pipelineID"`
	FsName       string `json:"fsName"`
	YamlPath     string `json:"yamlPath"`
	PipelineYaml string `json:"pipelineYaml"`
	UserName     string `json:"username"`
	CreateTime   string `json:"createTime"`
	UpdateTime   string `json:"updateTime"`
}

type PipelineVersions added in v0.14.3

type PipelineVersions struct {
	common.MarkerInfo
	PipelineVersionList []PipelineVersionBrief `json:"pplVersionList"`
}

type RunBrief added in v0.14.3

type RunBrief struct {
	ID            string `json:"runID"`
	Name          string `json:"name"`
	Source        string `json:"source"` // pipelineID or yamlPath
	UserName      string `json:"username"`
	FsName        string `json:"fsName"`
	Description   string `json:"description"`
	ScheduleID    string `json:"scheduleID"`
	Message       string `json:"runMsg"`
	Status        string `json:"status"`
	ScheduledTime string `json:"scheduledTime"`
	CreateTime    string `json:"createTime"`
	ActivateTime  string `json:"activateTime"`
	UpdateTime    string `json:"updateTime"`
}

type RunStep added in v0.14.3

type RunStep struct {
	Parameters map[string]interface{} `json:"parameters"`
	Command    string                 `json:"command"`
	Deps       string                 `json:"deps"`
	Artifacts  ArtifactsJson          `json:"artifacts"`
	Env        map[string]string      `json:"env"`
	Queue      string                 `json:"queue"`
	Flavour    string                 `json:"flavour"`
	JobType    string                 `json:"jobType"`
	Cache      schema.Cache           `json:"cache"`
	DockerEnv  string                 `json:"dockerEnv"`
}

used for API CreateRunJson to unmarshal steps in entryPoints and postProcess

type ScheduleBrief added in v0.14.3

type ScheduleBrief struct {
	ID                string                 `json:"scheduleID"`
	Name              string                 `json:"name"`
	Desc              string                 `json:"desc"`
	PipelineID        string                 `json:"pipelineID"`
	PipelineVersionID string                 `json:"pipelineVersionID"`
	UserName          string                 `json:"username"`
	FsConfig          models.FsConfig        `json:"fsConfig"`
	Crontab           string                 `json:"crontab"`
	Options           models.ScheduleOptions `json:"options"`
	StartTime         string                 `json:"startTime"`
	EndTime           string                 `json:"endTime"`
	CreateTime        string                 `json:"createTime"`
	UpdateTime        string                 `json:"updateTime"`
	NextRunTime       string                 `json:"nextRunTime"`
	Message           string                 `json:"scheduleMsg"`
	Status            string                 `json:"status"`
}

type Scheduler added in v0.14.3

type Scheduler struct {
	OpsChannel         chan OpInfo //用于监听用户操作的channel
	ConcurrencyChannel chan string //用于监听任务结束导致concurrency变化的channel
}

func GetGlobalScheduler added in v0.14.3

func GetGlobalScheduler() *Scheduler

单例函数,获取 Scheduler 实例

func (*Scheduler) Start added in v0.14.3

func (s *Scheduler) Start()

开启scheduler 1. 查询数据库,寻找是否有到时的周期调度,有的话就发起任务,并更新休眠时间(下一个最近的周期调度时间) 2. 开始for循环,每个循环监听三类信号:超时信号,用户操作信号,并发空闲信号

type UpdatePipelineRequest added in v0.14.3

type UpdatePipelineRequest = CreatePipelineRequest

type UpdatePipelineResponse added in v0.14.3

type UpdatePipelineResponse struct {
	PipelineID        string `json:"pipelineID"`
	PipelineVersionID string `json:"pipelineVersionID"`
}

func UpdatePipeline added in v0.14.3

func UpdatePipeline(ctx *logger.RequestContext, request UpdatePipelineRequest, pipelineID string) (UpdatePipelineResponse, error)

type UpdateRunRequest added in v0.14.3

type UpdateRunRequest struct {
	StopForce bool `json:"stopForce"`
}

type UpdateRunResponse added in v0.14.3

type UpdateRunResponse struct {
	RunID string `json:"runID"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL