conduit

package module
v0.0.0-...-b32a5b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2020 License: MIT Imports: 2 Imported by: 0

README

conduit

GoDoc

Package conduit is a helper for creating pipelines with worker pools with a rate limiter.

Installation

go get github.com/nickylogan/conduit

Example

// The pipeline is configured to create three workers, 
// where there can be at most ten jobs in queue. As the 
// rate limit is set to 5, only 5 jobs at most 
// can run in one second.
cfg := conduit.Config{
    MaxJobs:      10,
    MaxWorkers:   3,
    RateLimit:    5,
    OutputBuffer: 5,
}

// Create a source to generate 10 numbers onto a channel
generatorFunc := conduit.GeneratorFunc(func(out chan<- interface{}) {
    for i := 1; i <= 10; i++ {
        out <- i
    }
})
numbers := conduit.NewSource(cfg, generatorFunc).Generate()

// Create a process pipe that squares the incoming input
squareFunc := conduit.ProcessorFunc(func(in interface{}) (out interface{}) {
    x := in.(int)
    time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
    return x * x
})
squares := conduit.NewPipe(cfg, squareFunc).Process(numbers)

// As a sink, print each incoming input
printer := conduit.ReceiverFunc(func(in interface{}) {
    fmt.Println(in)
})
done := conduit.NewSink(cfg, printer).Receive(squares)
<-done

// Output:
// 1
// 4
// 9
// 16
// 25
// 36
// 49
// 64
// 81
// 100

License

This project is licensed under the MIT License - see the LICENSE file for details

Documentation

Overview

Example

In this example we create a simple pipeline to square a list of generated numbers.

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/nickylogan/conduit"
)

func main() {
	// The pipeline is configured to create three workers, where there can be at most ten
	// jobs in queue. As the rate limit is set to 5, only 5 jobs at most can run in one second.
	cfg := conduit.Config{
		MaxJobs:      10,
		MaxWorkers:   3,
		RateLimit:    5,
		OutputBuffer: 5,
	}

	// Create a source to generate 10 numbers onto a channel
	generatorFunc := conduit.GeneratorFunc(func(out chan<- interface{}) {
		for i := 1; i <= 10; i++ {
			out <- i
		}
	})
	numbers := conduit.NewSource(cfg, generatorFunc).Generate()

	// Create a process pipe that squares the incoming input
	squareFunc := conduit.ProcessorFunc(func(in interface{}) (out interface{}) {
		x := in.(int)
		time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
		return x * x
	})
	squares := conduit.NewPipe(cfg, squareFunc).Process(numbers)

	// As a sink, print each incoming input
	printer := conduit.ReceiverFunc(func(in interface{}) {
		fmt.Println(in)
	})
	done := conduit.NewSink(cfg, printer).Receive(squares)
	<-done

}
Output:

1
4
9
16
25
36
49
64
81
100

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	MaxJobs      int
	MaxWorkers   int
	RateLimit    int
	OutputBuffer int
}

Config configures the batch job worker

type Generator

type Generator interface {
	// Generate generates data onto the given channel
	Generate(out chan<- interface{})
}

Generator is an interface for wrapping a generator function

type GeneratorFunc

type GeneratorFunc func(out chan<- interface{})

GeneratorFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, GeneratorFunc(f) is a Generator the calls f.

func (GeneratorFunc) Generate

func (f GeneratorFunc) Generate(out chan<- interface{})

Generate calls f(out)

type Job

type Job struct {
	ID      int
	Payload interface{}
}

Job is a representation of a batch job

type Pipe

type Pipe interface {
	// Process receives data from an input channel, processes it,
	// and outputs the results onto a channel. The output channel
	// is returned for use.
	Process(in chan interface{}) (out chan interface{})
}

Pipe represents a process pipe.

func NewPipe

func NewPipe(cfg Config, p Processor) Pipe

NewPipe creates a new pipe with the given config. A Processor argument is given to process the incoming data.

type Processor

type Processor interface {
	// Process receives a given input, processes it, and returns an output
	Process(in interface{}) (out interface{})
}

Processor is an interface for wrapping a process function

type ProcessorFunc

type ProcessorFunc func(in interface{}) (out interface{})

ProcessorFunc is a type adapter allowing regular functions to be Processor. If f is a function with the appropriate signature, ProcessorFunc(f) is a Processor that calls f.

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(in interface{}) (out interface{})

Process calls f(in) and returns the result.

type Receiver

type Receiver interface {
	// Receive is an input feeder
	Receive(in interface{})
}

Receiver is an interface for wrapping a receiver function

type ReceiverFunc

type ReceiverFunc func(in interface{})

ReceiverFunc is a type adapter allowing regular functions to be Receiver. If f is a function with the appropriate signature, ReceiverFunc(f) is a Receiver the calls f.

func (ReceiverFunc) Receive

func (f ReceiverFunc) Receive(in interface{})

Receive calls f(in).

type Sink

type Sink interface {
	// Receive receives data from an input channel.
	// Once completed, done will send a finished signal.
	Receive(in chan interface{}) (done chan struct{})
}

Sink represents a data sink

func NewSink

func NewSink(cfg Config, r Receiver) Sink

NewSink creates a new sink with the given config. A Receiver argument is given to handle incoming data.

type Source

type Source interface {
	// Generate generates a data to an output channel.
	Generate() (out chan interface{})
}

Source represents a data source

func NewSource

func NewSource(cfg Config, g Generator) Source

NewSource creates a new sink with the given config. A Generator argument is used for generating data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL