Documentation ¶
Overview ¶
Example ¶
In this example we create a simple pipeline to square a list of generated numbers.
package main import ( "fmt" "math/rand" "time" "github.com/nickylogan/conduit" ) func main() { // The pipeline is configured to create three workers, where there can be at most ten // jobs in queue. As the rate limit is set to 5, only 5 jobs at most can run in one second. cfg := conduit.Config{ MaxJobs: 10, MaxWorkers: 3, RateLimit: 5, OutputBuffer: 5, } // Create a source to generate 10 numbers onto a channel generatorFunc := conduit.GeneratorFunc(func(out chan<- interface{}) { for i := 1; i <= 10; i++ { out <- i } }) numbers := conduit.NewSource(cfg, generatorFunc).Generate() // Create a process pipe that squares the incoming input squareFunc := conduit.ProcessorFunc(func(in interface{}) (out interface{}) { x := in.(int) time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond) return x * x }) squares := conduit.NewPipe(cfg, squareFunc).Process(numbers) // As a sink, print each incoming input printer := conduit.ReceiverFunc(func(in interface{}) { fmt.Println(in) }) done := conduit.NewSink(cfg, printer).Receive(squares) <-done }
Output: 1 4 9 16 25 36 49 64 81 100
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Generator ¶
type Generator interface {
// Generate generates data onto the given channel
Generate(out chan<- interface{})
}
Generator is an interface for wrapping a generator function
type GeneratorFunc ¶
type GeneratorFunc func(out chan<- interface{})
GeneratorFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, GeneratorFunc(f) is a Generator the calls f.
func (GeneratorFunc) Generate ¶
func (f GeneratorFunc) Generate(out chan<- interface{})
Generate calls f(out)
type Pipe ¶
type Pipe interface { // Process receives data from an input channel, processes it, // and outputs the results onto a channel. The output channel // is returned for use. Process(in chan interface{}) (out chan interface{}) }
Pipe represents a process pipe.
type Processor ¶
type Processor interface {
// Process receives a given input, processes it, and returns an output
Process(in interface{}) (out interface{})
}
Processor is an interface for wrapping a process function
type ProcessorFunc ¶
type ProcessorFunc func(in interface{}) (out interface{})
ProcessorFunc is a type adapter allowing regular functions to be Processor. If f is a function with the appropriate signature, ProcessorFunc(f) is a Processor that calls f.
func (ProcessorFunc) Process ¶
func (f ProcessorFunc) Process(in interface{}) (out interface{})
Process calls f(in) and returns the result.
type Receiver ¶
type Receiver interface {
// Receive is an input feeder
Receive(in interface{})
}
Receiver is an interface for wrapping a receiver function
type ReceiverFunc ¶
type ReceiverFunc func(in interface{})
ReceiverFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, ReceiverFunc(f) is a Receiver the calls f.
type Sink ¶
type Sink interface { // Receive receives data from an input channel. // Once completed, done will send a finished signal. Receive(in chan interface{}) (done chan struct{}) }
Sink represents a data sink