workpool

package module
v0.0.0-...-fbc7d82 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2021 License: MIT Imports: 1 Imported by: 0

README

Build Status Go Report Card GoDoc

Lightweight Workpool

This package provides a lightweight abstraction around a work function to make it easier to create work pools with early termination. This leaves you free to focus on the problem being solved and the data pipeline, while the work pool manages concurrency of execution.

Example

See example_full_test.go.

Documentation

Overview

Package workpool provides a lightweight abstraction around a work function to make it easier to create work pools with early termination. This leaves you free to focus on the problem being solved and the data pipeline, while the work pool manages concurrency of execution.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type WorkHandler

type WorkHandler func(abort <-chan struct{}) bool

WorkHandler is a blocking call which manages the retrieval and processing of work. It should either process all work, available, or a single piece of work and return. If you return after processing one piece of work pool will keep calling the handler.

Return true if the handler should be called again, otherwise return false to indicate work is complete.

The abort signal is triggered if the pool has been cancelled. It indicates that work should terminate immediately.

Where work comes from is implementation dependant, for example: a channel, RabbitMQ, dbus, or any other event system.

Here is a WorkHandler which squares a number. Notice that it is wrapped in a function to pass in the input/output channels. By returning after each item it allows the WorkPool to deal with early exits.

func sq(input <-chan int, output chan<- int) WorkHandler {
    return func(abort <-chan struct{}) bool {
       for true {
           select {
           case number := <- input:
               output <- number * number
               //return true
           case <-abort:
               return false
           }
       }
    }
}

Here is another example which ignores the abort channel. In this case the WorkPool will manage early termination, but will not be able to do so if the input channel is blocked:

func sq(input <-chan int, output chan<- int) WorkHandler {
    return func(abort <-chan struct{}) bool {
        for number := range input {
            output <- number * number
            return true
        }
        return false
    }
}

type WorkPool

type WorkPool struct {
	// Handler is called repeatedly until all work is finished.
	Handler WorkHandler

	// Workers is the number of go routines used to call the handler.
	Workers int

	// Close is called after all work is finished.
	Close func()
	// contains filtered or unexported fields
}

WorkPool manages running a WorkHandler in some number of goroutines. It also manages a cancel signal to allow for early termination.

Example
package main

import (
	"fmt"
)

// gen creates a closed channel with the nums arguments.
func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// sq connects an input channel to an output channel with a squaring function. If it detects the channel is closed false
// is returned, otherwise it processes one number and returns.
func sq(input <-chan int, output chan<- int) WorkHandler {
	return func(abort <-chan struct{}) bool {
		for number := range input {
			output <- number * number
			return true
		}
		return false
	}
}

func main() {
	// Closed input channel with three values.
	var input <-chan int = gen(2, 3, 10)

	// Output channel for results.
	output := make(chan int)

	// Close function called when pool exits.
	closer := func() {
		close(output)
	}

	// Create a pool using a squaring function WorkHandler and a single worker.
	pool := NewWithClose(1, sq(input, output), closer)
	go pool.Run()

	// Check results
	for num := range output {
		fmt.Println(num)
	}
}
Output:

4
9
100
Example (Struct)
numWorkers := 2
outputs := make(chan int)

worker := func(abort <-chan struct{}) bool {
	outputs <- 1
	return false
}

closer := func() {
	close(outputs)
}

pool := &WorkPool{
	Handler: worker,
	Workers: numWorkers,
	Close:   closer,
}

go pool.Run()
for out := range outputs {
	fmt.Println(out)
}
Output:

1
1

func New

func New(numWorkers int, handler WorkHandler) *WorkPool

New creates a worker pool with a given handler function.

Example
numWorkers := 3
outputs := make(chan int)

worker := func(abort <-chan struct{}) bool {
	outputs <- 1
	return false
}

pool := New(numWorkers, worker)
go func() {
	pool.Run()
	close(outputs)
}()

for out := range outputs {
	fmt.Println(out)
}
Output:

1
1
1

func NewWithClose

func NewWithClose(numWorkers int, handler WorkHandler, close func()) *WorkPool

NewWithClose creates a worker pool with a given handler function and a function to call when shutting down.

Example
numWorkers := 5
outputs := make(chan int)

worker := func(abort <-chan struct{}) bool {
	outputs <- 1
	return false
}

closer := func() {
	close(outputs)
}

pool := NewWithClose(numWorkers, worker, closer)
go pool.Run()

for out := range outputs {
	fmt.Println(out)
}
Output:

1
1
1
1
1

func (*WorkPool) Cancel

func (p *WorkPool) Cancel()

Cancel may be called asynchronously to signal that the pool should stop processing work and return to the caller. An abort signal will be sent to each WorkHandler to allow for graceful shutdown.

func (*WorkPool) Run

func (p *WorkPool) Run()

Run starts the configured number of workers and calls WorkHandler until all work has been processed, or the execution is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL