gopool

package module
v3.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2018 License: MIT Imports: 4 Imported by: 0

README

gopool

Build Status codecov Documentation

Go Pool makes it easy to set up and manage pools of background workers. Not only can they be started and stopped, but they can also be dynamically sized ensuring you have the optimal amount of workers running for your situation.

Pool Types

Long running, always alive workers

To start a pool of long running workers who's size will be managed by gopool, you should use the Pool.Start func.

A good use case for this is when you have a message queue that jobs get published to. Then lets assume you want to always have at least 1 worker consuming jobs from the queue, but you also want to scale the number of consumers according to the current lag.

To do this, just pass in a WorkerCountFunc that returns the number of workers you would want to run at any point in time, and pass in a SleepTimeFunc to customise how often gopool should modify the number of running workers.

E.g. Your WorkerCountFunc could return the {number of unread items in the queue} / 100, and the SleepTimeFunc could return time.Second * 120. This would mean that every 2 minutes gopool would resize the pool and ensure that X / 100 workers are running, where X is the number of unread items in the queue.

Temporary workers, which will die once their designated work has been finished

To start a pool of temporary workers who will die when their work input channel is closed, you should use the Pool.StartOnce func.

To wait for the pool to finish it's work you can use the Pool.Done func, as the channel it returns is closed when the pool's work is done.

Installation

go get -u github.com/tomwright/gopool

Documentation

Documentation can be found at https://godoc.org/github.com/tomwright/gopool.

Documentation

Index

Examples

Constants

View Source
const (
	ErrPoolAlreadyRunning      = Error("pool already running")
	ErrPoolHasNilSleepTimeFunc = Error("pool has a nil sleep time func")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Error

type Error string

func (Error) Error

func (e Error) Error() string

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a group of workers performing the same task simultaneously

Example
startedWorkers := safeCounter{}
finishedWorkers := safeCounter{}

workInputChan := make(chan string, 5)

var work WorkFunc = func(ctx context.Context) error {
	startedWorkers.Inc()
	fmt.Println("Starting worker")
	for {
		select {
		case <-ctx.Done():
			finishedWorkers.Inc()
			fmt.Println("Finishing worker")
			return ctx.Err()
		case x, ok := <-workInputChan:
			if !ok {
				return fmt.Errorf("unexpected closed input chan")
			}
			fmt.Printf("Doing work: %s\n", x)
		}
	}
}

var workerCount WorkerCountFunc = func() uint64 {
	return 3
}

var sleepTime SleepTimeFunc = func() time.Duration {
	return time.Millisecond * 100
}

safeCounterHasVal := func(started *safeCounter, startedVal int, finished *safeCounter, finishedVal int) func() error {
	return func() error {
		got := started.Val()
		if got != startedVal {
			return fmt.Errorf("expected %d workers to be started, got %d", startedVal, got)
		}
		got = finished.Val()
		if got != finishedVal {
			return fmt.Errorf("expected %d workers to be finished, got %d", finishedVal, got)
		}
		return nil
	}
}

p := NewPool("test", work, workerCount, sleepTime, context.Background())
if err := timeoutAfter(time.Second, safeCounterHasVal(&startedWorkers, 0, &finishedWorkers, 0)); err != nil {
	panic(err)
}
fmt.Printf("%d started - %d finished\n", startedWorkers.Val(), finishedWorkers.Val())

cancel, err := p.Start()
if err != nil {
	panic(err)
}
defer cancel()
if err := timeoutAfter(time.Second, safeCounterHasVal(&startedWorkers, 3, &finishedWorkers, 0)); err != nil {
	panic(err)
}
fmt.Printf("%d started - %d finished\n", startedWorkers.Val(), finishedWorkers.Val())

// Note: sleeps are used to ensure output order
workInputChan <- "One"
time.Sleep(time.Millisecond * 20)
workInputChan <- "Two"
time.Sleep(time.Millisecond * 20)
workInputChan <- "Three"

time.Sleep(time.Millisecond * 20)

cancel()
if err := timeoutAfter(time.Second, safeCounterHasVal(&startedWorkers, 3, &finishedWorkers, 3)); err != nil {
	panic(err)
}
time.Sleep(time.Millisecond * 50)
fmt.Printf("%d started - %d finished\n", startedWorkers.Val(), finishedWorkers.Val())
Output:

0 started - 0 finished
Starting worker
Starting worker
Starting worker
3 started - 0 finished
Doing work: One
Doing work: Two
Doing work: Three
Finishing worker
Finishing worker
Finishing worker
3 started - 3 finished

func NewPool

func NewPool(id string, work WorkFunc, desiredWorkerCount WorkerCountFunc, sleepTime SleepTimeFunc, ctx context.Context) *Pool

NewPool returns a new pool

func (*Pool) Context

func (p *Pool) Context() context.Context

Context

func (*Pool) Done

func (p *Pool) Done() <-chan struct{}

Done returns a channel that is closed when the pool is no longer running. Done returns nil if called when the pool hasn't started running yet.

func (*Pool) ID

func (p *Pool) ID() string

ID returns the pools unique ID

func (*Pool) Running

func (p *Pool) Running() bool

Running returns whether or not the pool is running

func (*Pool) Start

func (p *Pool) Start() (context.CancelFunc, error)

Start initiates the pool monitor

func (*Pool) StartOnce

func (p *Pool) StartOnce() (context.CancelFunc, error)
Example

StartOnce will start a pool with the desired worker count, and will close the done channel when all workers have finished.

ctx := context.TODO()

// outputSlice is just a concurrently safe slice
outputSlice := &safeIntSlice{}
// notice that it initially contains 10 values of 0
outputSlice.Init([]int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0})

workerInput := make(chan int)

var work WorkFunc = func(ctx context.Context) error {
	fmt.Println("worker started")
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case key, ok := <-workerInput:
			if !ok {
				return nil
			}
			// when we are given a key, the worker will set it to a value of 1
			outputSlice.Set(key, 1)
		}
	}
}

// in this case we will have 3 workers
var workerCount WorkerCountFunc = func() uint64 {
	return 3
}

p := NewPool("test", work, workerCount, nil, ctx)

fmt.Println("check #1")
for i := 0; i < 10; i++ {
	fmt.Printf("%d = %d\n", i, outputSlice.Get(i))
}

fmt.Println("starting workers")

cancel, err := p.StartOnce()
if err != nil {
	panic(err)
}
defer cancel()
<-time.After(time.Millisecond * 100)

workerInput <- 0
workerInput <- 2
workerInput <- 4
workerInput <- 6
workerInput <- 8

<-time.After(time.Millisecond * 100)
fmt.Println("check #2")
for i := 0; i < 10; i++ {
	fmt.Printf("%d = %d\n", i, outputSlice.Get(i))
}

workerInput <- 1
workerInput <- 3
workerInput <- 5
workerInput <- 7

fmt.Println("closing worker input chan")
close(workerInput)

select {
case <-p.Done():
	fmt.Println("done")
case <-time.After(time.Second):
	panic("timeout")
}

fmt.Println("check #3")
for i := 0; i < 10; i++ {
	fmt.Printf("%d = %d\n", i, outputSlice.Get(i))
}
Output:

check #1
0 = 0
1 = 0
2 = 0
3 = 0
4 = 0
5 = 0
6 = 0
7 = 0
8 = 0
9 = 0
starting workers
worker started
worker started
worker started
check #2
0 = 1
1 = 0
2 = 1
3 = 0
4 = 1
5 = 0
6 = 1
7 = 0
8 = 1
9 = 0
closing worker input chan
done
check #3
0 = 1
1 = 1
2 = 1
3 = 1
4 = 1
5 = 1
6 = 1
7 = 1
8 = 1
9 = 0

type SleepTimeFunc

type SleepTimeFunc func() time.Duration

SleepTimeFunc should return a time.Duration that we should sleep between checking the worker count of a pool

type WorkFunc

type WorkFunc func(ctx context.Context) error

WorkFunc defines a piece of work to be processed

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is the definition for a single worker

Example
ctx := context.TODO()

inputChan := make(chan string, 5)
var work WorkFunc = func(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case in, ok := <-inputChan:
			if !ok {
				return nil
			}
			fmt.Print(in)
		}
	}
	return nil
}

w := NewWorker("name printer", work, ctx)

inputChan <- "Tom"
inputChan <- "Jim"
inputChan <- "Frank"
inputChan <- "John"
inputChan <- "Tony"

close(inputChan)

fmt.Print("Start")

cancel := w.Start()
defer cancel()

timer := time.NewTimer(time.Second)
defer timer.Stop()

select {
case <-timer.C:
	panic("worker did not finish in time")
case <-w.Done():
	fmt.Print("Done")
}
Output:

StartTomJimFrankJohnTonyDone

func NewWorker

func NewWorker(id string, work WorkFunc, ctx context.Context) *Worker

NewWorker returns a worker instance

func (*Worker) Context

func (w *Worker) Context() context.Context

Context returns the workers context

func (*Worker) Done

func (w *Worker) Done() chan struct{}

Done returns a channel you can use to pick up on when a worker has finished

func (*Worker) Err

func (w *Worker) Err() error

Err returns an err if one occurred in the worker

func (*Worker) ID

func (w *Worker) ID() string

ID returns the works unique ID

func (*Worker) Start

func (w *Worker) Start() context.CancelFunc

Start initiates a go routine for the worker and returns the cancel context

type WorkerCountFunc

type WorkerCountFunc func() uint64

WorkerCountFunc should return the number of workers you want to be running in the pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL