jobqueue

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2017 License: GPL-3.0 Imports: 40 Imported by: 7

Documentation

Overview

Package jobqueue provides server/client functions to interact with the queue structure provided by the queue package over a network.

It provides a job queue and running system which guarantees:

# Created jobs are never lost accidentally.
# The same job will not run more than once simultaneously:
  - Duplicate jobs are not created
  - Each job is handled by only a single client
# Jobs are handled in the desired order (user priority and fifo, after
  dependencies have been satisfied).
# Jobs still get run despite crashing clients.
# Completed jobs are kept forever for historical and "live" dependency
  purposes.

You bring up the server, then use a client to add commands (jobs) to the queue. The server then interacts with the configured scheduler to start running the necessary number of runner clients on your compute cluster. The runner clients ask the server for a command to run, and then they run the command. Once complete, the server is updated and the runner client requests the next command, or might exit if there are no more left.

As a user you can query the status of the system using client methods or by viewing the real-time updated status web interface.

Server

import "github.com/VertebrateResequencing/wr/jobqueue"
server, msg, err := jobqueue.Serve(jobqueue.ServerConfig{
    Port:            "12345",
    WebPort:         "12346",
    SchedulerName:   "local",
    SchedulerConfig: &jqs.ConfigLocal{Shell: "bash"},
    RunnerCmd:       selfExe + " runner -q %s -s '%s' --deployment %s --server '%s' -r %d -m %d",
    DBFile:          "/home/username/.wr_produciton/boltdb",
    DBFileBackup:    "/home/username/.wr_produciton/boltdb.backup",
    Deployment:      "production",
    CIDR:            "",
})
err = server.Block()

Client

An example client, one for adding commands to the job queue:

import {
    "github.com/VertebrateResequencing/wr/jobqueue"
    jqs "github.com/VertebrateResequencing/wr/jobqueue/scheduler"
}

var jobs []*jobqueue.Job
other := make(map[string]string)
var deps []*jobqueue.Dependency
deps = append(deps, jobqueue.NewDepGroupDependency("step1"))
jobs = append(jobs, &jobqueue.Job{
    RepGroup:     "friendly name",
    Cmd:          "myexe -args",
    Cwd:          "/tmp",
    ReqGroup:     "myexeInArgsMode",
    Requirements: &jqs.Requirements{RAM: 1024, Time: 10 * time.Minute, Cores: 1, Disk: 1, Other: other},
    Override:     uint8(0),
    Priority:     uint8(0),
    Retries:      uint8(3),
    DepGroups:    []string{"step2"},
    Dependencies: deps,
})

jq, err := jobqueue.Connect("localhost:12345", "cmds", 30 * time.Second)
inserts, dups, err := jq.Add(jobs, os.Environ())

Index

Constants

View Source
const (
	FailReasonEnv      = "failed to get environment variables"
	FailReasonCwd      = "working directory does not exist"
	FailReasonStart    = "command failed to start"
	FailReasonCPerm    = "command permission problem"
	FailReasonCFound   = "command not found"
	FailReasonCExit    = "command invalid exit code"
	FailReasonExit     = "command exited non-zero"
	FailReasonRAM      = "command used too much RAM"
	FailReasonTime     = "command used too much time"
	FailReasonAbnormal = "command failed to complete normally"
	FailReasonSignal   = "runner received a signal to stop"
	FailReasonResource = "resource requirements cannot be met"
	FailReasonMount    = "mounting of remote file system(s) failed"
	FailReasonUpload   = "failed to upload files to remote file system"
)

FailReason* are the reasons for cmd line failure stored on Jobs

View Source
const (
	ErrInternalError  = "internal error"
	ErrUnknownCommand = "unknown command"
	ErrBadRequest     = "bad request (missing arguments?)"
	ErrBadJob         = "bad job (not in queue or correct sub-queue)"
	ErrMissingJob     = "corresponding job not found"
	ErrUnknown        = "unknown error"
	ErrClosedInt      = "queues closed due to SIGINT"
	ErrClosedTerm     = "queues closed due to SIGTERM"
	ErrClosedStop     = "queues closed due to manual Stop()"
	ErrQueueClosed    = "queue closed"
	ErrNoHost         = "could not determine the non-loopback ip address of this host"
	ErrNoServer       = "could not reach the server"
	ErrMustReserve    = "you must Reserve() a Job before passing it to other methods"
	ErrDBError        = "failed to use database"
	ServerModeNormal  = "started"
	ServerModeDrain   = "draining"
)

Err* constants are found in the our returned Errors under err.Err, so you can cast and check if it's a certain type of error. ServerMode* constants are used to report on the status of the server, found inside ServerInfo.

Variables

View Source
var (
	ClientTouchInterval               = 15 * time.Second
	ClientReleaseDelay                = 30 * time.Second
	RAMIncreaseMin            float64 = 1000
	RAMIncreaseMultLow                = 2.0
	RAMIncreaseMultHigh               = 1.3
	RAMIncreaseMultBreakpoint float64 = 8192
)

these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)

View Source
var (
	RecMBRound  = 100  // when we recommend amount of memory to reserve for a job, we round up to the nearest RecMBRound MBs
	RecSecRound = 1800 // when we recommend time to reserve for a job, we round up to the nearest RecSecRound seconds
)

Rec* variables are only exported for testing purposes (*** though they should probably be user configurable somewhere...).

View Source
var (
	ServerInterruptTime   = 1 * time.Second
	ServerItemTTR         = 60 * time.Second
	ServerReserveTicker   = 1 * time.Second
	ServerLogClientErrors = true
)

these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)

View Source
var AppName = "jobqueue"

AppName gets used in certain places like naming the base directory of created working directories during Client.Execute().

Functions

func CurrentIP

func CurrentIP(cidr string) (ip string)

CurrentIP returns the IP address of the machine we're running on right now. The cidr argument can be an empty string, but if set to the CIDR of the machine's primary network, it helps us be sure of getting the correct IP address (for when there are multiple network interfaces on the machine).

Types

type Behaviour added in v0.8.0

type Behaviour struct {
	When BehaviourTrigger
	Do   BehaviourAction
	Arg  interface{} // the arg needed by your chosen action
}

Behaviour describes something that should happen in response to a Job's Cmd exiting a certain way.

func (*Behaviour) String added in v0.8.0

func (b *Behaviour) String() string

String provides a nice string representation of a Behaviour for user interface display purposes. It is in the form of a JSON string that can be converted back to a Behaviour via a BehaviourViaJSON.

func (*Behaviour) Trigger added in v0.8.0

func (b *Behaviour) Trigger(status BehaviourTrigger, j *Job) error

Trigger will carry out our BehaviourAction if the supplied status matches our BehaviourTrigger.

type BehaviourAction added in v0.8.0

type BehaviourAction uint8

BehaviourAction is supplied to a Behaviour to define what should happen when that behaviour triggers. (It's a uint8 type as opposed to an actual func to save space since we need to store these on every Job; do not treat as a flag and OR multiple actions together!)

const (
	// CleanupAll is a BehaviourAction that will delete any directories that
	// were created by a Job due to CwdMatters being false. Note that if the
	// Job's Cmd created output files within the actual cwd, these would get
	// deleted along with everything else. It takes no arguments.
	CleanupAll BehaviourAction = 1 << iota

	// Cleanup is a BehaviourAction that behaves exactly as CleanupAll in the
	// case that no output files have been specified on the Job. If some have,
	// everything except those files gets deleted. It takes no arguments.
	// (NB: since output file specification has not yet been implemented, this
	// is currently identical to CleanupAll.)
	Cleanup

	// Run is a BehaviourAction that runs a given command (supplied as a single
	// string Arg to the Behaviour) in the Job's actual cwd.
	Run

	// CopyToManager is a BehaviourAction that copies the given files (specified
	// as a slice of string paths Arg to the Behaviour) from the Job's actual
	// cwd to a configured location on the machine that the jobqueue server is
	// running on. *** not yet implemented!
	CopyToManager
)

type BehaviourTrigger added in v0.8.0

type BehaviourTrigger uint8

BehaviourTrigger is supplied to a Behaviour to define under what circumstance that Behaviour will trigger.

const (
	// OnExit is a BehaviourTrigger for Behaviours that should trigger when a
	// Job's Cmd is executed and finishes running. These behaviours will trigger
	// after OnSucess and OnFailure triggers, which makes OnExit different to
	// specifying OnSuccess|OnFailure.
	OnExit BehaviourTrigger = 1 << iota

	// OnSuccess is a BehaviourTrigger for Behaviours that should trigger when a
	// Job's Cmd is executed and exits 0.
	OnSuccess

	// OnFailure is a BehaviourTrigger for Behaviours that should trigger when a
	// Job's Cmd is executed and exits non-0.
	OnFailure
)

type BehaviourViaJSON added in v0.8.0

type BehaviourViaJSON struct {
	Run           string   `json:"run,omitempty"`
	CopyToManager []string `json:"copy_to_manager,omitempty"`
	Cleanup       bool     `json:"cleanup,omitempty"`
	CleanupAll    bool     `json:"cleanup_all,omitempty"`
}

BehaviourViaJSON makes up BehavioursViaJSON. Each of these should only specify one of its properties.

func (BehaviourViaJSON) Behaviour added in v0.8.0

func (bj BehaviourViaJSON) Behaviour(when BehaviourTrigger) *Behaviour

Behaviour converts the friendly BehaviourViaJSON struct to real Behaviour.

type Behaviours added in v0.8.0

type Behaviours []*Behaviour

Behaviours are a slice of Behaviour.

func (Behaviours) String added in v0.8.0

func (bs Behaviours) String() string

String provides a nice string representation of Behaviours for user interface display purposes. It takes the form of a JSON string that can be converted back to Behaviours using a BehavioursViaJSON for each key. The keys are "on_failure", "on_success", "on_failure|success" and "on_exit".

func (Behaviours) Trigger added in v0.8.0

func (bs Behaviours) Trigger(success bool, j *Job) error

Trigger calls Trigger on each constituent Behaviour, first all those for OnSuccess if success = true or OnFailure otherwise, then those for OnExit.

type BehavioursViaJSON added in v0.8.0

type BehavioursViaJSON []BehaviourViaJSON

BehavioursViaJSON is a slice of BehaviourViaJSON. It is a convenience to allow users to specify behaviours in a more natural way if they're trying to describe them in a JSON string. You'd have one of these per BehaviourTrigger.

func (BehavioursViaJSON) Behaviours added in v0.8.0

func (bjs BehavioursViaJSON) Behaviours(when BehaviourTrigger) (bs Behaviours)

Behaviours converts a BehavioursViaJSON to real Behaviours.

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Client represents the client side of the socket that the jobqueue server is Serve()ing, specific to a particular queue.

func Connect

func Connect(addr string, queue string, timeout time.Duration) (c *Client, err error)

Connect creates a connection to the jobqueue server, specific to a single queue. Timeout determines how long to wait for a response from the server, not only while connecting, but for all subsequent interactions with it using the returned Client.

func (*Client) Add

func (c *Client) Add(jobs []*Job, envVars []string) (added int, existed int, err error)

Add adds new jobs to the job queue, but only if those jobs aren't already in there.

If any were already there, you will not get an error, but the returned 'existed' count will be > 0. Note that no cross-queue checking is done, so you need to be careful not to add the same job to different queues.

Note that if you add jobs to the queue that were previously added, Execute()d and were successfully Archive()d, the existed count will be 0 and the jobs will be treated like new ones, though when Archive()d again, the new Job will replace the old one in the database.

The envVars argument is a slice of ("key=value") strings with the environment variables you want to be set when the job's Cmd actually runs. Typically you would pass in os.Environ().

func (*Client) Archive

func (c *Client) Archive(job *Job) (err error)

Archive removes a job from the jobqueue and adds it to the database of complete jobs, for use after you have run the job successfully. You have to have been the one to Reserve() the supplied Job, and the Job must be marked as having successfully run, or you will get an error.

func (*Client) Bury

func (c *Client) Bury(job *Job, failreason string, stderr ...error) (err error)

Bury marks a job as unrunnable, so it will be ignored (until the user does something to perhaps make it runnable and kicks the job). Note that you must reserve a job before you can bury it. Optionally supply an error that will be be displayed as the Job's stderr.

func (*Client) CompressEnv added in v0.5.0

func (c *Client) CompressEnv(envars []string) []byte

CompressEnv encodes the given environment variables (slice of "key=value" strings) and then compresses that, so that for Add() the server can store it on disc without holding it in memory, and pass the compressed bytes back to us when we need to know the Env (during Execute()).

func (*Client) Delete

func (c *Client) Delete(jes []*JobEssence) (deleted int, err error)

Delete removes previously Bury()'d jobs from the queue completely. For use when jobs were created incorrectly/ by accident, or they can never be fixed. It returns a count of jobs that it actually removed. Errors will only be related to not being able to contact the server.

func (*Client) Disconnect

func (c *Client) Disconnect()

Disconnect closes the connection to the jobqueue server. It is CRITICAL that you call Disconnect() before calling Connect() again in the same process.

func (*Client) DrainServer

func (c *Client) DrainServer() (running int, etc time.Duration, err error)

DrainServer tells the server to stop spawning new runners, stop letting existing runners reserve new jobs, and exit once existing runners stop running. You get back a count of existing runners and and an estimated time until completion for the last of those runners.

func (*Client) Ended

func (c *Client) Ended(job *Job, cwd string, exitcode int, peakram int, cputime time.Duration, stdout []byte, stderr []byte) (err error)

Ended updates a Job on the server with information that you've finished running the Job's Cmd. Peakram should be in MB. The cwd you supply should be the actual working directory used, which may be different to the Job's Cwd property; if not, supply empty string.

func (*Client) Execute

func (c *Client) Execute(job *Job, shell string) error

Execute runs the given Job's Cmd and blocks until it exits. Then any Job Behaviours get triggered as appropriate for the exit status.

The Cmd is run using the environment variables set when the Job was Add()ed, or the current environment is used if none were set.

The Cmd is also run within the Job's Cwd. If CwdMatters is false, a unique subdirectory is created within Cwd, and that is used as the actual working directory. When creating these unique subdirectories, directory hashing is used to allow the safe running of 100s of thousands of Jobs all using the same Cwd (that is, we will not break the directory listing of Cwd). Furthermore, a sister folder will be created in the unique location for this Job, the path to which will become the value of the TMPDIR environment variable. Once the Cmd exits, this temp directory will be deleted and the path to the actual working directory created will be in the Job's ActualCwd property. The unique folder structure itself can be wholly deleted through the Job behaviour "cleanup".

If any remote file system mounts have been configured for the Job, these are mounted prior to running the Cmd, and unmounted afterwards.

Internally, Execute() calls Mount(), Started() and Ended() and keeps track of peak RAM used. It regularly calls Touch() on the Job so that the server knows we are still alive and handling the Job successfully. It also intercepts SIGTERM, SIGINT, SIGQUIT, SIGUSR1 and SIGUSR2, sending SIGKILL to the running Cmd and returning Error.Err(FailReasonSignal); you should check for this and exit your process. Finally it calls Unmount() and TriggerBehaviours().

If no error is returned, the Cmd will have run OK, exited with status 0, and been Archive()d from the queue while being placed in the permanent store. Otherwise, it will have been Release()d or Bury()ied as appropriate.

The supplied shell is the shell to execute the Cmd under, ideally bash (something that understand the command "set -o pipefail"). You have to have been the one to Reserve() the supplied Job, or this will immediately return an error. NB: the peak RAM tracking assumes we are running on a modern linux system with /proc/*/smaps.

func (*Client) GetByEssence added in v0.8.0

func (c *Client) GetByEssence(je *JobEssence, getstd bool, getenv bool) (j *Job, err error)

GetByEssence gets a Job given a JobEssence to describe it. With the boolean args set to true, this is the only way to get a Job that StdOut() and StdErr() will work on, and one of 2 ways that Env() will work (the other being Reserve()).

func (*Client) GetByEssences added in v0.8.0

func (c *Client) GetByEssences(jes []*JobEssence) (out []*Job, err error)

GetByEssences gets multiple Jobs at once given JobEssences that describe them.

func (*Client) GetByRepGroup

func (c *Client) GetByRepGroup(repgroup string, limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)

GetByRepGroup gets multiple Jobs at once given their RepGroup (an arbitrary user-supplied identifier for the purpose of grouping related jobs together for reporting purposes). 'limit', if greater than 0, limits the number of jobs returned that have the same State, FailReason and Exitcode, and on the the last job of each State+FailReason group it populates 'Similar' with the number of other excluded jobs there were in that group. Providing 'state' only returns jobs in that State. 'getStd' and 'getEnv', if true, retrieve the stdout, stderr and environement variables for the Jobs.

func (*Client) GetIncomplete

func (c *Client) GetIncomplete(limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)

GetIncomplete gets all Jobs that are currently in the jobqueue, ie. excluding those that are complete and have been Archive()d. The args are as in GetByRepGroup().

func (*Client) Kick

func (c *Client) Kick(jes []*JobEssence) (kicked int, err error)

Kick makes previously Bury()'d jobs runnable again (it can be Reserve()d in the future). It returns a count of jobs that it actually kicked. Errors will only be related to not being able to contact the server.

func (*Client) Ping

func (c *Client) Ping(timeout time.Duration) bool

Ping tells you if your connection to the server is working.

func (*Client) Release

func (c *Client) Release(job *Job, failreason string, delay time.Duration) (err error)

Release places a job back on the jobqueue, for use when you can't handle the job right now (eg. there was a suspected transient error) but maybe someone else can later. Note that you must reserve a job before you can release it. The delay arg is the duration to wait after your call to Release() before anyone else can Reserve() this job again - could help you stop immediately Reserve()ing the job again yourself. You can only Release() the same job as many times as its Retries value if it has been run and failed; a subsequent call to Release() will instead result in a Bury(). (If the job's Cmd was not run, you can Release() an unlimited number of times.)

func (*Client) Reserve

func (c *Client) Reserve(timeout time.Duration) (j *Job, err error)

Reserve takes a job off the jobqueue. If you process the job successfully you should Archive() it. If you can't deal with it right now you should Release() it. If you think it can never be dealt with you should Bury() it. If you die unexpectedly, the job will automatically be released back to the queue after some time.

If no job was available in the queue for as long as the timeout argument, nil is returned for both job and error. If your timeout is 0, you will wait indefinitely for a job.

NB: if your jobs have schedulerGroups (and they will if you added them to a server configured with a RunnerCmd), this will most likely not return any jobs; use ReserveScheduled() instead.

func (*Client) ReserveScheduled

func (c *Client) ReserveScheduled(timeout time.Duration, schedulerGroup string) (j *Job, err error)

ReserveScheduled is like Reserve(), except that it will only return jobs from the specified schedulerGroup.

Based on the scheduler the server was configured with, it will group jobs based on their resource requirements and then submit runners to handle them to your system's job scheduler (such as LSF), possibly in different scheduler queues. These runners are told the group they are a part of, and that same group name is applied internally to the Jobs as the "schedulerGroup", so that the runners can reserve only Jobs that they're supposed to. Therefore, it does not make sense for you to call this yourself; it is only for use by runners spawned by the server.

func (*Client) ServerStats

func (c *Client) ServerStats() (s *ServerStats, err error)

ServerStats returns stats of the jobqueue server itself.

func (*Client) ShutdownServer

func (c *Client) ShutdownServer() bool

ShutdownServer tells the server to immediately cease all operations. Its last act will be to backup its internal database. Any existing runners will fail. Because the server gets shut down it can't respond with success/failure, so we indirectly report if the server was shut down successfully.

func (*Client) Started

func (c *Client) Started(job *Job, pid int, host string) (err error)

Started updates a Job on the server with information that you've started running the Job's Cmd.

func (*Client) Touch

func (c *Client) Touch(job *Job) (err error)

Touch adds to a job's ttr, allowing you more time to work on it. Note that you must have reserved the job before you can touch it.

type Dependencies added in v0.2.0

type Dependencies []*Dependency

Dependencies is a slice of *Dependency, for use in Job.Dependencies. It describes the jobs that must be complete before the Job you associate this with will start.

func (Dependencies) DepGroups added in v0.3.0

func (d Dependencies) DepGroups() (depGroups []string)

DepGroups returns all the DepGroups of our constituent Dependency structs.

func (Dependencies) Stringify added in v0.3.0

func (d Dependencies) Stringify() (strings []string)

Stringify converts our constituent Dependency structs in to a slice of strings, each of which could be JobEssence or DepGroup based.

type Dependency added in v0.2.0

type Dependency struct {
	Essence  *JobEssence
	DepGroup string
}

Dependency is a struct that describes a Job purely in terms of a JobEssence, or in terms of a Job's DepGroup, for use in Dependencies. If DepGroup is specified, then Essence is ignored.

func NewDepGroupDependency added in v0.3.0

func NewDepGroupDependency(depgroup string) *Dependency

NewDepGroupDependency makes it a little easier to make a new *Dependency based on a dep group, for use in NewDependencies().

func NewEssenceDependency added in v0.8.0

func NewEssenceDependency(cmd string, cwd string) *Dependency

NewEssenceDependency makes it a little easier to make a new *Dependency based on Cmd+Cwd, for use in NewDependencies(). Leave cwd as an empty string if the job you are describing does not have CwdMatters true.

type Error

type Error struct {
	Queue string // the queue's Name
	Op    string // name of the method
	Item  string // the item's key
	Err   string // one of our Err* vars
}

Error records an error and the operation, item and queue that caused it.

func (Error) Error

func (e Error) Error() string

type Job

type Job struct {
	// Cmd is the actual command line that will be run via the shell.
	Cmd string

	// Cwd determines the command working directory, the directory we cd to
	// before running Cmd. When CwdMatters, Cwd is used exactly, otherwise a
	// unique sub-directory of Cwd is used as the command working directory.
	Cwd string

	// CwdMatters should be made true when Cwd contains input files that you
	// will refer to using relative (from Cwd) paths in Cmd, and when other Jobs
	// have identical Cmds because you have many different directories that
	// contain different but identically named input files. Cwd will become part
	// of what makes the Job unique.
	// When CwdMatters is false (default), Cmd gets run in a unique subfolder of
	// Cwd, enabling features like tracking disk space usage and clean up of the
	// working directory by simply deleting the whole thing. The TMPDIR
	// environment variable is also set to a sister folder of the unique
	// subfolder, and this is always cleaned up after the Cmd exits.
	CwdMatters bool

	// ChangeHome sets the $HOME environment variable to the actual working
	// directory before running Cmd, but only when CwdMatters is false.
	ChangeHome bool

	// RepGroup is a name associated with related Jobs to help group them
	// together when reporting on their status etc.
	RepGroup string

	// ReqGroup is a string that you supply to group together all commands that
	// you expect to have similar resource requirements.
	ReqGroup string

	// Requirements describes the resources this Cmd needs to run, such as RAM,
	// Disk and time. These may be determined for you by the system (depending
	// on Override) based on past experience of running jobs with the same
	// ReqGroup.
	Requirements *scheduler.Requirements

	// Override determines if your own supplied Requirements get used, or if the
	// systems' calculated values get used. 0 means prefer the system values. 1
	// means prefer your values if they are higher. 2 means always use your
	// values.
	Override uint8

	// Priority is a number between 0 and 255 inclusive - higher numbered jobs
	// will run before lower numbered ones (the default is 0).
	Priority uint8

	// Retries is the number of times to retry running a Cmd if it fails.
	Retries uint8

	// DepGroups are the dependency groups this job belongs to that other jobs
	// can refer to in their Dependencies.
	DepGroups []string

	// Dependencies describe the jobs that must be complete before this job
	// starts.
	Dependencies Dependencies

	// Behaviours describe what should happen after Cmd is executed, depending
	// on its success.
	Behaviours Behaviours

	// MountConfigs describes remote file systems or object stores that you wish
	// to be fuse mounted prior to running the Cmd. Once Cmd exits, the mounts
	// will be unmounted (with uploads only occurring if it exits with code 0).
	// If you want multiple separate mount points accessed from different local
	// directories, you will supply more than one MountConfig in the slice. If
	// you want multiple remote locations multiplexed and accessible from a
	// single local directory, you will supply a single MountConfig in the
	// slice, configured with multiple MountTargets. Relative paths for your
	// MountConfig.Mount options will be relative to Cwd (or ActualCwd if
	// CwdMatters == false). If a MountConfig.Mount is not specified, it
	// defaults to Cwd/mnt if CwdMatters, otherwise ActualCwd itself will be the
	// mount point. If a MountConfig.CachBase is not specified, it defaults to
	// to Cwd if CwdMatters, otherwise it will be a sister directory of
	// ActualCwd.
	MountConfigs MountConfigs

	// the actual working directory used, which would have been created with a
	// unique name if CwdMatters = false
	ActualCwd string
	// peak RAM (MB) used.
	PeakRAM int
	// true if the Cmd was run and exited.
	Exited bool
	// if the job ran and exited, its exit code is recorded here, but check
	// Exited because when this is not set it could like like exit code 0.
	Exitcode int
	// if the job failed to complete successfully, this will hold one of the
	// FailReason* strings.
	FailReason string
	// pid of the running or ran process.
	Pid int
	// host the process is running or did run on.
	Host string
	// time the cmd started running.
	StartTime time.Time
	// time the cmd stopped running.
	EndTime time.Time
	// CPU time used.
	CPUtime time.Duration
	// to read, call job.StdErr() instead; if the job ran, its (truncated)
	// STDERR will be here.
	StdErrC []byte
	// to read, call job.StdOut() instead; if the job ran, its (truncated)
	// STDOUT will be here.
	StdOutC []byte
	// to read, call job.Env() instead, to get the environment variables as a
	// []string, where each string is like "key=value".
	EnvC []byte
	// if set (using output of CompressEnv()), they will be returned in the
	// results of job.Env().
	EnvOverride []byte
	// job's state in the queue: 'delayed', 'ready', 'reserved', 'running',
	// 'buried', 'complete' or 'dependent'.
	State string
	// number of times the job had ever entered 'running' state.
	Attempts uint32
	// remaining number of Release()s allowed before being buried instead.
	UntilBuried uint8
	// we note which client reserved this job, for validating if that client has
	// permission to do other stuff to this Job; the server only ever sets this
	// on Reserve(), so clients can't cheat by changing this on their end.
	ReservedBy uuid.UUID
	// on the server we don't store EnvC with the job, but look it up in db via
	// this key.
	EnvKey string
	// when retrieving jobs with a limit, this tells you how many jobs were
	// excluded.
	Similar int
	// name of the queue the Job was added to.
	Queue string
	// contains filtered or unexported fields
}

Job is a struct that represents a command that needs to be run and some associated metadata. If you get a Job back from the server (via Reserve() or Get*()), you should treat the properties as read-only: changing them will have no effect.

func (*Job) Env

func (j *Job) Env() (env []string, err error)

Env decompresses and decodes job.EnvC (the output of CompressEnv(), which are the environment variables the Job's Cmd should run/ran under). Note that EnvC is only populated if you got the Job from GetByCmd(_, _, true) or Reserve(). If no environment variables were passed in when the job was Add()ed to the queue, returns current environment variables instead. In both cases, alters the return value to apply any overrides stored in job.EnvOverride.

func (*Job) Mount added in v0.8.0

func (j *Job) Mount() error

Mount uses the Job's MountConfigs to mount the remote file systems at the desired mount points. If a mount point is unspecified, mounts in the sub folder Cwd/mnt if CwdMatters (and unspecified CacheBase becomes Cwd), otherwise the actual working directory is used as the mount point (and the parent of that used for unspecified CacheBase). Relative CacheDir options are treated relative to the CacheBase.

func (*Job) StdErr

func (j *Job) StdErr() (stderr string, err error)

StdErr returns the decompressed job.StdErrC, which is the head and tail of job.Cmd's STDERR when it ran. If the Cmd hasn't run yet, or if it output nothing to STDERR, you will get an empty string. Note that StdErrC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.

func (*Job) StdOut

func (j *Job) StdOut() (stdout string, err error)

StdOut returns the decompressed job.StdOutC, which is the head and tail of job.Cmd's STDOUT when it ran. If the Cmd hasn't run yet, or if it output nothing to STDOUT, you will get an empty string. Note that StdOutC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.

func (*Job) TriggerBehaviours added in v0.8.0

func (j *Job) TriggerBehaviours(success bool) error

TriggerBehaviours triggers this Job's Behaviours based on if its Cmd got executed successfully or not. Should only be called as part of or after Execute().

func (*Job) Unmount added in v0.8.0

func (j *Job) Unmount(stopUploads ...bool) (logs string, err error)

Unmount unmounts any remote filesystems that were previously mounted with Mount(). Returns nil if Mount() had not been called or there were no MountConfigs. Note that for cached writable mounts, created files will only begin to upload once Unmount() is called, so this may take some time to return. Supply true to disable uploading of files (eg. if you're unmounting following an error). If uploading, error could contain the string "failed to upload", which you may want to check for. On success, triggers the deletion of any empty directories between the mount point(s) and Cwd if not CwdMatters and the mount point was (within) ActualCwd.

func (*Job) WallTime added in v0.8.0

func (j *Job) WallTime() (d time.Duration)

WallTime returns the time the job took to run if it ran to completion, or the time taken so far if it is currently running.

type JobEssence added in v0.8.0

type JobEssence struct {
	// JobKey can be set by itself if you already know the "key" of the desired
	// job; you can get these keys when you use GetByRepGroup() or
	// GetIncomplete() with a limit. When this is set, other properties are
	// ignored.
	JobKey string

	// Cmd always forms an essential part of a Job.
	Cmd string

	// Cwd should only be set if the Job was created with CwdMatters = true.
	Cwd string
}

JobEssence struct describes the essential aspects of a Job that make it unique, used to describe a Job when eg. you want to search for one.

func (*JobEssence) Key added in v0.8.0

func (j *JobEssence) Key() string

Key returns the same value that key() on the matching Job would give you.

func (*JobEssence) Stringify added in v0.8.0

func (j *JobEssence) Stringify() string

Stringify returns a nice printable form of a JobEssence.

type MountConfig added in v0.8.0

type MountConfig struct {
	// Mount is the local directory on which to mount your Target(s). It can be
	// (in) any directory you're able to write to. If the directory doesn't
	// exist, it will be created first. Otherwise, it must be empty. If not
	// supplied, defaults to the subdirectory "mnt" in the Job's working
	// directory if CwdMatters, otherwise the actual working directory will be
	// used as the mount point.
	Mount string `json:",omitempty"`

	// CacheBase is the parent directory to use for the CacheDir of any Targets
	// configured with Cache on, but CacheDir undefined, or specified with a
	// relative path. If CacheBase is also undefined, the base will be the Job's
	// Cwd if CwdMatters, otherwise it will be the parent of the Job's actual
	// working directory.
	CacheBase string `json:",omitempty"`

	// Retries is the number of retries that should be attempted when
	// encountering errors in trying to access your remote S3 bucket. At least 3
	// is recommended. It defaults to 10 if not provided.
	Retries int `json:",omitempty"`

	// Verbose is a boolean, which if true, would cause timing information on
	// all remote S3 calls to appear as lines of all job STDERR that use the
	// mount. Errors always appear there.
	Verbose bool `json:",omitempty"`

	// Targets is a slice of MountTarget which define what you want to access at
	// your Mount. It's a slice to allow you to multiplex different buckets (or
	// different subdirectories of the same bucket) so that it looks like all
	// their data is in the same place, for easier access to files in your
	// mount. You can only have one of these configured to be writeable.
	Targets []MountTarget
}

MountConfig struct is used for setting in a Job to specify that a remote file system or object store should be fuse mounted prior to running the Job's Cmd. Currently only supports S3-like object stores.

type MountConfigs added in v0.8.0

type MountConfigs []MountConfig

MountConfigs is a slice of MountConfig.

func (MountConfigs) String added in v0.8.0

func (mcs MountConfigs) String() string

String provides a JSON representation of the MountConfigs.

type MountTarget added in v0.8.0

type MountTarget struct {
	// Profile is the S3 configuration profile name to use. If not supplied, the
	// value of the $AWS_DEFAULT_PROFILE or $AWS_PROFILE environment variables
	// is used, and if those are unset it defaults to "default".
	//
	// We look at number of standard S3 configuration files and environment
	// variables to determine the scheme, domain, region and authentication
	// details to connect to S3 with. All possible sources are checked to fill
	// in any missing values from more preferred sources.
	//
	// The preferred file is ~/.s3cfg, since this is the only config file type
	// that allows the specification of a custom domain. This file is Amazon's
	// s3cmd config file, described here: http://s3tools.org/kb/item14.htm. wr
	// will look at the access_key, secret_key, use_https and host_base options
	// under the section with the given Profile name. If you don't wish to use
	// any other config files or environment variables, you can add the non-
	// standard region option to this file if you need to specify a specific
	// region.
	//
	// The next file checked is the one pointed to by the
	// $AWS_SHARED_CREDENTIALS_FILE environment variable, or ~/.aws/credentials.
	// This file is described here:
	// http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-
	// started.html. wr will look at the aws_access_key_id and
	// aws_secret_access_key options under the section with the given Profile
	// name.
	//
	// wr also checks the file pointed to by the $AWS_CONFIG_FILE environment
	// variable, or ~/.aws/config, described in the previous link. From here the
	// region option is used from the section with the given Profile name. If
	// you don't wish to use a ~/.s3cfg file but do need to specify a custom
	// domain, you can add the non-standard host_base and use_https options to
	// this file instead.
	//
	// As a last resort, ~/.awssecret is checked. This is s3fs's config file,
	// and consists of a single line with your access key and secret key
	// separated by a colon.
	//
	// If set, the environment variables $AWS_ACCESS_KEY_ID,
	// $AWS_SECRET_ACCESS_KEY and $AWS_DEFAULT_REGION override corresponding
	// options found in any config file.
	Profile string `json:",omitempty"`

	// Path (required) is the name of your S3 bucket, optionally followed URL-
	// style (separated with forward slashes) by sub-directory names. The
	// highest performance is gained by specifying the deepest path under your
	// bucket that holds all the files you wish to access.
	Path string

	// Cache is a boolean, which if true, turns on data caching of any data
	// retrieved, or any data you wish to upload.
	Cache bool `json:",omitempty"`

	// CacheDir is the local directory to store cached data. If this parameter
	// is supplied, Cache is forced true and so doesn't need to be provided. If
	// this parameter is not supplied but Cache is true, the directory will be a
	// unique directory in the containing MountConfig's CacheBase, and will get
	// deleted on unmount. If it's a relative path, it will be relative to the
	// CacheBase.
	CacheDir string `json:",omitempty"`

	// Write is a boolean, which if true, makes the mount point writeable. If
	// you don't intend to write to a mount, just leave this parameter out.
	// Because writing currently requires caching, turning this on forces Cache
	// to be considered true.
	Write bool `json:",omitempty"`
}

MountTarget struct is used for setting in a MountConfig to define what you want to access at your Mount.

type Server

type Server struct {
	ServerInfo *ServerInfo

	sync.Mutex
	// contains filtered or unexported fields
}

Server represents the server side of the socket that clients Connect() to.

func Serve

func Serve(config ServerConfig) (s *Server, msg string, err error)

Serve is for use by a server executable and makes it start listening on localhost at the configured port for Connect()ions from clients, and then handles those clients. It returns a *Server that you will typically call Block() on to block until until your executable receives a SIGINT or SIGTERM, or you call Stop(), at which point the queues will be safely closed (you'd probably just exit at that point). The possible errors from Serve() will be related to not being able to start up at the supplied address; errors encountered while dealing with clients are logged but otherwise ignored. If it creates a db file or recreates one from backup, it will say what it did in the returned msg string. It also spawns your runner clients as needed, running them via the configured job scheduler, using the configured shell. It determines the command line to execute for your runner client from the configured RunnerCmd string you supplied.

func (*Server) Block

func (s *Server) Block() (err error)

Block makes you block while the server does the job of serving clients. This will return with an error indicating why it stopped blocking, which will be due to receiving a signal or because you called Stop()

func (*Server) Drain

func (s *Server) Drain() (err error)

Drain will stop the server spawning new runners and stop Reserve*() from returning any more Jobs. Once all current runners exit, we Stop().

func (*Server) GetServerStats

func (s *Server) GetServerStats() *ServerStats

GetServerStats returns basic info about the server along with some simple live stats about what's happening in the server's queues.

func (*Server) HasRunners

func (s *Server) HasRunners() bool

HasRunners tells you if there are currently runner clients in the job scheduler (either running or pending).

func (*Server) Stop

func (s *Server) Stop() (err error)

Stop will cause a graceful shut down of the server.

type ServerConfig

type ServerConfig struct {
	// Port for client-server communication.
	Port string

	// Port for the web interface.
	WebPort string

	// Name of the desired scheduler (eg. "local" or "lsf" or "openstack") that
	// jobs will be submitted to.
	SchedulerName string

	// SchedulerConfig should define the config options needed by the chosen
	// scheduler, eg. scheduler.ConfigLocal{Deployment: "production", Shell:
	// "bash"} if using the local scheduler.
	SchedulerConfig interface{}

	// The command line needed to bring up a jobqueue runner client, which
	// should contain 6 %s parts which will be replaced with the queue name,
	// scheduler group, deployment ip:host address of the server, reservation
	// time out and maximum number of minutes allowed, eg.
	// "my_jobqueue_runner_client --queue %s --group '%s' --deployment %s
	// --server '%s' --reserve_timeout %d --max_mins %d". If you supply an empty
	// string (the default), runner clients will not be spawned; for any work to
	// be done you will have to run your runner client yourself manually.
	RunnerCmd string

	// Absolute path to where the database file should be saved. The database is
	// used to ensure no loss of added commands, to keep a permanent history of
	// all jobs completed, and to keep various stats, amongst other things.
	DBFile string

	// Absolute path to where the database file should be backed up to.
	DBFileBackup string

	// Name of the deployment ("development" or "production"); development
	// databases are deleted and recreated on start up by default.
	Deployment string

	// CIDR is the IP address range of your network. When the server needs to
	// know its own IP address, it uses this CIDR to confirm it got it correct
	// (ie. it picked the correct network interface). You can leave this unset,
	// in which case it will do its best to pick correctly. (This is only a
	// possible issue if you have multiple network interfaces.)
	CIDR string
}

ServerConfig is supplied to Serve() to configure your jobqueue server. All fields are required with no working default unless otherwise noted.

type ServerInfo

type ServerInfo struct {
	Addr       string // ip:port
	Host       string // hostname
	Port       string // port
	WebPort    string // port of the web interface
	PID        int    // process id of server
	Deployment string // deployment the server is running under
	Scheduler  string // the name of the scheduler that jobs are being submitted to
	Mode       string // ServerModeNormal if the server is running normally, or ServerModeDrain if draining
}

ServerInfo holds basic addressing info about the server.

type ServerStats

type ServerStats struct {
	ServerInfo *ServerInfo
	Delayed    int           // how many jobs are waiting following a possibly transient error
	Ready      int           // how many jobs are ready to begin running
	Running    int           // how many jobs are currently running
	Buried     int           // how many jobs are no longer being processed because of seemingly permanent errors
	ETC        time.Duration // how long until the the slowest of the currently running jobs is expected to complete
}

ServerStats holds information about the jobqueue server for sending to clients.

Directories

Path Synopsis
Package scheduler lets the jobqueue server interact with the configured job scheduler (if any) to submit jobqueue runner clients and have them run on a compute cluster (or local machine).
Package scheduler lets the jobqueue server interact with the configured job scheduler (if any) to submit jobqueue runner clients and have them run on a compute cluster (or local machine).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL