package module
v0.3.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2016 License: GPL-3.0 Imports: 4 Imported by: 0



Build Status Godoc

WorkerGo is a worker pool implementation that can be used in any Go program to handle tasks with workers. Workers created by WorkerGo calls the method of the structs sent them as a job. So, any struct with a method needs to be called in parallel can be sent to WorkerGo's job queue.

WorkerGo is heavily influenced by the Marcio Catilho's post here:

I was trying to write a worker pool implementation that I could use in a program with different portions of it will require parallel processing. I found his post while researching, created a new package using portions of his samples in my project. Since the package I created can be used for calling any struct with a method, I thought it would be good to share, so, it can be used in any program that needs a worker pool implementation.


go get


First, a Dispatcher needs to be created.

maxWorkers := 5 // Maximum number of workers
queueBufferSize := 20 // Buffer size for job queue
d := workergo.Dispatcher(maxWorkers, queueBufferSize)

Then, a job can be created and sent to job queue:

work := NewLengthyWork(123, "Hello")
job := workergo.NewJob(workergo.TASK, work, "DoLengthyWork")

The work here should be a struct with DoLengthyWork() method:

type LengthyWork struct {
	number int
	message string

func NewLengthyWork(number int, message string) *LengthyWork {
	return &LengthyWork{
		number: number,
		message: message,

func (w *LengthyWork) DoLengthyWork() {
	fmt.Println("Doing some lengthy work for", w.number, " - message:", w.message)

You can also pass the pointer of your existing sync.waitGroup into dispatcher to wait for workers to finish the jobs. Instead of calling NewDispatcher(), call NewDispatcherWG():

var wg sync.WorkGroup
d := NewDispatcherWG(maxWorkers, queueBufferSize, &wg)

Whenever a new job has been submitted into the job queue, dispatcher will call wg.Add(1), and once a worker finished that job, it will call wg.Done().

Setting a rate limiter

You can also run the dispatcher with a rate limiter value. Simply, call Dispatcher.RunWithLimiter() method and pass a desired time.Duration value instead of calling Dispatcher.Run()

Let's say, you want each job to be executed with 0.5 seconds delays:

d.RunWithLimiter(time.Millisecond * time.Duration(500))

Now, each job that you have submitted will be dispatched with 0.5 seconds delay.

More Documentation




This section is empty.


This section is empty.


This section is empty.


type Dispatcher

type Dispatcher struct {
	JobQueue chan Job
	// contains filtered or unexported fields

Dispatcher is the main code that runs and starts workers. Dispatcher is resposible of managing workers, dispatching them when needed

func NewDispatcher

func NewDispatcher(maxWorkers int, queueBufferSize int) *Dispatcher

NewDispatcher Creates a new dispatcher instance with given maximum number of workers

func NewDispatcherWG

func NewDispatcherWG(maxWorkers int, queueBufferSize int, exWg *sync.WaitGroup) *Dispatcher

NewDispatcherWG Creates a new Dispatcher instance with given maximum number of workers and uses the given wait group to wait for workers to finish their jobs

func (*Dispatcher) GetQueueSize

func (d *Dispatcher) GetQueueSize() int

GetQueueSize returns current size of the job queue

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run Starts the dispatcher

func (*Dispatcher) RunWithLimiter

func (d *Dispatcher) RunWithLimiter(limiterGap time.Duration)

RunWithLimiter Adds delays between jobs

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop Stops dispatcher

func (*Dispatcher) SubmitJob

func (d *Dispatcher) SubmitJob(job Job)

SubmitJob Submits given Job into job queue

type Job

type Job struct {
	ID string

	Type JobType

	// Payload contains the pointer to the structures with methods that can
	// be called by workers
	Payload *interface{}

	// TargetFunc function to call to send payload when Type is MESSAGE.
	// method of the payload to call when Type is TASK
	TargetFunc string

Job is the sturture that will be passed into workers

func NewJob

func NewJob(jobType JobType, payload interface{}, targetFunc string) Job

NewJob Creates a new job instance with given payload

type JobType

type JobType int

JobType has 2 possible values: 1- MESSAGE - Dummy message to be delivered to a given function 2- TASK - A struct with a method to be called

const (
	// MESSAGE Dummy message - interface{}
	MESSAGE JobType = iota
	// TASK A struct with a method to be called

type Worker

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	// contains filtered or unexported fields

Worker Structure of worker

func NewWorker

func NewWorker(workerPool chan chan Job) Worker

NewWorker Creates a new Worker instance

func NewWorkerWG

func NewWorkerWG(workerPool chan chan Job, wg *sync.WaitGroup) Worker

NewWorkerWG Creates a new Worker instance with pointer to a sync.WaitGroup instance to handle wait groups

func (*Worker) Run

func (w *Worker) Run()

Run Starts worker

func (*Worker) Stop

func (w *Worker) Stop() bool

Stop Stops worker instance if it is idle

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL