goalpost

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2020 License: MIT Imports: 10 Imported by: 0

README

goalpost

Build Status

Goalpost is a durable, embeddable, worker queue for Golang

It makes use of boltdb/bbolt to provide durability so messages don't get lost when the process get's killed.

Quickstart

Below is a simple use of goalpost:

package main

import (
	"context"
	"fmt"
	"time"
)
import "github.com/chiefnoah/goalpost"

const eventQueueID = "event-queue"

//Define a type that implements the goalpost.Worker interface
type worker struct {
	id string
}

func (w *worker) ID() string {
	return w.id
}

func (w *worker) DoWork(ctx context.Context, j *goalpost.Job) error {
	//do something cool!
	fmt.Printf("Hello, %s\n", j.Data)
	//Something broke, but we should retry it...
	if j.RetryCount < 9 { //try 10 times
		return goalpost.NewRecoverableWorkerError("Something broke, try again")
	}

	//Something *really* broke, don't retry
	//return errors.New("Something broke, badly")

	//Everything's fine, we're done here
	return nil
}

func main() {

	//Init a queue
	q, _ := goalpost.Init(eventQueueID)
	//remember to handle your errors :)

	//Create a worker with id "worker-id"
	w := &worker{
		id: "worker-1",
	}
	//register the worker, so it can do work
	q.RegisterWorker(w)

	//Let's do some work...
	q.PushBytes([]byte("World"))
	//You should see "Hello, World" printed 10 times

	//Make sure your process doesn't exit before your workers can do their work
	time.Sleep(10 * time.Second)

	//Remember to close your queue when you're done using it
	q.Close()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	Status JobStatus
	//Unique identifier for a Job
	ID uint64
	//Data contains the bytes that were pushed using Queue.PushBytes()
	Data []byte
	//RetryCount is the number of times the job has been retried
	//If your work can have a temporary failure state, it is recommended
	//that you check retry count and return a fatal error after a certain
	//number of retries
	RetryCount int
	//Message is primarily used for debugging. It conatains status info
	//about what was last done with the job.
	Message string
}

Job wraps arbitrary data for processing

func DecodeJob

func DecodeJob(b []byte) *Job

DecodeJob decodes a gob encoded byte array into a Job struct and returns a pointer to it

func (*Job) Bytes

func (j *Job) Bytes() []byte

Bytes returns a gob encoded byte array representation of *j

type JobStatus

type JobStatus int

JobStatus is a enumerated int representing the processing status of a Job

const (
	Ack JobStatus = iota + 1
	Uack
	Nack
	Failed
)

JobStatus types

type Queue

type Queue struct {
	//ID is a unique identifier for a Queue
	ID string

	//PollRate the duration to Sleep each worker before checking the queue for jobs again
	//queue for jobs again.
	//Default: 500 milliseconds
	PollRate time.Duration
	// contains filtered or unexported fields
}

Queue represents a queue

func Init

func Init(filepath string) (*Queue, error)

Init creates a connection to the internal database and initializes the Queue type filepath must be a valid path to a file. It cannot be shared between instances of a Queue. If the file cannot be opened r/w, an error is returned.

func (*Queue) Close

func (q *Queue) Close() error

Close attempts to gracefull shutdown all workers in a queue and shutdown the db connection

func (*Queue) GetJobByID

func (q *Queue) GetJobByID(id uint64) (*Job, error)

GetJobByID returns a pointer to a Job based on the primary key identifier id It first checks active jobs, if it doesn't find the bucket for active jobs it searches in the completed jobs bucket.

func (*Queue) PushBytes

func (q *Queue) PushBytes(d []byte) (uint64, error)

PushBytes wraps arbitrary binary data in a job and pushes it onto the queue

func (*Queue) PushJob

func (q *Queue) PushJob(j *Job) (uint64, error)

PushJob pushes a job to the queue and notifies workers Job.ID is always overwritten

func (*Queue) RegisterWorker

func (q *Queue) RegisterWorker(w Worker)

RegisterWorker registers a Worker to handle queued Jobs

type RecoverableWorkerError

type RecoverableWorkerError struct {
	// contains filtered or unexported fields
}

RecoverableWorkerError defines an error that a worker DoWork func can return that indicates the message should be retried

func NewRecoverableWorkerError

func NewRecoverableWorkerError(message string) RecoverableWorkerError

NewRecoverableWorkerError creates a new RecoverableWorkerError

func (RecoverableWorkerError) Error

func (e RecoverableWorkerError) Error() string

type Worker

type Worker interface {
	//DoWork is called when a worker picks up a job from the queue
	//Context can be used for cancelling jobs early when Close
	//is called on the Queue
	DoWork(context.Context, *Job) error
	//ID is a semi-unique identifier for a worker
	//it is primarily used for logging purposes
	ID() string
}

Worker represents a worker for handling Jobs

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL