pipe

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: MIT Imports: 1 Imported by: 0

README

Go Reference GitHub go.mod Go version License MIT GitHub tag (latest SemVer) Go Report codecov

pipe

Note Please, support this repository with a ⭐️ if you like the concept and would like to support the developer's motivation

Warning
This module under rapid development

Power of Go channels with io.Pipe usability. Build multithread tools easily.

  • Thread safe
  • io and lo like syntax (Tee, Reduce, Map, etc) but concurrently

TODO

Function Impl Tests Doc Comments Doc Readme
Map
Filter
Split
ForEach
Spread
Join
Merge
Route
Replicate
Reduce
Wait

🔽 Installation

This module powered by GO111MODULE and generics feature. So it supports Go 1.18 and upper.

go get -u github.com/msacore/pipe

🕹 Examples

Welcome example

Open in playground

package main

import (
  "fmt"
  "github.com/msacore/pipe"
)

func main() {
  // Initial inputs
  nums := make(chan int, 4)

  // Generator
  go func() {
    for i := 0; i < 8; i++ {
      nums <- i
    }
    close(nums)
  }()

  // Processor
  filtered := pipe.Filter(func (value int) bool {
    return value % 2 == 0
  }, nums)
  strs := pipe.Map(func(value int) string {
    return fmt.Sprintf("%d", value)
  }, filtered)

  // Consumer
  for str := range strs {
    fmt.Println(str)
  }
}

🏗 Methods

Map

Map

Parallel Sync Sequential Single Same

Take message and convert it into another type by map function. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Usage examples
// input := make(chan int, 4) with random values.
// Say, the input contains [1, 2, 3]

// Parallel strategy
// Best performance (Multiple goroutines)

output := Map(func(value int) string { 
    fmt.Print(value)
    return fmt.Sprintf("val: %d", value) 
}, input)
// stdout: 2 1 3
// output: ["val: 2", "val: 1", "val: 3"] 

// Sync strategy
// Consistent ordering (Multiple goroutines with sequential output)

output := MapSync(func(value int) string { 
    fmt.Print(value)
    return fmt.Sprintf("val: %d", value) 
}, input)
// stdout: 2 1 3
// output: ["val: 1", "val: 2", "val: 3"] 

// Sequential strategy
// Preventing thread race (Single goroutine)

output := MapSequential(func(value int) string { 
    fmt.Print(value)
    return fmt.Sprintf("val: %d", value) 
}, input)
// stdout: 1 2 3
// output: ["val: 1", "val: 2", "val: 3"] 
Filter

Filter

Parallel Sync Sequential Single Same

Take message and forward it if filter function return positive. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Usage examples
// input := make(chan int, 4) with random values.
// Say, the input contains [1, 2, 3, 4]

// Parallel strategy
// Best performance (Multiple goroutines)

output := Filter(func(value int) bool {
  fmt.Print(value)
    return value % 2 == 0
}, input)
// stdout: 4 1 2 3
// output: [4 2]

// Sync strategy
// Consistent ordering (Multiple goroutines with sequential output)

output := FilterSync(func(value int) bool {
  fmt.Print(value)
    return value % 2 == 0
}, input)
// stdout: 4 1 2 3
// output: [2 4]

// Sequential strategy
// Preventing thread race (Single goroutine)

output := FilterSequential(func(value int) bool {
  fmt.Print(value)
    return value % 2 == 0
}, input)
// stdout: 1 2 3 4
// output: [2 4]
Split

Split

Parallel Sync Sequential Single Same

Split takes a number of output channels and input channel, and forwards the input messages to all output channels. There is no guarantee that the message will be sent to the output channels in the sequence in which they are provided. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.

Usage examples
// input := make(chan int, 4) with random values.
// Say, the input contains [1, 2, 3, 4]

// Parallel strategy
// Best performance (Multiple goroutines)

outs := Split(2, input)
// The gaps demonstrate uneven recording in the channels
// outs[0]: [2,    1, 3   ]
// outs[1]: [   1, 3,    2]

// Sync strategy
// Consistent ordering (Multiple goroutines with sequential output)

outs := SplitSync(2, input)
// The gaps demonstrate uneven recording in the channels
// outs[0]: [1,    2, 3   ]
// outs[1]: [   1, 2,    3]

// Sequential strategy
// Preventing thread race (Single goroutine)

outs := SplitSequential(2, input)
// The gaps demonstrate uneven recording in the channels
// outs[0]: [1,    2,    3   ]
// outs[1]: [   1,    2,    3]

// Also we have several shortcut functions like:

out1, out2 := Split2(input)
out1, out2, out3 := Split3(input)
Wait

Here are 3 helper functions that are waiting for the channels to close. Each function blocks current goroutine until channels closing condition won't be done.

Wait(in chan T) chan struct{} - Waits for the input channel is closed and sends a signal to the returned channel.

Usage examples
<-Wait(input1)
select {
  case <-Wait(input2):
  case <-Wait(input3):
}
// Will executed after input1 closed and input2 or input3 closed

WaitAll(in ...chan T) chan struct{} - Waits for all input channels are closed, and sends a signal to the returned channel.

Usage examples
<-WaitAll(input1, input2)
// Will executed after input1 AND input2 closed

// It's equal:
<-Wait(input1)
<-Wait(input2)

WaitAny(in ...chan T) chan struct{} - Waits for one of the input channels are closes, and sends a signal to the returned channel. All other channels are read to the end in the background.

Usage examples
<-WaitAny(input1, input2)
// Will executed after input1 OR input2 closed

// It's equal:
select {
  case <-Wait(input1):
  case <-Wait(input2):
}

⚙ Strategies

Each function has own set of strategies from all categories. It describes how your channel data processing, when channels closing, and how calculate capacity of output channels.

🔄 Processing

Some functions have different channel processing algorithms. To ensure maximum performance, it is recommended to use the original function. However, specific algorithms can help in cases where you are faced with a race of threads or you need to output data strictly in the same order in which you received them.

Parallel

Parallel
Each handler is executed in its own goroutine and there is no guarantee that the output order will be consistent. Recommended for best performance.

Sync

Sync
Each handler executes in its own goroutine, but the result of the youngest goroutine waits for the oldest goroutine to finish before being passed to the output stream. To prevent memory leaks, the strategy will wait if there is more waiting data than the capacity of the output channel. Recommended if you want to get the output data in the same order as the input data.

Sequential

Sequential
Each handler is executed sequentially, one after the other. Keeps the order of the output data equal to the order of the input data. Recommended if it is necessary to exclude the race of threads between handlers.

🔒 Closing

Each function has one of several strategies for closing output channels. Understanding will help you understand how and when your pipeline closes.

Single

Single
Suitable only for functions with one input. If the input channel is closed, then the output channels are closed.

All

All
If all input channels are closed, then the output channels are closed.

Any

Any
If one of the input channels is closed, the output channels are closed. All other channels will be read to the end in the background.

📦 Capacity

Each function creates new output channels with the capacity corresponding to a specific strategy.

Same

Same
Suitable only for functions with one input channel. The output channels will have a capacity equal to the input channel.

Mult

Mult Suitable only for functions with one input channel. The output channels will have a capacity equal to the input channel multiplied by N.

Min

Min
The output channels will have a capacity equal to the minimum capacity of the input channels.

Max

Max
The output channels will have a capacity equal to the maximum capacity of the input channels.

Sum

Sum
The output channels will have a capacity equal to the sum of capacities of the input channels.

=== DRAFT ===

Under Construction
Spread

Warning
This function under construction

Spread

Sequential Single Same

Take next message and forward it to next output channel. If input channel is closed then all output channels are closed. Randomization algorithm is Round Robin or random. Creates new channels with the same capacity as input.

Join

Warning
This function under construction

Join

Sequential All Sum

Take next available message from any input and forward it to output. If all input channels are closed then output channel is closed. Creates new channel with sum of capacities of input channels.

Merge

Warning
This function under construction

Merge

Parallel Sync Sequential Any Min

Take next message from all channels (wait for data) and send new message into output. If one of input channels is closed then output channel is closed. All other input channels will be read till end in background. Creates new channel with minimal capacity of input channels.

Route

Warning
This function under construction

Route

Parallel Sync Sequential Single Same

Take next message from input and forward it to one of output channels by route function. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.

Replicate

Warning
This function under construction

Replicate

Sequential Single Mult

Take next message from input and forward copies to output. If input channel is closed then all output channels are closed. Creates new channel with the same capacity as input multiplied by N.

Reduce

Warning
This function under construction

Reduce

Sequential Single Same

Take several next messages from input and send new message to output. If input channel is closed then all output channels are closed. Creates new channel with the same capacity as input.

Documentation

Overview

Package pipe provides a convenient way to work with Go channels and simple construction of pipelines with a wide range of logic gates.

Processing Strategies

Some functions have different channel processing algorithms. To ensure maximum performance, it is recommended to use the original function. However, specific algorithms can help in cases where you are faced with a race of threads or you need to output data strictly in the same order in which you received them.

  • Parallel - Each handler is executed in its own goroutine and there is no guarantee that the output order will be consistent. Recommended for best performance.
  • Sync - Each handler executes in its own goroutine, but the result of the youngest goroutine waits for the oldest goroutine to finish before being passed to the output stream. To prevent memory leaks, the strategy will wait if there is more waiting data than the capacity of the output channel. Recommended if you want to get the output data in the same order as the input data.
  • Sequential - Each handler is executed sequentially, one after the other. Keeps the order of the output data equal to the order of the input data. Recommended if it is necessary to exclude the race of threads between handlers.

If the input channel capacity is 0 (no bandwidth), then any strategy will act as Sequential behavior.

Closing Strategies

Each function has one of several strategies for closing output channels. Understanding will help you understand how and when your pipeline closes.

  • Single - Suitable only for functions with one input. If the input channel is closed, then the output channels are closed.
  • All - If all input channels are closed, then the output channels are closed.
  • Any - If one of the input channels is closed, the output channels are closed. All other channels will be read to the end in the background.

Capacity Strategies

Each function creates new output channels with the capacity corresponding to a specific strategy.

  • Same - Suitable only for functions with one input channel. The output channels will have a capacity equal to the input channel.
  • Mult - Suitable only for functions with one input channel. The output channels will have a capacity equal to the input channel multiplied by N.
  • Min - The output channels will have a capacity equal to the minimum capacity of the input channels.
  • Max - The output channels will have a capacity equal to the maximum capacity of the input channels.
  • Sum - The output channels will have a capacity equal to the sum of capacities of the input channels.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Filter

func Filter[T any](filter func(T) bool, in <-chan T) <-chan T

Filter takes message and forwards it if filter function return positive. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Strategies

  • Processing: Parallel
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3, 4]

output := Filter(func(value int) bool {
	fmt.Print(value)
    return value % 2 == 0
}, input)

// stdout: 4 1 2 3
// output: [4 2]

func FilterSequential

func FilterSequential[T any](filter func(T) bool, in <-chan T) <-chan T

FilterSequential takes message and forwards it if filter function return positive. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Strategies

  • Processing: Sequential
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3, 4]

output := FilterSequential(func(value int) bool {
	fmt.Print(value)
    return value % 2 == 0
}, input)

// stdout: 1 2 3 4
// output: [2 4]

func FilterSync

func FilterSync[T any](filter func(T) bool, in <-chan T) <-chan T

FilterSync takes message and forwards it if filter function return positive. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Strategies

  • Processing: Sync
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3, 4]

output := FilterSync(func(value int) bool {
	fmt.Print(value)
    return value % 2 == 0
}, input)

// stdout: 4 1 2 3
// output: [2 4]

func Map

func Map[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout

Map takes message and converts it into another type by map function. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Strategies

  • Processing: Parallel
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3]

output := Map(func(value int) string {
    fmt.Print(value)
    return fmt.Sprintf("val: %d", value)
}, input)

// stdout: 2 1 3
// output: ["val: 2", "val: 1", "val: 3"]

func MapSequential

func MapSequential[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout

MapSequential takes message and converts it into another type by map function. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Strategies

  • Processing: Sequential
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3]

output := MapSequential(func(value int) string {
    fmt.Print(value)
    return fmt.Sprintf("val: %d", value)
}, input)

// stdout: 1 2 3
// output: ["val: 1", "val: 2", "val: 3"]

func MapSync

func MapSync[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout

MapSync takes message and converts it into another type by map function. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.

Strategies

  • Processing: Sync
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3]

output := MapSync(func(value int) string {
    fmt.Print(value)
    return fmt.Sprintf("val: %d", value)
}, input)

// stdout: 2 1 3
// output: ["val: 1", "val: 2", "val: 3"]

func Split

func Split[T any](n int, in <-chan T) []<-chan T

Split takes a number of output channels and input channel, and forwards the input messages to all output channels. There is no guarantee that the message will be sent to the output channels in the sequence in which they are provided. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.

Strategies

  • Processing: Parallel
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3]

outs := Split(2, input)

// The gaps demonstrate uneven recording in the channels
// outs[0]: [2,    1, 3   ]
// outs[1]: [   1, 3,    2]

func Split2

func Split2[T any](in <-chan T) (out1, out2 <-chan T)

Split2 - alias for Split

func Split3

func Split3[T any](in <-chan T) (out1, out2, out3 <-chan T)

Split3 - alias for Split

func SplitSequential

func SplitSequential[T any](n int, in <-chan T) []<-chan T

SplitSequential takes a number of output channels and input channel, and forwards the input messages to all output channels. The message will be sent to the output channels in the following sequence. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.

Be aware, if one of the output channels is blocked, then all other output channels will wait.

Strategies

  • Processing: Sequential
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3]

outs := SplitSequential(2, input)

// The gaps demonstrate uneven recording in the channels
// outs[0]: [1,    2,    3   ]
// outs[1]: [   1,    2,    3]

func SplitSequential2

func SplitSequential2[T any](in <-chan T) (out1, out2 <-chan T)

SplitSequential2 - alias for SplitSequential

func SplitSequential3

func SplitSequential3[T any](in <-chan T) (out1, out2, out3 <-chan T)

SplitSequential3 - alias for SplitSequential

func SplitSync

func SplitSync[T any](n int, in <-chan T) []<-chan T

SplitSync takes a number of output channels and input channel, and forwards the input messages to all output channels. There is no guarantee that the message will be sent to the output channels in the sequence in which they are provided. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.

Strategies

  • Processing: Sync
  • Closing: Single
  • Capacity: Same

Usages

// input := make(chan int, 4) with random values [1, 2, 3]

outs := SplitSync(2, input)

// The gaps demonstrate uneven recording in the channels
// outs[0]: [1,    2, 3   ]
// outs[1]: [   1, 2,    3]

func SplitSync2

func SplitSync2[T any](in <-chan T) (out1, out2 <-chan T)

SplitSync2 - alias for SplitSync

func SplitSync3

func SplitSync3[T any](in <-chan T) (out1, out2, out3 <-chan T)

SplitSync3 - alias for SplitSync

func Wait

func Wait[T any](in <-chan T) <-chan struct{}

Wait waits for the input channel to close and sends a signal to the returned channel.

Example

<-Wait(input1)
select {
	case <-Wait(input2):
	case <-Wait(input3):
}
// Will executed after input1 closed and input2 or input3 closed

func WaitAll

func WaitAll[T any](in ...<-chan T) <-chan struct{}

WaitAll waits for all input channels to close and sends a signal to the returned channel.

Example

<-WaitAll(input1, input2)
// Will executed after input1 AND input2 closed

// It's equal:
<-Wait(input1)
<-Wait(input2)

func WaitAny

func WaitAny[T any](in ...<-chan T) <-chan struct{}

WaitAny waits for one of the input channels to close and sends a signal to the returned channel. All other channels are read to the end in the background.

Example

<-WaitAny(input1, input2)
// Will executed after input1 OR input2 closed

// It's equal:
select {
	case <-Wait(input1):
	case <-Wait(input2):
}

Types

type Group

type Group[T any] []chan T

func (Group[T]) As1

func (g Group[T]) As1() (ch1 chan T)

func (Group[T]) As2

func (g Group[T]) As2() (ch1, ch2 chan T)

func (Group[T]) As3

func (g Group[T]) As3() (ch1, ch2, ch3 chan T)

func (Group[T]) Readers

func (g Group[T]) Readers() GroupReaders[T]

func (Group[T]) Writers

func (g Group[T]) Writers() GroupWriters[T]

type GroupReaders

type GroupReaders[T any] []<-chan T

type GroupWriters

type GroupWriters[T any] []chan<- T

Directories

Path Synopsis
Package tests provides helpers to test the module functions.
Package tests provides helpers to test the module functions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL