parallel

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2025 License: Apache-2.0 Imports: 3 Imported by: 0

README

go-parallel

parallelism in Go using generics

Parallel functions

Higher level

  • MapBatches - a batched parallel map
  • BatchWorkers - spawn parallel workers to work batches
  • ArrayWorkers1 - unbatched parallel map

Lower Level

  • QueueWorkers - spawn parallel workers
  • BatchedChannel - for batching data into a channel

Error handling

This library relies on go-recovery to trap panics that occur in user supplied work functions. This library does have unhandled panics, but only in places where panics should never occur. Errors and panics are written to an error channel for maximum flexibility. There are helpers for common patterns for dealing with errors:

  • CollectErrors (wait and convert to a slice)
  • CancelAfterFirstError (cancel and wait and convert to a slice)

Concurrency helpers

See go-concurrent, which this library uses.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ArrayWorkers1

func ArrayWorkers1[T any](nParallel int, objects []T, cancel <-chan struct{}, fn func(int, T) error) <-chan error

Just operate on one object at a time Processing continues until completion or a value is read from the cancel channel Returns a channel of errors Uses QueueWorkers under the hood

func BatchWorkers added in v0.3.0

func BatchWorkers[T any](bw BatchWork, objects []T, worker func([]T) error) []error

BatchWorkers combines BatchedChannel, QueueWorkers, and CancelAfterFirstError The given objects are batched up and worked in parallel

func BatchedChannel

func BatchedChannel[T any](bw BatchWork, objects []T) (<-chan []T, <-chan error)

BatchedChannel sends slices of Batchwork.Size objects to the resulting channel. The error channel should not have an error but is there in case there is a panic in the batching go routine.

func CancelAfterFirstError

func CancelAfterFirstError(cancel chan struct{}, errChannel <-chan error) []error

For functions that take a cancel channel and return an error channel. Attempt to cancel all processing but wait for it to finish.

This helper will trigger the cancel channel after the first error. It then waits for the error channel to be closed.

errors are returned as []error, a slice of errors If there are no errors, the slice will be nil To combine the errors as a single error, use errors.Join

func CollectErrors

func CollectErrors(errChannel <-chan error) []error

Wait for all errors from a channel of errors. errors are returned as []error, a slice of errors If there are no errors, the slice will be nil To combine the errors as a single error, use errors.Join

func MapBatches

func MapBatches[T any, U any](nParallel int, objects []T, fn func(T) (U, error)) ([]U, []error)

func QueueWorkers

func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error

spawn N parallel workers that apply fn to T. Any panics in the given fn are recovered and returned as errors. Work is taken out of the given queue and given to the first available worker. Once all workers are full, reflect.Select is used to select the next worker. Close the given queue to shutdown the workers.

Workers will continue to work after encountering an error. Errors are sent to the returned error channel. When the given queue is closed and the work is processed, the returned error channel will be closed.

Types

type BatchWork

type BatchWork struct {
	Size        int
	Parallelism int
	Cancel      chan struct{}
}

func (*BatchWork) AdjustForSmallLength

func (bw *BatchWork) AdjustForSmallLength(total int) int

If the length is too small, decrease the batch size

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL