Back to

Package conduit

Latest Go to latest

The latest major version is .

Published: Jan 2, 2020 | License: MIT | Module:



In this example we create a simple pipeline to square a list of generated numbers.


package main

import (

func main() {
	// The pipeline is configured to create three workers, where there can be at most ten
	// jobs in queue. As the rate limit is set to 5, only 5 jobs at most can run in one second.
	cfg := conduit.Config{
		MaxJobs:      10,
		MaxWorkers:   3,
		RateLimit:    5,
		OutputBuffer: 5,

	// Create a source to generate 10 numbers onto a channel
	generatorFunc := conduit.GeneratorFunc(func(out chan<- interface{}) {
		for i := 1; i <= 10; i++ {
			out <- i
	numbers := conduit.NewSource(cfg, generatorFunc).Generate()

	// Create a process pipe that squares the incoming input
	squareFunc := conduit.ProcessorFunc(func(in interface{}) (out interface{}) {
		x := in.(int)
		time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
		return x * x
	squares := conduit.NewPipe(cfg, squareFunc).Process(numbers)

	// As a sink, print each incoming input
	printer := conduit.ReceiverFunc(func(in interface{}) {
	done := conduit.NewSink(cfg, printer).Receive(squares)




type Config

type Config struct {
	MaxJobs      int
	MaxWorkers   int
	RateLimit    int
	OutputBuffer int

Config configures the batch job worker

type Generator

type Generator interface {
	// Generate generates data onto the given channel
	Generate(out chan<- interface{})

Generator is an interface for wrapping a generator function

type GeneratorFunc

type GeneratorFunc func(out chan<- interface{})

GeneratorFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, GeneratorFunc(f) is a Generator the calls f.

func (GeneratorFunc) Generate

func (f GeneratorFunc) Generate(out chan<- interface{})

Generate calls f(out)

type Job

type Job struct {
	ID      int
	Payload interface{}

Job is a representation of a batch job

type Pipe

type Pipe interface {
	// Process receives data from an input channel, processes it,
	// and outputs the results onto a channel. The output channel
	// is returned for use.
	Process(in chan interface{}) (out chan interface{})

Pipe represents a process pipe.

func NewPipe

func NewPipe(cfg Config, p Processor) Pipe

NewPipe creates a new pipe with the given config. A Processor argument is given to process the incoming data.

type Processor

type Processor interface {
	// Process receives a given input, processes it, and returns an output
	Process(in interface{}) (out interface{})

Processor is an interface for wrapping a process function

type ProcessorFunc

type ProcessorFunc func(in interface{}) (out interface{})

ProcessorFunc is a type adapter allowing regular functions to be Processor. If f is a function with the appropriate signature, ProcessorFunc(f) is a Processor that calls f.

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(in interface{}) (out interface{})

Process calls f(in) and returns the result.

type Receiver

type Receiver interface {
	// Receive is an input feeder
	Receive(in interface{})

Receiver is an interface for wrapping a receiver function

type ReceiverFunc

type ReceiverFunc func(in interface{})

ReceiverFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, ReceiverFunc(f) is a Receiver the calls f.

func (ReceiverFunc) Receive

func (f ReceiverFunc) Receive(in interface{})

Receive calls f(in).

type Sink

type Sink interface {
	// Receive receives data from an input channel.
	// Once completed, done will send a finished signal.
	Receive(in chan interface{}) (done chan struct{})

Sink represents a data sink

func NewSink

func NewSink(cfg Config, r Receiver) Sink

NewSink creates a new sink with the given config. A Receiver argument is given to handle incoming data.

type Source

type Source interface {
	// Generate generates a data to an output channel.
	Generate() (out chan interface{})

Source represents a data source

func NewSource

func NewSource(cfg Config, g Generator) Source

NewSource creates a new sink with the given config. A Generator argument is used for generating data.

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier