pipeline

package module
v0.0.0-...-4e49541 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2018 License: MIT Imports: 1 Imported by: 2

README

GoDoc Build Status cover.run Go Report Card

pipeline

This package provides a simplistic implementation of Go pipelines as outlined in Go Concurrency Patterns: Pipelines and cancellation.

Docs

GoDoc available here.

Example Usage

import "github.com/hyfather/pipeline"

p := pipeline.New()
p.AddStageWithFanOut(myStage, 10)
p.AddStageWithFanOut(anotherStage, 100)
doneChan := p.Run(inChan)

<- doneChan

More comprehensive examples can be found here.

Documentation

Overview

Package pipeline provides a simplistic implementation of pipelines as outlined in https://blog.golang.org/pipelines

Example
package main

import (
	"fmt"
	"github.com/hyfather/pipeline"
)

func printStage(inObj interface{}) interface{} {
	fmt.Println(inObj)
	return inObj
}

func squareStage(inObj interface{}) interface{} {
	if v, ok := inObj.(int); ok {
		return v * v
	}
	return nil
}

var pipelineChan chan interface{}

func main() {
	p := pipeline.New()
	p.AddStageWithFanOut(squareStage, 1)
	p.AddStage(printStage)

	pipelineChan = make(chan interface{}, 10)
	pipelineChan <- 2
	pipelineChan <- 3
	close(pipelineChan)

	<-p.Run(pipelineChan)
}
Output:

4
9

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func MergeChannels

func MergeChannels(inChans []chan interface{}) (outChan chan interface{})

MergeChannels merges an array of channels into a single channel. This utility function can also be used independently outside of a pipeline.

Example
package main

import (
	"fmt"
	"github.com/hyfather/pipeline"
	"sort"
)

func main() {
	inChan1 := make(chan interface{}, 10)
	inChan2 := make(chan interface{}, 10)

	inChan1 <- 1
	inChan1 <- 3
	inChan1 <- 5
	inChan1 <- 7
	inChan1 <- 9
	close(inChan1)

	inChan2 <- 2
	inChan2 <- 4
	inChan2 <- 6
	inChan2 <- 8
	inChan2 <- 10
	close(inChan2)

	outChan := pipeline.MergeChannels([]chan interface{}{inChan1, inChan2})

	var ints []int
	for e := range outChan {
		ints = append(ints, e.(int))
	}
	sort.Ints(ints)
	fmt.Println(ints)

}
Output:

[1 2 3 4 5 6 7 8 9 10]

Types

type Pipeline

type Pipeline []StageFn

Pipeline type defines a pipeline to which processing "stages" can be added and configured to fan-out. Pipelines are meant to be long running as they continuously process data as it comes in.

A pipeline can be simultaneously run multiple times with different input channels by invoking the Run() method multiple times. A running pipeline shouldn't be copied.

func New

func New() Pipeline

New is a convenience method that creates a new Pipeline

func (*Pipeline) AddRawStage

func (p *Pipeline) AddRawStage(inFunc StageFn)

AddRawStage simply adds a StageFn type to the pipeline without any further processing or parsing. This is meant for extensibility and customizations.

func (*Pipeline) AddStage

func (p *Pipeline) AddStage(inFunc ProcessFn)

AddStage is a convenience method for adding a stage with fanSize = 1. See AddStageWithFanOut for more information.

func (*Pipeline) AddStageWithFanOut

func (p *Pipeline) AddStageWithFanOut(inFunc ProcessFn, fanSize uint64)

AddStageWithFanOut adds a parallel fan-out ProcessFn to the pipeline. The fanSize number indicates how many instances of this stage will read from the previous stage and process the data flowing through simultaneously to take advantage of parallel CPU scheduling.

Most pipelines will have multiple stages, and the order in which AddStage() and AddStageWithFanOut() is invoked matters -- the first invocation indicates the first stage and so forth.

Since discrete goroutines process the inChan for FanOut > 1, the order of objects flowing through the FanOut stages can't be guaranteed.

func (*Pipeline) Run

func (p *Pipeline) Run(inChan <-chan interface{}) (doneChan chan struct{})

Run starts the pipeline with all the stages that have been added. Run is not a blocking function and will return immediately with a doneChan. Consumers can wait on the doneChan for an indication of when the pipeline has completed processing.

The pipeline runs until its `inChan` channel is open. Once the `inChan` is closed, the pipeline stages will sequentially complete from the first stage to the last. Once all stages are complete, the last outChan is drained and the doneChan is closed.

Run() can be invoked multiple times to start multiple instances of a pipeline that will typically process different incoming channels.

type ProcessFn

type ProcessFn func(inObj interface{}) (outObj interface{})

ProcessFn are the primary function types defined by users of this package and passed in to instantiate a meaningful pipeline.

type StageFn

type StageFn func(inChan <-chan interface{}) (outChan chan interface{})

StageFn is a lower level function type that chains together multiple stages using channels.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL