Documentation
¶
Overview ¶
Package workerpool provides a workerpool. It also can expand and shrink dynamically.
Jobs can be queued using the Queue() method which also accepts a timeout parameter for timing out queuing and if all workers are too busy.
For expanding the queue, Expand() method can be used, which increases the number of workers. If a timeout is provided, these extra workers will stop, if there are not enough jobs to do. It is also possible to explicitly stop extra workers by providing a quit channel.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool provides a pool of workers.
Example ¶
package main import ( "sync/atomic" "github.com/dc0d/workerpool" ) func main() { pool := workerpool.New(-1) var v int64 go func() { for i := 0; i < 100; i++ { pool.Queue(func() { atomic.AddInt64(&v, 1) }) } }() pool.Stop() if v != 100 { panic("BOOM!") } }
func (*WorkerPool) Expand ¶
func (pool *WorkerPool) Expand(n int, timeout time.Duration, quit <-chan struct{}) bool
Expand is for putting more 'Worker's into work. If there is'nt any job to do, and a timeout is set, they will simply get timed-out. Default behaviour is they will timeout in a sliding manner. A quit channel can be used too, to explicitly stop extra workers.
One firend noted that there might be a *temporary* goroutine leak, when expanding the worker pool, using timeouts. Actually is's not a goroutine leak because it's always bound to the size of pool and has a deterministic behavior. Assuming we have a worker pool of size 10, and we expand it with a 1000 extra workers, that would timeout after 100 mili-seconds, we may see (after some 100 mili-second) there remains twice the initial size of the pool (10 * 2) number of goroutines - which of-cource would get timedout after doing some extra jobs and the pool will shrink to it's initial size. The reason for this temporary expanded lives of some extra workers is, the initial workers may fail to register before those extra workers. So we will have 10 registered extra workers, plus 10 unregistered initial workers. And the rest of extra workers will get timedout because they fail to register. So we have 20 goroutines in the pool at max, in this very specific situation, which will evantually get timed out. That's not a goroutine leak (it described as *temporary* in the first place) but it was entertaining to find out why and how that happens! A test named `TestTimeoutNoGoroutineLeak(...)` is added to descibe this in code.
Example ¶
package main import ( "sync/atomic" "time" "github.com/dc0d/workerpool" ) func main() { pool := workerpool.New(-1) pool.Expand(1000, time.Millisecond, nil) var v int64 go func() { for i := 0; i < 100; i++ { pool.Queue(func() { atomic.AddInt64(&v, 1) }) } }() pool.Stop() if v != 100 { panic("BOOM!") } }
func (*WorkerPool) Queue ¶
func (pool *WorkerPool) Queue(job func(), timeout ...time.Duration) bool
Queue queues a job to be run by a worker.
func (*WorkerPool) Stop ¶
func (pool *WorkerPool) Stop()
Stop stops the pool and waits for all workers to return.