pipe

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2018 License: MIT Imports: 4 Imported by: 0

README

pipe/s

A pipers bag - generic functions to gain concurrency - batteries included :-)

Software License Go Report Card Build Status GoDoc

    go get -u github.com/GoLangsam/pipe

Please feel free and encouraged to suggest, improve, comment or ask - You'll be welcome!

Overview

  • an evolution:

    • sss a naive approach - as seen in popular slides, talks and blogs.
    • ss a better way to code it
  • the essence

  • toys and tests

  • notes and explanations

    • readme contains further background documentation
  • internals

  • extended

    • xxl demand-driven channel - lazy evaluation
    • xxs supply-driven counter-part
    • xxsl the super luxury version has both: demand- and supply-driven channels

sss - simply super stupid small

ss - still simply small

s - smart & useful - batteries included

Batteries

examples

internal


Think deep - code happy - be simple - see clear :-)

Support on Beerpay

Hey dude! Help me out for a couple of 🍻!

Beerpay Beerpay

Documentation

Index

Constants

View Source
const BufferAnyCAP = 10

BufferAnyCAP is the capacity of the buffered proxy channel in `SendAnyProxy`

View Source
const BufferAnyQUE = 16

BufferAnyQUE is the allocated size of the circular queue in `SendAnyProxy`

Variables

This section is empty.

Functions

func ChanAny

func ChanAny(inp ...Any) (out <-chan Any)

ChanAny returns a channel to receive all inputs before close.

func ChanAnyFuncErr

func ChanAnyFuncErr(gen func() (Any, error)) (out <-chan Any)

ChanAnyFuncErr returns a channel to receive all results of generator `gen` until `err != nil` before close.

func ChanAnyFuncNok

func ChanAnyFuncNok(gen func() (Any, bool)) (out <-chan Any)

ChanAnyFuncNok returns a channel to receive all results of generator `gen` until `!ok` before close.

func ChanAnySlice

func ChanAnySlice(inp ...[]Any) (out <-chan Any)

ChanAnySlice returns a channel to receive all inputs before close.

func DaisyChaiNAny

func DaisyChaiNAny(inp chan Any, somany int,
	procs ...func(into chan<- Any, from <-chan Any),
) (
	out chan Any)

DaisyChaiNAny returns a channel to receive all inp after having passed `somany` times thru the process(es) (`from` right `into` left) before close.

Note: If `somany` is less than 1 or no `tubes` are provided, `out` shall receive elements from `inp` unaltered (as a convenience), thus making null values useful.

Note: DaisyChaiNAny(inp, 1, procs) <==> DaisyChainAny(inp, procs)

func DaisyChainAny

func DaisyChainAny(inp chan Any,
	procs ...func(into chan<- Any, from <-chan Any),
) (
	out chan Any)

DaisyChainAny returns a channel to receive all inp after having passed thru the process(es) (`from` right `into` left) before close.

Note: If no `tubes` are provided, `out` shall receive elements from `inp` unaltered (as a convenience), thus making a null value useful.

func DoneAny

func DoneAny(inp <-chan Any) (done <-chan struct{})

DoneAny returns a channel to receive one signal before close after `inp` has been drained.

func DoneAnyFunc

func DoneAnyFunc(inp <-chan Any, act func(a Any)) (done <-chan struct{})

DoneAnyFunc returns a channel to receive one signal after `act` has been applied to every `inp` before close.

func DoneAnySlice

func DoneAnySlice(inp <-chan Any) (done <-chan []Any)

DoneAnySlice returns a channel to receive a slice with every Any received on `inp` before close.

Note: Unlike DoneAny, DoneAnySlice sends the fully accumulated slice, not just an event, once upon close of inp.

func Fan2Any

func Fan2Any(ori <-chan Any, inp ...Any) (out <-chan Any)

Fan2Any returns a channel to receive everything from the given original channel `ori` as well as all inputs before close.

func Fan2AnyChan

func Fan2AnyChan(ori <-chan Any, inp <-chan Any) (out <-chan Any)

Fan2AnyChan returns a channel to receive everything from the given original channel `ori` as well as from the the input channel `inp` before close. Note: Fan2AnyChan is nothing but FanIn2Any

func Fan2AnyFuncErr

func Fan2AnyFuncErr(ori <-chan Any, gen func() (Any, error)) (out <-chan Any)

Fan2AnyFuncErr returns a channel to receive everything from the given original channel `ori` as well as all results of generator `gen` until `err != nil` before close.

func Fan2AnyFuncNok

func Fan2AnyFuncNok(ori <-chan Any, gen func() (Any, bool)) (out <-chan Any)

Fan2AnyFuncNok returns a channel to receive everything from the given original channel `ori` as well as all results of generator `gen` until `!ok` before close.

func Fan2AnySlice

func Fan2AnySlice(ori <-chan Any, inp ...[]Any) (out <-chan Any)

Fan2AnySlice returns a channel to receive everything from the given original channel `ori` as well as all inputs before close.

func FanAnyOut

func FanAnyOut(inp <-chan Any, size int) (outS [](<-chan Any))

FanAnyOut returns a slice (of size = size) of channels each of which shall receive any inp before close.

func FanAnysIn

func FanAnysIn(inps ...<-chan Any) (out <-chan Any)

FanAnysIn returns a channel to receive all inputs arriving on variadic inps before close.

Ref: https://blog.golang.org/pipelines
Ref: https://github.com/QuentinPerez/go-stuff/channel/Fan-out-Fan-in/main.go

func FanIn2Any

func FanIn2Any(inp1, inp2 <-chan Any) (out <-chan Any)

FanIn2Any returns a channel to receive all to receive all from both `inp1` and `inp2` before close.

func FiniAny

func FiniAny() func(inp <-chan Any) (done <-chan struct{})

FiniAny returns a closure around `DoneAny(_)`.

func FiniAnyFunc

func FiniAnyFunc(act func(a Any)) func(inp <-chan Any) (done <-chan struct{})

FiniAnyFunc returns a closure around `DoneAnyFunc(_, act)`.

func FiniAnySlice

func FiniAnySlice() func(inp <-chan Any) (done <-chan []Any)

FiniAnySlice returns a closure around `DoneAnySlice(_)`.

func ForkAny

func ForkAny(inp <-chan Any) (out1, out2 <-chan Any)

ForkAny returns two channels either of which is to receive every result of inp before close.

func ForkAnySeen

func ForkAnySeen(inp <-chan Any) (new, old <-chan Any)

ForkAnySeen returns two channels, `new` and `old`, where `new` is to receive all `inp` not been seen before and `old` all `inp` seen before (internally growing a `sync.Map` to discriminate) until close.

func ForkAnySeenAttr

func ForkAnySeenAttr(inp <-chan Any, attr func(a Any) interface{}) (new, old <-chan Any)

ForkAnySeenAttr returns two channels, `new` and `old`, where `new` is to receive all `inp` whose attribute `attr` has not been seen before and `old` all `inp` seen before (internally growing a `sync.Map` to discriminate) until close.

func JoinAny

func JoinAny(out chan<- Any, inp ...Any) (done <-chan struct{})

JoinAny sends inputs on the given out channel and returns a done channel to receive one signal when inp has been drained

func JoinAnyChan

func JoinAnyChan(out chan<- Any, inp <-chan Any) (done <-chan struct{})

JoinAnyChan sends inputs on the given out channel and returns a done channel to receive one signal when inp has been drained

func JoinAnySlice

func JoinAnySlice(out chan<- Any, inp ...[]Any) (done <-chan struct{})

JoinAnySlice sends inputs on the given out channel and returns a done channel to receive one signal when inp has been drained

func MakeAnyChan

func MakeAnyChan() (out chan Any)

MakeAnyChan returns a new open channel (simply a 'chan Any' that is). Note: No 'Any-producer' is launched here yet! (as is in all the other functions).

This is useful to easily create corresponding variables such as:

var myAnyPipelineStartsHere := MakeAnyChan() // ... lot's of code to design and build Your favourite "myAnyWorkflowPipeline"

// ...
// ... *before* You start pouring data into it, e.g. simply via:
for drop := range water {

myAnyPipelineStartsHere <- drop

}

close(myAnyPipelineStartsHere)

Hint: especially helpful, if Your piping library operates on some hidden (non-exported) type
(or on a type imported from elsewhere - and You don't want/need or should(!) have to care.)

Note: as always (except for PipeAnyBuffer) the channel is unbuffered.

func MergeAny

func MergeAny(less func(i, j Any) bool, inps ...<-chan Any) (out <-chan Any)

MergeAny returns a channel to receive all inputs sorted and free of duplicates. Each input channel needs to be sorted ascending and free of duplicates. The passed binary boolean function `less` defines the applicable order.

Note: If no inputs are given, a closed channel is returned.

func PairAny

func PairAny(inp <-chan Any) (out1, out2 <-chan Any)

PairAny returns a pair of channels to receive every result of inp before close.

Note: Yes, it is a VERY simple fanout - but sometimes all You need.

func PipeAnyBuffer

func PipeAnyBuffer(inp <-chan Any, cap int) (out <-chan Any)

PipeAnyBuffer returns a buffered channel with capacity `cap` to receive all `inp` before close.

func PipeAnyDone

func PipeAnyDone(inp <-chan Any) (out <-chan Any, done <-chan struct{})

PipeAnyDone returns a channel to receive every `inp` before close and a channel to signal this closing.

func PipeAnyEnter

func PipeAnyEnter(inp <-chan Any, wg AnyWaiter) (out <-chan Any)

PipeAnyEnter returns a channel to receive all `inp` and registers throughput as arrival on the given `sync.WaitGroup` until close.

func PipeAnyFunc

func PipeAnyFunc(inp <-chan Any, act func(a Any) Any) (out <-chan Any)

PipeAnyFunc returns a channel to receive every result of action `act` applied to `inp` before close. Note: it 'could' be PipeAnyMap for functional people, but 'map' has a very different meaning in go lang.

func PipeAnyLeave

func PipeAnyLeave(inp <-chan Any, wg AnyWaiter) (out <-chan Any)

PipeAnyLeave returns a channel to receive all `inp` and registers throughput as departure on the given `sync.WaitGroup` until close.

func PipeAnySeen

func PipeAnySeen(inp <-chan Any) (out <-chan Any)

PipeAnySeen returns a channel to receive all `inp` not been seen before while silently dropping everything seen before (internally growing a `sync.Map` to discriminate) until close. Note: PipeAnyFilterNotSeenYet might be a better name, but is fairly long.

func PipeAnySeenAttr

func PipeAnySeenAttr(inp <-chan Any, attr func(a Any) interface{}) (out <-chan Any)

PipeAnySeenAttr returns a channel to receive all `inp` whose attribute `attr` has not been seen before while silently dropping everything seen before (internally growing a `sync.Map` to discriminate) until close. Note: PipeAnyFilterAttrNotSeenYet might be a better name, but is fairly long.

func PlugAny

func PlugAny(inp <-chan Any, stop <-chan struct{}) (out <-chan Any, done <-chan struct{})

PlugAny returns a channel to receive every `inp` before close and a channel to signal this closing. Upon receipt of a stop signal, output is immediately closed, and for graceful termination any remaining input is drained before done is signalled.

func PlugAnyAfter

func PlugAnyAfter(inp <-chan Any, after <-chan time.Time) (out <-chan Any, done <-chan struct{})

PlugAnyAfter returns a channel to receive every `inp` before close and a channel to signal this closing. Upon receipt of a time signal (e.g. from `time.After(...)`), output is immediately closed, and for graceful termination any remaining input is drained before done is signalled.

func SameAny

func SameAny(same func(a, b Any) bool, inp1, inp2 <-chan Any) (out <-chan bool)

SameAny reads values from two channels in lockstep and iff they have the same contents then `true` is sent on the returned bool channel before close.

func ScatterAny

func ScatterAny(inp <-chan Any, size int) (outS [](<-chan Any))

ScatterAny returns a slice (of size = size) of channels one of which shall receive any inp before close.

func SendAnyProxy

func SendAnyProxy(out chan<- Any) chan<- Any

SendAnyProxy returns a channel to serve as a sending proxy to 'out'. Uses a goroutine to receive values from 'out' and store them in an expanding buffer, so that sending to 'out' never blocks.

Note: the expanding buffer is implemented via "container/ring"

func TubeAnyBuffer

func TubeAnyBuffer(cap int) (tube func(inp <-chan Any) (out <-chan Any))

TubeAnyBuffer returns a closure around PipeAnyBuffer (_, cap).

func TubeAnyEnter

func TubeAnyEnter(wg AnyWaiter) (tube func(inp <-chan Any) (out <-chan Any))

TubeAnyEnter returns a closure around PipeAnyEnter (_, wg) registering throughput on the given `sync.WaitGroup` as arrival.

func TubeAnyFunc

func TubeAnyFunc(act func(a Any) Any) (tube func(inp <-chan Any) (out <-chan Any))

TubeAnyFunc returns a closure around PipeAnyFunc (_, act).

func TubeAnyLeave

func TubeAnyLeave(wg AnyWaiter) (tube func(inp <-chan Any) (out <-chan Any))

TubeAnyLeave returns a closure around PipeAnyLeave (_, wg) registering throughput on the given `sync.WaitGroup` as departure.

func TubeAnySeen

func TubeAnySeen() (tube func(inp <-chan Any) (out <-chan Any))

TubeAnySeen returns a closure around PipeAnySeen() (silently dropping every Any seen before).

func TubeAnySeenAttr

func TubeAnySeenAttr(attr func(a Any) interface{}) (tube func(inp <-chan Any) (out <-chan Any))

TubeAnySeenAttr returns a closure around PipeAnySeenAttr() (silently dropping every Any whose attribute `attr` was seen before).

Types

type Any

type Any generic.Type

Any is the generic type flowing thru the pipe network.

type AnyWaiter

type AnyWaiter interface {
	Add(delta int)
	Done()
}

AnyWaiter - as implemented by `*sync.WaitGroup` - attends Flapdoors and keeps track of how many enter and how many leave.

Use Your provided `*sync.WaitGroup.Wait()` to know when to close the facilities.

Just make sure to have _all_ entrances and exits attended, and don't `wg.Wait()` before You've flooded the facilities.

type ProcAny

type ProcAny func(into chan<- Any, from <-chan Any)

ProcAny is the signature of the inner process of any linear pipe-network

Example: the identity core:

samesame := func(into chan<- Any, from <-chan Any) { into <- <-from } Note: type ProcAny is provided for documentation purpose only. The implementation uses the explicit function signature in order to avoid some genny-related issue.

Note: In https://talks.golang.org/2012/waza.slide#40

Rob Pike uses a ProcAny named `worker`.

Directories

Path Synopsis
examples
httpsyet
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
internal
cmd/bundledotgo command
Bundle creates a single-source-file version of a source package suitable for inclusion in a particular target package.
Bundle creates a single-source-file version of a source package suitable for inclusion in a particular target package.
s
xxl
xxs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL