README

WorkerGo

Build Status Godoc

WorkerGo is a worker pool implementation that can be used in any Go program to handle tasks with workers. Workers created by WorkerGo calls the method of the structs sent them as a job. So, any struct with a method needs to be called in parallel can be sent to WorkerGo's job queue.

WorkerGo is heavily influenced by the Marcio Catilho's post here: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

I was trying to write a worker pool implementation that I could use in a program with different portions of it will require parallel processing. I found his post while researching, created a new package using portions of his samples in my project. Since the package I created can be used for calling any struct with a method, I thought it would be good to share, so, it can be used in any program that needs a worker pool implementation.

Installation

go get github.com/yasarix/workergo

Usage

First, a Dispatcher needs to be created.

maxWorkers := 5 // Maximum number of workers
queueBufferSize := 20 // Buffer size for job queue
d := workergo.Dispatcher(maxWorkers, queueBufferSize)
d.Run()

Then, a job can be created and sent to job queue:

work := NewLengthyWork(123, "Hello")
job := workergo.NewJob(workergo.TASK, work, "DoLengthyWork")
d.SubmitJob(job)

The work here should be a struct with DoLengthyWork() method:

type LengthyWork struct {
	number int
	message string
}

func NewLengthyWork(number int, message string) *LengthyWork {
	return &LengthyWork{
		number: number,
		message: message,
	}
}

func (w *LengthyWork) DoLengthyWork() {
	fmt.Println("Doing some lengthy work for", w.number, " - message:", w.message)
}

You can also pass the pointer of your existing sync.waitGroup into dispatcher to wait for workers to finish the jobs. Instead of calling NewDispatcher(), call NewDispatcherWG():

var wg sync.WorkGroup
d := NewDispatcherWG(maxWorkers, queueBufferSize, &wg)

Whenever a new job has been submitted into the job queue, dispatcher will call wg.Add(1), and once a worker finished that job, it will call wg.Done().

Setting a rate limiter

You can also run the dispatcher with a rate limiter value. Simply, call Dispatcher.RunWithLimiter() method and pass a desired time.Duration value instead of calling Dispatcher.Run()

Let's say, you want each job to be executed with 0.5 seconds delays:

d.RunWithLimiter(time.Millisecond * time.Duration(500))

Now, each job that you have submitted will be dispatched with 0.5 seconds delay.

More Documentation

https://godoc.org/github.com/yasarix/workergo

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	JobQueue chan Job
	// contains filtered or unexported fields
}

Dispatcher is the main code that runs and starts workers. Dispatcher is resposible of managing workers, dispatching them when needed

func NewDispatcher

func NewDispatcher(maxWorkers int, queueBufferSize int) *Dispatcher

NewDispatcher Creates a new dispatcher instance with given maximum number of workers

func NewDispatcherWG

func NewDispatcherWG(maxWorkers int, queueBufferSize int, exWg *sync.WaitGroup) *Dispatcher

NewDispatcherWG Creates a new Dispatcher instance with given maximum number of workers and uses the given wait group to wait for workers to finish their jobs

func (*Dispatcher) GetQueueSize

func (d *Dispatcher) GetQueueSize() int

GetQueueSize returns current size of the job queue

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run Starts the dispatcher

func (*Dispatcher) RunWithLimiter

func (d *Dispatcher) RunWithLimiter(limiterGap time.Duration)

RunWithLimiter Adds delays between jobs

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop Stops dispatcher

func (*Dispatcher) SubmitJob

func (d *Dispatcher) SubmitJob(job Job)

SubmitJob Submits given Job into job queue

type Job

type Job struct {
	ID string

	Type JobType

	// Payload contains the pointer to the structures with methods that can
	// be called by workers
	Payload *interface{}

	// TargetFunc function to call to send payload when Type is MESSAGE.
	// method of the payload to call when Type is TASK
	TargetFunc string
}

Job is the sturture that will be passed into workers

func NewJob

func NewJob(jobType JobType, payload interface{}, targetFunc string) Job

NewJob Creates a new job instance with given payload

type JobType

type JobType int

JobType has 2 possible values: 1- MESSAGE - Dummy message to be delivered to a given function 2- TASK - A struct with a method to be called

const (
	// MESSAGE Dummy message - interface{}
	MESSAGE JobType = iota
	// TASK A struct with a method to be called
	TASK
)

type Worker

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	// contains filtered or unexported fields
}

Worker Structure of worker

func NewWorker

func NewWorker(workerPool chan chan Job) Worker

NewWorker Creates a new Worker instance

func NewWorkerWG

func NewWorkerWG(workerPool chan chan Job, wg *sync.WaitGroup) Worker

NewWorkerWG Creates a new Worker instance with pointer to a sync.WaitGroup instance to handle wait groups

func (*Worker) Run

func (w *Worker) Run()

Run Starts worker

func (*Worker) Stop

func (w *Worker) Stop() bool

Stop Stops worker instance if it is idle