Back to

Package conduit

v0.0.0 (b32a5b2)
Latest Go to latest
Published: Jan 2, 2020 | License: MIT | Module:



In this example we create a simple pipeline to square a list of generated numbers.


cfg := conduit.Config{
	MaxJobs:      10,
	MaxWorkers:   3,
	RateLimit:    5,
	OutputBuffer: 5,

generatorFunc := conduit.GeneratorFunc(func(out chan<- interface{}) {
	for i := 1; i <= 10; i++ {
		out <- i
numbers := conduit.NewSource(cfg, generatorFunc).Generate()

squareFunc := conduit.ProcessorFunc(func(in interface{}) (out interface{}) {
	x := in.(int)
	time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
	return x * x
squares := conduit.NewPipe(cfg, squareFunc).Process(numbers)

printer := conduit.ReceiverFunc(func(in interface{}) {
done := conduit.NewSink(cfg, printer).Receive(squares)



Package Files

type Config

type Config struct {
	MaxJobs      int
	MaxWorkers   int
	RateLimit    int
	OutputBuffer int

Config configures the batch job worker

type Generator

type Generator interface {
	// Generate generates data onto the given channel
	Generate(out chan<- interface{})

Generator is an interface for wrapping a generator function

type GeneratorFunc

type GeneratorFunc func(out chan<- interface{})

GeneratorFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, GeneratorFunc(f) is a Generator the calls f.

func (GeneratorFunc) Generate

func (f GeneratorFunc) Generate(out chan<- interface{})

Generate calls f(out)

type Job

type Job struct {
	ID      int
	Payload interface{}

Job is a representation of a batch job

type Pipe

type Pipe interface {
	// Process receives data from an input channel, processes it,
	// and outputs the results onto a channel. The output channel
	// is returned for use.
	Process(in chan interface{}) (out chan interface{})

Pipe represents a process pipe.

func NewPipe

func NewPipe(cfg Config, p Processor) Pipe

NewPipe creates a new pipe with the given config. A Processor argument is given to process the incoming data.

type Processor

type Processor interface {
	// Process receives a given input, processes it, and returns an output
	Process(in interface{}) (out interface{})

Processor is an interface for wrapping a process function

type ProcessorFunc

type ProcessorFunc func(in interface{}) (out interface{})

ProcessorFunc is a type adapter allowing regular functions to be Processor. If f is a function with the appropriate signature, ProcessorFunc(f) is a Processor that calls f.

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(in interface{}) (out interface{})

Process calls f(in) and returns the result.

type Receiver

type Receiver interface {
	// Receive is an input feeder
	Receive(in interface{})

Receiver is an interface for wrapping a receiver function

type ReceiverFunc

type ReceiverFunc func(in interface{})

ReceiverFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, ReceiverFunc(f) is a Receiver the calls f.

func (ReceiverFunc) Receive

func (f ReceiverFunc) Receive(in interface{})

Receive calls f(in).

type Sink

type Sink interface {
	// Receive receives data from an input channel.
	// Once completed, done will send a finished signal.
	Receive(in chan interface{}) (done chan struct{})

Sink represents a data sink

func NewSink

func NewSink(cfg Config, r Receiver) Sink

NewSink creates a new sink with the given config. A Receiver argument is given to handle incoming data.

type Source

type Source interface {
	// Generate generates a data to an output channel.
	Generate() (out chan interface{})

Source represents a data source

func NewSource

func NewSource(cfg Config, g Generator) Source

NewSource creates a new sink with the given config. A Generator argument is used for generating data.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier