Version: v0.0.0-...-2ba9e61 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2016 License: BSD-3-Clause Imports: 7 Imported by: 2



Package pqueue provides privitives for processing a simple persistent queue backed by the local filesystem. In many ways, the approach is similar to Maildir. Submitting jobs can be done easily from any language by creating a directory and moving it atomically into a directory.

A pqueue is a directory with several subdirectories:

- New files and directories are created in the `tmp` subdirectory before being moved atomically to elsewhere in the directory structure.

- The `new` subdirectory contains jobs that workers should process (see the `Take` method).

- The `cur` subdirectory contains a subdirectory for each worker (named by its process ID) where jobs are placed while the worker processes them.

- When jobs fail or finish successfully, they are moved to the `failed` or `done` subdirectories, respectively. See the `Fail` and `Finish` methods.

Jobs have state in the form of properties, which are really just files inside the job's directory. The `Get` and `Set` methods read these properties and set them atomically. Properties are set both by the process submitting a job (to specify the work that is to be done) and by workers (to checkpoint its progress so the job can continue if interrupted).



This section is empty.


This section is empty.


This section is empty.


type Job

type Job struct {
	Basename string
	// contains filtered or unexported fields

func (*Job) Fail

func (job *Job) Fail() error

Move the job to the `failed` subdirectory.

func (*Job) Finish

func (job *Job) Finish() error

Move the job ot the `done` subdirectory

func (*Job) Get

func (j *Job) Get(name string) ([]byte, error)

Read a property of the job. This simply reads afile inside the job's directory.

func (*Job) Set

func (j *Job) Set(name string, data []byte) error

Set a property of the job. This simply creates a file inside the job's directory atomically.

func (*Job) Submit

func (job *Job) Submit() error

Move a job (created by `CreateJob` in the `tmp` subdirectory) to the `new` subdirectory so it becomes available to workers.

type Queue

type Queue struct {
	// contains filtered or unexported fields

func OpenQueue

func OpenQueue(dir string) (*Queue, error)

Open a pqueue. The directory `dir` must already exist. The subdirectories (`new`, `cur`, etc.) will be created if they are missing.

func (*Queue) CreateJob

func (q *Queue) CreateJob(prefix string) (*Job, error)

Create a job in the `tmp` directory of the queue. After you finish preparing the job with `Set`, call the `Submit` method to make the job available to workers.

func (*Queue) RescueDeadJobs

func (q *Queue) RescueDeadJobs() error

Go through the `cur` subdirectory, determine which workers are no longer alove, and resubmit the jobs they were processing when they died.

func (*Queue) Take

func (q *Queue) Take() (*Job, error)

Find an available job (in the `new` subdirectory) and move it to the `cur` subdirectory for this worker process. Returns `nil` if there are no available jobs.

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL