jobqueue

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2018 License: GPL-3.0 Imports: 54 Imported by: 0

Documentation

Overview

Package jobqueue provides server/client functions to interact with the queue structure provided by the queue package over a network.

It provides a job queue and running system which guarantees:

# Created jobs are never lost accidentally.
# The same job will not run more than once simultaneously:
  - Duplicate jobs are not created
  - Each job is handled by only a single client
# Jobs are handled in the desired order (user priority and fifo, after
  dependencies have been satisfied).
# Jobs still get run despite crashing clients.
# Completed jobs are kept forever for historical and "live" dependency
  purposes.

You bring up the server, then use a client to add commands (jobs) to the queue. The server then interacts with the configured scheduler to start running the necessary number of runner clients on your compute cluster. The runner clients ask the server for a command to run, and then they run the command. Once complete, the server is updated and the runner client requests the next command, or might exit if there are no more left.

As a user you can query the status of the system using client methods or by viewing the real-time updated status web interface.

Server

    import "github.com/VertebrateResequencing/wr/jobqueue"
    server, msg, token, err := jobqueue.Serve(jobqueue.ServerConfig{
        Port:            "12345",
        WebPort:         "12346",
        SchedulerName:   "local",
        SchedulerConfig: &jqs.ConfigLocal{Shell: "bash"},
        RunnerCmd:       selfExe + " runner -s '%s' --deployment %s --server '%s' --domain %s -r %d -m %d",
        DBFile:          "/home/username/.wr_production/boltdb",
        DBFileBackup:    "/home/username/.wr_production/boltdb.backup",
		TokenFile:       "/home/username/.wr_production/client.token",
		CAFile:          "/home/username/.wr_production/ca.pem",
        CertFile:        "/home/username/.wr_production/cert.pem",
        CertDomain:      "my.internal.domain.com",
        KeyFile:         "/home/username/.wr_production/key.pem",
        Deployment:      "production",
        CIDR:            "",
    })
    err = server.Block()

Client

An example client, one for adding commands to the job queue:

import {
    "github.com/VertebrateResequencing/wr/jobqueue"
    jqs "github.com/VertebrateResequencing/wr/jobqueue/scheduler"
}

var jobs []*jobqueue.Job
other := make(map[string]string)
var deps []*jobqueue.Dependency
deps = append(deps, jobqueue.NewDepGroupDependency("step1"))
jobs = append(jobs, &jobqueue.Job{
    RepGroup:     "friendly name",
    Cmd:          "myexe -args",
    Cwd:          "/tmp",
    ReqGroup:     "myexeInArgsMode",
    Requirements: &jqs.Requirements{RAM: 1024, Time: 10 * time.Minute, Cores: 1, Disk: 1, Other: other},
    Override:     uint8(0),
    Priority:     uint8(0),
    Retries:      uint8(3),
    DepGroups:    []string{"step2"},
    Dependencies: deps,
})

jq, err := jobqueue.Connect(
    "localhost:12345",
    "/home/username/.wr_production/ca.pem",
    "my.internal.domain.com",
    token,
    30 * time.Second
)
inserts, dups, err := jq.Add(jobs, os.Environ())

Index

Constants

View Source
const (
	FailReasonEnv      = "failed to get environment variables"
	FailReasonCwd      = "working directory does not exist"
	FailReasonStart    = "command failed to start"
	FailReasonCPerm    = "command permission problem"
	FailReasonCFound   = "command not found"
	FailReasonCExit    = "command invalid exit code"
	FailReasonExit     = "command exited non-zero"
	FailReasonRAM      = "command used too much RAM"
	FailReasonDisk     = "ran out of disk space"
	FailReasonTime     = "command used too much time"
	FailReasonDocker   = "could not interact with docker"
	FailReasonAbnormal = "command failed to complete normally"
	FailReasonLost     = "lost contact with runner"
	FailReasonSignal   = "runner received a signal to stop"
	FailReasonResource = "resource requirements cannot be met"
	FailReasonMount    = "mounting of remote file system(s) failed"
	FailReasonUpload   = "failed to upload files to remote file system"
	FailReasonKilled   = "killed by user request"
)

FailReason* are the reasons for cmd line failure stored on Jobs

View Source
const (
	ErrInternalError    = "internal error"
	ErrUnknownCommand   = "unknown command"
	ErrBadRequest       = "bad request (missing arguments?)"
	ErrBadJob           = "bad job (not in queue or correct sub-queue)"
	ErrMissingJob       = "corresponding job not found"
	ErrUnknown          = "unknown error"
	ErrClosedInt        = "queues closed due to SIGINT"
	ErrClosedTerm       = "queues closed due to SIGTERM"
	ErrClosedStop       = "queues closed due to manual Stop()"
	ErrQueueClosed      = "queue closed"
	ErrNoHost           = "could not determine the non-loopback ip address of this host"
	ErrNoServer         = "could not reach the server"
	ErrMustReserve      = "you must Reserve() a Job before passing it to other methods"
	ErrDBError          = "failed to use database"
	ErrPermissionDenied = "bad token: permission denied"
	ServerModeNormal    = "started"
	ServerModeDrain     = "draining"
)

Err* constants are found in our returned Errors under err.Err, so you can cast and check if it's a certain type of error. ServerMode* constants are used to report on the status of the server, found inside ServerInfo.

Variables

View Source
var (
	ClientTouchInterval               = 15 * time.Second
	ClientReleaseDelay                = 30 * time.Second
	ClientPercentMemoryKill           = 90
	RAMIncreaseMin            float64 = 1000
	RAMIncreaseMultLow                = 2.0
	RAMIncreaseMultHigh               = 1.3
	RAMIncreaseMultBreakpoint float64 = 8192
)

these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)

View Source
var (
	RecMBRound  = 100  // when we recommend amount of memory to reserve for a job, we round up to the nearest RecMBRound MBs
	RecSecRound = 1800 // when we recommend time to reserve for a job, we round up to the nearest RecSecRound seconds
)

Rec* variables are only exported for testing purposes (*** though they should probably be user configurable somewhere...).

View Source
var (
	ServerInterruptTime   = 1 * time.Second
	ServerItemTTR         = 60 * time.Second
	ServerReserveTicker   = 1 * time.Second
	ServerCheckRunnerTime = 1 * time.Minute
	ServerLogClientErrors = true
)

these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)

View Source
var AppName = "jobqueue"

AppName gets used in certain places like naming the base directory of created working directories during Client.Execute().

View Source
var BsubID uint64

BsubID is used to give added jobs a unique (atomically incremented) id when pretending to be bsub.

Functions

This section is empty.

Types

type Behaviour added in v0.8.0

type Behaviour struct {
	When BehaviourTrigger
	Do   BehaviourAction
	Arg  interface{} // the arg needed by your chosen action
}

Behaviour describes something that should happen in response to a Job's Cmd exiting a certain way.

func (*Behaviour) String added in v0.8.0

func (b *Behaviour) String() string

String provides a nice string representation of a Behaviour for user interface display purposes. It is in the form of a JSON string that can be converted back to a Behaviour via a BehaviourViaJSON.

func (*Behaviour) Trigger added in v0.8.0

func (b *Behaviour) Trigger(status BehaviourTrigger, j *Job) error

Trigger will carry out our BehaviourAction if the supplied status matches our BehaviourTrigger.

type BehaviourAction added in v0.8.0

type BehaviourAction uint8

BehaviourAction is supplied to a Behaviour to define what should happen when that behaviour triggers. (It's a uint8 type as opposed to an actual func to save space since we need to store these on every Job; do not treat as a flag and OR multiple actions together!)

const (
	// CleanupAll is a BehaviourAction that will delete any directories that
	// were created by a Job due to CwdMatters being false. Note that if the
	// Job's Cmd created output files within the actual cwd, these would get
	// deleted along with everything else. It takes no arguments.
	CleanupAll BehaviourAction = 1 << iota

	// Cleanup is a BehaviourAction that behaves exactly as CleanupAll in the
	// case that no output files have been specified on the Job. If some have,
	// everything except those files gets deleted. It takes no arguments.
	// (NB: since output file specification has not yet been implemented, this
	// is currently identical to CleanupAll.)
	Cleanup

	// Run is a BehaviourAction that runs a given command (supplied as a single
	// string Arg to the Behaviour) in the Job's actual cwd.
	Run

	// CopyToManager is a BehaviourAction that copies the given files (specified
	// as a slice of string paths Arg to the Behaviour) from the Job's actual
	// cwd to a configured location on the machine that the jobqueue server is
	// running on. *** not yet implemented!
	CopyToManager
)

type BehaviourTrigger added in v0.8.0

type BehaviourTrigger uint8

BehaviourTrigger is supplied to a Behaviour to define under what circumstance that Behaviour will trigger.

const (
	// OnExit is a BehaviourTrigger for Behaviours that should trigger when a
	// Job's Cmd is executed and finishes running. These behaviours will trigger
	// after OnSucess and OnFailure triggers, which makes OnExit different to
	// specifying OnSuccess|OnFailure.
	OnExit BehaviourTrigger = 1 << iota

	// OnSuccess is a BehaviourTrigger for Behaviours that should trigger when a
	// Job's Cmd is executed and exits 0.
	OnSuccess

	// OnFailure is a BehaviourTrigger for Behaviours that should trigger when a
	// Job's Cmd is executed and exits non-0.
	OnFailure
)

type BehaviourViaJSON added in v0.8.0

type BehaviourViaJSON struct {
	Run           string   `json:"run,omitempty"`
	CopyToManager []string `json:"copy_to_manager,omitempty"`
	Cleanup       bool     `json:"cleanup,omitempty"`
	CleanupAll    bool     `json:"cleanup_all,omitempty"`
}

BehaviourViaJSON makes up BehavioursViaJSON. Each of these should only specify one of its properties.

func (BehaviourViaJSON) Behaviour added in v0.8.0

func (bj BehaviourViaJSON) Behaviour(when BehaviourTrigger) *Behaviour

Behaviour converts the friendly BehaviourViaJSON struct to real Behaviour.

type Behaviours added in v0.8.0

type Behaviours []*Behaviour

Behaviours are a slice of Behaviour.

func (Behaviours) String added in v0.8.0

func (bs Behaviours) String() string

String provides a nice string representation of Behaviours for user interface display purposes. It takes the form of a JSON string that can be converted back to Behaviours using a BehavioursViaJSON for each key. The keys are "on_failure", "on_success", "on_failure|success" and "on_exit".

func (Behaviours) Trigger added in v0.8.0

func (bs Behaviours) Trigger(success bool, j *Job) error

Trigger calls Trigger on each constituent Behaviour, first all those for OnSuccess if success = true or OnFailure otherwise, then those for OnExit.

type BehavioursViaJSON added in v0.8.0

type BehavioursViaJSON []BehaviourViaJSON

BehavioursViaJSON is a slice of BehaviourViaJSON. It is a convenience to allow users to specify behaviours in a more natural way if they're trying to describe them in a JSON string. You'd have one of these per BehaviourTrigger.

func (BehavioursViaJSON) Behaviours added in v0.8.0

func (bjs BehavioursViaJSON) Behaviours(when BehaviourTrigger) Behaviours

Behaviours converts a BehavioursViaJSON to real Behaviours.

type Client

type Client struct {
	sync.Mutex

	ServerInfo *ServerInfo
	// contains filtered or unexported fields
}

Client represents the client side of the socket that the jobqueue server is Serve()ing, specific to a particular queue.

func Connect

func Connect(addr, caFile, certDomain string, token []byte, timeout time.Duration) (*Client, error)

Connect creates a connection to the jobqueue server.

addr is the host or IP of the machine running the server, suffixed with a colon and the port it is listening on, eg localhost:1234

caFile is a path to the PEM encoded CA certificate that was used to sign the server's certificate. If set as a blank string, or if the file doesn't exist, the server's certificate will be trusted based on the CAs installed in the normal location on the system.

certDomain is a domain that the server's certificate is supposed to be valid for.

token is the authentication token that Serve() returned when the server was started.

Timeout determines how long to wait for a response from the server, not only while connecting, but for all subsequent interactions with it using the returned Client.

func (*Client) Add

func (c *Client) Add(jobs []*Job, envVars []string, ignoreComplete bool) (added, existed int, err error)

Add adds new jobs to the job queue, but only if those jobs aren't already in there.

If any were already there, you will not get an error, but the returned 'existed' count will be > 0. Note that no cross-queue checking is done, so you need to be careful not to add the same job to different queues.

Note that if you add jobs to the queue that were previously added, Execute()d and were successfully Archive()d, the existed count will be 0 and the jobs will be treated like new ones, though when Archive()d again, the new Job will replace the old one in the database. To have such jobs skipped as "existed" instead, supply ignoreComplete as true.

The envVars argument is a slice of ("key=value") strings with the environment variables you want to be set when the job's Cmd actually runs. Typically you would pass in os.Environ().

func (*Client) Archive

func (c *Client) Archive(job *Job, jes *JobEndState) error

Archive removes a job from the jobqueue and adds it to the database of complete jobs, for use after you have run the job successfully. You have to have been the one to Reserve() the supplied Job, and the Job must be marked as having successfully run, or you will get an error.

func (*Client) BackupDB added in v0.10.0

func (c *Client) BackupDB(path string) error

BackupDB backs up the server's database to the given path. Note that automatic backups occur to the configured location without calling this.

func (*Client) Bury

func (c *Client) Bury(job *Job, jes *JobEndState, failreason string, stderr ...error) error

Bury marks a job as unrunnable, so it will be ignored (until the user does something to perhaps make it runnable and kicks the job). Note that you must reserve a job before you can bury it. Optionally supply an error that will be be displayed as the Job's stderr.

func (*Client) CompressEnv added in v0.5.0

func (c *Client) CompressEnv(envars []string) ([]byte, error)

CompressEnv encodes the given environment variables (slice of "key=value" strings) and then compresses that, so that for Add() the server can store it on disc without holding it in memory, and pass the compressed bytes back to us when we need to know the Env (during Execute()).

func (*Client) Delete

func (c *Client) Delete(jes []*JobEssence) (int, error)

Delete removes incomplete, not currently running jobs from the queue completely. For use when jobs were created incorrectly/ by accident, or they can never be fixed. It returns a count of jobs that it actually removed. Errors will only be related to not being able to contact the server.

func (*Client) Disconnect

func (c *Client) Disconnect() error

Disconnect closes the connection to the jobqueue server. It is CRITICAL that you call Disconnect() before calling Connect() again in the same process.

func (*Client) DrainServer

func (c *Client) DrainServer() (running int, etc time.Duration, err error)

DrainServer tells the server to stop spawning new runners, stop letting existing runners reserve new jobs, and exit once existing runners stop running. You get back a count of existing runners and and an estimated time until completion for the last of those runners.

func (*Client) Execute

func (c *Client) Execute(job *Job, shell string) error

Execute runs the given Job's Cmd and blocks until it exits. Then any Job Behaviours get triggered as appropriate for the exit status.

The Cmd is run using the environment variables set when the Job was Add()ed, or the current environment is used if none were set.

The Cmd is also run within the Job's Cwd. If CwdMatters is false, a unique subdirectory is created within Cwd, and that is used as the actual working directory. When creating these unique subdirectories, directory hashing is used to allow the safe running of 100s of thousands of Jobs all using the same Cwd (that is, we will not break the directory listing of Cwd). Furthermore, a sister folder will be created in the unique location for this Job, the path to which will become the value of the TMPDIR environment variable. Once the Cmd exits, this temp directory will be deleted and the path to the actual working directory created will be in the Job's ActualCwd property. The unique folder structure itself can be wholly deleted through the Job behaviour "cleanup".

If any remote file system mounts have been configured for the Job, these are mounted prior to running the Cmd, and unmounted afterwards.

Internally, Execute() calls Mount() and Started() and keeps track of peak RAM and disk used. It regularly calls Touch() on the Job so that the server knows we are still alive and handling the Job successfully. It also intercepts SIGTERM, SIGINT, SIGQUIT, SIGUSR1 and SIGUSR2, sending SIGKILL to the running Cmd and returning Error.Err(FailReasonSignal); you should check for this and exit your process. Finally it calls Unmount() and TriggerBehaviours().

If Kill() is called while executing the Cmd, the next internal Touch() call will result in the Cmd being killed and the job being Bury()ied.

If no error is returned, the Cmd will have run OK, exited with status 0, and been Archive()d from the queue while being placed in the permanent store. Otherwise, it will have been Release()d or Bury()ied as appropriate.

The supplied shell is the shell to execute the Cmd under, ideally bash (something that understands the command "set -o pipefail").

You have to have been the one to Reserve() the supplied Job, or this will immediately return an error. NB: the peak RAM tracking assumes we are running on a modern linux system with /proc/*/smaps.

func (*Client) GetByEssence added in v0.8.0

func (c *Client) GetByEssence(je *JobEssence, getstd bool, getenv bool) (*Job, error)

GetByEssence gets a Job given a JobEssence to describe it. With the boolean args set to true, this is the only way to get a Job that StdOut() and StdErr() will work on, and one of 2 ways that Env() will work (the other being Reserve()).

func (*Client) GetByEssences added in v0.8.0

func (c *Client) GetByEssences(jes []*JobEssence) ([]*Job, error)

GetByEssences gets multiple Jobs at once given JobEssences that describe them.

func (*Client) GetByRepGroup

func (c *Client) GetByRepGroup(repgroup string, subStr bool, limit int, state JobState, getStd bool, getEnv bool) ([]*Job, error)

GetByRepGroup gets multiple Jobs at once given their RepGroup (an arbitrary user-supplied identifier for the purpose of grouping related jobs together for reporting purposes).

If 'subStr' is true, gets Jobs in all RepGroups that the supplied repgroup is a substring of.

'limit', if greater than 0, limits the number of jobs returned that have the same State, FailReason and Exitcode, and on the the last job of each State+FailReason group it populates 'Similar' with the number of other excluded jobs there were in that group.

Providing 'state' only returns jobs in that State. 'getStd' and 'getEnv', if true, retrieve the stdout, stderr and environement variables for the Jobs.

func (*Client) GetIncomplete

func (c *Client) GetIncomplete(limit int, state JobState, getStd bool, getEnv bool) ([]*Job, error)

GetIncomplete gets all Jobs that are currently in the jobqueue, ie. excluding those that are complete and have been Archive()d. The args are as in GetByRepGroup().

func (*Client) Kick

func (c *Client) Kick(jes []*JobEssence) (int, error)

Kick makes previously Bury()'d jobs runnable again (it can be Reserve()d in the future). It returns a count of jobs that it actually kicked. Errors will only be related to not being able to contact the server.

func (*Client) Kill added in v0.10.0

func (c *Client) Kill(jes []*JobEssence) (int, error)

Kill will cause the next Touch() call for the job(s) described by the input to return a kill signal. Touches happening as part of an Execute() will respond to this signal by terminating their execution and burying the job. As such you should note that there could be a delay between calling Kill() and execution ceasing; wait until the jobs actually get buried before retrying the jobs if desired.

Kill returns a count of jobs that were eligible to be killed (those still in running state). Errors will only be related to not being able to contact the server.

func (*Client) Ping

func (c *Client) Ping(timeout time.Duration) (*ServerInfo, error)

Ping tells you if your connection to the server is working, returning static information about the server. If err is nil, it works. This is the only command that interacts with the server that works if a blank or invalid token had been supplied to Connect().

func (*Client) Release

func (c *Client) Release(job *Job, jes *JobEndState, failreason string) error

Release places a job back on the jobqueue, for use when you can't handle the job right now (eg. there was a suspected transient error) but maybe someone else can later. Note that you must reserve a job before you can release it. You can only Release() the same job as many times as its Retries value if it has been run and failed; a subsequent call to Release() will instead result in a Bury(). (If the job's Cmd was not run, you can Release() an unlimited number of times.)

func (*Client) Reserve

func (c *Client) Reserve(timeout time.Duration) (*Job, error)

Reserve takes a job off the jobqueue. If you process the job successfully you should Archive() it. If you can't deal with it right now you should Release() it. If you think it can never be dealt with you should Bury() it. If you die unexpectedly, the job will automatically be released back to the queue after some time.

If no job was available in the queue for as long as the timeout argument, nil is returned for both job and error. If your timeout is 0, you will wait indefinitely for a job.

NB: if your jobs have schedulerGroups (and they will if you added them to a server configured with a RunnerCmd), this will most likely not return any jobs; use ReserveScheduled() instead.

func (*Client) ReserveScheduled

func (c *Client) ReserveScheduled(timeout time.Duration, schedulerGroup string) (*Job, error)

ReserveScheduled is like Reserve(), except that it will only return jobs from the specified schedulerGroup.

Based on the scheduler the server was configured with, it will group jobs based on their resource requirements and then submit runners to handle them to your system's job scheduler (such as LSF), possibly in different scheduler queues. These runners are told the group they are a part of, and that same group name is applied internally to the Jobs as the "schedulerGroup", so that the runners can reserve only Jobs that they're supposed to. Therefore, it does not make sense for you to call this yourself; it is only for use by runners spawned by the server.

func (*Client) ShutdownServer

func (c *Client) ShutdownServer() bool

ShutdownServer tells the server to immediately cease all operations. Its last act will be to backup its internal database. Any existing runners will fail. Because the server gets shut down it can't respond with success/failure, so we indirectly report if the server was shut down successfully.

func (*Client) Started

func (c *Client) Started(job *Job, pid int) error

Started updates a Job on the server with information that you've started running the Job's Cmd. Started also figures out some host name, ip and possibly id (in cloud situations) to associate with the job, so that if something goes wrong the user can go to the host and investigate. Note that HostID will not be set on job after this call; only the server will know about it (use one of the Get methods afterwards to get a new object with the HostID set if necessary).

func (*Client) Touch

func (c *Client) Touch(job *Job) (bool, error)

Touch adds to a job's ttr, allowing you more time to work on it. Note that you must have reserved the job before you can touch it. If the returned bool is true, you stop doing what you're doing and bury the job, since this means that Kill() has been called for this job.

func (*Client) UploadFile added in v0.12.0

func (c *Client) UploadFile(local, remote string) (string, error)

UploadFile uploads a local file to the machine where the server is running, so you can add cloud jobs that need a script or config file on your local machine to be copied over to created cloud instances.

If the remote path is supplied as a blank string, the remote path will be chosen for you based on the MD5 checksum of your file data, rooted in the server's configured UploadDir.

The remote path can be supplied prefixed with ~/ to upload relative to the remote's home directory. Otherwise it should be an absolute path.

Returns the absolute path of the uploaded file on the server's machine.

NB: This is only suitable for transferring small files!

type Dependencies added in v0.2.0

type Dependencies []*Dependency

Dependencies is a slice of *Dependency, for use in Job.Dependencies. It describes the jobs that must be complete before the Job you associate this with will start.

func (Dependencies) DepGroups added in v0.3.0

func (d Dependencies) DepGroups() []string

DepGroups returns all the DepGroups of our constituent Dependency structs.

func (Dependencies) Stringify added in v0.3.0

func (d Dependencies) Stringify() []string

Stringify converts our constituent Dependency structs in to a slice of strings, each of which could be JobEssence or DepGroup based.

type Dependency added in v0.2.0

type Dependency struct {
	Essence  *JobEssence
	DepGroup string
}

Dependency is a struct that describes a Job purely in terms of a JobEssence, or in terms of a Job's DepGroup, for use in Dependencies. If DepGroup is specified, then Essence is ignored.

func NewDepGroupDependency added in v0.3.0

func NewDepGroupDependency(depgroup string) *Dependency

NewDepGroupDependency makes it a little easier to make a new *Dependency based on a dep group, for use in NewDependencies().

func NewEssenceDependency added in v0.8.0

func NewEssenceDependency(cmd string, cwd string) *Dependency

NewEssenceDependency makes it a little easier to make a new *Dependency based on Cmd+Cwd, for use in NewDependencies(). Leave cwd as an empty string if the job you are describing does not have CwdMatters true.

type Error

type Error struct {
	Op   string // name of the method
	Item string // the item's key
	Err  string // one of our Err* vars
}

Error records an error and the operation and item that caused it.

func (Error) Error

func (e Error) Error() string

type JStatus added in v0.13.0

type JStatus struct {
	Key           string
	RepGroup      string
	DepGroups     []string
	Dependencies  []string
	Cmd           string
	State         JobState
	Cwd           string
	CwdBase       string
	HomeChanged   bool
	Behaviours    string
	Mounts        string
	MonitorDocker string
	// ExpectedRAM is in Megabytes.
	ExpectedRAM int
	// ExpectedTime is in seconds.
	ExpectedTime float64
	// RequestedDisk is in Gigabytes.
	RequestedDisk int
	OtherRequests []string
	Cores         float64
	PeakRAM       int
	PeakDisk      int64 // MBs
	Exited        bool
	Exitcode      int
	FailReason    string
	Pid           int
	Host          string
	HostID        string
	HostIP        string
	Walltime      float64
	CPUtime       float64
	Started       int64
	Ended         int64
	StdErr        string
	StdOut        string
	Env           []string
	Attempts      uint32
	Similar       int
}

JStatus is the job info we send to the status webpage (only real difference to Job is that some of the values are converted to easy-to-display forms).

type Job

type Job struct {
	// Cmd is the actual command line that will be run via the shell.
	Cmd string

	// Cwd determines the command working directory, the directory we cd to
	// before running Cmd. When CwdMatters, Cwd is used exactly, otherwise a
	// unique sub-directory of Cwd is used as the command working directory.
	Cwd string

	// CwdMatters should be made true when Cwd contains input files that you
	// will refer to using relative (from Cwd) paths in Cmd, and when other Jobs
	// have identical Cmds because you have many different directories that
	// contain different but identically named input files. Cwd will become part
	// of what makes the Job unique.
	// When CwdMatters is false (default), Cmd gets run in a unique subfolder of
	// Cwd, enabling features like tracking disk space usage and clean up of the
	// working directory by simply deleting the whole thing. The TMPDIR
	// environment variable is also set to a sister folder of the unique
	// subfolder, and this is always cleaned up after the Cmd exits.
	CwdMatters bool

	// ChangeHome sets the $HOME environment variable to the actual working
	// directory before running Cmd, but only when CwdMatters is false.
	ChangeHome bool

	// RepGroup is a name associated with related Jobs to help group them
	// together when reporting on their status etc.
	RepGroup string

	// ReqGroup is a string that you supply to group together all commands that
	// you expect to have similar resource requirements.
	ReqGroup string

	// Requirements describes the resources this Cmd needs to run, such as RAM,
	// Disk and time. These may be determined for you by the system (depending
	// on Override) based on past experience of running jobs with the same
	// ReqGroup.
	Requirements *scheduler.Requirements

	// RequirementsOrig is like Requirements, but only has the original RAM,
	// Disk and time values set by you, if any.
	RequirementsOrig *scheduler.Requirements

	// Override determines if your own supplied Requirements get used, or if the
	// systems' calculated values get used. 0 means prefer the system values. 1
	// means prefer your values if they are higher. 2 means always use your
	// values.
	Override uint8

	// Priority is a number between 0 and 255 inclusive - higher numbered jobs
	// will run before lower numbered ones (the default is 0).
	Priority uint8

	// Retries is the number of times to retry running a Cmd if it fails.
	Retries uint8

	// DepGroups are the dependency groups this job belongs to that other jobs
	// can refer to in their Dependencies.
	DepGroups []string

	// Dependencies describe the jobs that must be complete before this job
	// starts.
	Dependencies Dependencies

	// Behaviours describe what should happen after Cmd is executed, depending
	// on its success.
	Behaviours Behaviours

	// MountConfigs describes remote file systems or object stores that you wish
	// to be fuse mounted prior to running the Cmd. Once Cmd exits, the mounts
	// will be unmounted (with uploads only occurring if it exits with code 0).
	// If you want multiple separate mount points accessed from different local
	// directories, you will supply more than one MountConfig in the slice. If
	// you want multiple remote locations multiplexed and accessible from a
	// single local directory, you will supply a single MountConfig in the
	// slice, configured with multiple MountTargets. Relative paths for your
	// MountConfig.Mount options will be relative to Cwd (or ActualCwd if
	// CwdMatters == false). If a MountConfig.Mount is not specified, it
	// defaults to Cwd/mnt if CwdMatters, otherwise ActualCwd itself will be the
	// mount point. If a MountConfig.CachBase is not specified, it defaults to
	// to Cwd if CwdMatters, otherwise it will be a sister directory of
	// ActualCwd.
	MountConfigs MountConfigs

	// BsubMode set to either Production or Development when Add()ing a job will
	// result in the job being assigned a BsubID. Such jobs, when they run, will
	// see bsub, bjobs and bkill as symlinks to wr, thus if they call bsub, they
	// will actually add jobs to the jobqueue etc. Those jobs will pick up the
	// same Requirements.Other as this job, and the same MountConfigs. If
	// Requirements.Other["cloud_shared"] is "true", the MountConfigs are not
	// reused.
	BsubMode string

	// MonitorDocker turns on monitoring of a docker container identified by its
	// --name or path to its --cidfile, adding its peak RAM and CPU usage to the
	// reported RAM and CPU usage of this job.
	//
	// If the special argument "?" is supplied, monitoring will apply to the
	// first new docker container that appears after the Cmd starts to run.
	// NB: if multiple jobs that run docker containers start running at the same
	// time on the same machine, the reported stats could be wrong for one or
	// more of those jobs.
	//
	// Requires that docker is installed on the machine where the job will run
	// (and that the Cmd uses docker to run a container). NB: does not handle
	// monitoring of multiple docker containers run by a single Cmd.
	MonitorDocker string

	// the actual working directory used, which would have been created with a
	// unique name if CwdMatters = false
	ActualCwd string
	// peak RAM (MB) used.
	PeakRAM int
	// peak disk (MB) used.
	PeakDisk int64
	// true if the Cmd was run and exited.
	Exited bool
	// if the job ran and exited, its exit code is recorded here, but check
	// Exited because when this is not set it could like like exit code 0.
	Exitcode int
	// true if the job was running but we've lost contact with it
	Lost bool
	// if the job failed to complete successfully, this will hold one of the
	// FailReason* strings. Also set if Lost == true.
	FailReason string
	// pid of the running or ran process.
	Pid int
	// host the process is running or did run on.
	Host string
	// host id the process is running or did run on (cloud specific).
	HostID string
	// host ip the process is running or did run on (cloud specific).
	HostIP string
	// time the cmd started running.
	StartTime time.Time
	// time the cmd stopped running.
	EndTime time.Time
	// CPU time used.
	CPUtime time.Duration
	// to read, call job.StdErr() instead; if the job ran, its (truncated)
	// STDERR will be here.
	StdErrC []byte
	// to read, call job.StdOut() instead; if the job ran, its (truncated)
	// STDOUT will be here.
	StdOutC []byte
	// to read, call job.Env() instead, to get the environment variables as a
	// []string, where each string is like "key=value".
	EnvC []byte
	// Since EnvC isn't always populated on job retrieval, this lets job.Env()
	// distinguish between no EnvC and merely not requested.
	EnvCRetrieved bool
	// if set (using output of CompressEnv()), they will be returned in the
	// results of job.Env().
	EnvOverride []byte
	// job's state in the queue: 'delayed', 'ready', 'reserved', 'running',
	// 'buried', 'complete' or 'dependent'.
	State JobState
	// number of times the job had ever entered 'running' state.
	Attempts uint32
	// remaining number of Release()s allowed before being buried instead.
	UntilBuried uint8
	// we note which client reserved this job, for validating if that client has
	// permission to do other stuff to this Job; the server only ever sets this
	// on Reserve(), so clients can't cheat by changing this on their end.
	ReservedBy uuid.UUID
	// on the server we don't store EnvC with the job, but look it up in db via
	// this key.
	EnvKey string
	// when retrieving jobs with a limit, this tells you how many jobs were
	// excluded.
	Similar int
	// name of the queue the Job was added to.
	Queue string
	// unique (for this manager session) id of the job submission, present if
	// BsubMode was set when the job was added.
	BsubID uint64

	sync.RWMutex
	// contains filtered or unexported fields
}

Job is a struct that represents a command that needs to be run and some associated metadata. If you get a Job back from the server (via Reserve() or Get*()), you should treat the properties as read-only: changing them will have no effect.

func (*Job) Env

func (j *Job) Env() ([]string, error)

Env decompresses and decodes job.EnvC (the output of CompressEnv(), which are the environment variables the Job's Cmd should run/ran under). Note that EnvC is only populated if you got the Job from GetByCmd(_, _, true) or Reserve(). If no environment variables were passed in when the job was Add()ed to the queue, returns current environment variables instead. In both cases, alters the return value to apply any overrides stored in job.EnvOverride.

func (*Job) EnvAddOverride added in v0.11.0

func (j *Job) EnvAddOverride(env []string) error

EnvAddOverride adds additional overrides to the jobs existing overrides (if any). These will then get used to determine the final value of Env(). NB: This does not do any updates to a job on the server if called from a client, but is suitable for altering a job's environment prior to calling Client.Execute().

func (*Job) Getenv added in v0.13.0

func (j *Job) Getenv(key string) string

Getenv is like os.Getenv(), but for the environment variables stored in the the job, including any overrides. Returns blank if Env() would have returned an error.

func (*Job) Key added in v0.13.0

func (j *Job) Key() string

Key calculates a unique key to describe the job.

func (*Job) Mount added in v0.8.0

func (j *Job) Mount(onCwd ...bool) ([]string, []string, error)

Mount uses the Job's MountConfigs to mount the remote file systems at the desired mount points. If a mount point is unspecified, mounts in the sub folder Cwd/mnt if CwdMatters (and unspecified CacheBase becomes Cwd), otherwise the actual working directory is used as the mount point (and the parent of that used for unspecified CacheBase). Relative CacheDir options are treated relative to the CacheBase.

If the optional onCwd argument is supplied true, and ActualCwd is not defined, then instead of mounting at j.Cwd/mnt, it tries to mount at j.Cwd itself. (This will fail if j.Cwd is not empty or already mounted by another process.)

Returns any non-shared cache directories, and any directories in (or at) the job's actual cwd if anything was mounted there, for the purpose of knowing what directories to check and not check for disk usage.

func (*Job) StdErr

func (j *Job) StdErr() (string, error)

StdErr returns the decompressed job.StdErrC, which is the head and tail of job.Cmd's STDERR when it ran. If the Cmd hasn't run yet, or if it output nothing to STDERR, you will get an empty string. Note that StdErrC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.

func (*Job) StdOut

func (j *Job) StdOut() (string, error)

StdOut returns the decompressed job.StdOutC, which is the head and tail of job.Cmd's STDOUT when it ran. If the Cmd hasn't run yet, or if it output nothing to STDOUT, you will get an empty string. Note that StdOutC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.

func (*Job) ToEssense added in v0.12.0

func (j *Job) ToEssense() *JobEssence

ToEssense converts a Job to its matching JobEssense, taking less space and being required as input for certain methods.

func (*Job) ToStatus added in v0.13.0

func (j *Job) ToStatus() JStatus

ToStatus converts a job to a simplified JStatus, useful for output as JSON.

func (*Job) TriggerBehaviours added in v0.8.0

func (j *Job) TriggerBehaviours(success bool) error

TriggerBehaviours triggers this Job's Behaviours based on if its Cmd got executed successfully or not. Should only be called as part of or after Execute().

func (*Job) Unmount added in v0.8.0

func (j *Job) Unmount(stopUploads ...bool) (logs string, err error)

Unmount unmounts any remote filesystems that were previously mounted with Mount(), returning a string of any log messages generated during the mount. Returns nil error if Mount() had not been called or there were no MountConfigs.

Note that for cached writable mounts, created files will only begin to upload once Unmount() is called, so this may take some time to return. Supply true to disable uploading of files (eg. if you're unmounting following an error). If uploading, error could contain the string "failed to upload", which you may want to check for. On success, triggers the deletion of any empty directories between the mount point(s) and Cwd if not CwdMatters and the mount point was (within) ActualCwd.

func (*Job) WallTime added in v0.8.0

func (j *Job) WallTime() time.Duration

WallTime returns the time the job took to run if it ran to completion, or the time taken so far if it is currently running.

type JobDefaults added in v0.10.0

type JobDefaults struct {
	RepGrp string
	// Cwd defaults to /tmp.
	Cwd        string
	CwdMatters bool
	ChangeHome bool
	ReqGrp     string
	// CPUs is the number of CPU cores each cmd will use.
	CPUs float64
	// Memory is the number of Megabytes each cmd will use. Defaults to 1000.
	Memory int
	// Time is the amount of time each cmd will run for. Defaults to 1 hour.
	Time time.Duration
	// Disk is the number of Gigabytes cmds will use.
	Disk      int
	Override  int
	Priority  int
	Retries   int
	DepGroups []string
	Deps      Dependencies
	// Env is a comma separated list of key=val pairs.
	Env           string
	OnFailure     Behaviours
	OnSuccess     Behaviours
	OnExit        Behaviours
	MountConfigs  MountConfigs
	MonitorDocker string
	CloudOS       string
	CloudUser     string
	CloudFlavor   string
	// CloudScript is the local path to a script.
	CloudScript string
	// CloudConfigFiles is the config files to copy in cloud.Server.CopyOver() format
	CloudConfigFiles string
	// CloudOSRam is the number of Megabytes that CloudOS needs to run. Defaults
	// to 1000.
	CloudOSRam  int
	CloudShared bool
	BsubMode    string
	// contains filtered or unexported fields
}

JobDefaults is supplied to JobViaJSON.Convert() to provide default values for the conversion.

func (*JobDefaults) DefaultCPUs added in v0.10.0

func (jd *JobDefaults) DefaultCPUs() float64

DefaultCPUs returns the CPUs value, but a minimum of 0.

func (*JobDefaults) DefaultCloudOSRam added in v0.10.0

func (jd *JobDefaults) DefaultCloudOSRam() string

DefaultCloudOSRam returns a string version of the CloudOSRam value, which is treated as 1000 if 0.

func (*JobDefaults) DefaultCwd added in v0.10.0

func (jd *JobDefaults) DefaultCwd() string

DefaultCwd returns the Cwd value, defaulting to /tmp.

func (*JobDefaults) DefaultEnv added in v0.10.0

func (jd *JobDefaults) DefaultEnv() ([]byte, error)

DefaultEnv returns an encoded compressed version of the Env value.

func (*JobDefaults) DefaultMemory added in v0.10.0

func (jd *JobDefaults) DefaultMemory() int

DefaultMemory returns the Memory value, but if <1 returns 1000 instead.

func (*JobDefaults) DefaultTime added in v0.10.0

func (jd *JobDefaults) DefaultTime() time.Duration

DefaultTime returns the Time value, but if 0 returns 1 hour instead.

type JobEndState added in v0.11.0

type JobEndState struct {
	Cwd      string
	Exitcode int
	PeakRAM  int
	PeakDisk int64
	CPUtime  time.Duration
	Stdout   []byte
	Stderr   []byte
	Exited   bool
}

JobEndState is used to describe the state of a job after it has (tried to) execute it's Cmd. You supply these to Client.Bury(), Release() and Archive(). The cwd you supply should be the actual working directory used, which may be different to the Job's Cwd property; if not, supply empty string. Always set exited to true, and populate all other fields, unless you never actually tried to execute the Cmd, in which case you would just provide a nil JobEndState to the methods that need one.

type JobEssence added in v0.8.0

type JobEssence struct {
	// JobKey can be set by itself if you already know the "key" of the desired
	// job; you can get these keys when you use GetByRepGroup() or
	// GetIncomplete() with a limit. When this is set, other properties are
	// ignored.
	JobKey string

	// Cmd always forms an essential part of a Job.
	Cmd string

	// Cwd should only be set if the Job was created with CwdMatters = true.
	Cwd string

	// Mounts should only be set if the Job was created with Mounts
	MountConfigs MountConfigs
}

JobEssence struct describes the essential aspects of a Job that make it unique, used to describe a Job when eg. you want to search for one.

func (*JobEssence) Key added in v0.8.0

func (j *JobEssence) Key() string

Key returns the same value that key() on the matching Job would give you.

func (*JobEssence) Stringify added in v0.8.0

func (j *JobEssence) Stringify() string

Stringify returns a nice printable form of a JobEssence.

type JobState added in v0.9.0

type JobState string

JobState is how we describe the possible job states.

const (
	JobStateNew       JobState = "new"
	JobStateDelayed   JobState = "delayed"
	JobStateReady     JobState = "ready"
	JobStateReserved  JobState = "reserved"
	JobStateRunning   JobState = "running"
	JobStateLost      JobState = "lost"
	JobStateBuried    JobState = "buried"
	JobStateDependent JobState = "dependent"
	JobStateComplete  JobState = "complete"
	JobStateDeleted   JobState = "deleted"
	JobStateDeletable JobState = "deletable"
	JobStateUnknown   JobState = "unknown"
)

JobState* constants represent all the possible job states. The fake "new" and "deleted" states are for the benefit of the web interface (jstateCount). "lost" is also a "fake" state indicating the job was running and we lost contact with it; it may be dead. "unknown" is an error case that shouldn't happen. "deletable" is a meta state that can be used when filtering jobs to mean !(running|complete).

type JobViaJSON added in v0.10.0

type JobViaJSON struct {
	Cmd          string       `json:"cmd"`
	Cwd          string       `json:"cwd"`
	CwdMatters   bool         `json:"cwd_matters"`
	ChangeHome   bool         `json:"change_home"`
	MountConfigs MountConfigs `json:"mounts"`
	ReqGrp       string       `json:"req_grp"`
	// Memory is a number and unit suffix, eg. 1G for 1 Gigabyte.
	Memory string `json:"memory"`
	// Time is a duration with a unit suffix, eg. 1h for 1 hour.
	Time string   `json:"time"`
	CPUs *float64 `json:"cpus"`
	// Disk is the number of Gigabytes the cmd will use.
	Disk             *int              `json:"disk"`
	Override         *int              `json:"override"`
	Priority         *int              `json:"priority"`
	Retries          *int              `json:"retries"`
	RepGrp           string            `json:"rep_grp"`
	DepGrps          []string          `json:"dep_grps"`
	Deps             []string          `json:"deps"`
	CmdDeps          Dependencies      `json:"cmd_deps"`
	OnFailure        BehavioursViaJSON `json:"on_failure"`
	OnSuccess        BehavioursViaJSON `json:"on_success"`
	OnExit           BehavioursViaJSON `json:"on_exit"`
	Env              []string          `json:"env"`
	MonitorDocker    string            `json:"monitor_docker"`
	CloudOS          string            `json:"cloud_os"`
	CloudUser        string            `json:"cloud_username"`
	CloudScript      string            `json:"cloud_script"`
	CloudConfigFiles string            `json:"cloud_config_files"`
	CloudOSRam       *int              `json:"cloud_ram"`
	CloudFlavor      string            `json:"cloud_flavor"`
	CloudShared      bool              `json:"cloud_shared"`
	BsubMode         string            `json:"bsub_mode"`
}

JobViaJSON describes the properties of a JOB that a user wishes to add to the queue, convenient if they are supplying JSON.

func (*JobViaJSON) Convert added in v0.10.0

func (jvj *JobViaJSON) Convert(jd *JobDefaults) (*Job, error)

Convert considers the supplied defaults and returns a *Job based on the properties of this JobViaJSON. The Job will not be in the queue until passed to a method that adds jobs to the queue.

type MountConfig added in v0.8.0

type MountConfig struct {
	// Mount is the local directory on which to mount your Target(s). It can be
	// (in) any directory you're able to write to. If the directory doesn't
	// exist, it will be created first. Otherwise, it must be empty. If not
	// supplied, defaults to the subdirectory "mnt" in the Job's working
	// directory if CwdMatters, otherwise the actual working directory will be
	// used as the mount point.
	Mount string `json:",omitempty"`

	// CacheBase is the parent directory to use for the CacheDir of any Targets
	// configured with Cache on, but CacheDir undefined, or specified with a
	// relative path. If CacheBase is also undefined, the base will be the Job's
	// Cwd if CwdMatters, otherwise it will be the parent of the Job's actual
	// working directory.
	CacheBase string `json:",omitempty"`

	// Retries is the number of retries that should be attempted when
	// encountering errors in trying to access your remote S3 bucket. At least 3
	// is recommended. It defaults to 10 if not provided.
	Retries int `json:",omitempty"`

	// Verbose is a boolean, which if true, would cause timing information on
	// all remote S3 calls to appear as lines of all job STDERR that use the
	// mount. Errors always appear there.
	Verbose bool `json:",omitempty"`

	// Targets is a slice of MountTarget which define what you want to access at
	// your Mount. It's a slice to allow you to multiplex different buckets (or
	// different subdirectories of the same bucket) so that it looks like all
	// their data is in the same place, for easier access to files in your
	// mount. You can only have one of these configured to be writeable.
	Targets []MountTarget
}

MountConfig struct is used for setting in a Job to specify that a remote file system or object store should be fuse mounted prior to running the Job's Cmd. Currently only supports S3-like object stores.

type MountConfigs added in v0.8.0

type MountConfigs []MountConfig

MountConfigs is a slice of MountConfig.

func (MountConfigs) Key added in v0.9.0

func (mcs MountConfigs) Key() string

Key returns a string representation of the most critical parts of the config that would make it different from other MountConfigs in practical terms of what files are accessible from where: only Mount, Target.Profile and Target.Path are considered. The order of Targets (but not of MountConfig) is considered as well.

func (MountConfigs) String added in v0.8.0

func (mcs MountConfigs) String() string

String provides a JSON representation of the MountConfigs.

type MountTarget added in v0.8.0

type MountTarget struct {
	// Profile is the S3 configuration profile name to use. If not supplied, the
	// value of the $AWS_DEFAULT_PROFILE or $AWS_PROFILE environment variables
	// is used, and if those are unset it defaults to "default".
	//
	// We look at number of standard S3 configuration files and environment
	// variables to determine the scheme, domain, region and authentication
	// details to connect to S3 with. All possible sources are checked to fill
	// in any missing values from more preferred sources.
	//
	// The preferred file is ~/.s3cfg, since this is the only config file type
	// that allows the specification of a custom domain. This file is Amazon's
	// s3cmd config file, described here: http://s3tools.org/kb/item14.htm. wr
	// will look at the access_key, secret_key, use_https and host_base options
	// under the section with the given Profile name. If you don't wish to use
	// any other config files or environment variables, you can add the non-
	// standard region option to this file if you need to specify a specific
	// region.
	//
	// The next file checked is the one pointed to by the
	// $AWS_SHARED_CREDENTIALS_FILE environment variable, or ~/.aws/credentials.
	// This file is described here:
	// http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-
	// started.html. wr will look at the aws_access_key_id and
	// aws_secret_access_key options under the section with the given Profile
	// name.
	//
	// wr also checks the file pointed to by the $AWS_CONFIG_FILE environment
	// variable, or ~/.aws/config, described in the previous link. From here the
	// region option is used from the section with the given Profile name. If
	// you don't wish to use a ~/.s3cfg file but do need to specify a custom
	// domain, you can add the non-standard host_base and use_https options to
	// this file instead.
	//
	// As a last resort, ~/.awssecret is checked. This is s3fs's config file,
	// and consists of a single line with your access key and secret key
	// separated by a colon.
	//
	// If set, the environment variables $AWS_ACCESS_KEY_ID,
	// $AWS_SECRET_ACCESS_KEY and $AWS_DEFAULT_REGION override corresponding
	// options found in any config file.
	Profile string `json:",omitempty"`

	// Path (required) is the name of your S3 bucket, optionally followed URL-
	// style (separated with forward slashes) by sub-directory names. The
	// highest performance is gained by specifying the deepest path under your
	// bucket that holds all the files you wish to access.
	Path string

	// Cache is a boolean, which if true, turns on data caching of any data
	// retrieved, or any data you wish to upload.
	Cache bool `json:",omitempty"`

	// CacheDir is the local directory to store cached data. If this parameter
	// is supplied, Cache is forced true and so doesn't need to be provided. If
	// this parameter is not supplied but Cache is true, the directory will be a
	// unique directory in the containing MountConfig's CacheBase, and will get
	// deleted on unmount. If it's a relative path, it will be relative to the
	// CacheBase.
	CacheDir string `json:",omitempty"`

	// Write is a boolean, which if true, makes the mount point writeable. If
	// you don't intend to write to a mount, just leave this parameter out.
	// Because writing currently requires caching, turning this on forces Cache
	// to be considered true.
	Write bool `json:",omitempty"`
}

MountTarget struct is used for setting in a MountConfig to define what you want to access at your Mount.

type Server

type Server struct {
	ServerInfo *ServerInfo

	sync.Mutex

	log15.Logger
	// contains filtered or unexported fields
}

Server represents the server side of the socket that clients Connect() to.

func Serve

func Serve(config ServerConfig) (s *Server, msg string, token []byte, err error)

Serve is for use by a server executable and makes it start listening on localhost at the configured port for Connect()ions from clients, and then handles those clients.

It returns a *Server that you will typically call Block() on to block until until your executable receives a SIGINT or SIGTERM, or you call Stop(), at which point the queues will be safely closed (you'd probably just exit at that point).

If it creates a db file or recreates one from backup, and if it creates TLS certificates, it will say what it did in the returned msg string.

The returned token must be provided by any client to authenticate. The server is a single user system, so there is only 1 token kept for its entire lifetime. If config.TokenFile has been set, the token will also be written to that file, potentially making it easier for any CLI clients to authenticate with this returned Server.

The possible errors from Serve() will be related to not being able to start up at the supplied address; errors encountered while dealing with clients are logged but otherwise ignored.

It also spawns your runner clients as needed, running them via the configured job scheduler, using the configured shell. It determines the command line to execute for your runner client from the configured RunnerCmd string you supplied.

func (*Server) BackupDB added in v0.10.0

func (s *Server) BackupDB(w io.Writer) error

BackupDB lets you do a manual live backup of the server's database to a given writer. Note that automatic backups occur to the configured location without calling this.

func (*Server) Block

func (s *Server) Block() error

Block makes you block while the server does the job of serving clients. This will return with an error indicating why it stopped blocking, which will be due to receiving a signal or because you called Stop()

func (*Server) Drain

func (s *Server) Drain() error

Drain will stop the server spawning new runners and stop Reserve*() from returning any more Jobs. Once all current runners exit, we Stop().

func (*Server) GetServerStats

func (s *Server) GetServerStats() *ServerStats

GetServerStats returns some simple live stats about what's happening in the server's queue.

func (*Server) HasRunners

func (s *Server) HasRunners() bool

HasRunners tells you if there are currently runner clients in the job scheduler (either running or pending).

func (*Server) Stop

func (s *Server) Stop(wait ...bool)

Stop will cause a graceful shut down of the server. Supplying an optional bool of true will cause Stop() to wait until all runners have exited and the server is truly down before returning.

type ServerConfig

type ServerConfig struct {
	// Port for client-server communication.
	Port string

	// Port for the web interface.
	WebPort string

	// Name of the desired scheduler (eg. "local" or "lsf" or "openstack") that
	// jobs will be submitted to.
	SchedulerName string

	// SchedulerConfig should define the config options needed by the chosen
	// scheduler, eg. scheduler.ConfigLocal{Deployment: "production", Shell:
	// "bash"} if using the local scheduler.
	SchedulerConfig interface{}

	// The command line needed to bring up a jobqueue runner client, which
	// should contain 6 %s parts which will be replaced with the scheduler
	// group, deployment, ip:host address of the server, domain name that the
	// server's certificate should be valid for, reservation time out and
	// maximum number of minutes allowed, eg. "my_jobqueue_runner_client --group
	// '%s' --deployment %s --server '%s' --domain %s --reserve_timeout %d
	// --max_mins %d". If you supply an empty string (the default), runner
	// clients will not be spawned; for any work to be done you will have to run
	// your runner client yourself manually.
	RunnerCmd string

	// Absolute path to where the database file should be saved. The database is
	// used to ensure no loss of added commands, to keep a permanent history of
	// all jobs completed, and to keep various stats, amongst other things.
	DBFile string

	// Absolute path to where the database file should be backed up to.
	DBFileBackup string

	// Absolute path to where the server will store the authorization token
	// needed by clients to communicate with the server. Storing it in a file
	// could make using any CLI clients more convenient. The file will be
	// read-only by the user starting the server. The default of empty string
	// means the token is not saved to disk.
	TokenFile string

	// Absolute path to where CA PEM file is that will be used for
	// securing access to the web interface. If the given file does not exist,
	// a certificate will be generated for you at this path.
	CAFile string

	// Absolute path to where certificate PEM file is that will be used for
	// securing access to the web interface. If the given file does not exist,
	// a certificate will be generated for you at this path.
	CertFile string

	// Absolute path to where key PEM file is that will be used for securing
	// access to the web interface. If the given file does not exist, a
	// key will be generated for you at this path.
	KeyFile string

	// Domain that a generated CertFile should be valid for. If not supplied,
	// defaults to "localhost".
	//
	// When using your own CertFile, this should be set to a domain that the
	// certifcate is valid for, as when the server spawns clients, those clients
	// will validate the server's certifcate based on this domain. For the web
	// interface and REST API, it is up to you to ensure that your DNS has an
	// entry for this domain that points to the IP address of the machine
	// running your server.
	CertDomain string

	// Name of the deployment ("development" or "production"); development
	// databases are deleted and recreated on start up by default.
	Deployment string

	// CIDR is the IP address range of your network. When the server needs to
	// know its own IP address, it uses this CIDR to confirm it got it correct
	// (ie. it picked the correct network interface). You can leave this unset,
	// in which case it will do its best to pick correctly. (This is only a
	// possible issue if you have multiple network interfaces.)
	CIDR string

	// UploadDir is the directory where files uploaded to the Server will be
	// stored. They get given unique names based on the MD5 checksum of the file
	// uploaded. Defaults to /tmp.
	UploadDir string

	// Logger is a logger object that will be used to log uncaught errors and
	// debug statements. "Uncought" errors are all errors generated during
	// operation that either shouldn't affect the success of operations, and can
	// be ignored (logged at the Warn level, and which is why the errors are not
	// returned by the methods generating them), or errors that could not be
	// returned (logged at the Error level, eg. generated during a go routine,
	// such as errors by the server handling a particular client request).
	// We attempt to recover from panics during server operation and log these
	// at the Crit level.
	//
	// If your logger is levelled and set to the debug level, you will also get
	// information tracking the inner workings of the server.
	//
	// If this is unset, nothing is logged (defaults to a logger using a
	// log15.DiscardHandler()).
	Logger log15.Logger
}

ServerConfig is supplied to Serve() to configure your jobqueue server. All fields are required with no working default unless otherwise noted.

type ServerInfo

type ServerInfo struct {
	Addr       string // ip:port
	Host       string // hostname
	Port       string // port
	WebPort    string // port of the web interface
	PID        int    // process id of server
	Deployment string // deployment the server is running under
	Scheduler  string // the name of the scheduler that jobs are being submitted to
	Mode       string // ServerModeNormal if the server is running normally, or ServerModeDrain if draining
}

ServerInfo holds basic addressing info about the server.

type ServerStats

type ServerStats struct {
	Delayed int           // how many jobs are waiting following a possibly transient error
	Ready   int           // how many jobs are ready to begin running
	Running int           // how many jobs are currently running
	Buried  int           // how many jobs are no longer being processed because of seemingly permanent errors
	ETC     time.Duration // how long until the the slowest of the currently running jobs is expected to complete
}

ServerStats holds information about the jobqueue server for sending to clients.

Directories

Path Synopsis
Package scheduler lets the jobqueue server interact with the configured job scheduler (if any) to submit jobqueue runner clients and have them run on a compute cluster (or local machine).
Package scheduler lets the jobqueue server interact with the configured job scheduler (if any) to submit jobqueue runner clients and have them run on a compute cluster (or local machine).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL