simpletracker

package
v0.3.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2024 License: Apache-2.0 Imports: 25 Imported by: 4

README

OS Process Tacker

Introduction

OS Process Tracker implements the JobTracker interface used by the Go DRMAA2 implementation in order to use standard OS processes as a backend for managing jobs as processes from the DRMAA2 interface.

Basic Usage

A JobTemplate requires at least:

* RemoteCommand -> Path to the executable 

Job arrays are supported, also the control of the amount of jobs running concurrently.

Job Control Mapping
DRMAA2 Job Control OS Process
Suspend SIGTSTP
Resume SIGCONT
Terminate SIGKILL
Hold Unsupported
Release Unsupported
State Mapping
DRMAA2 State Process State
Queued Unsupported
Running PID is found
Suspended
Done
Failed
DeleteJob

Removes a finished or failed job from the internal DB to free up memory.

Job Template Mapping

A JobTemplate is mapped into the process creation process in the following way:

DRMAA2 JobTemplate OS Process
RemoteCommand Executable to start
JobName
Args Arguments of the executable
WorkingDir Working directory
JobEnvironment Environment variables set
InputPath If set it uses this file as stdin for the job
OutputPath File to print stdout to (like /dev/stdout)
ErrorPath File to print stderr to (like /dev/stderr)

JOB_ID env variable is set and TASK_ID env variable is set in case of a a job array.

JobInfo

For finished jobs following fields could be available:

JobInfo OS Process
ExitStatus exit status
TerminatingSignal signal name
State Done or Failed
WallclockTime Duration since start
ID process ID
AllocatedMachines local hostname
FinishTime time termination is recognized
SubmissionHost local hostname
JobOwner user ID (getuid())
ExtensionList[extension.JobInfoDefaultJSessionMaxRSS] maxRSS
ExtensionList[extension.JobInfoDefaultJSessionSwap] nswap
ExtensionList[extension.JobInfoDefaultJSessionInBlock] inblock
ExtensionList[extension.JobInfoDefaultJSessionOutBlock] oublock
ExtensionList[extension.JobInfoDefaultJSessionSystemTime] system time in ms
ExtensionList[extension.JobInfoDefaultJSessionUserTime] user time in ms

For jobs tracked through the monitoring session following fields could be available:

JobInfo OS Process
State Running
DispatchTime Start time of process
SubmissionTime Same as dispatch time
WallclockTime now - dispatch time
AllocatedMachines local hostname
SubmissionHost local hostname
JobOwner user ID (getuid())
ExtensionList[extension.JobInfoDefaultMSessionProcessName] process name
ExtensionList[extension.JobInfoDefaultMSessionCommandLine] command line command
ExtensionList[jobtracker.DRMAA2_MS_JOBINFO_WORKINGDIR] working directory
ExtensionList[extension.JobInfoDefaultMSessionCPUUsage] how many percent of CPU time is used
ExtensionList[extension.JobInfoDefaultMSessionCPUAffinity] CPU affinity list (space separated)
ExtensionList[extension.JobInfoDefaultMSessionMemoryUsage] memory usage info
ExtensionList[extension.JobInfoDefaultMSessionMemoryUsageRSS] RSS usage
ExtensionList[extension.JobInfoDefaultMSessionMemoryUsageVMS] VMS usage

Documentation

Index

Constants

View Source
const HighestJobIDStorageKey string = "HighestJobIDStorageKey"
View Source
const IsArrayJobStorageKey string = "IsArrayJobStorageKey"
View Source
const JobIDsStorageKey string = "JobIDsStorageKey"
View Source
const JobInfoStorageKey string = "JobInfoStorageKey"
View Source
const JobStorageKey string = "JobStorageKey"
View Source
const JobTemplatesStorageKey string = "JobTemplatesStorageKey"

Variables

This section is empty.

Functions

func AddHostInfo added in v0.3.16

func AddHostInfo(machine drmaa2interface.Machine, hostInfo *host.InfoStat) drmaa2interface.Machine

func AddMemory added in v0.3.16

func CollectSocketCoreThreads added in v0.3.16

func CollectSocketCoreThreads(cpuInfo []cpu.InfoStat) (int64, int64, int64, error)

CollectSocketCoreThreads returns the amount of sockets, cores per socket, and threads per core.

func GetAllProcesses added in v0.3.16

func GetAllProcesses() ([]string, error)

func GetJobInfo added in v0.3.16

func GetJobInfo(id int32) (drmaa2interface.JobInfo, error)

func GetLocalMachineInfo added in v0.3.16

func GetLocalMachineInfo() (drmaa2interface.Machine, error)

GetLocalMachineInfo collects information about the local machine and returns a current DRMAA2 machine info struct.

func GetNextJobID

func GetNextJobID() string

func IsPidRunning added in v0.3.14

func IsPidRunning(pid int) (bool, error)

IsPidRunning returns true if the process is still alive.

func KillPid

func KillPid(pid int) error

KillPid terminates a process and all processes belonging to the process group.

func NewAllocator added in v0.3.0

func NewAllocator() *allocator

func NewJobID

func NewJobID() *lastJobID

func ProcessToJobInfo added in v0.3.16

func ProcessToJobInfo(proc *process.Process) drmaa2interface.JobInfo

func ResumePid

func ResumePid(pid int) error

ResumePid contiues to run a previously suspended process group.

func SetJobID

func SetJobID(jobid int64)

func StartProcess

func StartProcess(jobid string, task int, t drmaa2interface.JobTemplate, finishedJobChannel chan JobEvent) (int, error)

StartProcess creates a new process based on the JobTemplate. It returns the PID or 0 and an error if the process could be created. The given channel is used for communicating back when the job state changed.

func SuspendPid

func SuspendPid(pid int) error

SuspendPid stops a process group from its execution. Note that it sends SIGTSTP which can be caught by the application and hence could be ignored.

func TrackProcess

func TrackProcess(cmd *exec.Cmd, proc *os.Process, jobID string, startTime time.Time,
	finishedJobChannel chan JobEvent, waitForFiles int, waitCh chan bool)

TrackProcess supervises a running process and sends a notification when the process is finished. If the process was started from this process cmd is given otherwise when when re-attaching to an already existing process proc is given.

Types

type InternalJob

type InternalJob struct {
	TaskID int
	State  drmaa2interface.JobState
	PID    int
}

InternalJob represents a process.

type JobEvent

type JobEvent struct {
	JobID    string
	JobState drmaa2interface.JobState
	JobInfo  drmaa2interface.JobInfo
	// contains filtered or unexported fields
}

JobEvent is send whenever a job status change is happening to inform all registered listeners.

type JobStore

type JobStore struct {
	sync.Mutex
	// contains filtered or unexported fields
}

JobStore is an internal storage for jobs and job templates processed by the job tracker. Jobs are stored until Reap(). Locking must be done externally.

func NewJobStore

func NewJobStore() *JobStore

NewJobStore returns a new in memory job store for jobs.

func (*JobStore) GetArrayJobTaskIDs added in v0.2.6

func (js *JobStore) GetArrayJobTaskIDs(arrayjobID string) []string

GetArrayJobTaskIDs returns the IDs of all tasks of a job array.

func (*JobStore) GetJobIDs added in v0.2.6

func (js *JobStore) GetJobIDs() []string

GetJobIDs returns the IDs of all jobs.

func (*JobStore) GetJobInfo added in v0.3.16

func (js *JobStore) GetJobInfo(jobid string) (drmaa2interface.JobInfo, error)

func (*JobStore) GetJobTemplate added in v0.3.14

func (js *JobStore) GetJobTemplate(jobID string) (drmaa2interface.JobTemplate, error)

func (*JobStore) GetPID

func (js *JobStore) GetPID(jobid string) (int, error)

GetPID returns the PID of a job or an array job task. It returns -1 and an error if the job is not known.

func (*JobStore) HasJob added in v0.2.0

func (js *JobStore) HasJob(jobid string) bool

HasJob returns true if the job is saved in the job store.

func (*JobStore) IsArrayJob added in v0.3.14

func (js *JobStore) IsArrayJob(jobid string) bool

func (*JobStore) NewJobID added in v0.3.14

func (js *JobStore) NewJobID() string

func (*JobStore) RemoveJob added in v0.2.0

func (js *JobStore) RemoveJob(jobid string)

RemoveJob deletes all occurrences of a job within the job storage. The jobid can be the identifier of a job or a job array. In case of a job array it removes all tasks which belong to the array job.

func (*JobStore) SaveArrayJob

func (js *JobStore) SaveArrayJob(arrayjobid string, pids []int,
	t drmaa2interface.JobTemplate, begin, end, step int)

SaveArrayJob stores all process IDs of the tasks of an array job.

func (*JobStore) SaveArrayJobPID added in v0.2.0

func (js *JobStore) SaveArrayJobPID(arrayjobid string, taskid, pid int) error

SaveArrayJobPID stores the current PID of main process of the job array task.

func (*JobStore) SaveJob

func (js *JobStore) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)

SaveJob stores a job, the job submission template, and the process PID of the job in an internal job store.

func (*JobStore) SaveJobInfo added in v0.3.16

func (js *JobStore) SaveJobInfo(jobid string, jobInfo drmaa2interface.JobInfo) error

type JobStorer added in v0.3.13

type JobStorer interface {
	SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)
	HasJob(jobid string) bool
	RemoveJob(jobid string)
	IsArrayJob(jobid string) bool
	SaveArrayJob(arrayjobid string, pids []int, t drmaa2interface.JobTemplate, begin, end, step int)
	SaveArrayJobPID(arrayjobid string, taskid, pid int) error
	GetPID(jobid string) (int, error)
	GetJobIDs() []string
	GetArrayJobTaskIDs(arrayjobID string) []string
	// NewJobID returns a new unique job ID
	NewJobID() string
	// Require job template
	GetJobTemplate(jobid string) (drmaa2interface.JobTemplate, error)
	SaveJobInfo(jobid string, jobInfo drmaa2interface.JobInfo) error
	GetJobInfo(jobid string) (drmaa2interface.JobInfo, error)
}

JobStorer has all methods required for storing job related information.

type JobTracker

type JobTracker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

JobTracker implements the JobTracker interface and treats jobs as OS processes.

func EnableCheckpointRestart added in v0.3.16

func EnableCheckpointRestart(jobtracker *JobTracker) *JobTracker

EnableCheckpointRestart turns a job tracker which handles suspend / resume with signals into a job tracker which does suspend and resume with CRIU

func New

func New(jobsession string) *JobTracker

New creates and initializes a JobTracker.

func NewWithJobStore added in v0.3.14

func NewWithJobStore(jobsession string, jobstore JobStorer, persistent bool) (*JobTracker, error)

func (*JobTracker) AddArrayJob

func (jt *JobTracker) AddArrayJob(t drmaa2interface.JobTemplate, begin, end, step, maxParallel int) (string, error)

AddArrayJob starts end-begin/step processes based on the given JobTemplate. Note that maxParallel is not yet implemented.

func (*JobTracker) AddJob

AddJob creates a process, fills in the internal job state and saves the job internally.

func (*JobTracker) Close added in v0.3.14

func (jt *JobTracker) Close() error

Close implmements the jobtracker.Closer interface to disengage from a DB or the DRM when the job session gets closed.

func (*JobTracker) CloseMonitoringSession added in v0.3.16

func (m *JobTracker) CloseMonitoringSession(name string) error

func (*JobTracker) DeleteJob

func (jt *JobTracker) DeleteJob(jobid string) error

DeleteJob removes a job from the internal job storage but only when the job is in any finished state.

func (*JobTracker) Destroy

func (jt *JobTracker) Destroy() error

Destroy signals the JobTracker to shutdown.

func (*JobTracker) GetAllJobIDs added in v0.3.16

func (m *JobTracker) GetAllJobIDs(filter *drmaa2interface.JobInfo) ([]string, error)

func (*JobTracker) GetAllMachines added in v0.3.16

func (m *JobTracker) GetAllMachines(names []string) ([]drmaa2interface.Machine, error)

func (*JobTracker) GetAllQueueNames added in v0.3.16

func (m *JobTracker) GetAllQueueNames(names []string) ([]string, error)

func (*JobTracker) JobControl

func (jt *JobTracker) JobControl(jobid, state string) error

JobControl suspends, resumes, or terminates a job.

func (*JobTracker) JobInfo

func (jt *JobTracker) JobInfo(jobid string) (drmaa2interface.JobInfo, error)

JobInfo returns more detailed information about a job.

func (*JobTracker) JobInfoFromMonitor added in v0.3.16

func (m *JobTracker) JobInfoFromMonitor(id string) (drmaa2interface.JobInfo, error)

func (*JobTracker) JobState

func (jt *JobTracker) JobState(jobid string) (drmaa2interface.JobState, string, error)

JobState returns the current state of the job (running, suspended, done, failed).

func (*JobTracker) JobTemplate added in v0.3.14

func (jt *JobTracker) JobTemplate(jobID string) (drmaa2interface.JobTemplate, error)

JobTemplate returns the stored job template of the job. This job tracker implements the JobTemplater interface additional to the JobTracker interface.

func (*JobTracker) ListArrayJobs

func (jt *JobTracker) ListArrayJobs(id string) ([]string, error)

ListArrayJobs returns all job IDs the job array ID is associated with.

func (*JobTracker) ListJobCategories

func (jt *JobTracker) ListJobCategories() ([]string, error)

ListJobCategories returns an empty list as JobCategories are currently not defined for OS processes.

func (*JobTracker) ListJobs

func (jt *JobTracker) ListJobs() ([]string, error)

ListJobs returns a list of all job IDs stored in the job store.

func (*JobTracker) OpenMonitoringSession added in v0.3.16

func (m *JobTracker) OpenMonitoringSession(name string) error

func (*JobTracker) Wait

func (jt *JobTracker) Wait(jobid string, d time.Duration, state ...drmaa2interface.JobState) error

Wait blocks until the job with the given job id is in on of the given states. If the job is after the given duration is still not in any of the states the method returns an error. If the duration is 0 then it waits infitely.

type PersistentJobStorage added in v0.3.13

type PersistentJobStorage struct {
	// contains filtered or unexported fields
}

PersistentJobStorage is an internal storage for jobs and job templates processed by the job tracker. Jobs are stored until Reap(). Locking must be done externally.

func NewPersistentJobStore added in v0.3.13

func NewPersistentJobStore(path string) (*PersistentJobStorage, error)

NewPersistentJobStore returns a new job store which uses a file based DB to be persistent over process restarts. The PersistentJobStore implements the JobStorer interface.

func (*PersistentJobStorage) Close added in v0.3.14

func (js *PersistentJobStorage) Close() error

func (*PersistentJobStorage) GetArrayJobTaskIDs added in v0.3.13

func (js *PersistentJobStorage) GetArrayJobTaskIDs(arrayjobID string) []string

GetArrayJobTaskIDs returns the IDs of all tasks of a job array.

func (*PersistentJobStorage) GetJobIDs added in v0.3.13

func (js *PersistentJobStorage) GetJobIDs() []string

GetJobIDs returns the IDs of all jobs.

func (*PersistentJobStorage) GetJobInfo added in v0.3.16

func (js *PersistentJobStorage) GetJobInfo(jobid string) (drmaa2interface.JobInfo, error)

func (*PersistentJobStorage) GetJobTemplate added in v0.3.14

func (js *PersistentJobStorage) GetJobTemplate(jobid string) (drmaa2interface.JobTemplate, error)

func (*PersistentJobStorage) GetPID added in v0.3.13

func (js *PersistentJobStorage) GetPID(jobid string) (int, error)

GetPID returns the PID of a job or an array job task. It returns -1 and an error if the job is not known.

func (*PersistentJobStorage) HasJob added in v0.3.13

func (js *PersistentJobStorage) HasJob(jobid string) bool

HasJob returns true if the job is saved in the job store.

func (*PersistentJobStorage) IsArrayJob added in v0.3.14

func (js *PersistentJobStorage) IsArrayJob(jobid string) bool

func (*PersistentJobStorage) NewJobID added in v0.3.14

func (js *PersistentJobStorage) NewJobID() string

func (*PersistentJobStorage) RemoveJob added in v0.3.13

func (js *PersistentJobStorage) RemoveJob(jobid string)

RemoveJob deletes all occurrences of a job within the job storage. The jobid can be the identifier of a job or a job array. In case of a job array it removes all tasks which belong to the array job.

func (*PersistentJobStorage) SaveArrayJob added in v0.3.13

func (js *PersistentJobStorage) SaveArrayJob(arrayjobid string, pids []int,
	t drmaa2interface.JobTemplate, begin, end, step int)

SaveArrayJob stores all process IDs of the tasks of an array job.

func (*PersistentJobStorage) SaveArrayJobPID added in v0.3.13

func (js *PersistentJobStorage) SaveArrayJobPID(arrayjobid string, taskid, pid int) error

SaveArrayJobPID stores the current PID of main process of the job array task.

func (*PersistentJobStorage) SaveJob added in v0.3.13

func (js *PersistentJobStorage) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)

SaveJob stores a job, the job submission template, and the process PID of the job in an internal job store.

func (*PersistentJobStorage) SaveJobInfo added in v0.3.16

func (js *PersistentJobStorage) SaveJobInfo(jobid string, jobinfo drmaa2interface.JobInfo) error

type PubSub

type PubSub struct {
	sync.Mutex
	// contains filtered or unexported fields
}

PubSub distributes job status change events to clients which Register() at PubSub.

func NewPubSub

func NewPubSub(jobstore JobStorer) (*PubSub, chan JobEvent)

NewPubSub returns an initialized PubSub structure and the JobEvent channel which is used by the caller to publish job events (i.e. job state transitions).

func (*PubSub) GetJobInfo added in v0.3.18

func (ps *PubSub) GetJobInfo(jobID string) (drmaa2interface.JobInfo, error)

func (*PubSub) NotifyAndWait added in v0.2.0

func (ps *PubSub) NotifyAndWait(evt JobEvent)

NotifyAndWait sends a job event and waits until it was distributed to all waiting functions.

func (*PubSub) Register

func (ps *PubSub) Register(jobid string, states ...drmaa2interface.JobState) (chan drmaa2interface.JobState, error)

Register returns a channel which emits a job state once the given job transitions in one of the given states. If job is already in the expected state it returns nil as channel and nil as error.

TODO add method for removing specific wait functions.

func (*PubSub) StartBookKeeper

func (ps *PubSub) StartBookKeeper()

StartBookKeeper processes all job state changes from the process trackers and notifies registered wait functions.

func (*PubSub) Unregister

func (ps *PubSub) Unregister(jobid string)

Unregister removes all functions waiting for a specific job and all occurences of the job itself.

type SimpleTrackerInitParams added in v0.3.14

type SimpleTrackerInitParams struct {
	UsePersistentJobStorage           bool
	DBFilePath                        string
	CheckPointRestartForSuspendResume bool
}

type StoreCloser added in v0.3.14

type StoreCloser interface {
	Close() error
}

StoreCloser closes any DB handle when called so that a new JobStorer can be created.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL