goroutinepool

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2018 License: MIT Imports: 5 Imported by: 0

README

Goroutinepool

GoDoc Build Status Go Report Card

Package goroutinepool provides handy functions for running goroutines concurrently similar to multiprocess.Pool in python or concurrency.ThreadPoolExecutor in java.

Example:

ctx := context.Background()

var counter int32

// a simple job to increment a number
increment := func(delta int32) func(context.Context) {
    return func(ctx context.Context) {
        atomic.AddInt32(&counter, delta)
    }
}

// jobs
fns := []func(context.Context){
    increment(1),
    increment(10),
    increment(100),
}

// use two workers
goroutinepool.RunInPool(ctx, 2, fns)

fmt.Println(atomic.LoadInt32(&counter))

// Output:
// 111

Documentation

Overview

Package goroutinepool provides handy functions for running goroutines concurrently similar to `multiprocess.Pool` in python or `concurrency.ThreadPoolExecutor` in java.

This package is safe for parallel execution.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DisableDebugLog

func DisableDebugLog()

DisableDebugLog makes further calls to RunInPool or RunInPoolWithChan not log trace messages.

func EnableDebugLog

func EnableDebugLog()

EnableDebugLog makes further calls to RunInPool or RunInPoolWithChan to log trace messages.

func NonBufferedChan

func NonBufferedChan(ctx context.Context, fns []func(context.Context)) chan func(context.Context)

NonBufferedChan creates a non-buffered channel out of provided functions.

Channel is auto-closed once all its elements consumed or context canceled.

Example
package main

import (
	"context"
	"fmt"
	"github.com/ykhrustalev/goroutinepool"
	"sync/atomic"
)

func main() {
	ctx := context.Background()

	var counter int32

	// a simple job to increment a number
	increment := func(delta int32) func(context.Context) {
		return func(ctx context.Context) {
			atomic.AddInt32(&counter, delta)
		}
	}

	// jobs
	fns := []func(context.Context){
		increment(1),   // executed and waited
		increment(10),  // executed and waited
		increment(100), // will be cancelled by timeout
	}

	// use two workers
	ch := goroutinepool.NonBufferedChan(ctx, fns)

	for fn := range ch {
		fn(ctx)
	}

	fmt.Println(atomic.LoadInt32(&counter))

}
Output:

111

func PopulateAndCloseChan

func PopulateAndCloseChan(ctx context.Context, ch chan func(context.Context), fns []func(context.Context)) chan func(context.Context)

PopulateAndCloseChan passes all provided elements into the given channel. This approach allows providing own buffered channel. The provided channel must not be populated anywhere else otherwise there will be a data race.

See RunInPoolWithChan examples for the use cases.

Channel is auto-closed once all its elements consumed or context canceled.

func RunInPool

func RunInPool(ctx context.Context, poolSize int, fns []func(context.Context))

RunInPool concurrently runs provided functions with supplied concurrency level unless the context is triggered. Uses at least one worker.

Example (Basic)
package main

import (
	"context"
	"fmt"
	"github.com/ykhrustalev/goroutinepool"
	"sync/atomic"
)

func main() {
	ctx := context.Background()

	var counter int32

	// a simple job to increment a number
	increment := func(delta int32) func(context.Context) {
		return func(ctx context.Context) {
			atomic.AddInt32(&counter, delta)
		}
	}

	// jobs
	fns := []func(context.Context){
		increment(1),
		increment(10),
		increment(100),
	}

	// use two workers
	goroutinepool.RunInPool(ctx, 2, fns)

	fmt.Println(atomic.LoadInt32(&counter))

}
Output:

111
Example (Cancellation)
package main

import (
	"context"
	"fmt"
	"github.com/ykhrustalev/goroutinepool"
	"sync/atomic"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	var counter int32

	// a simple job to increment a number
	increment := func(delta int32) func(context.Context) {
		return func(ctx context.Context) {
			atomic.AddInt32(&counter, delta)

			select {
			case <-ctx.Done():
				break // won't exit unless cancelled
			}

			time.Sleep(200 * time.Millisecond)
		}
	}

	// jobs
	fns := []func(context.Context){
		increment(1),   // executed and waited
		increment(10),  // executed and waited
		increment(100), // will be cancelled by timeout
	}

	// use two workers
	goroutinepool.RunInPool(ctx, 2, fns)

	fmt.Println(atomic.LoadInt32(&counter))

}
Output:

11

func RunInPoolWithChan

func RunInPoolWithChan(ctx context.Context, poolSize int, ch chan func(context.Context))

RunInPoolWithChan concurrently runs items from a provided channel unless till the context is triggered or channel is closed. Uses at least one worker.

Example (BufferedChannel)
package main

import (
	"context"
	"fmt"
	"github.com/ykhrustalev/goroutinepool"
	"sync/atomic"
)

func main() {
	ctx := context.Background()

	var counter int32

	// a simple job to increment a number
	increment := func(delta int32) func(context.Context) {
		return func(ctx context.Context) {
			atomic.AddInt32(&counter, delta)
		}
	}

	// create a buffered channel
	ch := make(chan func(context.Context), 2)
	// extend channel with jobs
	// note, that channel gets closed using this function
	goroutinepool.PopulateAndCloseChan(ctx, ch, []func(context.Context){
		increment(1),
		increment(10),
		increment(100),
	})

	// use two workers
	goroutinepool.RunInPoolWithChan(ctx, 2, ch)

	fmt.Println(atomic.LoadInt32(&counter))

}
Output:

111

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL