Documentation
¶
Overview ¶
Package parallel extends the command-pipeline core with concurrency steps.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewFanOutStep ¶
func NewFanOutStep(name string, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step
NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines. The function provided as PipelineSupplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished. If the given ResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful. The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline. However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.
Example ¶
p := pipeline.NewPipeline()
fanout := NewFanOutStep("fanout", func(pipelines chan *pipeline.Pipeline) {
defer close(pipelines)
// create some pipelines
for i := 0; i < 3; i++ {
n := i
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ pipeline.Context) pipeline.Result {
time.Sleep(time.Duration(n * 10000000)) // fake some load
fmt.Println(fmt.Sprintf("I am worker %d", n))
return pipeline.Result{}
}))
}
}, func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
for worker, result := range results {
if result.IsFailed() {
fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err))
}
}
return pipeline.Result{}
})
p.AddStep(fanout)
p.Run()
Output: I am worker 0 I am worker 1 I am worker 2
func NewWorkerPoolStep ¶
func NewWorkerPoolStep(name string, size int, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step
NewWorkerPoolStep creates a pipeline step that runs nested pipelines in a thread pool. The function provided as PipelineSupplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished.
- If the given ResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
- The pipelines are executed in a pool of a number of Go routines indicated by size.
- If size is 1, the pipelines are effectively run in sequence.
- If size is 0 or less, the function panics.
The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline. However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.
Example ¶
p := pipeline.NewPipeline()
pool := NewWorkerPoolStep("pool", 2, func(pipelines chan *pipeline.Pipeline) {
defer close(pipelines)
// create some pipelines
for i := 0; i < 3; i++ {
n := i
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ pipeline.Context) pipeline.Result {
time.Sleep(time.Duration(n * 100000000)) // fake some load
fmt.Println(fmt.Sprintf("This is job item %d", n))
return pipeline.Result{}
}))
}
}, func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
for jobIndex, result := range results {
if result.IsFailed() {
fmt.Println(fmt.Sprintf("Job %d failed: %v", jobIndex, result.Err))
}
}
return pipeline.Result{}
})
p.AddStep(pool)
p.Run()
Output: This is job item 0 This is job item 1 This is job item 2
Types ¶
type PipelineSupplier ¶
PipelineSupplier is a function that spawns pipeline.Pipeline for consumption. The function must close the channel once all pipelines are spawned (`defer close()` recommended).
type ResultHandler ¶
ResultHandler is a callback that provides a result map and expect a single, combined pipeline.Result object. The map key is a zero-based index of n-th pipeline.Pipeline spawned, e.g. pipeline number 3 will have index 2. Context may be nil. Return an empty pipeline.Result if you want to ignore errors, or reduce multiple errors into a single one to make the parent pipeline fail.