Back to godoc.org
github.com/nickylogan/conduit

Package conduit

v0.0.0-...-b32a5b2
Latest Go to latest

The latest major version is .

Published: Jan 2, 2020 | License: MIT | Module: github.com/nickylogan/conduit

Overview

Example

In this example we create a simple pipeline to square a list of generated numbers.

Code:

package main

import (
	"fmt"
	"github.com/nickylogan/conduit"
	"math/rand"
	"time"
)

func main() {
	// The pipeline is configured to create three workers, where there can be at most ten
	// jobs in queue. As the rate limit is set to 5, only 5 jobs at most can run in one second.
	cfg := conduit.Config{
		MaxJobs:      10,
		MaxWorkers:   3,
		RateLimit:    5,
		OutputBuffer: 5,
	}

	// Create a source to generate 10 numbers onto a channel
	generatorFunc := conduit.GeneratorFunc(func(out chan<- interface{}) {
		for i := 1; i <= 10; i++ {
			out <- i
		}
	})
	numbers := conduit.NewSource(cfg, generatorFunc).Generate()

	// Create a process pipe that squares the incoming input
	squareFunc := conduit.ProcessorFunc(func(in interface{}) (out interface{}) {
		x := in.(int)
		time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
		return x * x
	})
	squares := conduit.NewPipe(cfg, squareFunc).Process(numbers)

	// As a sink, print each incoming input
	printer := conduit.ReceiverFunc(func(in interface{}) {
		fmt.Println(in)
	})
	done := conduit.NewSink(cfg, printer).Receive(squares)
	<-done

}
1
4
9
16
25
36
49
64
81
100

Index

Examples

type Config

type Config struct {
	MaxJobs      int
	MaxWorkers   int
	RateLimit    int
	OutputBuffer int
}

Config configures the batch job worker

type Generator

type Generator interface {
	// Generate generates data onto the given channel
	Generate(out chan<- interface{})
}

Generator is an interface for wrapping a generator function

type GeneratorFunc

type GeneratorFunc func(out chan<- interface{})

GeneratorFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, GeneratorFunc(f) is a Generator the calls f.

func (GeneratorFunc) Generate

func (f GeneratorFunc) Generate(out chan<- interface{})

Generate calls f(out)

type Job

type Job struct {
	ID      int
	Payload interface{}
}

Job is a representation of a batch job

type Pipe

type Pipe interface {
	// Process receives data from an input channel, processes it,
	// and outputs the results onto a channel. The output channel
	// is returned for use.
	Process(in chan interface{}) (out chan interface{})
}

Pipe represents a process pipe.

func NewPipe

func NewPipe(cfg Config, p Processor) Pipe

NewPipe creates a new pipe with the given config. A Processor argument is given to process the incoming data.

type Processor

type Processor interface {
	// Process receives a given input, processes it, and returns an output
	Process(in interface{}) (out interface{})
}

Processor is an interface for wrapping a process function

type ProcessorFunc

type ProcessorFunc func(in interface{}) (out interface{})

ProcessorFunc is a type adapter allowing regular functions to be Processor. If f is a function with the appropriate signature, ProcessorFunc(f) is a Processor that calls f.

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(in interface{}) (out interface{})

Process calls f(in) and returns the result.

type Receiver

type Receiver interface {
	// Receive is an input feeder
	Receive(in interface{})
}

Receiver is an interface for wrapping a receiver function

type ReceiverFunc

type ReceiverFunc func(in interface{})

ReceiverFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, ReceiverFunc(f) is a Receiver the calls f.

func (ReceiverFunc) Receive

func (f ReceiverFunc) Receive(in interface{})

Receive calls f(in).

type Sink

type Sink interface {
	// Receive receives data from an input channel.
	// Once completed, done will send a finished signal.
	Receive(in chan interface{}) (done chan struct{})
}

Sink represents a data sink

func NewSink

func NewSink(cfg Config, r Receiver) Sink

NewSink creates a new sink with the given config. A Receiver argument is given to handle incoming data.

type Source

type Source interface {
	// Generate generates a data to an output channel.
	Generate() (out chan interface{})
}

Source represents a data source

func NewSource

func NewSource(cfg Config, g Generator) Source

NewSource creates a new sink with the given config. A Generator argument is used for generating data.

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier