gdw

package module
v0.0.0-...-a33fd02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2016 License: MIT Imports: 6 Imported by: 1

README

go-do-work

GoDoc

gdw makes use of eapache's delightfully clever channels package in order to provide dynamically resizable pools of goroutines which can queue an infinite number of jobs.

Installation

go get github.com/c3mb0/go-do-work

Types of Pools

There are currently 2 types of pools in gdw: Worker and Rebel. Their internal mechanics of operation are the same except for jobs queued in a WorkerPool being waitable. This allows for separation of concerns, namely for jobs whose results and/or execution are of interest and jobs which are of fire-and-forget nature. You can safely mix them without affecting one and other.

Usage

Any object that implements the Job interface is eligible to be queued and executed by gdw. There is, however, a big difference between queuing an object and an object pointer.

Consider the following example:

type adder struct {
	count int
}

func (a adder) DoWork() {
	a.count++
	fmt.Print(a.count, " ")
}

func main() {
	test := adder{count: 0}
	pool := gdw.NewWorkerPool(2)
	defer pool.Close()
	pool.Add(test, 5)
	pool.Wait()
	fmt.Println()
}

Here we create a new WorkerPool with a pool size of 2. We then queue 5 test jobs. The resulting output is:

1 1 1 1 1

Let's queue an object pointer instead of an object:

type adder struct {
	count int
}

func (a *adder) DoWork() {
	a.count++
	fmt.Print(a.count, " ")
}

func main() {
	test := &adder{count: 0}
	pool := gdw.NewWorkerPool(2)
	defer pool.Close()
	pool.Add(test, 5)
	pool.Wait()
	fmt.Println()
}

The resulting output is:

1 2 3 4 5

When you queue an object, each goroutine in the pool works on a copy of the object provided. On the other hand, when you queue an object pointer, all goroutines work on the same object. These approaches both have their use cases, but keep in mind that the latter approach needs to be thread-safe. Thus, the correct implementation would be:

type adder struct {
	count uint32
}

func (a *adder) DoWork() {
	atomic.AddUint32(&a.count, 1)
	fmt.Print(atomic.LoadUint32(&a.count), " ")
}

func main() {
	test := &adder{count: 0}
	pool := gdw.NewWorkerPool(2)
	defer pool.Close()
	pool.Add(test, 5)
	pool.Wait()
	fmt.Println()
}
Pool Size and Safety

You can safely increase or decrease a pool's size at runtime without losing already queued data or shutting down already running goroutines. The only caveat is that you cannot set the pool size to 0. Details can be found here.

The following example demonstrates pool resizing in action:

type adder struct {
	count int
}

func (a adder) DoWork() {
	a.count++
	fmt.Print(a.count, " ")
	time.Sleep(2 * time.Second)
}

func main() {
	test := adder{count: 0}
	pool := gdw.NewWorkerPool(3)
	defer pool.Close()
	pool.Add(test, 5)
	time.Sleep(1 * time.Second)
	pool.SetPoolSize(1)
	fmt.Printf("\n%d\n", pool.GetQueueDepth())
	pool.Wait()
	fmt.Println()
}

Check the output for some magic!

Batching

Instead of waiting for the entire pool to finish, you can wait for a specific group of jobs. This is done via "batching":

type adder struct {
	count uint32
}

func (a adder) DoWork() {
	a.count++
	time.Sleep(1 * time.Second)
}

func main() {
	test := adder{count: 0}
	pool := NewWorkerPool(3)
	defer pool.Close()
	batch1 := pool.NewTempBatch()
	batch2 := pool.NewTempBatch()
	pool.NewBatch("my batch")
	defer batch1.Clean()
	defer batch2.Clean()
	defer pool.CleanBatch("my batch")
	batch1.Add(test, 5)
	batch2.Add(test, 10)
	batch3, _ := pool.LoadBatch("my batch")
	batch3.Add(test, 4)
	batch1.Wait()
	fmt.Println("batch 1 done")
	batch2.Wait()
	fmt.Println("batch 2 done")
	fmt.Println(pool.GetQueueDepth())
	pool.Wait() // includes jobs added through batches
}

Keep in mind that even though batches are separately waitable, jobs queued through them contribute to the job count in the pool.

Collecting Results

If you would like to get some results back from your jobs, the most practical approach is to slip in a channel to the object of interest:

type adder struct {
	count  int
	result chan int
}

func (a adder) DoWork() {
	a.count++
	a.result <- a.count
}

func main() {
	result := make(chan int)
	test := adder{
		count:  0,
		result: result,
	}
	pool := gdw.NewWorkerPool(3)
	defer pool.Close()
	pool.Add(test, 5)
	go func() {
		for res := range result {
			fmt.Print(res, " ")
		}
	}()
	pool.Wait()
	close(result) // close the result channel after the pool has completed
	fmt.Println()
}

This works for both object and object pointer jobs.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

func (*Batch) Add

func (b *Batch) Add(job Job, amount int)

func (*Batch) AddOne

func (b *Batch) AddOne(job Job)

func (*Batch) Clean

func (b *Batch) Clean() error

func (*Batch) Wait

func (b *Batch) Wait() error

type Job

type Job interface {
	DoWork()
}

Job interface defines a method through which gdw can execute requested jobs.

type RebelPool

type RebelPool struct {
	// contains filtered or unexported fields
}

Rebel is used to define a goroutine pool whose purpose is to execute fire-and-forget jobs.

func NewRebelPool

func NewRebelPool(size int) *RebelPool

func (*RebelPool) Add

func (r *RebelPool) Add(job Job, amount int)

func (*RebelPool) AddOne

func (r *RebelPool) AddOne(job Job)

func (*RebelPool) Close

func (r *RebelPool) Close()

func (*RebelPool) GetPoolSize

func (r *RebelPool) GetPoolSize() int

func (*RebelPool) GetQueueDepth

func (r *RebelPool) GetQueueDepth() int

func (*RebelPool) SetPoolSize

func (r *RebelPool) SetPoolSize(size int)

type WorkerPool

type WorkerPool struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Worker is used to define a goroutine pool whose results and/or execution are of interest, thus awaitable through WaitGroup.

func NewWorkerPool

func NewWorkerPool(size int) *WorkerPool

func (*WorkerPool) Add

func (w *WorkerPool) Add(job Job, amount int)

func (*WorkerPool) AddOne

func (w *WorkerPool) AddOne(job Job)

func (*WorkerPool) CleanBatch

func (w *WorkerPool) CleanBatch(batch string) error

func (*WorkerPool) Close

func (w *WorkerPool) Close()

func (*WorkerPool) GetPoolSize

func (w *WorkerPool) GetPoolSize() int

func (*WorkerPool) GetQueueDepth

func (w *WorkerPool) GetQueueDepth() int

func (*WorkerPool) LoadBatch

func (w *WorkerPool) LoadBatch(name string) (*Batch, error)

func (*WorkerPool) NewBatch

func (w *WorkerPool) NewBatch(name string) (*Batch, error)

func (*WorkerPool) NewTempBatch

func (w *WorkerPool) NewTempBatch() *Batch

func (*WorkerPool) SetPoolSize

func (w *WorkerPool) SetPoolSize(size int)

func (*WorkerPool) Wait

func (w *WorkerPool) Wait()

func (*WorkerPool) WaitBatch

func (w *WorkerPool) WaitBatch(batch string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL