workerpool

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2020 License: MIT Imports: 3 Imported by: 0

README

workerpool

License MIT GoDoc Go Report Card

This Go Module contains an implementation of a workerpool which can get expanded & shrink dynamically. Workers can get added when needed and get dismissed when no longer are needed. Of-course this workerpool can be used just as a simple one with a fixed size.

Examples can be seen inside documents.

Documentation

Overview

Package workerpool provides a workerpool. It also can expand and shrink dynamically.

Jobs can be queued using the Queue() method which also accepts a timeout parameter for timing out queuing and if all workers are too busy.

For expanding the queue, Expand() method can be used, which increases the number of workers. If a timeout is provided, these extra workers will stop, if there are not enough jobs to do. It is also possible to explicitly stop extra workers by providing a quit channel.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool provides a pool of workers.

Example
package main

import (
	"sync/atomic"

	"github.com/dc0d/workerpool"
)

func main() {
	pool := workerpool.New(-1)

	var v int64
	go func() {
		for i := 0; i < 100; i++ {
			pool.Queue(func() {
				atomic.AddInt64(&v, 1)
			})
		}
	}()

	pool.Stop()

	if v != 100 {
		panic("BOOM!")
	}
}

func New

func New(workers int, jobQueue ...int) *WorkerPool

New makes a new *WorkerPool.

func (*WorkerPool) Expand

func (pool *WorkerPool) Expand(n int, timeout time.Duration, quit <-chan struct{}) bool

Expand is for putting more 'Worker's into work. If there is'nt any job to do, and a timeout is set, they will simply get timed-out. Default behaviour is they will timeout in a sliding manner. A quit channel can be used too, to explicitly stop extra workers.

One firend noted that there might be a *temporary* goroutine leak, when expanding the worker pool, using timeouts. Actually is's not a goroutine leak because it's always bound to the size of pool and has a deterministic behavior. Assuming we have a worker pool of size 10, and we expand it with a 1000 extra workers, that would timeout after 100 mili-seconds, we may see (after some 100 mili-second) there remains twice the initial size of the pool (10 * 2) number of goroutines - which of-cource would get timedout after doing some extra jobs and the pool will shrink to it's initial size. The reason for this temporary expanded lives of some extra workers is, the initial workers may fail to register before those extra workers. So we will have 10 registered extra workers, plus 10 unregistered initial workers. And the rest of extra workers will get timedout because they fail to register. So we have 20 goroutines in the pool at max, in this very specific situation, which will evantually get timed out. That's not a goroutine leak (it described as *temporary* in the first place) but it was entertaining to find out why and how that happens! A test named `TestTimeoutNoGoroutineLeak(...)` is added to descibe this in code.

Example
package main

import (
	"sync/atomic"
	"time"

	"github.com/dc0d/workerpool"
)

func main() {
	pool := workerpool.New(-1)
	pool.Expand(1000, time.Millisecond, nil)

	var v int64
	go func() {
		for i := 0; i < 100; i++ {
			pool.Queue(func() {
				atomic.AddInt64(&v, 1)
			})
		}
	}()

	pool.Stop()

	if v != 100 {
		panic("BOOM!")
	}
}

func (*WorkerPool) Queue

func (pool *WorkerPool) Queue(job func(), timeout ...time.Duration) bool

Queue queues a job to be run by a worker.

func (*WorkerPool) Stop

func (pool *WorkerPool) Stop()

Stop stops the pool and waits for all workers to return.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL