job

package
v0.14.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const EnvSkipResourceValidate = "PF_SKIP_RESOURCE_VALIDATE"

Variables

View Source
var (
	WSManager = WebsocketManager{
		Connections:   make(map[string]*Connection),
		BroadcastChan: make(chan GetJobResponse, 1000),
	}
	UpdateTime = time.Now()

	LogURLFormat = "http://%s:%s/v1/containers/%s/log?jobID=%s&token=%s"
)
View Source
var IsSkipResourceValidate bool

Functions

func CreatePPLJob added in v0.14.3

func CreatePPLJob(conf schema.PFJobConf) (string, error)

CreatePPLJob create a run job, used by pipeline

func DeleteJob

func DeleteJob(ctx *logger.RequestContext, jobID string) error

func GenerateLogURL added in v0.14.6

func GenerateLogURL(task model.JobTask) string

func StopJob

func StopJob(ctx *logger.RequestContext, jobID string) error

func UpdateJob

func UpdateJob(ctx *logger.RequestContext, request *UpdateJobRequest) error

func ValidatePPLJob added in v0.14.3

func ValidatePPLJob(conf schema.PFJobConf) error

Types

type CommonJobInfo

type CommonJobInfo struct {
	ID               string            `json:"id"`
	Name             string            `json:"name"`
	Labels           map[string]string `json:"labels"`
	Annotations      map[string]string `json:"annotations"`
	SchedulingPolicy SchedulingPolicy  `json:"schedulingPolicy"`
	UserName         string            `json:",omitempty"`
}

CommonJobInfo the common fields for jobs

type Connection

type Connection struct {
	ID            string
	WsConnect     *websocket.Conn
	IsClosed      bool
	InformChan    chan []byte
	CloseChan     chan byte
	MuxClose      sync.Mutex
	MuxWrite      sync.Mutex
	Ctx           *logger.RequestContext
	SetupTime     time.Time
	HeartbeatChan chan []byte
}

func InitConnection

func InitConnection(wsConn *websocket.Conn, ctx *logger.RequestContext) (*Connection, error)

func (*Connection) Close

func (conn *Connection) Close()

func (*Connection) WriteMessage

func (conn *Connection) WriteMessage(data string, msgType MsgType) error

type CreateDisJobRequest

type CreateDisJobRequest struct {
	CommonJobInfo     `json:",inline"`
	Framework         schema.Framework       `json:"framework"`
	Members           []MemberSpec           `json:"members"`
	ExtensionTemplate map[string]interface{} `json:"extensionTemplate"`
}

CreateDisJobRequest convey request for create distributed job

func (CreateDisJobRequest) ToJobInfo added in v0.14.3

func (ds CreateDisJobRequest) ToJobInfo() *CreateJobInfo

type CreateJobInfo added in v0.14.3

type CreateJobInfo struct {
	CommonJobInfo     `json:",inline"`
	Framework         schema.Framework       `json:"framework"`
	Type              schema.JobType         `json:"type"`
	Mode              string                 `json:"mode,omitempty"`
	Members           []MemberSpec           `json:"members"`
	ExtensionTemplate map[string]interface{} `json:"extensionTemplate,omitempty"`
}

CreateJobInfo defines

type CreateJobResponse

type CreateJobResponse struct {
	ID string `json:"id"`
}

CreateJobResponse convey response for create job

func CreatePFJob added in v0.14.3

func CreatePFJob(ctx *logger.RequestContext, request *CreateJobInfo) (*CreateJobResponse, error)

CreatePFJob handler for creating job

func CreateWorkflowJob

func CreateWorkflowJob(ctx *logger.RequestContext, request *CreateWfJobRequest) (*CreateJobResponse, error)

CreateWorkflowJob handler for creating job

type CreateSingleJobRequest

type CreateSingleJobRequest struct {
	CommonJobInfo `json:",inline"`
	JobSpec       `json:",inline"`
}

CreateSingleJobRequest convey request for create job

func (CreateSingleJobRequest) ToJobInfo added in v0.14.3

func (sj CreateSingleJobRequest) ToJobInfo() *CreateJobInfo

type CreateWfJobRequest

type CreateWfJobRequest struct {
	CommonJobInfo     `json:",inline"`
	Framework         schema.Framework       `json:"framework"`
	Members           []MemberSpec           `json:"members"`
	ExtensionTemplate map[string]interface{} `json:"extensionTemplate"`
}

CreateWfJobRequest convey request for create workflow job

type DistributedJobSpec

type DistributedJobSpec struct {
	Framework schema.Framework `json:"framework,omitempty"`
	Members   []schema.Member  `json:"members,omitempty"`
}

type DistributedRuntimeInfo

type DistributedRuntimeInfo struct {
	Name      string        `json:"name,omitempty"`
	Namespace string        `json:"namespace,omitempty"`
	ID        string        `json:"id,omitempty"`
	Status    string        `json:"status,omitempty"`
	Runtimes  []RuntimeInfo `json:"runtimes,omitempty"`
}

type GetJobResponse

type GetJobResponse struct {
	CreateSingleJobRequest `json:",inline"`
	DistributedJobSpec     `json:",inline"`
	Status                 string                  `json:"status"`
	Message                string                  `json:"message"`
	AcceptTime             string                  `json:"acceptTime"`
	StartTime              string                  `json:"startTime"`
	FinishTime             string                  `json:"finishTime"`
	Runtime                *RuntimeInfo            `json:"runtime,omitempty"`
	DistributedRuntime     *DistributedRuntimeInfo `json:"distributedRuntime,omitempty"`
	WorkflowRuntime        *WorkflowRuntimeInfo    `json:"workflowRuntime,omitempty"`
	UpdateTime             time.Time               `json:"-"`
}

func GetJob

func GetJob(ctx *logger.RequestContext, jobID string) (*GetJobResponse, error)

type InformJob

type InformJob struct {
	JobID      string `json:"jobId"`
	UpdateTime string `json:"updateTime"`
}

type JobSpec

type JobSpec struct {
	Flavour           schema.Flavour         `json:"flavour"`
	LimitFlavour      schema.Flavour         `json:"limitFlavour"`
	FileSystem        schema.FileSystem      `json:"fs"`
	ExtraFileSystems  []schema.FileSystem    `json:"extraFS"`
	Image             string                 `json:"image"`
	Env               map[string]string      `json:"env"`
	Command           string                 `json:"command"`
	Args              []string               `json:"args"`
	Port              int                    `json:"port"`
	ExtensionTemplate map[string]interface{} `json:"extensionTemplate"`
}

JobSpec the spec fields for jobs

type ListJobRequest

type ListJobRequest struct {
	Queue     string            `json:"queue,omitempty"`
	Status    string            `json:"status,omitempty"`
	Timestamp int64             `json:"timestamp,omitempty"`
	StartTime string            `json:"startTime,omitempty"`
	Labels    map[string]string `json:"labels,omitempty"`
	Marker    string            `json:"marker"`
	MaxKeys   int               `json:"maxKeys"`
}

type ListJobResponse

type ListJobResponse struct {
	common.MarkerInfo
	JobList []*GetJobResponse `json:"jobList"`
}

func ListJob

func ListJob(ctx *logger.RequestContext, request ListJobRequest) (*ListJobResponse, error)

type MemberSpec

type MemberSpec struct {
	CommonJobInfo `json:",inline"`
	JobSpec       `json:",inline"`
	Role          string `json:"role"`
	Replicas      int    `json:"replicas"`
}

type MsgType

type MsgType string
const (
	HeartbeatMsg MsgType = "HeartbeatMsg"
	DataMsg      MsgType = "DataMsg"
)

type RuntimeInfo

type RuntimeInfo struct {
	Name      string `json:"name,omitempty"`
	Namespace string `json:"namespace,omitempty"`
	ID        string `json:"id,omitempty"`
	Status    string `json:"status,omitempty"`
	NodeName  string `json:"nodeName"`
	LogURL    string `json:"logURL,omitempty"`
}

type SchedulingPolicy

type SchedulingPolicy struct {
	Queue        string              `json:"queue"`
	QueueID      string              `json:"-"`
	MaxResources *resources.Resource `json:"-"`
	QueueType    string              `json:"-"`
	ClusterId    string              `json:"-"`
	Namespace    string              `json:"-"`
	Priority     string              `json:"priority,omitempty"`
}

SchedulingPolicy indicate queueID/priority

type UpdateJobRequest

type UpdateJobRequest struct {
	JobID       string            `json:"-"`
	Priority    string            `json:"priority"`
	Labels      map[string]string `json:"labels"`
	Annotations map[string]string `json:"annotations"`
}

type WebsocketManager

type WebsocketManager struct {
	Connections   map[string]*Connection
	BroadcastChan chan GetJobResponse
}

func (*WebsocketManager) Exit

func (manager *WebsocketManager) Exit(id string)

func (*WebsocketManager) GetGroupData

func (manager *WebsocketManager) GetGroupData()

func (*WebsocketManager) Register

func (manager *WebsocketManager) Register(connection *Connection, clientID string)

func (*WebsocketManager) SendGroupData

func (manager *WebsocketManager) SendGroupData()

type WorkflowRuntimeInfo

type WorkflowRuntimeInfo struct {
	Name      string                   `json:"name,omitempty"`
	Namespace string                   `json:"namespace,omitempty"`
	ID        string                   `json:"id,omitempty"`
	Status    string                   `json:"status,omitempty"`
	Nodes     []DistributedRuntimeInfo `json:"nodes,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL