jobserver

package
v0.0.0-...-6f34b33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package jobserver provides a CBOR-RPC interface compatible with the Python coordinate module. This defines what is served from github.com/diffeo/go-coordinate/cmd/coordinated, and probably should be merged in some form with that package.

The Python coordinated operates with an extremely irregular RPC-like interface. Many methods, but not all, take a dictionary of additional options. Many methods, but not all, return an in-band string error message, plus the underlying RPC layer allows an exception string to be returned. Some methods specifically require a Python tuple return, even though the only way to achieve this across the wire is through an extension tag in CBOR.

As such, JobServer provides an interface that can be made compatible with the Python coordinate library, but it is unlikely to be useful to native Go code or other client interfaces.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ControlWorkSpecOptions

type ControlWorkSpecOptions struct {
	// Continuous can change the "continuous" flag on a work spec.
	// If a work spec declares itself as "continuous", the
	// scheduler can generate empty work units for it if there is
	// no other work to do.  This flag can pause and resume the
	// generation of these artificial work units for work specs
	// that declare themselves continuous.  Trying to set this
	// flag for a work spec that does not declare itself
	// continuous is an error.
	Continuous bool

	// Status indicates whether or not the work spec is paused.
	Status WorkSpecStatus

	// Weight controls the relative scheduling weight of this work
	// spec.
	Weight int

	// Interval specifies the minimum time, in seconds, between
	// generating continuous work units.
	Interval float64

	// MaxRunning specifies the maximum number of work units that
	// can concurrently run for this work spec, across the entire
	// system.
	MaxRunning int `mapstructure:"max_running"`
}

ControlWorkSpecOptions defines the list of actions ControlWorkSpec can take.

type DelWorkUnitsOptions

type DelWorkUnitsOptions struct {
	// All, if set to true, directs DelWorkUnits to delete
	// all of the work units in its work spec.  If this is
	// provided, all other options are ignored.
	All bool

	// WorkUnitKeys, if provided, is a list of specific work unit
	// keys to delete.  If this is given and All is false, then
	// these specific work units are deleted; if State is also
	// given, then each work unit must be in that state to be
	// deleted.
	WorkUnitKeys []string `mapstructure:"work_unit_keys"`

	// State, if provided, is one of the external Coordinate work
	// unit statuses, and all work units in this state are deleted.
	// If WorkUnitKeys is also provided then only those work units
	// will be deleted, and then only if in this state.
	State WorkUnitStatus
}

DelWorkUnitsOptions specifies the options for DelWorkUnits. The first of All, WorkUnitKeys, or State given defines the operation to perform. If none of these are given, the zero value for this structure tells DelWorkUnits to do nothing.

type GetWorkOptions

type GetWorkOptions struct {
	// AvailableGb contains the amount of memory the worker
	// advertises.  In classic rejester, this limited work specs
	// to needing not more than this min_gb; Python Coordinate
	// ignores this constraint.  If zero, do not enforce this
	// constraint.
	AvailableGb float64 `mapstructure:"available_gb"`

	// LeaseTime specifies the number of seconds to complete the
	// work.  If zero, use a default value of 5 minutes.  Cannot
	// be less than 1 second or more than 1 day.
	LeaseTime int `mapstructure:"lease_time"`

	// MaxJobs indicates the number of jobs requested.  If zero,
	// use 1 instead.  The response to GetWork() is different if
	// this is 1 vs. a greater number.  Fewer jobs may be returned
	// if fewer are available.  All returned work units belong to
	// the same work spec.
	MaxJobs int `mapstructure:"max_jobs"`

	// WorkSpecNames gives a list of work specs to consider.  If
	// not nil, no work specs not in this list will be considered.
	// The list may be further filtered by resource constraints
	// and work unit availability.
	WorkSpecNames []string `mapstructure:"work_spec_names"`
}

GetWorkOptions contains mapped options for the GetWork() call.

type GetWorkUnitsOptions

type GetWorkUnitsOptions struct {
	// WorkUnitKeys contains a list of work unit keys to retrieve.
	// If this option is supplied, all other options are ignored.
	WorkUnitKeys []string `mapstructure:"work_unit_keys"`

	// State provides a list of states to query on.  If this is
	// provided then only work units in one of the specified states
	// will be returned.
	State []WorkUnitStatus

	// Start gives a starting point to iterate through the list of
	// work units.  It is the name of the last work unit returned
	// in the previous call to GetWorkUnits().  No work unit whose
	// name is lexicographically less than this will be returned.
	Start string

	// Limit specifies the maximum number of work units to return.
	// Defaults to 1000.
	Limit int
}

GetWorkUnitsOptions contains unmarshaled options for GetWorkUnits().

type JobServer

type JobServer struct {
	// Namespace is the Coordinate Namespace interface this works
	// against.
	Namespace coordinate.Namespace

	// GlobalConfig is the configuration that is returned by the
	// GetConfig RPC call.
	GlobalConfig map[string]interface{}

	// Clock is the system time source.  This should agree with the
	// time source for the Coordinate backend, if it was created with
	// an alternate time source.
	Clock clock.Clock
	// contains filtered or unexported fields
}

JobServer is a network-accessible interface to Coordinate. Its methods are the Python coordinated RPC methods, with more normalized parameters and Go-style CamelCase names.

func (*JobServer) AddWorkUnits

func (jobs *JobServer) AddWorkUnits(workSpecName string, workUnitKvp []interface{}) (bool, string, error)

AddWorkUnits adds any number of work units to a work spec. Each oy the work units is a cborrpc.PythonTuple or slice containing a string with the work unit key, a dictionary with the work unit data, and an optional dictionary with additional metadata.

func (*JobServer) Archive

func (jobs *JobServer) Archive(options map[string]interface{}) (interface{}, error)

Archive causes the system to clean up completed work units. The system will keep up to a pre-specified limit of work units that have completed successfully, and will also remove work units that have completed successfully but are beyond a pre-specified age. The work units are deleted as in DelWorkUnits(). The return value is always nil.

TODO(dmaze): Actually implement this. This probably involves triggering a background task the system would need to do on its own in any case. The observable effects of this are minimal, especially in a default/test configuration.

func (*JobServer) Clear

func (jobs *JobServer) Clear() (count int, err error)

Clear deletes every work spec. Call this with caution. Returns the number of work specs deleted.

func (*JobServer) ControlWorkSpec

func (jobs *JobServer) ControlWorkSpec(workSpecName string, options map[string]interface{}) (bool, string, error)

ControlWorkSpec makes changes to a work spec that are not directly reflected in the work spec definition. This allows work specs to be paused or to stop generating new continuous jobs. ControlWorkSpecOptions has a complete listing of what can be done.

func (*JobServer) CountWorkUnits

func (jobs *JobServer) CountWorkUnits(workSpecName string) (map[WorkUnitStatus]int, string, error)

CountWorkUnits returns the number of work units in each status for a given work spec.

func (*JobServer) DelWorkSpec

func (jobs *JobServer) DelWorkSpec(workSpecName string) (bool, string, error)

DelWorkSpec deletes a single work spec.

func (*JobServer) DelWorkUnits

func (jobs *JobServer) DelWorkUnits(workSpecName string, options map[string]interface{}) (int, string, error)

DelWorkUnits deletes work units from an existing work spec. If options is empty, this does nothing. On success, returns the number of work units deleted.

func (*JobServer) DeleteNamespace

func (jobs *JobServer) DeleteNamespace(prefix interface{}) (int, error)

DeleteNamespace deletes all locks in the lock system whose first key part is prefix.

func (*JobServer) GetChildWorkUnits

func (jobs *JobServer) GetChildWorkUnits(workerID string) (map[string][]map[string]interface{}, string, error)

GetChildWorkUnits collects a list of work units being performed by immediate children of workerID. The return value is a map of child worker IDs to lists of work unit maps. Each of the individual work unit maps in turn has keys "work_spec_name", "work_unit_key", "work_unit_data", "worker_id", and "expires", with the obvious meanings.

Note that there is an important layer of indirection here: the returned metadata for work units reflects those work units' active attempts, which may not be the attempts the worker thinks they are doing. That is, this will report (by a different "worker_id" key) that a worker is working on a work unit for which some other worker currently owns the active attempt.

func (*JobServer) GetConfig

func (jobs *JobServer) GetConfig() (map[string]interface{}, string, error)

GetConfig returns a global configuration map. Sadly, the RPC interface has no way to set a global configuration map and this implementation doesn't have one on its own, so this method fails.

func (*JobServer) GetWork

func (jobs *JobServer) GetWork(workerID string, options map[string]interface{}) (interface{}, string, error)

GetWork requests one or more work units to perform. The work unit attempts are associated with workerID, which need not have been previously registered. If there is no work to do, may return neither work nor an error.

Each work unit is returned as a cborrpc.PythonTuple holding the work spec name, work unit key as a byte slice, and work unit data dictionary. If options does not contain "max_jobs" or if that value is 1, returns a tuple or nil, otherwise returns a slice of tuples (maybe 1 or none).

func (*JobServer) GetWorkSpec

func (jobs *JobServer) GetWorkSpec(workSpecName string) (data map[string]interface{}, err error)

GetWorkSpec retrieves the definition of a work spec. If the named work spec does not exist, returns nil (not an error).

func (*JobServer) GetWorkSpecMeta

func (jobs *JobServer) GetWorkSpecMeta(workSpecName string) (result map[string]interface{}, _ string, err error)

GetWorkSpecMeta returns a set of control options for a given work spec. The returned map has the full set of keys that ControlWorkSpec() will accept.

func (*JobServer) GetWorkUnitStatus

func (jobs *JobServer) GetWorkUnitStatus(workSpecName string, workUnitKeys []string) ([]map[string]interface{}, string, error)

GetWorkUnitStatus returns a summary status of zero or more work units in a single work spec. On success, the returned list of dictionaries corresponds one-to-one with workUnitKeys. If there is no such work unit, nil is in the list; otherwise each map contains keys "status", "expiration", "worker_id", and "traceback".

func (*JobServer) GetWorkUnits

func (jobs *JobServer) GetWorkUnits(workSpecName string, options map[string]interface{}) ([]interface{}, string, error)

GetWorkUnits retrieves the keys and data dictionaries for some number of work units. If options contains "work_unit_keys", those specific work units are retrieved; otherwise the work units are based on which of GetWorkUnitsOptions are present.

On success, the return value is a slice of cborrpc.PythonTuple objects where each contains the work unit key as a byte slice and the data dictionary.

func (*JobServer) GetWorkerInfo

func (jobs *JobServer) GetWorkerInfo(workerID string) (map[string]interface{}, string, error)

GetWorkerInfo returns the data dictionary sent with the last WorkerHeartbeat call for this worker, plus the key "age_seconds" as the time since that last heartbeat.

func (*JobServer) ListWorkSpecs

func (jobs *JobServer) ListWorkSpecs(options map[string]interface{}) (result []map[string]interface{}, next string, err error)

ListWorkSpecs retrieves a list of all work specs in the server namespace. It does not expect to return an error.

func (*JobServer) ListWorkerModes

func (jobs *JobServer) ListWorkerModes() (map[string]string, error)

ListWorkerModes returns a map where the keys are worker IDs and the values are the mode value passed to the most recent call to WorkerHeartbeat for that worker..

func (*JobServer) Lock

func (jobs *JobServer) Lock(lockerID string, timeout int, keys interface{}) (ok bool, msg string, err error)

Lock claims a lock on a set of hierarchical keys. Locks are global to this specific instance of the job server, and will not survive a restart. The key space is hierarchical, so the keys parameter is a list of key paths, e.g. [][]interface{}{{"usr", "bin"}, {"usr", "share"}}. If the timeout is 0, a (60-second) default is used instead.

This tries to lock all of the keys. Returns true if all could be locked, or false if nothing is locked at all.

func (*JobServer) Locksome

func (jobs *JobServer) Locksome(lockerID string, timeout int, keys interface{}) (result [][]interface{}, msg string, err error)

Locksome claims locks on as many of a set of hierarchical keys as possible. The actual semantics of locking are the same as Lock(). The return value is an ordered list of key paths, in the same order as the keys parameter, where each item is either the matching key path if locked or nil if unsuccessful.

func (*JobServer) ModeCounts

func (jobs *JobServer) ModeCounts() (map[string]int, error)

ModeCounts counts the number of workers in each reported mode. It is the same as aggregating the result of ListWorkerModes by their mode value, and producing a map from mode value to worker count.

func (*JobServer) Now

func (jobs *JobServer) Now() (int64, error)

Now returns the current Unix time as seen by the server. It can be used as a simple aliveness test for the server.

func (*JobServer) PrioritizeWorkUnits

func (jobs *JobServer) PrioritizeWorkUnits(workSpecName string, options map[string]interface{}) (bool, string, error)

PrioritizeWorkUnits changes the priorities of some number of work units. The actual work units are in options["work_unit_keys"]. A higher priority results in the work units being scheduled sooner.

func (*JobServer) Readlock

func (jobs *JobServer) Readlock(keys interface{}) (result []interface{}, err error)

Readlock determines the current owner, if any, of a list of key paths. The return value is a list of either string owner names or nil.

func (*JobServer) Renew

func (jobs *JobServer) Renew(lockerID string, timeout int, keys interface{}) (ok bool, msg string, err error)

Renew attempts to renew all of a list of key paths. If any are not locked by the given locker ID, fails; otherwise extends their locks by the new timeout.

func (*JobServer) SetWorkSpec

func (jobs *JobServer) SetWorkSpec(specMap map[string]interface{}) (bool, string, error)

SetWorkSpec creates or updates a work spec from a map. The map must contain a key "name" with a string value which indicates which work spec is being modified. Other keys may have meaning and specific format requirements as well.

func (*JobServer) Unlock

func (jobs *JobServer) Unlock(lockerID string, keys interface{}) (ok bool, msg string, err error)

Unlock releases locks on all of a set of key paths. The nodes must be currently locked by lockerID.

func (*JobServer) UpdateWorkUnit

func (jobs *JobServer) UpdateWorkUnit(
	workSpecName string,
	workUnitKey string,
	options map[string]interface{},
) (bool, string, error)

UpdateWorkUnit causes some state change in a work unit. If the work unit is pending, this is the principal interface to complete or renew it; if it is already complete this can cause it to be retried.

func (*JobServer) WorkerHeartbeat

func (jobs *JobServer) WorkerHeartbeat(
	workerID string,
	mode string,
	expireSeconds float64,
	data map[string]interface{},
	parent string,
) (bool, string, error)

WorkerHeartbeat "checks in" with the job server. It creates a new worker record, if required, and marks the worker as active. It will remain active until the next call to WorkerUnregister or until expireSeconds have passed.

func (*JobServer) WorkerStats

func (jobs *JobServer) WorkerStats() (map[string]int, error)

WorkerStats retrieves basic statistics on the workers in the system. The returned map has keys "num_workers", the total number of active workers; "num_children", the total number of workers with parents; and "num_expirable", the same as "num_workers".

func (*JobServer) WorkerUnregister

func (jobs *JobServer) WorkerUnregister(workerID string) (bool, string, error)

WorkerUnregister deactivates a specific worker.

type ListWorkSpecsOptions

type ListWorkSpecsOptions struct {
	// Start indicates the name of the first work spec to emit.
	// Empty string means start at the beginning.
	Start string

	// Limit indicates the maximum number of work specs to emit.
	Limit int
}

ListWorkSpecsOptions contains options to the ListWorkSpecs call.

type PrioritizeWorkUnitsOptions

type PrioritizeWorkUnitsOptions struct {
	// WorkUnitKeys gives the names of the work units to reprioritize.
	// If not present, does nothing.
	WorkUnitKeys []string `mapstructure:"work_unit_keys"`

	// Priority sets an absolute priority.  If a NaN value, make a
	// change specified by Adjustment instead.
	Priority float64

	// Adjustment is added to the priorities of each of the work
	// units, if Priority is NaN.  If also a NaN value, do nothing.
	Adjustment float64
}

PrioritizeWorkUnitsOptions specifies which work units PrioritizeWorkUnits should adjust and how.

type UpdateWorkUnitOptions

type UpdateWorkUnitOptions struct {
	// LeaseTime specifies the number of additional seconds required
	// to complete the work unit.
	LeaseTime int `mapstructure:"lease_time"`

	// Status specifies the new status of the work unit.
	// Depending on the current status of the work unit, this may
	// start a new attempt or complete an existing attempt.  If
	// zero, make no change in the work unit status, only update
	// the data dictionary and extend an existing attempt's
	// deadline.
	Status WorkUnitStatus

	// Data, if provided, specifies the new data dictionary for
	// the work unit.
	Data map[string]interface{}

	// WorkerID identifies the worker making the request.
	WorkerID string `mapstructure:"worker_id"`
}

UpdateWorkUnitOptions holds the possible options to the UpdateWorkUnit call.

func (UpdateWorkUnitOptions) LeaseDuration

func (opts UpdateWorkUnitOptions) LeaseDuration() time.Duration

LeaseDuration converts the requested LeaseTime to a duration.

type WorkSpecStatus

type WorkSpecStatus int

WorkSpecStatus is a value passed as a "status" option to ControlWorkSpec().

const (
	// Runnable indicates a work spec is not paused.
	Runnable WorkSpecStatus = 1

	// Paused indicates a work spec is paused.
	Paused WorkSpecStatus = 2
)

type WorkUnitStatus

type WorkUnitStatus int

WorkUnitStatus is one of the possible work unit states supported by the Python Coordinate server.

const (
	// Available work units can be returned by the get_work call.
	Available WorkUnitStatus = 1

	// Pending work units have been returned by the get_work call,
	// and have not yet been completed.
	Pending WorkUnitStatus = 3

	// Finished work units have completed successfully.
	Finished WorkUnitStatus = 4

	// Failed work units have completed unsuccessfully.
	Failed WorkUnitStatus = 5
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL