schema

package
v0.14.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LocalType      = "Local"
	KubernetesType = "Kubernetes"
	K3SType        = "K3S"
)
View Source
const (
	PFSTypeLocal = "local"

	PVNameTemplate  = "pfs-$(pfs.fs.id)-$(namespace)-pv"
	PVCNameTemplate = "pfs-$(pfs.fs.id)-pvc"
	FSIDFormat      = "$(pfs.fs.id)"
	NameSpaceFormat = "$(namespace)"

	PFSID        = "pfs.fs.id"
	PFSInfo      = "pfs.fs.info"
	PFSCache     = "pfs.fs.cache"
	PFSServer    = "pfs.server"
	PFSClusterID = "pfs.cluster.id"

	FusePodMntDir = "/home/paddleflow/mnt"

	FsMetaMemory = "mem"
	FsMetaDisk   = "disk"

	FuseKeyFsInfo = "fs-info"

	LabelKeyFsID             = "fsID"
	LabelKeyCacheID          = "cacheID"
	LabelKeyNodeName         = "nodename"
	LabelKeyUsedSize         = "usedSize"
	AnnotationKeyCacheDir    = "cacheDir"
	AnnotationKeyMTime       = "modifiedTime"
	AnnotationKeyMountPrefix = "mount-"

	EnvKeyMountPodName = "POD_NAME"
	EnvKeyNamespace    = "NAMESPACE"

	MountPodNamespace = "paddleflow"
)
View Source
const (
	EnvJobType        = "PF_JOB_TYPE"
	EnvJobQueueName   = "PF_JOB_QUEUE_NAME"
	EnvJobQueueID     = "PF_JOB_QUEUE_ID"
	EnvJobClusterName = "PF_JOB_CLUSTER_NAME"
	EnvJobClusterID   = "PF_JOB_CLUSTER_ID"
	EnvJobNamespace   = "PF_JOB_NAMESPACE"
	EnvJobUserName    = "PF_USER_NAME"
	EnvJobFsID        = "PF_FS_ID"
	EnvJobPVCName     = "PF_JOB_PVC_NAME"
	EnvJobPriority    = "PF_JOB_PRIORITY"
	EnvJobMode        = "PF_JOB_MODE"
	EnvJobFramework   = "PF_JOB_FRAMEWORK"
	// EnvJobYamlPath Additional configuration for a specific job
	EnvJobYamlPath  = "PF_JOB_YAML_PATH"
	EnvIsCustomYaml = "PF_IS_CUSTOM_YAML"
	// EnvJobWorkDir The working directory of the job, `null` means command without a working directory
	EnvJobWorkDir = "PF_WORK_DIR"
	EnvMountPath  = "PF_MOUNT_PATH"

	EnvJobRestartPolicy = "PF_JOB_RESTART_POLICY"

	EnvEnableJobQueueSync = "PF_JOB_QUEUE_SYNC"

	// EnvJobModePS env
	EnvJobModePS          = "PS"
	EnvJobPSPort          = "PF_JOB_PS_PORT"
	EnvJobPServerReplicas = "PF_JOB_PSERVER_REPLICAS"
	EnvJobPServerFlavour  = "PF_JOB_PSERVER_FLAVOUR"
	EnvJobPServerCommand  = "PF_JOB_PSERVER_COMMAND"
	EnvJobWorkerReplicas  = "PF_JOB_WORKER_REPLICAS"
	EnvJobWorkerFlavour   = "PF_JOB_WORKER_FLAVOUR"
	EnvJobWorkerCommand   = "PF_JOB_WORKER_COMMAND"

	// EnvJobModeCollective env
	EnvJobModeCollective   = "Collective"
	EnvJobReplicas         = "PF_JOB_REPLICAS"
	EnvJobFlavour          = "PF_JOB_FLAVOUR"
	EnvJobLimitFlavour     = "PF_JOB_LIMIT_FLAVOUR"
	EnvJobLimitFlavourNone = "NONE"

	// EnvJobModePod env reuse EnvJobReplicas and EnvJobFlavour
	EnvJobModePod = "Pod"

	// spark job env
	EnvJobSparkMainFile    = "PF_JOB_SPARK_MAIN_FILE"
	EnvJobSparkMainClass   = "PF_JOB_SPARK_MAIN_CLASS"
	EnvJobSparkArguments   = "PF_JOB_SPARK_ARGUMENTS"
	EnvJobDriverFlavour    = "PF_JOB_DRIVER_FLAVOUR"
	EnvJobExecutorReplicas = "PF_JOB_EXECUTOR_REPLICAS"
	EnvJobExecutorFlavour  = "PF_JOB_EXECUTOR_FLAVOUR"

	// TODO move to framework
	TypeVcJob      JobType = "vcjob"
	TypeSparkJob   JobType = "spark"
	TypePaddleJob  JobType = "paddlejob"
	TypePodJob     JobType = "pod"
	TypeDeployment JobType = "deployment"

	StatusJobInit        JobStatus = "init"
	StatusJobPending     JobStatus = "pending"
	StatusJobRunning     JobStatus = "running"
	StatusJobFailed      JobStatus = "failed"
	StatusJobSucceeded   JobStatus = "succeeded"
	StatusJobTerminating JobStatus = "terminating"
	StatusJobTerminated  JobStatus = "terminated"
	StatusJobCancelled   JobStatus = "cancelled"
	StatusJobSkipped     JobStatus = "skipped"

	StatusTaskPending   TaskStatus = "pending"
	StatusTaskRunning   TaskStatus = "running"
	StatusTaskSucceeded TaskStatus = "succeeded"
	StatusTaskFailed    TaskStatus = "failed"

	RoleMaster   MemberRole = "master"
	RoleWorker   MemberRole = "worker"
	RoleDriver   MemberRole = "driver"
	RoleExecutor MemberRole = "executor"
	RolePServer  MemberRole = "pserver"
	RolePWorker  MemberRole = "pworker"

	TypeSingle      JobType = "single"
	TypeDistributed JobType = "distributed"
	TypeWorkflow    JobType = "workflow"

	FrameworkSpark      Framework = "spark"
	FrameworkMPI        Framework = "mpi"
	FrameworkTF         Framework = "tensorflow"
	FrameworkPytorch    Framework = "pytorch"
	FrameworkPaddle     Framework = "paddle"
	FrameworkMXNet      Framework = "mxnet"
	FrameworkRay        Framework = "ray"
	FrameworkStandalone Framework = "standalone"

	ListenerTypeJob      = "job"
	ListenerTypeTask     = "task"
	ListenerTypeQueue    = "queue"
	ListenerTypeNode     = "node"
	ListenerTypeNodeTask = "nodeTask"

	EnvPFNodeLabels     = "PF_NODE_LABELS"
	EnvPFTaskLabels     = "PF_TASK_LABELS"
	EnvPFResourceFilter = "PF_NODE_RESOURCES_FILTER"
	PFNodeLabels        = "resource-isolation-type"

	// job priority
	EnvJobVeryLowPriority  = "VERY_LOW"
	EnvJobLowPriority      = "LOW"
	EnvJobNormalPriority   = "NORMAL"
	EnvJobHighPriority     = "HIGH"
	EnvJobVeryHighPriority = "VERY_HIGH"

	// priority class
	PriorityClassVeryLow  = "very-low"
	PriorityClassLow      = "low"
	PriorityClassNormal   = "normal"
	PriorityClassHigh     = "high"
	PriorityClassVeryHigh = "very-high"

	JobOwnerLabel     = "owner"
	JobOwnerValue     = "paddleflow"
	JobIDLabel        = "paddleflow-job-id"
	JobTTLSeconds     = "padleflow/job-ttl-seconds"
	JobLabelFramework = "paddleflow-job-framework"

	VolcanoJobNameLabel  = "volcano.sh/job-name"
	QueueLabelKey        = "volcano.sh/queue-name"
	SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name"

	JobPrefix            = "job"
	DefaultSchedulerName = "volcano"
	DefaultFSMountPath   = "/home/paddleflow/storage/mnt"

	// EnvPaddleParaJob defines env for Paddle Para Job
	EnvPaddleParaJob            = "PF_PADDLE_PARA_JOB"
	EnvPaddleParaPriority       = "PF_PADDLE_PARA_PRIORITY"
	EnvPaddleParaConfigHostFile = "PF_PADDLE_PARA_CONFIG_FILE"
	// PaddleParaVolumeName defines config for Paddle Para Pod
	PaddleParaVolumeName            = "paddle-para-conf-volume"
	PaddleParaAnnotationKeyJobName  = "paddle-para/job-name"
	PaddleParaAnnotationKeyPriority = "paddle-para/priority"
	PaddleParaEnvJobName            = "FLAGS_job_name"
	PaddleParaEnvGPUConfigFile      = "GPU_CONFIG_FILE"
	PaddleParaGPUConfigFilePath     = "/opt/paddle/para/gpu_config.json"

	// RayJob keywords
	EnvRayJobEntryPoint              = "RAY_JOB_ENTRY_POINT"
	EnvRayJobRuntimeEnv              = "RAY_JOB_RUNTIME_ENV"
	EnvRayJobEnableAutoScaling       = "RAY_JOB_ENABLE_AUTOSCALING"
	EnvRayJobAutoScalingMode         = "RAY_JOB_AUTOSCALING_MODE"
	EnvRayJobAutoScalingTimeout      = "RAY_JOB_AUTOSCALING_IDLE_TIMEOUT"
	EnvRayJobHeaderFlavour           = "RAY_JOB_HEADER_FLAVOUR"
	EnvRayJobHeaderImage             = "RAY_JOB_HEADER_IMAGE"
	EnvRayJobHeaderPriority          = "RAY_JOB_HEADER_PRIORITY"
	EnvRayJobHeaderPreStop           = "RAY_JOB_HEADER_PRE_STOP"
	EnvRayJobHeaderStartParamsPrefix = "RAY_JOB_HEADER_START_PARAMS_"
	EnvRayJobWorkerGroupName         = "RAY_JOB_WORKER_GROUP_NAME"
	EnvRayJobWorkerFlavour           = "RAY_JOB_WORKER_FLAVOUR"
	EnvRayJobWorkerImage             = "RAY_JOB_WORKER_IMAGE"
	EnvRayJobWorkerPriority          = "RAY_JOB_WORKER_PRIORITY"
	EnvRayJobWorkerReplicas          = "RAY_JOB_WORKER_REPLICAS"
	EnvRayJobWorkerMinReplicas       = "RAY_JOB_WORKER_MIN_REPLICAS"
	EnvRayJobWorkerMaxReplicas       = "RAY_JOB_WORKER_MAX_REPLICAS"
	EnvRayJobWorkerStartParamsPrefix = "RAY_JOB_WORKER_START_PARAMS_"

	// ENVK3SNodeName  makesure schedule without volcano
	ENVK3SNodeName = "K3S_NODE_NAME"
)
View Source
const (
	StatusQueueCreating    = "creating"
	StatusQueueOpen        = "open"
	StatusQueueUpdating    = "updating"
	StatusQueueClosing     = "closing"
	StatusQueueClosed      = "closed"
	StatusQueueUnavailable = "unavailable"

	TypeElasticQuota           = "elasticQuota"
	TypeVolcanoCapabilityQuota = "volcanoCapabilityQuota"
)
View Source
const (
	ArtifactTypeInput  = "input"
	ArtifactTypeOutput = "output"

	EntryPointsStr = "entry_points"

	CacheAttributeEnable         = "enable"
	CacheAttributeMaxExpiredTime = "max_expired_time"
	CacheAttributeFsScope        = "fs_scope"

	FailureStrategyFailFast = "fail_fast"
	FailureStrategyContinue = "continue"

	EnvDockerEnv = "dockerEnv"

	FsPrefix = "fs-"

	CompTypeComponents  = "components"
	CompTypeEntryPoints = "entryPoints"
	CompTypePostProcess = "postProcess"
)

Variables

This section is empty.

Functions

func CheckReg

func CheckReg(str, pattern string) bool

CheckReg TODO: remove this function to util

func CheckScalarResource

func CheckScalarResource(res string) error

func ConcatenatePVCName added in v0.14.3

func ConcatenatePVCName(fsID string) string

func ConcatenatePVName added in v0.14.3

func ConcatenatePVName(namespace, fsID string) string

func GetBindSource

func GetBindSource(fsID string) string

func ID added in v0.14.3

func ID(userName, fsName string) string

func IsEmptyResource

func IsEmptyResource(resourceInfo ResourceInfo) bool

IsEmptyResource return true when cpu or mem is nil

func IsImmutableJobStatus

func IsImmutableJobStatus(status JobStatus) bool

func IsValidFsMetaDriver

func IsValidFsMetaDriver(metaDriver string) bool

func ProcessStepCacheByMap added in v0.14.3

func ProcessStepCacheByMap(cache *Cache, globalCacheMap map[string]interface{}, componentCacheMap map[string]interface{}) error

func ProcessStepFsMount added in v0.14.3

func ProcessStepFsMount(fsMountList *[]FsMount, globalFsMountList []interface{}) error

func RunYaml2Map added in v0.14.3

func RunYaml2Map(runYaml []byte) (map[string]interface{}, error)

将yaml解析为map

func ValidateResource added in v0.14.3

func ValidateResource(resourceInfo ResourceInfo, scalarResourcesType []string) error

ValidateResource validate resource info

func ValidateResourceItem added in v0.14.3

func ValidateResourceItem(res string) error

ValidateResourceItem check resource for cpu or memory

func ValidateScalarResourceInfo

func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error

ValidateScalarResourceInfo validate scalar resource info

Types

type ActionType

type ActionType string
const (
	Create    ActionType = "create"
	Update    ActionType = "update"
	Delete    ActionType = "delete"
	Terminate ActionType = "terminate"
)

type Artifacts

type Artifacts struct {
	Input  map[string]string `yaml:"input"       json:"input"`
	Output map[string]string `yaml:"output"      json:"output"`
}

func (*Artifacts) DeepCopy added in v0.14.3

func (art *Artifacts) DeepCopy() *Artifacts

func (*Artifacts) ValidateOutputMapByList

func (art *Artifacts) ValidateOutputMapByList() error

type Cache

type Cache struct {
	Enable         bool      `yaml:"enable"           json:"enable"`
	MaxExpiredTime string    `yaml:"max_expired_time" json:"maxExpiredTime"` // seconds
	FsScope        []FsScope `yaml:"fs_scope"         json:"fsScope"`        // seperated by ","
}

type ClientOptions

type ClientOptions struct {
	Master string
	Config string
	QPS    float32
	Burst  int
}

ClientOptions used to build rest config.

type Cluster

type Cluster struct {
	ID   string
	Name string
	Type string

	// ClientOpt defines client config for cluster
	ClientOpt ClientOptions
}

type Component added in v0.14.3

type Component interface {
	GetDeps() []string
	GetArtifacts() Artifacts
	GetArtifactPath(artName string) (string, error)
	GetInputArtifactPath(artName string) (string, error)
	GetOutputArtifactPath(artName string) (string, error)
	GetParameters() map[string]interface{}
	GetParameterValue(paramName string) (interface{}, error)
	GetCondition() string
	GetLoopArgument() interface{}
	GetLoopArgumentLength() int
	GetType() string
	GetName() string

	// 下面几个Update 函数在进行模板替换的时候会用到
	UpdateCondition(string)
	UpdateLoopArguemt(interface{})
	UpdateName(name string)
	UpdateDeps(deps string)

	InitInputArtifacts()
	InitOutputArtifacts()
	InitParameters()

	// 用于 deepCopy, 避免复用时出现问题
	DeepCopy() Component
}

Component包括Dag和Step,有Struct WorkflowSourceStep 和 WorkflowSourceDag实现了该接口

type ComponentView added in v0.14.3

type ComponentView interface {
	GetComponentName() string
	GetParentDagID() string
	GetStatus() JobStatus
	GetSeq() int
	GetDeps() string
	GetMsg() string
	GetName() string
	GetStartTime() string
	GetEndTime() string

	SetDeps(string)
}

type Conf

type Conf struct {
	Name string `json:"name"`
	// 存储资源
	FileSystem      FileSystem   `json:"fs,omitempty"`
	ExtraFileSystem []FileSystem `json:"extraFS,omitempty"`

	// 计算资源
	Flavour      Flavour `json:"flavour,omitempty"`
	LimitFlavour Flavour `json:"limitFlavour,omitempty"`
	Priority     string  `json:"priority"`
	ClusterID    string  `json:"clusterID"`
	QueueID      string  `json:"queueID"`
	QueueName    string  `json:"queueName,omitempty"`
	// 运行时需要的参数
	Labels      map[string]string `json:"labels"`
	Annotations map[string]string `json:"annotations"`
	Env         map[string]string `json:"env,omitempty"`
	Command     string            `json:"command,omitempty"`
	Image       string            `json:"image"`
	Port        int               `json:"port,omitempty"`
	Args        []string          `json:"args,omitempty"`
	// contains filtered or unexported fields
}

func (*Conf) Framework added in v0.14.5

func (c *Conf) Framework() Framework

func (*Conf) GetAllFileSystem added in v0.14.3

func (c *Conf) GetAllFileSystem() []FileSystem

GetAllFileSystem combine FileSystem and ExtraFileSystem to a slice

func (*Conf) GetAnnotations added in v0.14.5

func (c *Conf) GetAnnotations() map[string]string

func (*Conf) GetArgs added in v0.14.3

func (c *Conf) GetArgs() []string

func (*Conf) GetClusterID

func (c *Conf) GetClusterID() string

func (*Conf) GetCommand

func (c *Conf) GetCommand() string

func (*Conf) GetEnv

func (c *Conf) GetEnv() map[string]string

func (*Conf) GetEnvSubset added in v0.14.5

func (c *Conf) GetEnvSubset(prefix string) map[string]string

func (*Conf) GetEnvValue added in v0.14.5

func (c *Conf) GetEnvValue(key string) string

func (*Conf) GetExtraFS added in v0.14.3

func (c *Conf) GetExtraFS() []FileSystem

func (*Conf) GetFileSystem added in v0.14.3

func (c *Conf) GetFileSystem() FileSystem

func (*Conf) GetFlavour

func (c *Conf) GetFlavour() string

func (*Conf) GetImage

func (c *Conf) GetImage() string

func (*Conf) GetJobMode

func (c *Conf) GetJobMode() string

func (*Conf) GetLabels added in v0.14.5

func (c *Conf) GetLabels() map[string]string

func (*Conf) GetLimitFlavour added in v0.14.6

func (c *Conf) GetLimitFlavour() string

func (*Conf) GetName

func (c *Conf) GetName() string

func (*Conf) GetNamespace

func (c *Conf) GetNamespace() string

func (*Conf) GetPriority

func (c *Conf) GetPriority() string

func (*Conf) GetProcessedFileSystem added in v0.14.6

func (c *Conf) GetProcessedFileSystem() []FileSystem

func (*Conf) GetQueueID

func (c *Conf) GetQueueID() string

func (*Conf) GetQueueName

func (c *Conf) GetQueueName() string

func (*Conf) GetRestartPolicy added in v0.14.5

func (c *Conf) GetRestartPolicy() string

func (*Conf) GetUserName

func (c *Conf) GetUserName() string

func (*Conf) SetAnnotations

func (c *Conf) SetAnnotations(k, v string)

func (*Conf) SetClusterID

func (c *Conf) SetClusterID(id string)

func (*Conf) SetEnv

func (c *Conf) SetEnv(name, value string)

func (*Conf) SetFlavour

func (c *Conf) SetFlavour(flavourKey string)

func (*Conf) SetLabels

func (c *Conf) SetLabels(k, v string)

func (*Conf) SetNamespace

func (c *Conf) SetNamespace(ns string)

func (*Conf) SetPriority

func (c *Conf) SetPriority(pc string)

func (*Conf) SetProcessedFileSystem added in v0.14.6

func (c *Conf) SetProcessedFileSystem(fs []FileSystem)

func (*Conf) SetQueueID

func (c *Conf) SetQueueID(id string)

func (*Conf) SetQueueName

func (c *Conf) SetQueueName(queueName string)

SetQueueName set queue name

func (*Conf) Type

func (c *Conf) Type() JobType

type DagView added in v0.14.3

type DagView struct {
	PK          int64                      `json:"-"`
	DagID       string                     `json:"id"`
	Name        string                     `json:"name"`
	Type        string                     `json:"type"`
	DagName     string                     `json:"dagName"`
	ParentDagID string                     `json:"parentDagID"`
	LoopSeq     int                        `json:"-"`
	Deps        string                     `json:"deps"`
	Parameters  map[string]string          `json:"parameters"`
	Artifacts   Artifacts                  `json:"artifacts"`
	StartTime   string                     `json:"startTime"`
	EndTime     string                     `json:"endTime"`
	Status      JobStatus                  `json:"status"`
	Message     string                     `json:"message"`
	EntryPoints map[string][]ComponentView `json:"entryPoints"`
}

func (DagView) GetComponentName added in v0.14.3

func (d DagView) GetComponentName() string

func (DagView) GetDeps added in v0.14.3

func (d DagView) GetDeps() string

func (DagView) GetEndTime added in v0.14.5

func (d DagView) GetEndTime() string

func (DagView) GetMsg added in v0.14.3

func (d DagView) GetMsg() string

func (DagView) GetName added in v0.14.3

func (d DagView) GetName() string

func (DagView) GetParentDagID added in v0.14.3

func (d DagView) GetParentDagID() string

func (DagView) GetSeq added in v0.14.3

func (d DagView) GetSeq() int

func (DagView) GetStartTime added in v0.14.5

func (d DagView) GetStartTime() string

func (DagView) GetStatus added in v0.14.3

func (d DagView) GetStatus() JobStatus

func (*DagView) SetDeps added in v0.14.3

func (d *DagView) SetDeps(deps string)

type FailureOptions

type FailureOptions struct {
	Strategy string `yaml:"strategy"     json:"strategy"`
}

type FileSystem

type FileSystem struct {
	ID        string `json:"id,omitempty"`
	Name      string `json:"name"`
	Type      string `json:"type"`
	HostPath  string `json:"hostPath,omitempty"`
	MountPath string `json:"mountPath,omitempty"`
	SubPath   string `json:"subPath,omitempty"`
	ReadOnly  bool   `json:"readOnly,omitempty"`
}

FileSystem indicate PaddleFlow

type Flavour

type Flavour struct {
	ResourceInfo `yaml:",inline"`
	Name         string `json:"name" yaml:"name"`
}

Flavour is a set of resources that can be used to run a job.

type Framework

type Framework string

type FrameworkVersion added in v0.14.5

type FrameworkVersion struct {
	Framework  string `json:"framework"`
	APIVersion string `json:"apiVersion"`
}

func NewFrameworkVersion added in v0.14.5

func NewFrameworkVersion(framework, apiVersion string) FrameworkVersion

func (*FrameworkVersion) String added in v0.14.5

func (f *FrameworkVersion) String() string

type FsMount added in v0.14.3

type FsMount struct {
	ID        string `yaml:"-"             json:"id"`
	Name      string `yaml:"name"          json:"name"`
	MountPath string `yaml:"mount_path"    json:"mountPath"`
	SubPath   string `yaml:"sub_path"      json:"subPath"`
	ReadOnly  bool   `yaml:"read_only"     json:"readOnly"`
}

type FsOptions added in v0.14.3

type FsOptions struct {
	MainFS  FsMount   `yaml:"main_fs"      json:"mainFS"`
	ExtraFS []FsMount `yaml:"extra_fs"     json:"extraFS,omitempty"`
}

type FsScope added in v0.14.3

type FsScope struct {
	Name string `yaml:"name"          json:"name"`
	ID   string `yaml:"-"             json:"id"`
	Path string `yaml:"path"          json:"path"`
}

type JobLogInfo

type JobLogInfo struct {
	JobID    string        `json:"jobID"`
	TaskList []TaskLogInfo `json:"taskList"`

	ResourceName string   `json:"name"`
	Resourcetype string   `json:"type"`
	Events       []string `json:"eventList"`
}

type JobLogRequest

type JobLogRequest struct {
	JobID           string `json:"jobID"`
	JobType         string `json:"jobType"`
	Namespace       string `json:"namespace"`
	LogFilePosition string `json:"logFilePosition"`
	LogPageSize     int    `json:"logPageSize"`
	LogPageNo       int    `json:"logPageNo"`
}

type JobStatus

type JobStatus string

type JobType

type JobType string

type JobView

type JobView struct {
	PK          int64             `json:"-"`
	JobID       string            `json:"jobID"`
	Name        string            `json:"name"`
	Type        string            `json:"type"`
	StepName    string            `json:"stepName"`
	ParentDagID string            `json:"parentDagID"`
	LoopSeq     int               `json:"-"`
	Command     string            `json:"command"`
	Parameters  map[string]string `json:"parameters"`
	Env         map[string]string `json:"env"`
	ExtraFS     []FsMount         `json:"extraFS"`
	StartTime   string            `json:"startTime"`
	EndTime     string            `json:"endTime"`
	Status      JobStatus         `json:"status"`
	Deps        string            `json:"deps"`
	DockerEnv   string            `json:"dockerEnv"`
	Artifacts   Artifacts         `json:"artifacts"`
	Cache       Cache             `json:"cache"`
	JobMessage  string            `json:"jobMessage"`
	CacheRunID  string            `json:"cacheRunID"`
	CacheJobID  string            `json:"cacheJobID"`
}

JobView is view of job info responded to user, while Job is for pipeline and job engine to process

func (JobView) GetComponentName added in v0.14.3

func (j JobView) GetComponentName() string

func (JobView) GetDeps added in v0.14.3

func (j JobView) GetDeps() string

func (JobView) GetEndTime added in v0.14.5

func (j JobView) GetEndTime() string

func (JobView) GetMsg added in v0.14.3

func (j JobView) GetMsg() string

func (JobView) GetName added in v0.14.3

func (j JobView) GetName() string

func (JobView) GetParentDagID added in v0.14.3

func (j JobView) GetParentDagID() string

func (JobView) GetSeq added in v0.14.3

func (j JobView) GetSeq() int

func (JobView) GetStartTime added in v0.14.5

func (j JobView) GetStartTime() string

func (JobView) GetStatus added in v0.14.3

func (j JobView) GetStatus() JobStatus

func (*JobView) SetDeps added in v0.14.3

func (j *JobView) SetDeps(deps string)

type LogInfo

type LogInfo struct {
	LogContent  string `json:"logContent"`
	HasNextPage bool   `json:"hasNextPage"`
	Truncated   bool   `json:"truncated"`
}

type LogRunArtifactRequest

type LogRunArtifactRequest struct {
	Md5          string `json:"md5"`
	RunID        string `json:"runID"`
	FsID         string `json:"fsID"`
	FsName       string `json:"fsname"`
	UserName     string `json:"username"`
	ArtifactPath string `json:"artifactPath"`
	Step         string `json:"step"`
	JobID        string `json:"jobID"`
	Type         string `json:"type"`
	ArtifactName string `json:"artifactName"`
	Meta         string `json:"meta"`
}

type LogRunCacheRequest

type LogRunCacheRequest struct {
	FirstFp     string `json:"firstFp"`
	SecondFp    string `json:"secondFp"`
	Source      string `json:"source"`
	RunID       string `json:"runID"`
	Step        string `json:"step"`
	JobID       string `json:"jobID"`
	FsID        string `json:"fsID"`
	FsName      string `json:"fsname"`
	UserName    string `json:"username"`
	ExpiredTime string `json:"expiredTime"`
	Strategy    string `json:"strategy"`
}

type Member added in v0.14.5

type Member struct {
	ID       string     `json:"id"`
	Replicas int        `json:"replicas"`
	Role     MemberRole `json:"role"`
	Conf     `json:",inline"`
}

type MemberRole

type MemberRole string

type MixedLogRequest added in v0.14.6

type MixedLogRequest struct {
	Name         string
	Namespace    string
	ResourceType string
	Framework    string

	LineLimit      string
	SizeLimit      int64
	IsReadFromTail bool
}

MixedLogRequest can request job log or k8s pod/deploy events and log

type NodeQuotaInfo

type NodeQuotaInfo struct {
	NodeName    string             `json:"nodeName"`
	Schedulable bool               `json:"schedulable"`
	Total       resources.Resource `json:"total"`
	Idle        resources.Resource `json:"idle"`
}

type PFJobConf

type PFJobConf interface {
	GetName() string
	GetEnv() map[string]string
	GetEnvValue(key string) string
	GetEnvSubset(prefix string) map[string]string
	GetCommand() string
	GetImage() string

	GetFileSystem() FileSystem
	GetExtraFS() []FileSystem
	GetArgs() []string

	GetPriority() string
	SetPriority(string)

	GetQueueName() string
	GetQueueID() string
	GetClusterID() string
	GetUserName() string
	GetNamespace() string
	GetFlavour() string
	GetLimitFlavour() string

	SetQueueID(string)
	SetClusterID(string)
	SetNamespace(string)
	SetEnv(string, string)
	SetLabels(string, string)
	SetAnnotations(string, string)

	Type() JobType
	Framework() Framework
}

type Parser added in v0.14.3

type Parser struct {
}

func (*Parser) IsDag added in v0.14.3

func (p *Parser) IsDag(comp map[string]interface{}) bool

func (*Parser) ParseCache added in v0.14.3

func (p *Parser) ParseCache(cacheMap map[string]interface{}, cache *Cache) error

func (*Parser) ParseComponents added in v0.14.3

func (p *Parser) ParseComponents(entryPoints map[string]interface{}) (map[string]Component, error)

func (*Parser) ParseDag added in v0.14.3

func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSourceDag) error

该函数用于给生成给WorkflowSourceDag的各个字段赋值,但不会进行默认值填充,不会进行全局参数对局部参数的替换

func (*Parser) ParseFsMount added in v0.14.3

func (p *Parser) ParseFsMount(fsMap map[string]interface{}, fs *FsMount) error

func (*Parser) ParseFsOptions added in v0.14.3

func (p *Parser) ParseFsOptions(fsMap map[string]interface{}, fs *FsOptions) error

func (*Parser) ParseFsScope added in v0.14.3

func (p *Parser) ParseFsScope(fsMap map[string]interface{}, fs *FsScope) error

func (*Parser) ParseStep added in v0.14.3

func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error

func (*Parser) ParseWorkflowSource added in v0.14.3

func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *WorkflowSource) error

该函数将请求体解析成WorkflowSource, 该函数未完成全局替换操作

func (*Parser) TransJsonMap2Yaml added in v0.14.3

func (p *Parser) TransJsonMap2Yaml(jsonMap map[string]interface{}) error

type PostProcessView

type PostProcessView map[string]*JobView

type QuotaSummary

type QuotaSummary struct {
	TotalQuota resources.Resource `json:"total"`
	IdleQuota  resources.Resource `json:"idle"`
}

type Reference added in v0.14.3

type Reference struct {
	Component string `yaml:"component" json:"component"`
}

type ResourceInfo

type ResourceInfo struct {
	CPU             string              `json:"cpu" yaml:"cpu"`
	Mem             string              `json:"mem" yaml:"mem"`
	ScalarResources ScalarResourcesType `json:"scalarResources,omitempty" yaml:"scalarResources,omitempty"`
}

ResourceInfo is a struct that contains the information of a resource.

func (ResourceInfo) ToMap added in v0.14.3

func (r ResourceInfo) ToMap() map[string]string

type ResourceName

type ResourceName string

ResourceName is the name identifying various resources in a ResourceList.

type RunOptions added in v0.14.3

type RunOptions struct {
	FSUsername string
	StopForce  bool
}

type RuntimeView

type RuntimeView map[string][]ComponentView

RuntimeView is view of run responded to user, while workflowRuntime is for pipeline engine to process

func (*RuntimeView) UnmarshalJSON added in v0.14.5

func (rv *RuntimeView) UnmarshalJSON(data []byte) error

type ScalarResourcesType

type ScalarResourcesType map[ResourceName]string

ScalarResourcesType is the type of scalar resources

type TaskLogInfo

type TaskLogInfo struct {
	// container name
	TaskID string  `json:"taskID"`
	Info   LogInfo `json:"logInfo"`
}

type TaskStatus

type TaskStatus string

type WorkflowSource

type WorkflowSource struct {
	Name           string                         `yaml:"name"               json:"name"`
	DockerEnv      string                         `yaml:"docker_env"         json:"dockerEnv"`
	EntryPoints    WorkflowSourceDag              `yaml:"entry_points"       json:"entryPoints"`
	Components     map[string]Component           `yaml:"components"         json:"components"`
	Cache          Cache                          `yaml:"cache"              json:"cache"`
	Parallelism    int                            `yaml:"parallelism"        json:"parallelism"`
	Disabled       string                         `yaml:"disabled"           json:"disabled"`
	FailureOptions FailureOptions                 `yaml:"failure_options"    json:"failureOptions"`
	PostProcess    map[string]*WorkflowSourceStep `yaml:"post_process"       json:"postProcess"`
	FsOptions      FsOptions                      `yaml:"fs_options"         json:"fsOptions"`
}

func GetWorkflowSource added in v0.14.3

func GetWorkflowSource(runYaml []byte) (WorkflowSource, error)

该函数除了将yaml解析为wfs,还进行了全局参数替换操作

func GetWorkflowSourceByMap added in v0.14.3

func GetWorkflowSourceByMap(yamlMap map[string]interface{}) (WorkflowSource, error)

由Map解析得到一个Wfs,该Map中的key需要是下划线格式

func (*WorkflowSource) GetComponentByFullName added in v0.14.3

func (wfs *WorkflowSource) GetComponentByFullName(fullName string) (Component, error)

func (*WorkflowSource) GetCompsMapAndRelName added in v0.14.3

func (wfs *WorkflowSource) GetCompsMapAndRelName(components map[string]Component, absoluteName string) (map[string]Component, string, bool)

递归的检查Absolute Name对应的Component是否存在,并返回该Comp的所有同级别节点,和它的Relative Name

func (*WorkflowSource) GetDisabled

func (wfs *WorkflowSource) GetDisabled() []string

func (*WorkflowSource) GetFsMounts added in v0.14.3

func (wfs *WorkflowSource) GetFsMounts() ([]FsMount, error)

func (*WorkflowSource) IsDisabled

func (wfs *WorkflowSource) IsDisabled(componentName string) (bool, error)

func (*WorkflowSource) ProcessRuntimeComponents added in v0.14.3

func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Component, componentType string,
	yamlMap map[string]interface{}, componentsMap map[string]interface{}) error

对Step的DockerEnv、Cache进行全局替换

func (*WorkflowSource) TransToRunYamlRaw added in v0.14.3

func (wfs *WorkflowSource) TransToRunYamlRaw() (runYamlRaw string, err error)

func (*WorkflowSource) UnmarshalJSON added in v0.14.5

func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error

type WorkflowSourceDag added in v0.14.3

type WorkflowSourceDag struct {
	Name         string                 `yaml:"-"              json:"name"`
	Type         string                 `yaml:"-"              json:"type"`
	LoopArgument interface{}            `yaml:"loop_argument"  json:"loopArgument"`
	Condition    string                 `yaml:"condition"      json:"condition"`
	Parameters   map[string]interface{} `yaml:"parameters"     json:"parameters"`
	Deps         string                 `yaml:"deps"           json:"deps"`
	Artifacts    Artifacts              `yaml:"artifacts"      json:"artifacts"`
	EntryPoints  map[string]Component   `yaml:"entry_points"   json:"entryPoints"`
}

func (*WorkflowSourceDag) DeepCopy added in v0.14.3

func (d *WorkflowSourceDag) DeepCopy() Component

func (*WorkflowSourceDag) GetArtifactPath added in v0.14.3

func (d *WorkflowSourceDag) GetArtifactPath(artName string) (string, error)

获取 artifact 的路径

func (*WorkflowSourceDag) GetArtifacts added in v0.14.3

func (d *WorkflowSourceDag) GetArtifacts() Artifacts

func (*WorkflowSourceDag) GetCondition added in v0.14.3

func (d *WorkflowSourceDag) GetCondition() string

func (*WorkflowSourceDag) GetDeps added in v0.14.3

func (d *WorkflowSourceDag) GetDeps() []string

func (*WorkflowSourceDag) GetInputArtifactPath added in v0.14.3

func (d *WorkflowSourceDag) GetInputArtifactPath(artName string) (string, error)

获取 输入artifact的存储路径

func (*WorkflowSourceDag) GetLoopArgument added in v0.14.3

func (d *WorkflowSourceDag) GetLoopArgument() interface{}

func (*WorkflowSourceDag) GetLoopArgumentLength added in v0.14.3

func (d *WorkflowSourceDag) GetLoopArgumentLength() int

func (*WorkflowSourceDag) GetName added in v0.14.3

func (d *WorkflowSourceDag) GetName() string

func (*WorkflowSourceDag) GetOutputArtifactPath added in v0.14.3

func (d *WorkflowSourceDag) GetOutputArtifactPath(artName string) (string, error)

获取输出artifact的存储路径

func (*WorkflowSourceDag) GetParameterValue added in v0.14.3

func (d *WorkflowSourceDag) GetParameterValue(paramName string) (interface{}, error)

获取指定 parameter 的值

func (*WorkflowSourceDag) GetParameters added in v0.14.3

func (d *WorkflowSourceDag) GetParameters() map[string]interface{}

func (*WorkflowSourceDag) GetSubComponet added in v0.14.3

func (d *WorkflowSourceDag) GetSubComponet(subComponentName string) (Component, bool)

func (*WorkflowSourceDag) GetType added in v0.14.3

func (d *WorkflowSourceDag) GetType() string

func (*WorkflowSourceDag) InitInputArtifacts added in v0.14.3

func (d *WorkflowSourceDag) InitInputArtifacts()

func (*WorkflowSourceDag) InitOutputArtifacts added in v0.14.3

func (d *WorkflowSourceDag) InitOutputArtifacts()

func (*WorkflowSourceDag) InitParameters added in v0.14.3

func (d *WorkflowSourceDag) InitParameters()

func (*WorkflowSourceDag) UpdateCondition added in v0.14.3

func (d *WorkflowSourceDag) UpdateCondition(condition string)

func (*WorkflowSourceDag) UpdateDeps added in v0.14.3

func (d *WorkflowSourceDag) UpdateDeps(deps string)

func (*WorkflowSourceDag) UpdateLoopArguemt added in v0.14.3

func (d *WorkflowSourceDag) UpdateLoopArguemt(loopArgument interface{})

func (*WorkflowSourceDag) UpdateName added in v0.14.3

func (d *WorkflowSourceDag) UpdateName(name string)

type WorkflowSourceStep

type WorkflowSourceStep struct {
	Name         string                 `yaml:"-"                 json:"name"`
	LoopArgument interface{}            `yaml:"loop_argument"     json:"loopArgument"`
	Condition    string                 `yaml:"condition"         json:"condition"`
	Parameters   map[string]interface{} `yaml:"parameters"        json:"parameters"`
	Command      string                 `yaml:"command"           json:"command"`
	Deps         string                 `yaml:"deps"              json:"deps"`
	Artifacts    Artifacts              `yaml:"artifacts"         json:"artifacts"`
	Env          map[string]string      `yaml:"env"               json:"env"`
	DockerEnv    string                 `yaml:"docker_env"        json:"dockerEnv"`
	Cache        Cache                  `yaml:"cache"             json:"cache"`
	Reference    Reference              `yaml:"reference"         json:"reference"`
	ExtraFS      []FsMount              `yaml:"extra_fs"          json:"extraFS"`
}

func (*WorkflowSourceStep) DeepCopy added in v0.14.3

func (s *WorkflowSourceStep) DeepCopy() Component

func (*WorkflowSourceStep) GetArtifactPath added in v0.14.3

func (s *WorkflowSourceStep) GetArtifactPath(artName string) (string, error)

获取 artifact 的路径

func (*WorkflowSourceStep) GetArtifacts added in v0.14.3

func (s *WorkflowSourceStep) GetArtifacts() Artifacts

func (*WorkflowSourceStep) GetCondition added in v0.14.3

func (s *WorkflowSourceStep) GetCondition() string

func (*WorkflowSourceStep) GetDeps

func (s *WorkflowSourceStep) GetDeps() []string

func (*WorkflowSourceStep) GetInputArtifactPath added in v0.14.3

func (s *WorkflowSourceStep) GetInputArtifactPath(artName string) (string, error)

获取 输入artifact的存储路径

func (*WorkflowSourceStep) GetLoopArgument added in v0.14.3

func (s *WorkflowSourceStep) GetLoopArgument() interface{}

func (*WorkflowSourceStep) GetLoopArgumentLength added in v0.14.3

func (s *WorkflowSourceStep) GetLoopArgumentLength() int

func (*WorkflowSourceStep) GetName added in v0.14.3

func (s *WorkflowSourceStep) GetName() string

func (*WorkflowSourceStep) GetOutputArtifactPath added in v0.14.3

func (s *WorkflowSourceStep) GetOutputArtifactPath(artName string) (string, error)

获取输出artifact的存储路径

func (*WorkflowSourceStep) GetParameterValue added in v0.14.3

func (s *WorkflowSourceStep) GetParameterValue(paramName string) (interface{}, error)

获取指定 parameter 的值

func (*WorkflowSourceStep) GetParameters added in v0.14.3

func (s *WorkflowSourceStep) GetParameters() map[string]interface{}

func (*WorkflowSourceStep) GetType added in v0.14.3

func (s *WorkflowSourceStep) GetType() string

func (*WorkflowSourceStep) InitInputArtifacts added in v0.14.3

func (s *WorkflowSourceStep) InitInputArtifacts()

func (*WorkflowSourceStep) InitOutputArtifacts added in v0.14.3

func (s *WorkflowSourceStep) InitOutputArtifacts()

func (*WorkflowSourceStep) InitParameters added in v0.14.3

func (s *WorkflowSourceStep) InitParameters()

func (*WorkflowSourceStep) UpdateCondition added in v0.14.3

func (s *WorkflowSourceStep) UpdateCondition(condition string)

func (*WorkflowSourceStep) UpdateDeps added in v0.14.3

func (s *WorkflowSourceStep) UpdateDeps(deps string)

func (*WorkflowSourceStep) UpdateLoopArguemt added in v0.14.3

func (s *WorkflowSourceStep) UpdateLoopArguemt(loopArgument interface{})

func (*WorkflowSourceStep) UpdateName added in v0.14.3

func (s *WorkflowSourceStep) UpdateName(name string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL