package module
v0.0.0-...-4e49541 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2018 License: MIT Imports: 1 Imported by: 2


GoDoc Build Status cover.run Go Report Card


This package provides a simplistic implementation of Go pipelines as outlined in Go Concurrency Patterns: Pipelines and cancellation.


GoDoc available here.

Example Usage

import "github.com/hyfather/pipeline"

p := pipeline.New()
p.AddStageWithFanOut(myStage, 10)
p.AddStageWithFanOut(anotherStage, 100)
doneChan := p.Run(inChan)

<- doneChan

More comprehensive examples can be found here.



Package pipeline provides a simplistic implementation of pipelines as outlined in https://blog.golang.org/pipelines

package main

import (

func printStage(inObj interface{}) interface{} {
	return inObj

func squareStage(inObj interface{}) interface{} {
	if v, ok := inObj.(int); ok {
		return v * v
	return nil

var pipelineChan chan interface{}

func main() {
	p := pipeline.New()
	p.AddStageWithFanOut(squareStage, 1)

	pipelineChan = make(chan interface{}, 10)
	pipelineChan <- 2
	pipelineChan <- 3






This section is empty.


This section is empty.


func MergeChannels

func MergeChannels(inChans []chan interface{}) (outChan chan interface{})

MergeChannels merges an array of channels into a single channel. This utility function can also be used independently outside of a pipeline.

package main

import (

func main() {
	inChan1 := make(chan interface{}, 10)
	inChan2 := make(chan interface{}, 10)

	inChan1 <- 1
	inChan1 <- 3
	inChan1 <- 5
	inChan1 <- 7
	inChan1 <- 9

	inChan2 <- 2
	inChan2 <- 4
	inChan2 <- 6
	inChan2 <- 8
	inChan2 <- 10

	outChan := pipeline.MergeChannels([]chan interface{}{inChan1, inChan2})

	var ints []int
	for e := range outChan {
		ints = append(ints, e.(int))


[1 2 3 4 5 6 7 8 9 10]


type Pipeline

type Pipeline []StageFn

Pipeline type defines a pipeline to which processing "stages" can be added and configured to fan-out. Pipelines are meant to be long running as they continuously process data as it comes in.

A pipeline can be simultaneously run multiple times with different input channels by invoking the Run() method multiple times. A running pipeline shouldn't be copied.

func New

func New() Pipeline

New is a convenience method that creates a new Pipeline

func (*Pipeline) AddRawStage

func (p *Pipeline) AddRawStage(inFunc StageFn)

AddRawStage simply adds a StageFn type to the pipeline without any further processing or parsing. This is meant for extensibility and customizations.

func (*Pipeline) AddStage

func (p *Pipeline) AddStage(inFunc ProcessFn)

AddStage is a convenience method for adding a stage with fanSize = 1. See AddStageWithFanOut for more information.

func (*Pipeline) AddStageWithFanOut

func (p *Pipeline) AddStageWithFanOut(inFunc ProcessFn, fanSize uint64)

AddStageWithFanOut adds a parallel fan-out ProcessFn to the pipeline. The fanSize number indicates how many instances of this stage will read from the previous stage and process the data flowing through simultaneously to take advantage of parallel CPU scheduling.

Most pipelines will have multiple stages, and the order in which AddStage() and AddStageWithFanOut() is invoked matters -- the first invocation indicates the first stage and so forth.

Since discrete goroutines process the inChan for FanOut > 1, the order of objects flowing through the FanOut stages can't be guaranteed.

func (*Pipeline) Run

func (p *Pipeline) Run(inChan <-chan interface{}) (doneChan chan struct{})

Run starts the pipeline with all the stages that have been added. Run is not a blocking function and will return immediately with a doneChan. Consumers can wait on the doneChan for an indication of when the pipeline has completed processing.

The pipeline runs until its `inChan` channel is open. Once the `inChan` is closed, the pipeline stages will sequentially complete from the first stage to the last. Once all stages are complete, the last outChan is drained and the doneChan is closed.

Run() can be invoked multiple times to start multiple instances of a pipeline that will typically process different incoming channels.

type ProcessFn

type ProcessFn func(inObj interface{}) (outObj interface{})

ProcessFn are the primary function types defined by users of this package and passed in to instantiate a meaningful pipeline.

type StageFn

type StageFn func(inChan <-chan interface{}) (outChan chan interface{})

StageFn is a lower level function type that chains together multiple stages using channels.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL