parallel

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2019 License: BSD-3-Clause Imports: 3 Imported by: 0

Documentation

Overview

Package parallel provides functions for expressing parallel algorithms.

See https://github.com/ExaScience/pargo/wiki/TaskParallelism for a general overview.

Example (HeatDistributionSimulation)
package main

// This is a simplified version of a heat distribution simulation, based on an
// implementation by Wilfried Verachtert.
//
// See https://en.wikipedia.org/wiki/Heat_equation for some theoretical
// background.

import (
	"fmt"
	"math"

	"gonum.org/v1/gonum/mat"

	"github.com/exascience/pargo/parallel"
)

const ε = 0.001

func maxDiff(m1, m2 *mat.Dense) (result float64) {
	rows, cols := m1.Dims()
	result = parallel.RangeReduceFloat64(
		1, rows-1, 0,
		func(low, high int) (result float64) {
			for row := low; row < high; row++ {
				r1 := m1.RawRowView(row)
				r2 := m2.RawRowView(row)
				for col := 1; col < cols-1; col++ {
					result = math.Max(result, math.Abs(r1[col]-r2[col]))
				}
			}
			return
		},
		math.Max,
	)
	return
}

func HeatDistributionStep(u, v *mat.Dense) {
	rows, cols := u.Dims()
	parallel.Range(1, rows-1, 0,
		func(low, high int) {
			for row := low; row < high; row++ {
				uRow := u.RawRowView(row)
				vRow := v.RawRowView(row)
				vRowUp := v.RawRowView(row - 1)
				vRowDn := v.RawRowView(row + 1)
				for col := 1; col < cols-1; col++ {
					uRow[col] = (vRowUp[col] + vRowDn[col] + vRow[col-1] + vRow[col+1]) / 4.0
				}
			}
		},
	)
}

func HeatDistributionSimulation(M, N int, init, t, r, b, l float64) {
	// ensure a border
	M += 2
	N += 2

	// set up the input matrix
	data := make([]float64, M*N)
	for i := range data {
		data[i] = init
	}
	u := mat.NewDense(M, N, data)

	// set up the border for the input matrix
	for i := 0; i < N; i++ {
		u.Set(0, i, t)
		u.Set(M-1, i, b)
	}
	for i := 0; i < M; i++ {
		u.Set(i, 0, l)
		u.Set(i, N-1, r)
	}

	// create a secondary working matrix
	v := mat.NewDense(M, N, nil)
	v.Copy(u)

	// run the simulation
	for δ, iterations := ε+1.0, 0; δ >= ε; {
		for step := 0; step < 1000; step++ {
			HeatDistributionStep(v, u)
			HeatDistributionStep(u, v)
		}
		iterations += 2000
		δ = maxDiff(u, v)
		fmt.Printf("iterations: %6d, δ: %08.6f, u[8][8]: %10.8f\n", iterations, δ, u.At(8, 8))
	}
}

func main() {
	HeatDistributionSimulation(1024, 1024, 75, 0, 100, 100, 100)

}
Output:

iterations:   2000, δ: 0.009073, u[8][8]: 50.99678108
iterations:   4000, δ: 0.004537, u[8][8]: 50.50380048
iterations:   6000, δ: 0.003025, u[8][8]: 50.33708179
iterations:   8000, δ: 0.002268, u[8][8]: 50.25326869
iterations:  10000, δ: 0.001815, u[8][8]: 50.20283493
iterations:  12000, δ: 0.001512, u[8][8]: 50.16915148
iterations:  14000, δ: 0.001296, u[8][8]: 50.14506197
iterations:  16000, δ: 0.001134, u[8][8]: 50.12697847
iterations:  18000, δ: 0.001008, u[8][8]: 50.11290381
iterations:  20000, δ: 0.000907, u[8][8]: 50.10163797

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func And

func And(predicates ...func() bool) bool

And receives zero or more predicate functions and executes them in parallel.

Each predicate is invoked in its own goroutine, and And returns only when all predicates have terminated, combining all return values with the && operator, with true as the default return value.

If one or more predicates panic, the corresponding goroutines recover the panics, and And eventually panics with the left-most recovered panic value.

func Do

func Do(thunks ...func())

Do receives zero or more thunks and executes them in parallel.

Each thunk is invoked in its own goroutine, and Do returns only when all thunks have terminated.

If one or more thunks panic, the corresponding goroutines recover the panics, and Do eventually panics with the left-most recovered panic value.

Example
package main

import (
	"errors"
	"fmt"

	"github.com/exascience/pargo/parallel"
)

func main() {
	var fib func(int) (int, error)

	fib = func(n int) (result int, err error) {
		if n < 0 {
			err = errors.New("invalid argument")
		} else if n < 2 {
			result = n
		} else {
			var n1, n2 int
			n1, err = fib(n - 1)
			if err != nil {
				return
			}
			n2, err = fib(n - 2)
			result = n1 + n2
		}
		return
	}

	type intErr struct {
		n   int
		err error
	}

	var parallelFib func(int) intErr

	parallelFib = func(n int) (result intErr) {
		if n < 0 {
			result.err = errors.New("invalid argument")
		} else if n < 20 {
			result.n, result.err = fib(n)
		} else {
			var n1, n2 intErr
			parallel.Do(
				func() { n1 = parallelFib(n - 1) },
				func() { n2 = parallelFib(n - 2) },
			)
			result.n = n1.n + n2.n
			if n1.err != nil {
				result.err = n1.err
			} else {
				result.err = n2.err
			}
		}
		return
	}

	if result := parallelFib(-1); result.err != nil {
		fmt.Println(result.err)
	} else {
		fmt.Println(result.n)
	}

}
Output:

invalid argument

func Or

func Or(predicates ...func() bool) bool

Or receives zero or more predicate functions and executes them in parallel.

Each predicate is invoked in its own goroutine, and Or returns only when all predicates have terminated, combining all return values with the || operator, with false as the default return value.

If one or more predicates panic, the corresponding goroutines recover the panics, and Or eventually panics with the left-most recovered panic value.

func Range

func Range(
	low, high, n int,
	f func(low, high int),
)

Range receives a range, a batch count n, and a range function f, divides the range into batches, and invokes the range function for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range function is invoked for each batch in its own goroutine, with 0 <= low <= high, and Range returns only when all range functions have terminated.

Range panics if high < low, or if n < 0.

If one or more range function invocations panic, the corresponding goroutines recover the panics, and Range eventually panics with the left-most recovered panic value.

func RangeAnd

func RangeAnd(
	low, high, n int,
	f func(low, high int) bool,
) bool

RangeAnd receives a range, a batch count n, and a range predicate function f, divides the range into batches, and invokes the range predicate for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range predicate is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeAnd returns only when all range predicates have terminated, combining all return values with the && operator.

RangeAnd panics if high < low, or if n < 0.

If one or more range predicate invocations panic, the corresponding goroutines recover the panics, and RangeAnd eventually panics with the left-most recovered panic value.

func RangeOr

func RangeOr(
	low, high, n int,
	f func(low, high int) bool,
) bool

RangeOr receives a range, a batch count n, and a range predicate function f, divides the range into batches, and invokes the range predicate for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range predicate is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeOr returns only when all range predicates have terminated, combining all return values with the || operator.

RangeOr panics if high < low, or if n < 0.

If one or more range predicate invocations panic, the corresponding goroutines recover the panics, and RangeOr eventually panics with the left-most recovered panic value.

func RangeReduce

func RangeReduce(
	low, high, n int,
	reduce func(low, high int) interface{},
	join func(x, y interface{}) interface{},
) interface{}

RangeReduce receives a range, a batch count, a range reduce function, and a join function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then combined by repeated invocations of join.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduce returns only when all range reducers and pair reducers have terminated.

RangeReduce panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduce eventually panics with the left-most recovered panic value.

Example
package main

import (
	"fmt"
	"runtime"

	"github.com/exascience/pargo/parallel"
)

func numDivisors(n int) int {
	return parallel.RangeReduceIntSum(
		1, n+1, runtime.GOMAXPROCS(0),
		func(low, high int) int {
			var sum int
			for i := low; i < high; i++ {
				if (n % i) == 0 {
					sum++
				}
			}
			return sum
		},
	)
}

func main() {
	findPrimes := func(n int) []int {
		result := parallel.RangeReduce(
			2, n, 4*runtime.GOMAXPROCS(0),
			func(low, high int) interface{} {
				var slice []int
				for i := low; i < high; i++ {
					if numDivisors(i) == 2 { // see RangeReduceInt example
						slice = append(slice, i)
					}
				}
				return slice
			},
			func(x, y interface{}) interface{} {
				return append(x.([]int), y.([]int)...)
			},
		)
		return result.([]int)
	}

	fmt.Println(findPrimes(20))

}
Output:

[2 3 5 7 11 13 17 19]

func RangeReduceFloat64

func RangeReduceFloat64(
	low, high, n int,
	reduce func(low, high int) float64,
	join func(x, y float64) float64,
) float64

RangeReduceFloat64 receives a range, a batch count n, a range reducer function, and a join function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then combined by repeated invocations of join.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceFloat64 returns only when all range reducers and pair reducers have terminated.

RangeReduceFloat64 panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceFloat64 eventually panics with the left-most recovered panic value.

func RangeReduceFloat64Product

func RangeReduceFloat64Product(
	low, high, n int,
	reduce func(low, high int) float64,
) float64

RangeReduceFloat64Product receives a range, a batch count n, and a range reducer function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then multiplied with each other.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceFloat64Product returns only when all range reducers and pair reducers have terminated.

RangeReduceFloat64Product panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceFloat64Producet eventually panics with the left-most recovered panic value.

func RangeReduceFloat64Sum

func RangeReduceFloat64Sum(
	low, high, n int,
	reduce func(low, high int) float64,
) float64

RangeReduceFloat64Sum receives a range, a batch count n, and a range reducer function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then added together.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceFloat64Sum returns only when all range reducers and pair reducers have terminated.

RangeReduceFloat64Sum panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceFloat64Sum eventually panics with the left-most recovered panic value.

Example
package main

import (
	"fmt"
	"runtime"

	"github.com/exascience/pargo/parallel"
)

func main() {
	sumFloat64s := func(f []float64) float64 {
		result := parallel.RangeReduceFloat64Sum(
			0, len(f), runtime.GOMAXPROCS(0),
			func(low, high int) float64 {
				var sum float64
				for i := low; i < high; i++ {
					sum += f[i]
				}
				return sum
			},
		)
		return result
	}

	fmt.Println(sumFloat64s([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}))

}
Output:

55

func RangeReduceInt

func RangeReduceInt(
	low, high, n int,
	reduce func(low, high int) int,
	join func(x, y int) int,
) int

RangeReduceInt receives a range, a batch count n, a range reducer function, and a join function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then combined by repeated invocations of join.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceInt returns only when all range reducers and pair reducers have terminated.

RangeReduceInt panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceInt eventually panics with the left-most recovered panic value.

func RangeReduceIntProduct

func RangeReduceIntProduct(
	low, high, n int,
	reduce func(low, high int) int,
) int

RangeReduceIntProduct receives a range, a batch count n, and a range reducer function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then multiplied with each other.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceIntProduct returns only when all range reducers and pair reducers have terminated.

RangeReduceIntProduct panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceIntProducet eventually panics with the left-most recovered panic value.

func RangeReduceIntSum

func RangeReduceIntSum(
	low, high, n int,
	reduce func(low, high int) int,
) int

RangeReduceIntSum receives a range, a batch count n, and a range reducer function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then added together.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceIntSum returns only when all range reducers and pair reducers have terminated.

RangeReduceIntSum panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceIntSum eventually panics with the left-most recovered panic value.

Example
package main

import (
	"fmt"
	"runtime"

	"github.com/exascience/pargo/parallel"
)

func main() {
	numDivisors := func(n int) int {
		return parallel.RangeReduceIntSum(
			1, n+1, runtime.GOMAXPROCS(0),
			func(low, high int) int {
				var sum int
				for i := low; i < high; i++ {
					if (n % i) == 0 {
						sum++
					}
				}
				return sum
			},
		)
	}

	fmt.Println(numDivisors(12))

}
Output:

6

func RangeReduceString

func RangeReduceString(
	low, high, n int,
	reduce func(low, high int) string,
	join func(x, y string) string,
) string

RangeReduceString receives a range, a batch count n, a range reducer function, and a join function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then combined by repeated invocations of join.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceString returns only when all range reducers and pair reducers have terminated.

RangeReduceString panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceString eventually panics with the left-most recovered panic value.

func RangeReduceStringSum

func RangeReduceStringSum(
	low, high, n int,
	reduce func(low, high int) string,
) string

RangeReduceStringSum receives a range, a batch count n, and a range reducer function, divides the range into batches, and invokes the range reducer for each of these batches in parallel, covering the half-open interval from low to high, including low but excluding high. The results of the range reducer invocations are then concatenated together.

The range is specified by a low and high integer, with low <= high. The batches are determined by dividing up the size of the range (high - low) by n. If n is 0, a reasonable default is used that takes runtime.GOMAXPROCS(0) into account.

The range reducer is invoked for each batch in its own goroutine, with 0 <= low <= high, and RangeReduceStringSum returns only when all range reducers and pair reducers have terminated.

RangeReduceStringSum panics if high < low, or if n < 0.

If one or more reducer invocations panic, the corresponding goroutines recover the panics, and RangeReduceStringSum eventually panics with the left-most recovered panic value.

func Reduce

func Reduce(
	join func(x, y interface{}) interface{},
	firstFunction func() interface{},
	moreFunctions ...func() interface{},
) interface{}

Reduce receives one or more functions, executes them in parallel, and combines their results with the join function in parallel.

Each function is invoked in its own goroutine, and Reduce returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and Reduce eventually panics with the left-most recovered panic value.

func ReduceFloat64

func ReduceFloat64(
	join func(x, y float64) float64,
	firstFunction func() float64,
	moreFunctions ...func() float64,
) float64

ReduceFloat64 receives one or more functions, executes them in parallel, and combines their results with the join function in parallel.

Each function is invoked in its own goroutine, and ReduceFloat64 returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceFloat64 eventually panics with the left-most recovered panic value.

func ReduceFloat64Product

func ReduceFloat64Product(functions ...func() float64) float64

ReduceFloat64Product receives zero or more functions, executes them in parallel, and multiplies their results in parallel.

Each function is invoked in its own goroutine, and ReduceFloat64Product returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceFloat64Product eventually panics with the left-most recovered panic value.

func ReduceFloat64Sum

func ReduceFloat64Sum(functions ...func() float64) float64

ReduceFloat64Sum receives zero or more functions, executes them in parallel, and adds their results in parallel.

Each function is invoked in its own goroutine, and ReduceFloat64Sum returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceFloat64Sum eventually panics with the left-most recovered panic value.

func ReduceInt

func ReduceInt(
	join func(x, y int) int,
	firstFunction func() int,
	moreFunctions ...func() int,
) int

ReduceInt receives zero or more functions, executes them in parallel, and combines their results with the join function in parallel.

Each function is invoked in its own goroutine, and ReduceInt returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceInt eventually panics with the left-most recovered panic value.

func ReduceIntProduct

func ReduceIntProduct(functions ...func() int) int

ReduceIntProduct receives zero or more functions, executes them in parallel, and multiplies their results in parallel.

Each function is invoked in its own goroutine, and ReduceIntProduct returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceIntProduct eventually panics with the left-most recovered panic value.

func ReduceIntSum

func ReduceIntSum(functions ...func() int) int

ReduceIntSum receives zero or more functions, executes them in parallel, and adds their results in parallel.

Each function is invoked in its own goroutine, and ReduceIntSum returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceIntSum eventually panics with the left-most recovered panic value.

func ReduceString

func ReduceString(
	join func(x, y string) string,
	firstFunction func() string,
	moreFunctions ...func() string,
) string

ReduceString receives zero or more functions, executes them in parallel, and combines their results with the join function in parallel.

Each function is invoked in its own goroutine, and ReduceString returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceString eventually panics with the left-most recovered panic value.

func ReduceStringSum

func ReduceStringSum(functions ...func() string) string

ReduceStringSum receives zero or more functions, executes them in parallel, and concatenates their results in parallel.

Each function is invoked in its own goroutine, and ReduceStringSum returns only when all functions have terminated.

If one or more functions panic, the corresponding goroutines recover the panics, and ReduceStringSum eventually panics with the left-most recovered panic value.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL