Documentation
¶
Overview ¶
Package workpool provides a lightweight abstraction around a work function to make it easier to create work pools with early termination. This leaves you free to focus on the problem being solved and the data pipeline, while the work pool manages concurrency of execution.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type WorkHandler ¶
type WorkHandler func(abort <-chan struct{}) bool
WorkHandler is a blocking call which manages the retrieval and processing of work. It should either process all work, available, or a single piece of work and return. If you return after processing one piece of work pool will keep calling the handler.
Return true if the handler should be called again, otherwise return false to indicate work is complete.
The abort signal is triggered if the pool has been cancelled. It indicates that work should terminate immediately.
Where work comes from is implementation dependant, for example: a channel, RabbitMQ, dbus, or any other event system.
Here is a WorkHandler which squares a number. Notice that it is wrapped in a function to pass in the input/output channels. By returning after each item it allows the WorkPool to deal with early exits.
func sq(input <-chan int, output chan<- int) WorkHandler { return func(abort <-chan struct{}) bool { for true { select { case number := <- input: output <- number * number //return true case <-abort: return false } } } }
Here is another example which ignores the abort channel. In this case the WorkPool will manage early termination, but will not be able to do so if the input channel is blocked:
func sq(input <-chan int, output chan<- int) WorkHandler { return func(abort <-chan struct{}) bool { for number := range input { output <- number * number return true } return false } }
type WorkPool ¶
type WorkPool struct { // Handler is called repeatedly until all work is finished. Handler WorkHandler // Workers is the number of go routines used to call the handler. Workers int // Close is called after all work is finished. Close func() // contains filtered or unexported fields }
WorkPool manages running a WorkHandler in some number of goroutines. It also manages a cancel signal to allow for early termination.
Example ¶
package main import ( "fmt" ) // gen creates a closed channel with the nums arguments. func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } // sq connects an input channel to an output channel with a squaring function. If it detects the channel is closed false // is returned, otherwise it processes one number and returns. func sq(input <-chan int, output chan<- int) WorkHandler { return func(abort <-chan struct{}) bool { for number := range input { output <- number * number return true } return false } } func main() { // Closed input channel with three values. var input <-chan int = gen(2, 3, 10) // Output channel for results. output := make(chan int) // Close function called when pool exits. closer := func() { close(output) } // Create a pool using a squaring function WorkHandler and a single worker. pool := NewWithClose(1, sq(input, output), closer) go pool.Run() // Check results for num := range output { fmt.Println(num) } }
Output: 4 9 100
Example (Struct) ¶
numWorkers := 2 outputs := make(chan int) worker := func(abort <-chan struct{}) bool { outputs <- 1 return false } closer := func() { close(outputs) } pool := &WorkPool{ Handler: worker, Workers: numWorkers, Close: closer, } go pool.Run() for out := range outputs { fmt.Println(out) }
Output: 1 1
func New ¶
func New(numWorkers int, handler WorkHandler) *WorkPool
New creates a worker pool with a given handler function.
Example ¶
numWorkers := 3 outputs := make(chan int) worker := func(abort <-chan struct{}) bool { outputs <- 1 return false } pool := New(numWorkers, worker) go func() { pool.Run() close(outputs) }() for out := range outputs { fmt.Println(out) }
Output: 1 1 1
func NewWithClose ¶
func NewWithClose(numWorkers int, handler WorkHandler, close func()) *WorkPool
NewWithClose creates a worker pool with a given handler function and a function to call when shutting down.
Example ¶
numWorkers := 5 outputs := make(chan int) worker := func(abort <-chan struct{}) bool { outputs <- 1 return false } closer := func() { close(outputs) } pool := NewWithClose(numWorkers, worker, closer) go pool.Run() for out := range outputs { fmt.Println(out) }
Output: 1 1 1 1 1