pipeline

package module
v0.0.0-...-999e789 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2016 License: MIT Imports: 4 Imported by: 0

README

pipeline

Go Pipeline for chained operations

Circle CI GoDoc Go Report Card

go get github.com/skidder/pipeline

Overview

Have you ever needed to perform a sequence of distinct operations, possiby in different combinations? Those operations, or pipe segments, could be arranged in a pipeline, with the pipe segments being reusable and possibly ordered differently to compose different pipelines. This library provides a structure for creating Pipe segments and forming them into a reusable Pipeline.

Usage

Pipes

Your pipe must implement the Pipe interface with its Process function whose input is a Data channel and returns a Data channel representing the Pipe segment output:

-> channel Data -> [Process] -> channel Data ->
Pipelines

A Pipeline is constructed from one or more Pipe values:

-> channel Data -> |                                                                             |-> channel Data ->
                   |                                                                             |
                   -> [Process A] -> channel Data -> [Process B] -> channel Data -> [Process C] ->

Examples

See the Pass-through Pipe and Timing Pipe examples in the Go-Docs.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Data

type Data struct {
	Payload interface{}
	// contains filtered or unexported fields
}

Data to be passed between pipe segments composing a pipeline

func (*Data) CreateTempDir

func (d *Data) CreateTempDir(tmpDir string, prefix string) (string, error)

CreateTempDir makes a temporary directory associated tied to this pipeline data

func (*Data) DeleteTempDirs

func (d *Data) DeleteTempDirs()

DeleteTempDirs removes temporary directories created with CreateTempDir

type PassThroughPipe

type PassThroughPipe struct {
}

PassThroughPipe passes the data read from input to output without modification

func (*PassThroughPipe) Process

func (p *PassThroughPipe) Process(in chan Data) chan Data

Process pipe input

type Pipe

type Pipe interface {
	Process(in chan Data) chan Data
}

Pipe is a segment in a pipeline that can process a given map of job attributes

func NewTimingPipe

func NewTimingPipe(timedPipe Pipe, callback func(begin time.Time, duration time.Duration)) Pipe

NewTimingPipe creates a new timing pipe

Example
os.Stdout.Sync()
timingCallbackFoo := func(begin time.Time, duration time.Duration) {
	os.Stdout.WriteString("foo")
}
timingCallbackBar := func(begin time.Time, duration time.Duration) {
	os.Stdout.WriteString("bar")
}

pipes := NewPipeline(
	NewTimingPipe(&PassThroughPipe{}, timingCallbackFoo),
	NewTimingPipe(&PassThroughPipe{}, timingCallbackBar),
)
go func() {
	pipes.Enqueue(Data{})
	pipes.Close()
}()

pipes.Dequeue(func(data Data) {
})
Output:

foobar

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline composed of channels for head & tail

func NewPipeline

func NewPipeline(pipes ...Pipe) Pipeline

NewPipeline returns a new pipeline composed of the set of supplied pipes

Example
pipes := NewPipeline(&PassThroughPipe{}, &PassThroughPipe{}, &PassThroughPipe{})

go func() {
	pipes.Enqueue(Data{Payload: "foo"})
	pipes.Close()
}()

var pipeOutput Data
pipes.Dequeue(func(data Data) {
	pipeOutput = data
})
fmt.Println(pipeOutput.Payload.(string))
Output:

foo

func (*Pipeline) Close

func (p *Pipeline) Close()

Close the pipeline

func (*Pipeline) Dequeue

func (p *Pipeline) Dequeue(handler func(Data))

Dequeue an item from the pipeline

func (*Pipeline) Enqueue

func (p *Pipeline) Enqueue(item Data)

Enqueue an item in the pipeline

type TimingPipe

type TimingPipe struct {
	// contains filtered or unexported fields
}

TimingPipe invokes a custom callback function with the amount of time required to run a specific Pipe

func (*TimingPipe) Process

func (t *TimingPipe) Process(in chan Data) chan Data

Process pipe input

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL