Documentation ¶
Overview ¶
Package pipe provides a convenient way to work with Go channels and simple construction of pipelines with a wide range of logic gates.
Processing Strategies ¶
Some functions have different channel processing algorithms. To ensure maximum performance, it is recommended to use the original function. However, specific algorithms can help in cases where you are faced with a race of threads or you need to output data strictly in the same order in which you received them.
- Parallel - Each handler is executed in its own goroutine and there is no guarantee that the output order will be consistent. Recommended for best performance.
- Sync - Each handler executes in its own goroutine, but the result of the youngest goroutine waits for the oldest goroutine to finish before being passed to the output stream. To prevent memory leaks, the strategy will wait if there is more waiting data than the capacity of the output channel. Recommended if you want to get the output data in the same order as the input data.
- Sequential - Each handler is executed sequentially, one after the other. Keeps the order of the output data equal to the order of the input data. Recommended if it is necessary to exclude the race of threads between handlers.
If the input channel capacity is 0 (no bandwidth), then any strategy will act as Sequential behavior.
Closing Strategies ¶
Each function has one of several strategies for closing output channels. Understanding will help you understand how and when your pipeline closes.
- Single - Suitable only for functions with one input. If the input channel is closed, then the output channels are closed.
- All - If all input channels are closed, then the output channels are closed.
- Any - If one of the input channels is closed, the output channels are closed. All other channels will be read to the end in the background.
Capacity Strategies ¶
Each function creates new output channels with the capacity corresponding to a specific strategy.
- Same - Suitable only for functions with one input channel. The output channels will have a capacity equal to the input channel.
- Mult - Suitable only for functions with one input channel. The output channels will have a capacity equal to the input channel multiplied by N.
- Min - The output channels will have a capacity equal to the minimum capacity of the input channels.
- Max - The output channels will have a capacity equal to the maximum capacity of the input channels.
- Sum - The output channels will have a capacity equal to the sum of capacities of the input channels.
Index ¶
- func Filter[T any](filter func(T) bool, in <-chan T) <-chan T
- func FilterSequential[T any](filter func(T) bool, in <-chan T) <-chan T
- func FilterSync[T any](filter func(T) bool, in <-chan T) <-chan T
- func Map[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout
- func MapSequential[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout
- func MapSync[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout
- func Split[T any](n int, in <-chan T) []<-chan T
- func Split2[T any](in <-chan T) (out1, out2 <-chan T)
- func Split3[T any](in <-chan T) (out1, out2, out3 <-chan T)
- func SplitSequential[T any](n int, in <-chan T) []<-chan T
- func SplitSequential2[T any](in <-chan T) (out1, out2 <-chan T)
- func SplitSequential3[T any](in <-chan T) (out1, out2, out3 <-chan T)
- func SplitSync[T any](n int, in <-chan T) []<-chan T
- func SplitSync2[T any](in <-chan T) (out1, out2 <-chan T)
- func SplitSync3[T any](in <-chan T) (out1, out2, out3 <-chan T)
- func Wait[T any](in <-chan T) <-chan struct{}
- func WaitAll[T any](in ...<-chan T) <-chan struct{}
- func WaitAny[T any](in ...<-chan T) <-chan struct{}
- type Group
- type GroupReaders
- type GroupWriters
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Filter ¶
Filter takes message and forwards it if filter function return positive. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.
Strategies ¶
- Processing: Parallel
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3, 4] output := Filter(func(value int) bool { fmt.Print(value) return value % 2 == 0 }, input) // stdout: 4 1 2 3 // output: [4 2]
func FilterSequential ¶
FilterSequential takes message and forwards it if filter function return positive. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.
Strategies ¶
- Processing: Sequential
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3, 4] output := FilterSequential(func(value int) bool { fmt.Print(value) return value % 2 == 0 }, input) // stdout: 1 2 3 4 // output: [2 4]
func FilterSync ¶
FilterSync takes message and forwards it if filter function return positive. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.
Strategies ¶
- Processing: Sync
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3, 4] output := FilterSync(func(value int) bool { fmt.Print(value) return value % 2 == 0 }, input) // stdout: 4 1 2 3 // output: [2 4]
func Map ¶
func Map[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout
Map takes message and converts it into another type by map function. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.
Strategies ¶
- Processing: Parallel
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3] output := Map(func(value int) string { fmt.Print(value) return fmt.Sprintf("val: %d", value) }, input) // stdout: 2 1 3 // output: ["val: 2", "val: 1", "val: 3"]
func MapSequential ¶
func MapSequential[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout
MapSequential takes message and converts it into another type by map function. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.
Strategies ¶
- Processing: Sequential
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3] output := MapSequential(func(value int) string { fmt.Print(value) return fmt.Sprintf("val: %d", value) }, input) // stdout: 1 2 3 // output: ["val: 1", "val: 2", "val: 3"]
func MapSync ¶
func MapSync[Tin, Tout any](mapper func(Tin) Tout, in <-chan Tin) <-chan Tout
MapSync takes message and converts it into another type by map function. If input channel is closed then output channel is closed. Creates a new channel with the same capacity as input.
Strategies ¶
- Processing: Sync
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3] output := MapSync(func(value int) string { fmt.Print(value) return fmt.Sprintf("val: %d", value) }, input) // stdout: 2 1 3 // output: ["val: 1", "val: 2", "val: 3"]
func Split ¶
Split takes a number of output channels and input channel, and forwards the input messages to all output channels. There is no guarantee that the message will be sent to the output channels in the sequence in which they are provided. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.
Strategies ¶
- Processing: Parallel
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3] outs := Split(2, input) // The gaps demonstrate uneven recording in the channels // outs[0]: [2, 1, 3 ] // outs[1]: [ 1, 3, 2]
func SplitSequential ¶
SplitSequential takes a number of output channels and input channel, and forwards the input messages to all output channels. The message will be sent to the output channels in the following sequence. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.
Be aware, if one of the output channels is blocked, then all other output channels will wait.
Strategies ¶
- Processing: Sequential
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3] outs := SplitSequential(2, input) // The gaps demonstrate uneven recording in the channels // outs[0]: [1, 2, 3 ] // outs[1]: [ 1, 2, 3]
func SplitSequential2 ¶
func SplitSequential2[T any](in <-chan T) (out1, out2 <-chan T)
SplitSequential2 - alias for SplitSequential
func SplitSequential3 ¶
func SplitSequential3[T any](in <-chan T) (out1, out2, out3 <-chan T)
SplitSequential3 - alias for SplitSequential
func SplitSync ¶
SplitSync takes a number of output channels and input channel, and forwards the input messages to all output channels. There is no guarantee that the message will be sent to the output channels in the sequence in which they are provided. If input channel is closed then all output channels are closed. Creates new channels with the same capacity as input.
Strategies ¶
- Processing: Sync
- Closing: Single
- Capacity: Same
Usages ¶
// input := make(chan int, 4) with random values [1, 2, 3] outs := SplitSync(2, input) // The gaps demonstrate uneven recording in the channels // outs[0]: [1, 2, 3 ] // outs[1]: [ 1, 2, 3]
func SplitSync2 ¶
func SplitSync2[T any](in <-chan T) (out1, out2 <-chan T)
SplitSync2 - alias for SplitSync
func SplitSync3 ¶
func SplitSync3[T any](in <-chan T) (out1, out2, out3 <-chan T)
SplitSync3 - alias for SplitSync
func Wait ¶
func Wait[T any](in <-chan T) <-chan struct{}
Wait waits for the input channel to close and sends a signal to the returned channel.
Example ¶
<-Wait(input1) select { case <-Wait(input2): case <-Wait(input3): } // Will executed after input1 closed and input2 or input3 closed
func WaitAll ¶
func WaitAll[T any](in ...<-chan T) <-chan struct{}
WaitAll waits for all input channels to close and sends a signal to the returned channel.
Example ¶
<-WaitAll(input1, input2) // Will executed after input1 AND input2 closed // It's equal: <-Wait(input1) <-Wait(input2)
func WaitAny ¶
func WaitAny[T any](in ...<-chan T) <-chan struct{}
WaitAny waits for one of the input channels to close and sends a signal to the returned channel. All other channels are read to the end in the background.
Example ¶
<-WaitAny(input1, input2) // Will executed after input1 OR input2 closed // It's equal: select { case <-Wait(input1): case <-Wait(input2): }
Types ¶
type Group ¶
type Group[T any] []chan T
func (Group[T]) Readers ¶
func (g Group[T]) Readers() GroupReaders[T]
func (Group[T]) Writers ¶
func (g Group[T]) Writers() GroupWriters[T]
type GroupReaders ¶
type GroupReaders[T any] []<-chan T
type GroupWriters ¶
type GroupWriters[T any] []chan<- T