Published: Feb 28, 2016



Package pqueue provides privitives for processing a simple persistent queue backed by the local filesystem. In many ways, the approach is similar to Maildir. Submitting jobs can be done easily from any language by creating a directory and moving it atomically into a directory.

A pqueue is a directory with several subdirectories:

- New files and directories are created in the `tmp` subdirectory before being moved atomically to elsewhere in the directory structure.

- The `new` subdirectory contains jobs that workers should process (see the `Take` method).

- The `cur` subdirectory contains a subdirectory for each worker (named by its process ID) where jobs are placed while the worker processes them.

- When jobs fail or finish successfully, they are moved to the `failed` or `done` subdirectories, respectively. See the `Fail` and `Finish` methods.

Jobs have state in the form of properties, which are really just files inside the job's directory. The `Get` and `Set` methods read these properties and set them atomically. Properties are set both by the process submitting a job (to specify the work that is to be done) and by workers (to checkpoint its progress so the job can continue if interrupted).



type Job

type Job struct {
	Basename string
	// contains filtered or unexported fields

func (*Job) Fail

func (job *Job) Fail() error

Move the job to the `failed` subdirectory.

func (*Job) Finish

func (job *Job) Finish() error

Move the job ot the `done` subdirectory

func (*Job) Get

func (j *Job) Get(name string) ([]byte, error)

Read a property of the job. This simply reads afile inside the job's directory.

func (*Job) Set

func (j *Job) Set(name string, data []byte) error

Set a property of the job. This simply creates a file inside the job's directory atomically.

func (*Job) Submit

func (job *Job) Submit() error

Move a job (created by `CreateJob` in the `tmp` subdirectory) to the `new` subdirectory so it becomes available to workers.

type Queue

type Queue struct {
	// contains filtered or unexported fields

func OpenQueue

func OpenQueue(dir string) (*Queue, error)

Open a pqueue. The directory `dir` must already exist. The subdirectories (`new`, `cur`, etc.) will be created if they are missing.

func (*Queue) CreateJob

func (q *Queue) CreateJob(prefix string) (*Job, error)

Create a job in the `tmp` directory of the queue. After you finish preparing the job with `Set`, call the `Submit` method to make the job available to workers.

func (*Queue) RescueDeadJobs

func (q *Queue) RescueDeadJobs() error

Go through the `cur` subdirectory, determine which workers are no longer alove, and resubmit the jobs they were processing when they died.

func (*Queue) Take

func (q *Queue) Take() (*Job, error)

Find an available job (in the `new` subdirectory) and move it to the `cur` subdirectory for this worker process. Returns `nil` if there are no available jobs.

