Pipeline

Purpose
This project was inspired by The Go Blog and Pipeline Patterns in Go by Claudio Fahey. We wanted to build a library that we could use "seed" our pipeline, set up as many "stages" as we wanted, than add a "sink" to manage our results. We wanted a library that would manage linking our stages together and manage the lifecycle of our channels and goroutines.
Features that were important to us are:
- Cancelation - cancel at any stage all routines will stop
- Error handling - if an error is reportag in any pipeline everything will stop
- Splitting
- Merging
Usage
import (
"github.com/kazzcade/pipeline"
)
Create a pipeline
You create a pipeline by calling pipeline.New() passing in a function with the signature
func(context.Context) (<-chan interface{}, func() error)
Simple example pipeline seeded with 10 numbers
seed := func(ctx context.Context, in <-chan interface{}) (<-chan interface{}, func() error) {
out := MakeGenericChannel()
return out, func() error {
defer func() {
close(out)
}()
for i := range in {
value, valueError := intercept(i.(int))
if valueError != nil {
return valueError
}
select {
case <-ctx.Done():
return nil
case out <- value:
}
}
return nil
}
}
pipeline := pipeline.new(seed)
Something a bit more complicated
We read input data and translate it
seedFactory := func(in *os.File) pipeline.Seed {
reader := bufio.NewReader(in)
fmt.Println("Simple Shell")
fmt.Println("---------------------")
return func(ctx context.Context) (<-chan interface{}, func() error) {
out := pipeline.MakeGenericChannel()
return out, func() error {
defer close(out)
for {
fmt.Print("-> ")
select {
// return when canceled
case <-ctx.Done():
default:
text, _ := reader.ReadString('\n')
// convert CRLF to LF
out <- strings.Replace(text, "\n", "", -1)
}
}
return nil
}
}
}
p := pipeline.New(context.Background(), seedFactory(os.Stdin))
translator := func(ctx context.Context, in <-chan interface{}) (<-chan interface{}, func() error) {
out := p.MakeGenericChannel()
return out, func() error {
// clean up when done
defer close(out)
for i := range in {
select {
case <- ctx.Done():
return nil
default:
// example google translate not working code
translation, err := google.translate(i.(string))
if (err != nil) {
// stops our pipeline
return err
}
// send the output to our next stage
out <- translation
}
}
return nil
}
}
p.Stage(translator)
sink := func(ctx context.Context, in <-chan interface{}) (interface{}, error) {
transcript := ""
for i := range in {
select {
case <-ctx.Done():
return transcript, nil
default:
transcript = transcript + i.(string) + "\n"
fmt.Println(i)
}
}
return transcript, nil
}
p.Sink(sink)
Pipeline provided two methods of creating a "generic channel" which is simply chan interface{}
.
- pipeline.MakeGenericChannel - this will create an unbuffered channel
- p.MakeGenericChannel where p is an instance of Pipeline. When using the instance method of MakeGenericChannel we will create a buffered channel with the buffer amount defined by the number of stages. If we have 3 stages we would have seed->{}->{}{}->{}{}{}. Since we have three stages we can seed up to three items before our final stage is full and two items before our second and one before our first. Our pipeline buffer will not block until all buffers in all stages are full, 6 items total, given there are no consumers.
If the production does not need to be controlled by the consumption it is recommended that you use the instance method of MakeGenericChannel as your code will not block for long.
On a 2017 MacBook Pro i7 16GB ram pushing 1000 integers through 1000 stages for 100 pipelines
With instance method MakeGenericChannel
real 0m5.073s
user 0m36.481s
sys 0m0.902s
With static method MakeGenericChannel
real 0m11.651s
user 1m28.406s
sys 0m0.539s