Documentation ¶
Index ¶
Constants ¶
const ( Lowest = Priority(0) Background = Priority(math.MaxUint8 / 4) Default = Priority(math.MaxUint8 / 2) High = Priority(3 * math.MaxUint8 / 4) Maximum = Priority(math.MaxUint8) )
Named priority levels
Variables ¶
var NilID = ID(ulid.Nil)
NilID is the special ID that represents a non-existent job
var NilTaskID = TaskID(ulid.Nil)
NilTaskID is the special ID that represents a non-existent task
Functions ¶
This section is empty.
Types ¶
type CompilationError ¶
type CompilationError struct {
Err error
}
CompilationError wraps a compilation error.
func (CompilationError) Error ¶
func (e CompilationError) Error() string
Error returns the wrapped error.
type Info ¶
type Info interface { Value() int64 // the value for this task State() State // the state of this task AppName() string // the application name provided by the client to which the task was assigned, or the empty string if there is no such client Hostname() string // the hostname provided by the client to which the task was assigned, or the empty string if there is no such client Start() time.Time // the time the task was assigned to a client, or the zero time if there is no such client Deadline() time.Time // the time at which this task will go stale, or the zero time if the task has not been assigned to a client Failures() int // the number of times this task has failed }
Info describes a task.
type Manifest ¶
type Manifest struct {
// contains filtered or unexported fields
}
Manifest represents a collection of files and directories on a pcas fs server. Paths in a valid Manifest must be unique. A valid Manifest contains at least one file.
func NewManifest ¶
NewManifest constructs a new Manifest containing the given files and directories.
func (Manifest) ContainsFile ¶
ContainsFile returns true if and only if M contains the file with the given path.
func (Manifest) Directories ¶
Directories returns the directories in M.
func (Manifest) IsEquivalentTo ¶
IsEquivalentTo returns true if and only if the paths and hashes in M and N agree after reordering.
type Metadata ¶
type Metadata struct { AppName string // the name of the worker application Hostname string // the hostname of the machine on which the worker is running }
Metadata holds metadata about a worker
type Priority ¶
type Priority uint8
Priority represents the priority of a job. Higher numbers represent higher priority.
type Reader ¶
type Reader interface { io.ReadCloser // SetReadDeadline sets the deadline for future Read calls and any // currently-blocked Read call. A zero value for t means Read will not // time out. SetReadDeadline(t time.Time) error // SetDeadline is an alias for SetReadDeadline. SetDeadline(t time.Time) error }
Reader is the interface describing a reader.
type Specification ¶
type Specification struct { // Name is the name of the job Name string // Manifest records the files and directories created on job submission Manifest Manifest // Script is the file in Manifest that is run for each task in the job Script string // WorkingDir is a directory in Manifest, either explicitly or implicitly (as a subpath) WorkingDir string // Metadata records user-provided metadata Metadata map[string]string // Range is the range of tasks that make up the job Range irange.Range // Priority is the priority level of the job Priority Priority // Lifetime is the maximum lifetime of a task Lifetime time.Duration // MaxRetries is the maximum number of times that a task is retried on error MaxRetries int // MaxConcurrency is the maximum number of simultaneously-active tasks for this job MaxConcurrency int }
Specification represents a job as submitted by a client.
func (*Specification) IsEquivalentTo ¶
func (S *Specification) IsEquivalentTo(T *Specification) (ok bool, err error)
IsEquivalentTo returns true if and only if S and T are equivalent.
func (*Specification) Validate ¶
func (S *Specification) Validate() error
Validate validates the specification, returning an error if there is a problem.
type Status ¶
type Status interface { Pending() irange.Range // The pending tasks Active() irange.Range // The active tasks Succeeded() irange.Range // The tasks that succeeded Failed() irange.Range // The tasks that failed }
Status describes the status of a job.
type Storage ¶
type Storage interface { SubmitterStorage WorkerStorage }
Storage is the interface satisfied by job storage systems
type SubmitterStorage ¶
type SubmitterStorage interface { // Submit submits a job, returning an ID which identifies that job. The job will become active when all files in the specified manifest have been uploaded. The storage system may discard a job if all such files are not uploaded after a certain length of time. Submit(context.Context, *Specification) (ID, error) // Upload uploads a file with the given path and contents. The path should be present in the manifest for the job with the given ID. Upload(ctx context.Context, id ID, path string, contents io.Reader) error // Status returns the status of the job with the given ID, or the error errors.ErrUnknownJob if there is no such active job. Status(context.Context, ID) (Status, error) // Info returns information about the task with the given value for the job with the given ID, or the error errors.ErrUnknownJob if there is no such active job. Info(context.Context, ID, int64) (Info, error) // List returns a slice containing the IDs of all active jobs. List(context.Context) ([]ID, error) // ListWithPriority returns a slice containing the IDs of all active jobs with the given priority level. ListWithPriority(context.Context, Priority) ([]ID, error) // Delete deletes the job with given ID, and returns the error errors.ErrUnknownJob if there is no such job. Delete(context.Context, ID) error // Describe returns the submission data for the job with the given ID, or the error errors.ErrUnknownJob if there is no such job. Describe(context.Context, ID) (*Specification, error) }
SubmitterStorage is the part of the interface satisfied by a job storage system that is needed by a submitter.
type Task ¶
type Task interface { // Name is the name of the job to which this task belongs. Name() string // JobID is the ID of the job. JobID() ID // Manifest records the files and directories uploaded with the job. Manifest() Manifest // Script is the file in Manifest that is run for each task in the job. Script() string // WorkingDir is a directory in Manifest, either explicitly or implicitly (as a subpath). WorkingDir() string // Metadata records user-provided metadata. Metadata() map[string]string // Priority is the priority level of the job. Priority() Priority // ID is the ID of the task. ID() TaskID // Value is the value of the task. Value() int64 // Deadline is the time at which the task will go stale. Deadline() time.Time // Failures is the number of times that this task has failed. Failures() int }
Task represents a task
type UserError ¶
type UserError struct {
Err error
}
UserError wraps an error generated by a task script.
type Worker ¶
type Worker interface { log.Logable metrics.Metricsable // Run starts the worker. It will continue processing tasks until either Stop is called, in which case it finishes processing the current task and then exits, or the context ctx is cancelled, in which case it marks the current task as having errored and exits immediately. Run(context.Context) error // Stop causes the worker to exit once it finishes processing the current task. Stop() }
Worker is an interface satisfied by a jobdb worker
func NewWorker ¶
func NewWorker(c WorkerStorage, S fs.Interface, cfg *WorkerConfig) Worker
NewWorker returns a new worker with underlying job storage system c, local file storage S, and configuration cfg.
type WorkerConfig ¶
type WorkerConfig struct { Timeout time.Duration // the maximum length of time to wait for a new task before exiting AppName string // the application name by which the worker identifies itself to the job storage system Hostname string // the hostname by which the worker identifies itself to the job storage system }
WorkerConfig holds configuration data for a jobdb worker
type WorkerStorage ¶
type WorkerStorage interface { // Next returns a new task, if one is available. It provides the given metadata to the job server. Next(context.Context, Metadata) (Task, error) // Download downloads a file with the given path. This should be present in the manifest for the job with the given ID. Download(ctx context.Context, id ID, path string) (Reader, error) // Success indicates that the given task has succeeded. Success(context.Context, Task) error // Error indicates that the given task has failed and should be retried. Error(context.Context, Task) error // Requeue indicates that the given task should be requeued, without incrementing the number of failures. Requeue(context.Context, Task) error // Fatal indicates that the given task has failed and should not be requeued. Fatal(context.Context, Task) error // Heartbeat sends a message to the job server indicating that the worker is still alive and is processing the given task. A nil Task indicates that no task is being processed. Heartbeat returns true if task processing should continue and false if the current task should be cancelled. Heartbeat(context.Context, Task) (bool, error) }
WorkerStorage is the part of the interface satisfied by a job storage system that is needed by a worker.