runnerpool

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2020 License: Apache-2.0 Imports: 4 Imported by: 1

README

runnerpool

Build Status Coverage Status

Library that offers a bounded pool of goroutines to perform deserved jobs.

Usage

First of all, you need to create a pool with some configuration and a runner

    cfg := runnerpool.Config{
        Workers: workers,
    }
    
    pool := runnerpool.New(cfg, runner)

A runner is a function that starts goroutines. Why do we need one? Because you probably want to handle panics in those goroutines in your favourite way.

Could you just defer a recovery from panics in every function you pass to the workers? Yes, that would be another solution, but we chose this one.

We'll use a runner that just launches goroutines:

    runner := func(f func()) {
        go f()
    }

Now we need to start the runnerpool, so it will create the goroutines. Notice that it will create them all from the beginning. This process doesn't take much time, it takes around 3ms to create 5000 goroutines, which should not worry you in your app startup. Anyway, you can check runner_pool_bench_test.go for more benchmarks.

Once you have the pool started, you can acquire a worker. You'll need a provide a context for that, because usually you'll want to desist from waiting at some point.

    worker, err := pool.Worker(ctx)
    if err != nil {
        return err
    }
    defer worker.Release()

If error is returned then it may wrap one of the errors returned by ctx.Err() (or not, if the pool was just stopped).

Once you've successfully acquired a worker, you should make sure you'll return it back to the pool when you leave, regardless you're going to execute code on it or not, just defer worker.Release(). This is a no-op once you've run some code.

Now, you can run code on the worker:

    worker.Run(func(_ context.Context) { fmt.Println("I'm running!") })

Notice that a worker can't execute code twice, and it obviously can't execute code once it has been released.

The context provided to the worker will be canceled if pool's Stop(ctx) method is called, and the Stop(ctx) will wait until all workers are stopped or the provided context is done. Once pool's Stop() method is called, no more workers can be acquired and Worker(ctx) will return an error.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Workers int `default:"100"`
}

Config is the configuration for the pool implementation

type ErrCantAcquireWorker

type ErrCantAcquireWorker struct {
	// contains filtered or unexported fields
}

ErrCantAcquireWorker is returned by Pool.Worker() function when worker can't be acquired

func (ErrCantAcquireWorker) Error

func (e ErrCantAcquireWorker) Error() string

func (ErrCantAcquireWorker) Unwrap

func (e ErrCantAcquireWorker) Unwrap() error

Unwrap implements the errors unwrapping

type Pool

type Pool interface {
	// Worker returns a Worker if can be acquired before context is canceled, otherwise returns the
	// ctx.Err() result
	Worker(ctx context.Context) (Worker, error)

	// Stats provides statistics about pool's size, status & config
	Stats() Stats
}

Pool represents a pool of runners, understanding runners as func(func()) functions like func(f func()) { go f() }

Example
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/cabify/runnerpool"
)

func main() {
	const workers = 2
	const tasks = workers + 1
	const timeout = 250 * time.Millisecond

	cfg := runnerpool.Config{
		Workers: workers,
	}

	runner := func(f func()) {
		go f()
	}

	pool := runnerpool.New(cfg, runner)
	err := pool.Start()
	if err != nil {
		log.Fatal(err)
	}

	wg := sync.WaitGroup{}
	wg.Add(tasks)

	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	for i := 1; i <= tasks; i++ {
		go func(i int) {
			time.Sleep(time.Duration(i) * timeout / 10)

			worker, err := pool.Worker(ctx)
			if err != nil {
				fmt.Printf("Can't acquire worker %d\n", i)
				wg.Done()
				return
			}

			defer worker.Release()
			worker.Run(func(ctx context.Context) {
				time.Sleep(time.Duration(i) * timeout * 2)
				fmt.Printf("Worker %d done\n", i)
				wg.Done()
			})
		}(i)
	}

	wg.Wait()
}
Output:

Can't acquire worker 3
Worker 1 done
Worker 2 done

type Stats

type Stats struct {
	MaxWorkers int32
	Workers    int32
	Acquired   int32
	Running    int32
}

Stats provides statistics about pool's size, status & config

type Worker

type Worker interface {
	// Run runs the given function.
	// The context provided is will be canceled if WorkerPool.Stop is called.
	Run(func(context.Context))

	// Release should be called at least once every time a worker is acquired
	// It is safe to call Release twice and calling Release on a worker that is already running has no effect,
	// so the safest way is to defer worker.Release() once the worker has been acquired.
	Release()
}

Worker represents a pool worker that offers a one-time usable runner

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool implements Pool using a runner & goroutines pool

func New

func New(cfg Config, runner func(func())) *WorkerPool

New returns a new WorkerPool, this should be started after being used Run should be provided to choose whether to recover or report panics, notice that if you recover from panics, the workers won't be restablished so you'll eventually exhaust them

func (*WorkerPool) Start

func (p *WorkerPool) Start() error

Start starts creating the workers

func (*WorkerPool) Stats

func (p *WorkerPool) Stats() Stats

Stats provides thread-safe statistics about pool's size, status & config

func (*WorkerPool) Stop

func (p *WorkerPool) Stop(ctx context.Context) error

Stop cancel the pool's context which is provided to each worker and will wait until all workers are stopped or until context is done, in which case it will return a wrapped ctx.Err()

func (*WorkerPool) Worker

func (p *WorkerPool) Worker(ctx context.Context) (Worker, error)

Worker returns an error, if possible within the provided context, otherwise it will return ctx.Err()

Directories

Path Synopsis
Package runnerpooltest provides an implementation of runnerpool.Pool to be used from tests of third party libraries where we just need to provide a dependency that will execute the code synchronously.
Package runnerpooltest provides an implementation of runnerpool.Pool to be used from tests of third party libraries where we just need to provide a dependency that will execute the code synchronously.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL