Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Data ¶
type Data struct { Payload interface{} // contains filtered or unexported fields }
Data to be passed between pipe segments composing a pipeline
func (*Data) CreateTempDir ¶
CreateTempDir makes a temporary directory associated tied to this pipeline data
func (*Data) DeleteTempDirs ¶
func (d *Data) DeleteTempDirs()
DeleteTempDirs removes temporary directories created with CreateTempDir
type PassThroughPipe ¶
type PassThroughPipe struct { }
PassThroughPipe passes the data read from input to output without modification
func (*PassThroughPipe) Process ¶
func (p *PassThroughPipe) Process(in chan Data) chan Data
Process pipe input
type Pipe ¶
Pipe is a segment in a pipeline that can process a given map of job attributes
func NewTimingPipe ¶
NewTimingPipe creates a new timing pipe
Example ¶
os.Stdout.Sync() timingCallbackFoo := func(begin time.Time, duration time.Duration) { os.Stdout.WriteString("foo") } timingCallbackBar := func(begin time.Time, duration time.Duration) { os.Stdout.WriteString("bar") } pipes := NewPipeline( NewTimingPipe(&PassThroughPipe{}, timingCallbackFoo), NewTimingPipe(&PassThroughPipe{}, timingCallbackBar), ) go func() { pipes.Enqueue(Data{}) pipes.Close() }() pipes.Dequeue(func(data Data) { })
Output: foobar
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline composed of channels for head & tail
func NewPipeline ¶
NewPipeline returns a new pipeline composed of the set of supplied pipes
Example ¶
pipes := NewPipeline(&PassThroughPipe{}, &PassThroughPipe{}, &PassThroughPipe{}) go func() { pipes.Enqueue(Data{Payload: "foo"}) pipes.Close() }() var pipeOutput Data pipes.Dequeue(func(data Data) { pipeOutput = data }) fmt.Println(pipeOutput.Payload.(string))
Output: foo
type TimingPipe ¶
type TimingPipe struct {
// contains filtered or unexported fields
}
TimingPipe invokes a custom callback function with the amount of time required to run a specific Pipe
func (*TimingPipe) Process ¶
func (t *TimingPipe) Process(in chan Data) chan Data
Process pipe input