parallel

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2021 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Overview

Package parallel extends the command-pipeline core with concurrency steps.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFanOutStep

func NewFanOutStep(name string, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step

NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines. The function provided as PipelineSupplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished. If the given ResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful. The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline. However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.

Example
p := pipeline.NewPipeline()
fanout := NewFanOutStep("fanout", func(pipelines chan *pipeline.Pipeline) {
	defer close(pipelines)
	// create some pipelines
	for i := 0; i < 3; i++ {
		n := i
		pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ pipeline.Context) pipeline.Result {
			time.Sleep(time.Duration(n * 10000000)) // fake some load
			fmt.Println(fmt.Sprintf("I am worker %d", n))
			return pipeline.Result{}
		}))
	}
}, func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
	for worker, result := range results {
		if result.IsFailed() {
			fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err))
		}
	}
	return pipeline.Result{}
})
p.AddStep(fanout)
p.Run()
Output:

I am worker 0
I am worker 1
I am worker 2

func NewWorkerPoolStep

func NewWorkerPoolStep(name string, size int, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step

NewWorkerPoolStep creates a pipeline step that runs nested pipelines in a thread pool. The function provided as PipelineSupplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished.

  • If the given ResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
  • The pipelines are executed in a pool of a number of Go routines indicated by size.
  • If size is 1, the pipelines are effectively run in sequence.
  • If size is 0 or less, the function panics.

The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline. However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.

Example
p := pipeline.NewPipeline()
pool := NewWorkerPoolStep("pool", 2, func(pipelines chan *pipeline.Pipeline) {
	defer close(pipelines)
	// create some pipelines
	for i := 0; i < 3; i++ {
		n := i
		pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ pipeline.Context) pipeline.Result {
			time.Sleep(time.Duration(n * 100000000)) // fake some load
			fmt.Println(fmt.Sprintf("This is job item %d", n))
			return pipeline.Result{}
		}))
	}
}, func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
	for jobIndex, result := range results {
		if result.IsFailed() {
			fmt.Println(fmt.Sprintf("Job %d failed: %v", jobIndex, result.Err))
		}
	}
	return pipeline.Result{}
})
p.AddStep(pool)
p.Run()
Output:

This is job item 0
This is job item 1
This is job item 2

Types

type PipelineSupplier

type PipelineSupplier func(chan *pipeline.Pipeline)

PipelineSupplier is a function that spawns pipeline.Pipeline for consumption. The function must close the channel once all pipelines are spawned (`defer close()` recommended).

type ResultHandler

type ResultHandler func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result

ResultHandler is a callback that provides a result map and expect a single, combined pipeline.Result object. The map key is a zero-based index of n-th pipeline.Pipeline spawned, e.g. pipeline number 3 will have index 2. Context may be nil. Return an empty pipeline.Result if you want to ignore errors, or reduce multiple errors into a single one to make the parent pipeline fail.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL