schema

package
v0.14.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LocalType      = "Local"
	KubernetesType = "Kubernetes"
)
View Source
const (
	FSIDFormat      = "$(pfs.fs.id)"
	NameSpaceFormat = "$(namespace)"
	FSID            = "pfs.fs.id"
	PFSServer       = "pfs.server"
	PFSUserName     = "pfs.user.name"

	HostMntDir = "/data/paddleflow-fs/mnt"
	PodMntDir  = "/home/paddleflow/mnt"

	FsMetaDefault = "default"
	FsMetaMemory  = "mem"
	FsMetaLevelDB = "leveldb"
	FsMetaNutsDB  = "nutsdb"
)
View Source
const (
	EnvJobType        = "PF_JOB_TYPE"
	EnvJobQueueName   = "PF_JOB_QUEUE_NAME"
	EnvJobQueueID     = "PF_JOB_QUEUE_ID"
	EnvJobClusterName = "PF_JOB_CLUSTER_NAME"
	EnvJobClusterID   = "PF_JOB_CLUSTER_ID"
	EnvJobNamespace   = "PF_JOB_NAMESPACE"
	EnvJobUserName    = "PF_USER_NAME"
	EnvJobFsID        = "PF_FS_ID"
	EnvJobPVCName     = "PF_JOB_PVC_NAME"
	EnvJobPriority    = "PF_JOB_PRIORITY"
	EnvJobMode        = "PF_JOB_MODE"
	// EnvJobYamlPath Additional configuration for a specific job
	EnvJobYamlPath  = "PF_JOB_YAML_PATH"
	EnvIsCustomYaml = "PF_IS_CUSTOM_YAML"

	// EnvJobModePS env
	EnvJobModePS          = "PS"
	EnvJobPSPort          = "PF_JOB_PS_PORT"
	EnvJobPServerReplicas = "PF_JOB_PSERVER_REPLICAS"
	EnvJobPServerFlavour  = "PF_JOB_PSERVER_FLAVOUR"
	EnvJobPServerCommand  = "PF_JOB_PSERVER_COMMAND"
	EnvJobWorkerReplicas  = "PF_JOB_WORKER_REPLICAS"
	EnvJobWorkerFlavour   = "PF_JOB_WORKER_FLAVOUR"
	EnvJobWorkerCommand   = "PF_JOB_WORKER_COMMAND"

	// EnvJobModeCollective env
	EnvJobModeCollective = "Collective"
	EnvJobReplicas       = "PF_JOB_REPLICAS"
	EnvJobFlavour        = "PF_JOB_FLAVOUR"

	// EnvJobModePod env reuse EnvJobReplicas and EnvJobFlavour
	EnvJobModePod = "Pod"

	// spark job env
	EnvJobSparkMainFile    = "PF_JOB_SPARK_MAIN_FILE"
	EnvJobSparkMainClass   = "PF_JOB_SPARK_MAIN_CLASS"
	EnvJobSparkArguments   = "PF_JOB_SPARK_ARGUMENTS"
	EnvJobDriverFlavour    = "PF_JOB_DRIVER_FLAVOUR"
	EnvJobExecutorReplicas = "PF_JOB_EXECUTOR_REPLICAS"
	EnvJobExecutorFlavour  = "PF_JOB_EXECUTOR_FLAVOUR"

	// TODO move to framework
	TypeVcJob     JobType = "vcjob"
	TypeSparkJob  JobType = "spark"
	TypePaddleJob JobType = "paddlejob"
	TypePodJob    JobType = "pod"

	StatusJobInit        JobStatus = "init"
	StatusJobPending     JobStatus = "pending"
	StatusJobRunning     JobStatus = "running"
	StatusJobFailed      JobStatus = "failed"
	StatusJobSucceeded   JobStatus = "succeeded"
	StatusJobTerminating JobStatus = "terminating"
	StatusJobTerminated  JobStatus = "terminated"
	StatusJobCancelled   JobStatus = "cancelled"
	StatusJobSkipped     JobStatus = "skipped"

	StatusTaskPending   TaskStatus = "pending"
	StatusTaskRunning   TaskStatus = "running"
	StatusTaskSucceeded TaskStatus = "succeeded"
	StatusTaskFailed    TaskStatus = "failed"

	RoleMaster   MemberRole = "master"
	RoleWorker   MemberRole = "worker"
	RoleDriver   MemberRole = "driver"
	RoleExecutor MemberRole = "executor"
	RolePServer  MemberRole = "pserver"
	RolePWorker  MemberRole = "pworker"

	TypeSingle      JobType = "single"
	TypeDistributed JobType = "distributed"
	TypeWorkflow    JobType = "workflow"

	FrameworkSpark      Framework = "spark"
	FrameworkMPI        Framework = "mpi"
	FrameworkTF         Framework = "tensorflow"
	FrameworkPytorch    Framework = "pytorch"
	FrameworkPaddle     Framework = "paddle"
	FrameworkStandalone Framework = "standalone"

	// job priority
	EnvJobVeryLowPriority  = "VERY_LOW"
	EnvJobLowPriority      = "LOW"
	EnvJobNormalPriority   = "NORMAL"
	EnvJobHighPriority     = "HIGH"
	EnvJobVeryHighPriority = "VERY_HIGH"

	// priority class
	PriorityClassVeryLow  = "very-low"
	PriorityClassLow      = "low"
	PriorityClassNormal   = "normal"
	PriorityClassHigh     = "high"
	PriorityClassVeryHigh = "very-high"

	JobOwnerLabel = "owner"
	JobOwnerValue = "paddleflow"
	JobIDLabel    = "paddleflow-job-id"
	JobTTLSeconds = "padleflow/job-ttl-seconds"

	VolcanoJobNameLabel  = "volcano.sh/job-name"
	QueueLabelKey        = "volcano.sh/queue-name"
	SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name"

	JobPrefix            = "job"
	DefaultSchedulerName = "volcano"
	DefaultFSMountPath   = "/home/paddleflow/storage/mnt"

	// EnvPaddleParaJob defines env for Paddle Para Job
	EnvPaddleParaJob            = "PF_PADDLE_PARA_JOB"
	EnvPaddleParaPriority       = "PF_PADDLE_PARA_PRIORITY"
	EnvPaddleParaConfigHostFile = "PF_PADDLE_PARA_CONFIG_FILE"
	// PaddleParaVolumeName defines config for Paddle Para Pod
	PaddleParaVolumeName            = "paddle-para-conf-volume"
	PaddleParaAnnotationKeyJobName  = "paddle-para/job-name"
	PaddleParaAnnotationKeyPriority = "paddle-para/priority"
	PaddleParaEnvJobName            = "FLAGS_job_name"
	PaddleParaEnvGPUConfigFile      = "GPU_CONFIG_FILE"
	PaddleParaGPUConfigFilePath     = "/opt/paddle/para/gpu_config.json"
)
View Source
const (
	StatusQueueCreating    = "creating"
	StatusQueueOpen        = "open"
	StatusQueueUpdating    = "updating"
	StatusQueueClosing     = "closing"
	StatusQueueClosed      = "closed"
	StatusQueueUnavailable = "unavailable"

	TypeElasticQuota           = "elasticQuota"
	TypeVolcanoCapabilityQuota = "volcanoCapabilityQuota"
)
View Source
const (
	ArtifactTypeInput  = "input"
	ArtifactTypeOutput = "output"

	EntryPointsStr = "entry_points"

	CacheAttributeEnable         = "enable"
	CacheAttributeMaxExpiredTime = "max_expired_time"
	CacheAttributeFsScope        = "fs_scope"

	FailureStrategyFailFast = "fail_fast"
	FailureStrategyContinue = "continue"

	EnvDockerEnv = "dockerEnv"
)

Variables

This section is empty.

Functions

func CheckCPUResource

func CheckCPUResource(res string) error

func CheckMemoryResource

func CheckMemoryResource(res string) error

func CheckReg

func CheckReg(str, pattern string) bool

CheckReg todo remove this function to util

func CheckScalarResource

func CheckScalarResource(res string) error

func DefaultCacheDir

func DefaultCacheDir(fsID string) string

func GetBindSource

func GetBindSource(fsID string) string

func IsEmptyResource

func IsEmptyResource(resourceInfo ResourceInfo) bool

IsEmptyResource return true when cpu or mem is nil

func IsImmutableJobStatus

func IsImmutableJobStatus(status JobStatus) bool

func IsValidFsMetaDriver

func IsValidFsMetaDriver(metaDriver string) bool

func ValidateResourceInfo

func ValidateResourceInfo(resourceInfo ResourceInfo, scalarResourcesType []string) error

ValidateResourceInfo validate resource info

func ValidateScalarResourceInfo

func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error

ValidateScalarResourceInfo validate scalar resource info

Types

type ActionType

type ActionType string
const (
	Create    ActionType = "create"
	Update    ActionType = "update"
	Delete    ActionType = "delete"
	Terminate ActionType = "terminate"
)

type Artifacts

type Artifacts struct {
	Input      map[string]string `yaml:"input"       json:"input"`
	Output     map[string]string `yaml:"-"           json:"output"`
	OutputList []string          `yaml:"output"      json:"-"`
}

func (*Artifacts) ValidateOutputMapByList

func (atf *Artifacts) ValidateOutputMapByList() error

type Cache

type Cache struct {
	Enable         bool   `yaml:"enable"           json:"enable"`
	MaxExpiredTime string `yaml:"max_expired_time" json:"maxExpiredTime"` // seconds
	FsScope        string `yaml:"fs_scope"         json:"fsScope"`        // seperated by ","
}

type ClientOptions

type ClientOptions struct {
	Master string
	Config string
	QPS    float32
	Burst  int
}

ClientOptions used to build rest config.

type Cluster

type Cluster struct {
	ID   string
	Name string
	Type string

	// ClientOpt defines client config for cluster
	ClientOpt ClientOptions
}

type Conf

type Conf struct {
	Name string `json:"name"`
	// 存储资源
	FileSystem      FileSystem   `json:"fileSystem,omitempty"`
	ExtraFileSystem []FileSystem `json:"extraFileSystem,omitempty"`
	// 计算资源
	Flavour  Flavour `json:"flavour,omitempty"`
	Priority string  `json:"priority"`
	QueueID  string  `json:"queueID"`
	// 运行时需要的参数
	Labels      map[string]string `json:"labels"`
	Annotations map[string]string `json:"annotations"`
	Env         map[string]string `json:"env,omitempty"`
	Command     string            `json:"command,omitempty"`
	Image       string            `json:"image"`
	Port        int               `json:"port,omitempty"`
	Args        []string          `json:"args,omitempty"`
}

func (*Conf) GetClusterID

func (c *Conf) GetClusterID() string

func (*Conf) GetClusterName

func (c *Conf) GetClusterName() string

func (*Conf) GetCommand

func (c *Conf) GetCommand() string

func (*Conf) GetEnv

func (c *Conf) GetEnv() map[string]string

func (*Conf) GetFS

func (c *Conf) GetFS() string

func (*Conf) GetFlavour

func (c *Conf) GetFlavour() string

func (*Conf) GetImage

func (c *Conf) GetImage() string

func (*Conf) GetJobExecutorReplicas

func (c *Conf) GetJobExecutorReplicas() string

func (*Conf) GetJobMode

func (c *Conf) GetJobMode() string

func (*Conf) GetJobReplicas

func (c *Conf) GetJobReplicas() string

func (*Conf) GetName

func (c *Conf) GetName() string

func (*Conf) GetNamespace

func (c *Conf) GetNamespace() string

func (*Conf) GetPSCommand

func (c *Conf) GetPSCommand() string

func (*Conf) GetPSFlavour

func (c *Conf) GetPSFlavour() string

func (*Conf) GetPSReplicas

func (c *Conf) GetPSReplicas() string

func (*Conf) GetPriority

func (c *Conf) GetPriority() string

func (*Conf) GetQueueID

func (c *Conf) GetQueueID() string

func (*Conf) GetQueueName

func (c *Conf) GetQueueName() string

func (*Conf) GetUserName

func (c *Conf) GetUserName() string

func (*Conf) GetWorkerCommand

func (c *Conf) GetWorkerCommand() string

func (*Conf) GetWorkerFlavour

func (c *Conf) GetWorkerFlavour() string

func (*Conf) GetWorkerReplicas

func (c *Conf) GetWorkerReplicas() string

func (*Conf) GetYamlPath

func (c *Conf) GetYamlPath() string

func (*Conf) SetAnnotations

func (c *Conf) SetAnnotations(k, v string)

func (*Conf) SetClusterID

func (c *Conf) SetClusterID(id string)

func (*Conf) SetEnv

func (c *Conf) SetEnv(name, value string)

func (*Conf) SetFS

func (c *Conf) SetFS(fsID string)

SetFS sets the filesystem id

func (*Conf) SetFlavour

func (c *Conf) SetFlavour(flavourKey string)

func (*Conf) SetLabels

func (c *Conf) SetLabels(k, v string)

func (*Conf) SetNamespace

func (c *Conf) SetNamespace(ns string)

func (*Conf) SetPSFlavour

func (c *Conf) SetPSFlavour(flavourKey string)

func (*Conf) SetPriority

func (c *Conf) SetPriority(pc string)

func (*Conf) SetQueueID

func (c *Conf) SetQueueID(id string)

func (*Conf) SetQueueName

func (c *Conf) SetQueueName(queueName string)

SetQueueName set queue name

func (*Conf) SetUserName

func (c *Conf) SetUserName(userName string)

func (*Conf) SetWorkerFlavour

func (c *Conf) SetWorkerFlavour(flavourKey string)

func (*Conf) Type

func (c *Conf) Type() JobType

type FailureOptions

type FailureOptions struct {
	Strategy string `yaml:"strategy"`
}

type FileSystem

type FileSystem struct {
	Name      string `json:"name"`
	MountPath string `json:"mountPath,omitempty"`
	SubPath   string `json:"subPath,omitempty"`
	ReadOnly  bool   `json:"readOnly,omitempty"`
}

type Flavour

type Flavour struct {
	ResourceInfo `yaml:",inline"`
	Name         string `json:"name" yaml:"name"`
}

Flavour is a set of resources that can be used to run a job.

type Framework

type Framework string

type JobLogInfo

type JobLogInfo struct {
	JobID    string        `json:"jobID"`
	TaskList []TaskLogInfo `json:"taskList"`
}

type JobLogRequest

type JobLogRequest struct {
	JobID           string `json:"jobID"`
	JobType         string `json:"jobType"`
	Namespace       string `json:"namespace"`
	LogFilePosition string `json:"logFilePosition"`
	LogPageSize     int    `json:"logPageSize"`
	LogPageNo       int    `json:"logPageNo"`
}

type JobStatus

type JobStatus string

type JobType

type JobType string

type JobView

type JobView struct {
	JobID      string            `json:"jobID"`
	JobName    string            `json:"name"`
	Command    string            `json:"command"`
	Parameters map[string]string `json:"parameters"`
	Env        map[string]string `json:"env"`
	StartTime  string            `json:"startTime"`
	EndTime    string            `json:"endTime"`
	Status     JobStatus         `json:"status"`
	Deps       string            `json:"deps"`
	DockerEnv  string            `json:"dockerEnv"`
	Artifacts  Artifacts         `json:"artifacts"`
	Cache      Cache             `json:"cache"`
	JobMessage string            `json:"jobMessage"`
	CacheRunID string            `json:"cacheRunID"`
}

JobView is view of job info responded to user, while Job is for pipeline and job engine to process

type LogInfo

type LogInfo struct {
	LogContent  string `json:"logContent"`
	HasNextPage bool   `json:"hasNextPage"`
	Truncated   bool   `json:"truncated"`
}

type LogRunArtifactRequest

type LogRunArtifactRequest struct {
	Md5          string `json:"md5"`
	RunID        string `json:"runID"`
	FsID         string `json:"fsID"`
	FsName       string `json:"fsname"`
	UserName     string `json:"username"`
	ArtifactPath string `json:"artifactPath"`
	Step         string `json:"step"`
	Type         string `json:"type"`
	ArtifactName string `json:"artifactName"`
	Meta         string `json:"meta"`
}

type LogRunCacheRequest

type LogRunCacheRequest struct {
	FirstFp     string `json:"firstFp"`
	SecondFp    string `json:"secondFp"`
	Source      string `json:"source"`
	RunID       string `json:"runID"`
	Step        string `json:"step"`
	FsID        string `json:"fsID"`
	FsName      string `json:"fsname"`
	UserName    string `json:"username"`
	ExpiredTime string `json:"expiredTime"`
	Strategy    string `json:"strategy"`
}

type MemberRole

type MemberRole string

type NodeQuotaInfo

type NodeQuotaInfo struct {
	NodeName    string   `json:"nodeName"`
	Schedulable bool     `json:"schedulable"`
	Total       Resource `json:"total"`
	Idle        Resource `json:"idle"`
}

type PFJobConf

type PFJobConf interface {
	GetName() string
	GetEnv() map[string]string
	GetCommand() string
	GetImage() string

	GetPriority() string
	SetPriority(string)

	GetQueueName() string
	GetQueueID() string
	GetClusterName() string
	GetClusterID() string
	GetUserName() string

	GetFS() string
	SetFS(string)

	GetYamlPath() string
	GetNamespace() string
	GetJobMode() string

	GetFlavour() string
	GetPSFlavour() string
	GetWorkerFlavour() string

	SetQueueID(string)
	SetClusterID(string)
	SetNamespace(string)
	SetEnv(string, string)
	SetLabels(string, string)
	SetAnnotations(string, string)

	Type() JobType
}

type PostProcessView

type PostProcessView RuntimeView

type QuotaSummary

type QuotaSummary struct {
	TotalQuota Resource `json:"total"`
	IdleQuota  Resource `json:"idle"`
}

type Resource

type Resource struct {
	MilliCPU float64 `json:"cpu"`
	Memory   float64 `json:"memory"`
	Storage  float64 `json:"-"`

	// ScalarResources
	ScalarResources map[ResourceName]float64 `json:"scalarResources"`
}

Resource struct defines all the resource type

func EmptyResource

func EmptyResource() *Resource

EmptyResource creates a empty resource object and returns

func NewResource

func NewResource(resourceInfo ResourceInfo) (*Resource, error)

func (*Resource) Add

func (r *Resource) Add(rr *Resource) *Resource

add two resources

func (*Resource) AddScalar

func (r *Resource) AddScalar(name ResourceName, quantity float64)

AddScalar adds a resource by a scalar value of this resource.

func (*Resource) AddWithoutScalarResources

func (r *Resource) AddWithoutScalarResources(rr *Resource) *Resource

不更新ScalarResources

func (Resource) LessEqual

func (r Resource) LessEqual(rr *Resource) bool

LessEqual once any field in r less than rr, it would be return false Notice that r is properly less than rr and r.ScalarResources can be nil e.g. `if !requestResource.lessEqual(staticResource) {}` todo to be removed

func (Resource) MarshalJSON

func (r Resource) MarshalJSON() ([]byte, error)

Resource类json序列化的时候会自动调用该方法

func (*Resource) Multi

func (r *Resource) Multi(ratio float64) *Resource

Multi multiples the resource with ratio provided

func (*Resource) SetScalar

func (r *Resource) SetScalar(name ResourceName, quantity float64)

SetScalar sets a resource by a scalar value of this resource.

func (*Resource) String

func (r *Resource) String() string

String returns resource details in string format

func (*Resource) Sub

func (r *Resource) Sub(rr *Resource) *Resource

Sub subtracts two Resource objects.

type ResourceInfo

type ResourceInfo struct {
	CPU             string              `json:"cpu" yaml:"cpu"`
	Mem             string              `json:"mem" yaml:"mem"`
	ScalarResources ScalarResourcesType `json:"scalarResources,omitempty" yaml:"scalarResources,omitempty"`
}

ResourceInfo is a struct that contains the information of a resource.

func EmptyResourceInfo

func EmptyResourceInfo() *ResourceInfo

func (*ResourceInfo) Add

Add adds the resource info of the other flavour to the current one.

func (*ResourceInfo) LessEqual

func (r *ResourceInfo) LessEqual(r2 ResourceInfo) bool

LessEqual returns true if the current flavour is less than or equal to the other one.

func (*ResourceInfo) SetScalar

func (r *ResourceInfo) SetScalar(name ResourceName, value string)

SetScalar sets a resource by a scalar value of this resource.

func (*ResourceInfo) Sub

Sub subtracts the other resource from the current one.

type ResourceName

type ResourceName string

ResourceName is the name identifying various resources in a ResourceList.

type RuntimeView

type RuntimeView map[string]JobView

RuntimeView is view of run responded to user, while workflowRuntime is for pipeline engine to process

type ScalarResourcesType

type ScalarResourcesType map[ResourceName]string

ScalarResourcesType is the type of scalar resources

type TaskLogInfo

type TaskLogInfo struct {
	TaskID string  `json:"taskID"`
	Info   LogInfo `json:"logInfo"`
}

type TaskStatus

type TaskStatus string

type WorkflowSource

type WorkflowSource struct {
	Name           string                         `yaml:"name"`
	DockerEnv      string                         `yaml:"docker_env"`
	EntryPoints    map[string]*WorkflowSourceStep `yaml:"entry_points"`
	Cache          Cache                          `yaml:"cache"`
	Parallelism    int                            `yaml:"parallelism"`
	Disabled       string                         `yaml:"disabled"`
	FailureOptions FailureOptions                 `yaml:"failure_options"`
	PostProcess    map[string]*WorkflowSourceStep `yaml:"post_process"`
}

func ParseWorkflowSource

func ParseWorkflowSource(runYaml []byte) (WorkflowSource, error)

func (*WorkflowSource) GetDisabled

func (wfs *WorkflowSource) GetDisabled() []string

func (*WorkflowSource) HasStep

func (wfs *WorkflowSource) HasStep(step string) bool

func (*WorkflowSource) IsDisabled

func (wfs *WorkflowSource) IsDisabled(stepName string) (bool, error)

func (*WorkflowSource) ValidateStepCacheByMap

func (wfs *WorkflowSource) ValidateStepCacheByMap(runMap map[string]interface{}) error

type WorkflowSourceStep

type WorkflowSourceStep struct {
	Parameters map[string]interface{} `yaml:"parameters"`
	Command    string                 `yaml:"command"`
	Deps       string                 `yaml:"deps"`
	Artifacts  Artifacts              `yaml:"artifacts"`
	Env        map[string]string      `yaml:"env"`
	DockerEnv  string                 `yaml:"docker_env"`
	Cache      Cache                  `yaml:"cache"`
}

func (*WorkflowSourceStep) GetDeps

func (s *WorkflowSourceStep) GetDeps() []string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL