Documentation
¶
Index ¶
- Constants
- func CheckCPUResource(res string) error
- func CheckMemoryResource(res string) error
- func CheckReg(str, pattern string) bool
- func CheckScalarResource(res string) error
- func DefaultCacheDir(fsID string) string
- func GetBindSource(fsID string) string
- func IsEmptyResource(resourceInfo ResourceInfo) bool
- func IsImmutableJobStatus(status JobStatus) bool
- func IsValidFsMetaDriver(metaDriver string) bool
- func ValidateResourceInfo(resourceInfo ResourceInfo, scalarResourcesType []string) error
- func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
- type ActionType
- type Artifacts
- type Cache
- type ClientOptions
- type Cluster
- type Conf
- func (c *Conf) GetClusterID() string
- func (c *Conf) GetClusterName() string
- func (c *Conf) GetCommand() string
- func (c *Conf) GetEnv() map[string]string
- func (c *Conf) GetFS() string
- func (c *Conf) GetFlavour() string
- func (c *Conf) GetImage() string
- func (c *Conf) GetJobExecutorReplicas() string
- func (c *Conf) GetJobMode() string
- func (c *Conf) GetJobReplicas() string
- func (c *Conf) GetName() string
- func (c *Conf) GetNamespace() string
- func (c *Conf) GetPSCommand() string
- func (c *Conf) GetPSFlavour() string
- func (c *Conf) GetPSReplicas() string
- func (c *Conf) GetPriority() string
- func (c *Conf) GetQueueID() string
- func (c *Conf) GetQueueName() string
- func (c *Conf) GetUserName() string
- func (c *Conf) GetWorkerCommand() string
- func (c *Conf) GetWorkerFlavour() string
- func (c *Conf) GetWorkerReplicas() string
- func (c *Conf) GetYamlPath() string
- func (c *Conf) SetAnnotations(k, v string)
- func (c *Conf) SetClusterID(id string)
- func (c *Conf) SetEnv(name, value string)
- func (c *Conf) SetFS(fsID string)
- func (c *Conf) SetFlavour(flavourKey string)
- func (c *Conf) SetLabels(k, v string)
- func (c *Conf) SetNamespace(ns string)
- func (c *Conf) SetPSFlavour(flavourKey string)
- func (c *Conf) SetPriority(pc string)
- func (c *Conf) SetQueueID(id string)
- func (c *Conf) SetQueueName(queueName string)
- func (c *Conf) SetUserName(userName string)
- func (c *Conf) SetWorkerFlavour(flavourKey string)
- func (c *Conf) Type() JobType
- type FailureOptions
- type FileSystem
- type Flavour
- type Framework
- type JobLogInfo
- type JobLogRequest
- type JobStatus
- type JobType
- type JobView
- type LogInfo
- type LogRunArtifactRequest
- type LogRunCacheRequest
- type MemberRole
- type NodeQuotaInfo
- type PFJobConf
- type PostProcessView
- type QuotaSummary
- type Resource
- func (r *Resource) Add(rr *Resource) *Resource
- func (r *Resource) AddScalar(name ResourceName, quantity float64)
- func (r *Resource) AddWithoutScalarResources(rr *Resource) *Resource
- func (r Resource) LessEqual(rr *Resource) bool
- func (r Resource) MarshalJSON() ([]byte, error)
- func (r *Resource) Multi(ratio float64) *Resource
- func (r *Resource) SetScalar(name ResourceName, quantity float64)
- func (r *Resource) String() string
- func (r *Resource) Sub(rr *Resource) *Resource
- type ResourceInfo
- type ResourceName
- type RuntimeView
- type ScalarResourcesType
- type TaskLogInfo
- type TaskStatus
- type WorkflowSource
- type WorkflowSourceStep
Constants ¶
const ( LocalType = "Local" KubernetesType = "Kubernetes" )
const ( FSIDFormat = "$(pfs.fs.id)" NameSpaceFormat = "$(namespace)" FSID = "pfs.fs.id" PFSServer = "pfs.server" PFSUserName = "pfs.user.name" HostMntDir = "/data/paddleflow-fs/mnt" PodMntDir = "/home/paddleflow/mnt" FsMetaDefault = "default" FsMetaMemory = "mem" FsMetaLevelDB = "leveldb" FsMetaNutsDB = "nutsdb" )
const ( EnvJobType = "PF_JOB_TYPE" EnvJobQueueName = "PF_JOB_QUEUE_NAME" EnvJobQueueID = "PF_JOB_QUEUE_ID" EnvJobClusterName = "PF_JOB_CLUSTER_NAME" EnvJobClusterID = "PF_JOB_CLUSTER_ID" EnvJobNamespace = "PF_JOB_NAMESPACE" EnvJobUserName = "PF_USER_NAME" EnvJobFsID = "PF_FS_ID" EnvJobPVCName = "PF_JOB_PVC_NAME" EnvJobPriority = "PF_JOB_PRIORITY" EnvJobMode = "PF_JOB_MODE" // EnvJobYamlPath Additional configuration for a specific job EnvJobYamlPath = "PF_JOB_YAML_PATH" EnvIsCustomYaml = "PF_IS_CUSTOM_YAML" // EnvJobModePS env EnvJobModePS = "PS" EnvJobPSPort = "PF_JOB_PS_PORT" EnvJobPServerReplicas = "PF_JOB_PSERVER_REPLICAS" EnvJobPServerFlavour = "PF_JOB_PSERVER_FLAVOUR" EnvJobPServerCommand = "PF_JOB_PSERVER_COMMAND" EnvJobWorkerReplicas = "PF_JOB_WORKER_REPLICAS" EnvJobWorkerFlavour = "PF_JOB_WORKER_FLAVOUR" EnvJobWorkerCommand = "PF_JOB_WORKER_COMMAND" // EnvJobModeCollective env EnvJobModeCollective = "Collective" EnvJobReplicas = "PF_JOB_REPLICAS" EnvJobFlavour = "PF_JOB_FLAVOUR" // EnvJobModePod env reuse EnvJobReplicas and EnvJobFlavour EnvJobModePod = "Pod" // spark job env EnvJobSparkMainFile = "PF_JOB_SPARK_MAIN_FILE" EnvJobSparkMainClass = "PF_JOB_SPARK_MAIN_CLASS" EnvJobSparkArguments = "PF_JOB_SPARK_ARGUMENTS" EnvJobDriverFlavour = "PF_JOB_DRIVER_FLAVOUR" EnvJobExecutorReplicas = "PF_JOB_EXECUTOR_REPLICAS" EnvJobExecutorFlavour = "PF_JOB_EXECUTOR_FLAVOUR" // TODO move to framework TypeVcJob JobType = "vcjob" TypeSparkJob JobType = "spark" TypePaddleJob JobType = "paddlejob" TypePodJob JobType = "pod" StatusJobInit JobStatus = "init" StatusJobPending JobStatus = "pending" StatusJobRunning JobStatus = "running" StatusJobFailed JobStatus = "failed" StatusJobSucceeded JobStatus = "succeeded" StatusJobTerminating JobStatus = "terminating" StatusJobTerminated JobStatus = "terminated" StatusJobCancelled JobStatus = "cancelled" StatusJobSkipped JobStatus = "skipped" StatusTaskPending TaskStatus = "pending" StatusTaskRunning TaskStatus = "running" StatusTaskSucceeded TaskStatus = "succeeded" StatusTaskFailed TaskStatus = "failed" RoleMaster MemberRole = "master" RoleWorker MemberRole = "worker" RoleDriver MemberRole = "driver" RoleExecutor MemberRole = "executor" RolePServer MemberRole = "pserver" RolePWorker MemberRole = "pworker" TypeSingle JobType = "single" TypeDistributed JobType = "distributed" TypeWorkflow JobType = "workflow" FrameworkSpark Framework = "spark" FrameworkMPI Framework = "mpi" FrameworkTF Framework = "tensorflow" FrameworkPytorch Framework = "pytorch" FrameworkPaddle Framework = "paddle" FrameworkStandalone Framework = "standalone" // job priority EnvJobVeryLowPriority = "VERY_LOW" EnvJobLowPriority = "LOW" EnvJobNormalPriority = "NORMAL" EnvJobHighPriority = "HIGH" EnvJobVeryHighPriority = "VERY_HIGH" // priority class PriorityClassVeryLow = "very-low" PriorityClassLow = "low" PriorityClassNormal = "normal" PriorityClassHigh = "high" PriorityClassVeryHigh = "very-high" JobOwnerLabel = "owner" JobOwnerValue = "paddleflow" JobIDLabel = "paddleflow-job-id" JobTTLSeconds = "padleflow/job-ttl-seconds" VolcanoJobNameLabel = "volcano.sh/job-name" QueueLabelKey = "volcano.sh/queue-name" SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name" JobPrefix = "job" DefaultSchedulerName = "volcano" DefaultFSMountPath = "/home/paddleflow/storage/mnt" // EnvPaddleParaJob defines env for Paddle Para Job EnvPaddleParaJob = "PF_PADDLE_PARA_JOB" EnvPaddleParaPriority = "PF_PADDLE_PARA_PRIORITY" EnvPaddleParaConfigHostFile = "PF_PADDLE_PARA_CONFIG_FILE" // PaddleParaVolumeName defines config for Paddle Para Pod PaddleParaVolumeName = "paddle-para-conf-volume" PaddleParaAnnotationKeyJobName = "paddle-para/job-name" PaddleParaAnnotationKeyPriority = "paddle-para/priority" PaddleParaEnvJobName = "FLAGS_job_name" PaddleParaEnvGPUConfigFile = "GPU_CONFIG_FILE" PaddleParaGPUConfigFilePath = "/opt/paddle/para/gpu_config.json" )
const ( StatusQueueCreating = "creating" StatusQueueOpen = "open" StatusQueueUpdating = "updating" StatusQueueClosing = "closing" StatusQueueClosed = "closed" TypeElasticQuota = "elasticQuota" TypeVolcanoCapabilityQuota = "volcanoCapabilityQuota" )
const ( ArtifactTypeInput = "input" ArtifactTypeOutput = "output" EntryPointsStr = "entry_points" CacheAttributeEnable = "enable" CacheAttributeMaxExpiredTime = "max_expired_time" CacheAttributeFsScope = "fs_scope" FailureStrategyFailFast = "fail_fast" FailureStrategyContinue = "continue" EnvDockerEnv = "dockerEnv" )
Variables ¶
This section is empty.
Functions ¶
func CheckCPUResource ¶
func CheckMemoryResource ¶
func CheckScalarResource ¶
func DefaultCacheDir ¶
func GetBindSource ¶
func IsEmptyResource ¶
func IsEmptyResource(resourceInfo ResourceInfo) bool
IsEmptyResource return true when cpu or mem is nil
func IsImmutableJobStatus ¶
func IsValidFsMetaDriver ¶
func ValidateResourceInfo ¶
func ValidateResourceInfo(resourceInfo ResourceInfo, scalarResourcesType []string) error
ValidateResourceInfo validate resource info
func ValidateScalarResourceInfo ¶
func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
ValidateScalarResourceInfo validate scalar resource info
Types ¶
type ActionType ¶
type ActionType string
const ( Create ActionType = "create" Update ActionType = "update" Delete ActionType = "delete" Terminate ActionType = "terminate" )
type Artifacts ¶
type Artifacts struct { Input map[string]string `yaml:"input" json:"input"` Output map[string]string `yaml:"-" json:"output"` OutputList []string `yaml:"output" json:"-"` }
func (*Artifacts) ValidateOutputMapByList ¶
type ClientOptions ¶
ClientOptions used to build rest config.
type Cluster ¶
type Cluster struct { ID string Name string Type string // ClientOpt defines client config for cluster ClientOpt ClientOptions }
type Conf ¶
type Conf struct { Name string `json:"name"` // 存储资源 FileSystem FileSystem `json:"fileSystem,omitempty"` ExtraFileSystem []FileSystem `json:"extraFileSystem,omitempty"` // 计算资源 Flavour Flavour `json:"flavour,omitempty"` Priority string `json:"priority"` QueueID string `json:"queueID"` // 运行时需要的参数 Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` Env map[string]string `json:"env,omitempty"` Command string `json:"command,omitempty"` Image string `json:"image"` Port int `json:"port,omitempty"` Args []string `json:"args,omitempty"` }
func (*Conf) GetClusterID ¶
func (*Conf) GetClusterName ¶
func (*Conf) GetCommand ¶
func (*Conf) GetFlavour ¶
func (*Conf) GetJobExecutorReplicas ¶
func (*Conf) GetJobMode ¶
func (*Conf) GetJobReplicas ¶
func (*Conf) GetNamespace ¶
func (*Conf) GetPSCommand ¶
func (*Conf) GetPSFlavour ¶
func (*Conf) GetPSReplicas ¶
func (*Conf) GetPriority ¶
func (*Conf) GetQueueID ¶
func (*Conf) GetQueueName ¶
func (*Conf) GetUserName ¶
func (*Conf) GetWorkerCommand ¶
func (*Conf) GetWorkerFlavour ¶
func (*Conf) GetWorkerReplicas ¶
func (*Conf) GetYamlPath ¶
func (*Conf) SetAnnotations ¶
func (*Conf) SetClusterID ¶
func (*Conf) SetFlavour ¶
func (*Conf) SetNamespace ¶
func (*Conf) SetPSFlavour ¶
func (*Conf) SetPriority ¶
func (*Conf) SetQueueID ¶
func (*Conf) SetQueueName ¶
SetQueueName set queue name
func (*Conf) SetUserName ¶
func (*Conf) SetWorkerFlavour ¶
type FailureOptions ¶
type FailureOptions struct {
Strategy string `yaml:"strategy"`
}
type FileSystem ¶
type Flavour ¶
type Flavour struct { ResourceInfo `yaml:",inline"` Name string `json:"name" yaml:"name"` }
Flavour is a set of resources that can be used to run a job.
type JobLogInfo ¶
type JobLogInfo struct { JobID string `json:"jobID"` TaskList []TaskLogInfo `json:"taskList"` }
type JobLogRequest ¶
type JobView ¶
type JobView struct { JobID string `json:"jobID"` JobName string `json:"name"` Command string `json:"command"` Parameters map[string]string `json:"parameters"` Env map[string]string `json:"env"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Status JobStatus `json:"status"` Deps string `json:"deps"` DockerEnv string `json:"dockerEnv"` Artifacts Artifacts `json:"artifacts"` Cache Cache `json:"cache"` JobMessage string `json:"jobMessage"` CacheRunID string `json:"cacheRunID"` }
JobView is view of job info responded to user, while Job is for pipeline and job engine to process
type LogRunArtifactRequest ¶
type LogRunArtifactRequest struct { Md5 string `json:"md5"` RunID string `json:"runID"` FsID string `json:"fsID"` FsName string `json:"fsname"` UserName string `json:"username"` ArtifactPath string `json:"artifactPath"` Step string `json:"step"` Type string `json:"type"` ArtifactName string `json:"artifactName"` Meta string `json:"meta"` }
type LogRunCacheRequest ¶
type LogRunCacheRequest struct { FirstFp string `json:"firstFp"` SecondFp string `json:"secondFp"` Source string `json:"source"` RunID string `json:"runID"` Step string `json:"step"` FsID string `json:"fsID"` FsName string `json:"fsname"` UserName string `json:"username"` ExpiredTime string `json:"expiredTime"` Strategy string `json:"strategy"` }
type MemberRole ¶
type MemberRole string
type NodeQuotaInfo ¶
type PFJobConf ¶
type PFJobConf interface { GetName() string GetEnv() map[string]string GetCommand() string GetImage() string GetPriority() string SetPriority(string) GetQueueName() string GetQueueID() string GetClusterName() string GetClusterID() string GetUserName() string GetFS() string SetFS(string) GetYamlPath() string GetNamespace() string GetJobMode() string GetFlavour() string GetPSFlavour() string GetWorkerFlavour() string SetQueueID(string) SetClusterID(string) SetNamespace(string) SetEnv(string, string) SetLabels(string, string) SetAnnotations(string, string) Type() JobType }
type PostProcessView ¶
type PostProcessView RuntimeView
type QuotaSummary ¶
type Resource ¶
type Resource struct { MilliCPU float64 `json:"cpu"` Memory float64 `json:"memory"` Storage float64 `json:"-"` // ScalarResources ScalarResources map[ResourceName]float64 `json:"scalarResources"` }
Resource struct defines all the resource type
func EmptyResource ¶
func EmptyResource() *Resource
EmptyResource creates a empty resource object and returns
func NewResource ¶
func NewResource(resourceInfo ResourceInfo) (*Resource, error)
func (*Resource) AddScalar ¶
func (r *Resource) AddScalar(name ResourceName, quantity float64)
AddScalar adds a resource by a scalar value of this resource.
func (*Resource) AddWithoutScalarResources ¶
不更新ScalarResources
func (Resource) LessEqual ¶
LessEqual once any field in r less than rr, it would be return false Notice that r is properly less than rr and r.ScalarResources can be nil e.g. `if !requestResource.lessEqual(staticResource) {}` todo to be removed
func (Resource) MarshalJSON ¶
Resource类json序列化的时候会自动调用该方法
func (*Resource) SetScalar ¶
func (r *Resource) SetScalar(name ResourceName, quantity float64)
SetScalar sets a resource by a scalar value of this resource.
type ResourceInfo ¶
type ResourceInfo struct { CPU string `json:"cpu" yaml:"cpu"` Mem string `json:"mem" yaml:"mem"` ScalarResources ScalarResourcesType `json:"scalarResources,omitempty" yaml:"scalarResources,omitempty"` }
ResourceInfo is a struct that contains the information of a resource.
func EmptyResourceInfo ¶
func EmptyResourceInfo() *ResourceInfo
func (*ResourceInfo) Add ¶
func (r *ResourceInfo) Add(r2 ResourceInfo) ResourceInfo
Add adds the resource info of the other flavour to the current one.
func (*ResourceInfo) LessEqual ¶
func (r *ResourceInfo) LessEqual(r2 ResourceInfo) bool
LessEqual returns true if the current flavour is less than or equal to the other one.
func (*ResourceInfo) SetScalar ¶
func (r *ResourceInfo) SetScalar(name ResourceName, value string)
SetScalar sets a resource by a scalar value of this resource.
func (*ResourceInfo) Sub ¶
func (r *ResourceInfo) Sub(r2 ResourceInfo) (ResourceInfo, error)
Sub subtracts the other resource from the current one.
type ResourceName ¶
type ResourceName string
ResourceName is the name identifying various resources in a ResourceList.
type RuntimeView ¶
RuntimeView is view of run responded to user, while workflowRuntime is for pipeline engine to process
type ScalarResourcesType ¶
type ScalarResourcesType map[ResourceName]string
ScalarResourcesType is the type of scalar resources
type TaskLogInfo ¶
type TaskStatus ¶
type TaskStatus string
type WorkflowSource ¶
type WorkflowSource struct { Name string `yaml:"name"` DockerEnv string `yaml:"docker_env"` EntryPoints map[string]*WorkflowSourceStep `yaml:"entry_points"` Cache Cache `yaml:"cache"` Parallelism int `yaml:"parallelism"` Disabled string `yaml:"disabled"` FailureOptions FailureOptions `yaml:"failure_options"` PostProcess map[string]*WorkflowSourceStep `yaml:"post_process"` }
func ParseWorkflowSource ¶
func ParseWorkflowSource(runYaml []byte) (WorkflowSource, error)
func (*WorkflowSource) GetDisabled ¶
func (wfs *WorkflowSource) GetDisabled() []string
func (*WorkflowSource) HasStep ¶
func (wfs *WorkflowSource) HasStep(step string) bool
func (*WorkflowSource) IsDisabled ¶
func (wfs *WorkflowSource) IsDisabled(stepName string) (bool, error)
func (*WorkflowSource) ValidateStepCacheByMap ¶
func (wfs *WorkflowSource) ValidateStepCacheByMap(runMap map[string]interface{}) error
type WorkflowSourceStep ¶
type WorkflowSourceStep struct { Parameters map[string]interface{} `yaml:"parameters"` Command string `yaml:"command"` Deps string `yaml:"deps"` Artifacts Artifacts `yaml:"artifacts"` Env map[string]string `yaml:"env"` DockerEnv string `yaml:"docker_env"` Cache Cache `yaml:"cache"` }
func (*WorkflowSourceStep) GetDeps ¶
func (s *WorkflowSourceStep) GetDeps() []string