chanz

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 3 Imported by: 4

README

chanz

Generic channel pipelines and utilities for Go

The chanz package provides 50+ functions for building channel pipelines, fan-in/fan-out patterns, and concurrent data processing. All functions support functional options for configuration.

Quick Reference

By Category:

Installation

go get github.com/modfin/henry/chanz

Usage

import "github.com/modfin/henry/chanz"

// Create a pipeline
input := chanz.Generate(1, 2, 3, 4, 5)
doubled := chanz.Map(input, func(n int) int { return n * 2 })
result := chanz.Collect(doubled)
// result = []int{2, 4, 6, 8, 10}

Configuration Options

Most functions accept options:

// Buffer size
ch := chanz.GenerateWith[int](chanz.OpBuffer(100))(1, 2, 3, 4, 5)

// Context cancellation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ch := chanz.Map(input, transform, chanz.OpContext(ctx))

// Done channel
done := make(chan struct{})
ch := chanz.Map(input, transform, chanz.OpDone(done))

Function Categories

Generation

Create channels from data or generators.

Generate

Create channel from elements.

ch := chanz.Generate(1, 2, 3, 4, 5)
result := chanz.Collect(ch)
// result = []int{1, 2, 3, 4, 5}
GenerateWith

Configured generator.

// With 10-element buffer
gen := chanz.GenerateWith[int](chanz.OpBuffer(10))
ch := gen(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Generator

Create channel from generator function.

// Generate Fibonacci numbers
fib := chanz.Generator(func(yield func(int)) {
    a, b := 0, 1
    for i := 0; i < 10; i++ {
        yield(a)
        a, b = b, a+b
    }
})
result := chanz.Collect(fib)
// result = []int{0, 1, 1, 2, 3, 5, 8, 13, 21, 34}
GeneratorWith

Configured generator function.

fibGen := chanz.GeneratorWith[int](chanz.OpBuffer(5))
fib := fibGen(func(yield func(int)) {
    yield(0)
    yield(1)
    yield(1)
})
Transformation

Transform channel data.

Map

Transform each element.

input := chanz.Generate(1, 2, 3, 4, 5)
doubled := chanz.Map(input, func(n int) int {
    return n * 2
})
result := chanz.Collect(doubled)
// result = []int{2, 4, 6, 8, 10}
MapWith

Configured mapper.

doubler := chanz.MapWith[int, int](chanz.OpBuffer(10))
input := chanz.Generate(1, 2, 3, 4, 5)
result := chanz.Collect(doubler(input, func(n int) int {
    return n * 2
}))
Peek

Side-effect without transformation.

input := chanz.Generate(1, 2, 3)
logged := chanz.Peek(input, func(n int) {
    fmt.Printf("Processing: %d\n", n)
})
result := chanz.Collect(logged)
// Prints: Processing: 1, Processing: 2, Processing: 3
// result = []int{1, 2, 3}
Flatten

Flatten channel of slices.

input := chanz.Generate([]int{1, 2, 3}, []int{4, 5, 6})
flat := chanz.Flatten(input)
result := chanz.Collect(flat)
// result = []int{1, 2, 3, 4, 5, 6}
Zip

Combine two channels.

nums := chanz.Generate(1, 2, 3)
strs := chanz.Generate("a", "b", "c")
zipped := chanz.Zip(nums, strs, func(n int, s string) string {
    return fmt.Sprintf("%d:%s", n, s)
})
result := chanz.Collect(zipped)
// result = []string{"1:a", "2:b", "3:c"}
Unzip

Split channel into two.

input := chanz.Generate(Pair{1, 10}, Pair{2, 20}, Pair{3, 30})
xs, ys := chanz.Unzip(input, func(p Pair) (int, int) {
    return p.X, p.Y
})
// xs receives 1, 2, 3
// ys receives 10, 20, 30
Filtering

Select or skip elements.

Filter

Keep matching elements.

input := chanz.Generate(1, 2, 3, 4, 5, 6)
evens := chanz.Filter(input, func(n int) bool {
    return n%2 == 0
})
result := chanz.Collect(evens)
// result = []int{2, 4, 6}
Compact

Remove consecutive duplicates.

input := chanz.Generate(1, 1, 2, 2, 2, 3, 3)
compacted := chanz.Compact(input, func(a, b int) bool {
    return a == b
})
result := chanz.Collect(compacted)
// result = []int{1, 2, 3}
Take

Take first N elements.

input := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
first3 := chanz.Take(input, 3)
result := chanz.Collect(first3)
// result = []int{1, 2, 3}
TakeWhile

Take while predicate true.

input := chanz.Generate(1, 2, 3, 4, 5, 1, 2)
ascending := chanz.TakeWhile(input, func(n int) bool {
    return n < 4
})
result := chanz.Collect(ascending)
// result = []int{1, 2, 3}
Drop

Drop first N elements.

input := chanz.Generate(1, 2, 3, 4, 5, 6)
rest := chanz.Drop(input, 3)
result := chanz.Collect(rest)
// result = []int{4, 5, 6}
DropWhile

Drop while predicate true.

input := chanz.Generate(1, 2, 3, 4, 5)
from4 := chanz.DropWhile(input, func(n int) bool {
    return n < 4
})
result := chanz.Collect(from4)
// result = []int{4, 5}
Partition

Split into two channels.

input := chanz.Generate(1, 2, 3, 4, 5, 6)
evens, odds := chanz.Partition(input, func(n int) bool {
    return n%2 == 0
})
// evens receives 2, 4, 6
// odds receives 1, 3, 5
Aggregation

Combine multiple channels.

FanIn

Merge channels concurrently.

ch1 := chanz.Generate(1, 2, 3)
ch2 := chanz.Generate(4, 5, 6)
ch3 := chanz.Generate(7, 8, 9)

merged := chanz.FanIn(ch1, ch2, ch3)
result := chanz.Collect(merged)
// result contains all 9 numbers (order non-deterministic)
Concat

Concatenate channels sequentially.

ch1 := chanz.Generate(1, 2, 3)
ch2 := chanz.Generate(4, 5, 6)
ch3 := chanz.Generate(7, 8, 9)

combined := chanz.Concat(ch1, ch2, ch3)
result := chanz.Collect(combined)
// result = []int{1, 2, 3, 4, 5, 6, 7, 8, 9} (order preserved)
Collect

Read all elements into slice.

input := chanz.Generate(1, 2, 3, 4, 5)
result := chanz.Collect(input)
// result = []int{1, 2, 3, 4, 5}
Fan-Out

Distribute to multiple channels.

FanOut

Broadcast to multiple channels.

input := chanz.Generate(1, 2, 3, 4, 5)
outputs := chanz.FanOut(input, 3, chanz.OpBuffer(1))

// outputs[0], outputs[1], outputs[2] all receive: 1, 2, 3, 4, 5

// Process in parallel
for i, ch := range outputs {
    go func(id int, c <-chan int) {
        for n := range c {
            fmt.Printf("Worker %d got %d\n", id, n)
        }
    }(i, ch)
}
Control Flow

Signal coordination and cancellation.

Done

Convert any channel to done signal.

work := make(chan int)
done := chanz.Done(work)

// Close work channel to signal done
close(work)
<-done // Unblocks when work is closed
SomeDone

Close when ANY input closes.

done1 := make(chan struct{})
done2 := make(chan struct{})

done := chanz.SomeDone(done1, done2)

// Close either channel
close(done1)
<-done // Unblocks immediately
EveryDone

Close when ALL inputs close.

done1 := make(chan struct{})
done2 := make(chan struct{})

done := chanz.EveryDone(done1, done2)

// Must close both
close(done1)
close(done2)
<-done // Unblocks now
Buffering

Buffer management utilities.

Buffer

Collect N elements with done support.

input := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
batch, more := chanz.Buffer(3, input)
// batch = []int{1, 2, 3}, more = true (channel still open)

batch, more = chanz.Buffer(100, input)
// batch = []int{4, 5, 6, 7, 8, 9, 10}, more = false (channel closed)
TakeBuffer

Non-blocking buffer read.

ch := make(chan int, 10)
ch <- 1
ch <- 2
ch <- 3

buffered := chanz.TakeBuffer(ch)
// buffered = []int{1, 2, 3}
// ch still has space for 7 more
DropBuffer

Non-blocking buffer clear.

ch := make(chan int, 10)
ch <- 1
ch <- 2
ch <- 3

chanz.DropBuffer(ch, false)
// ch is now empty (0 buffered items)
DropAll

Consume until closed.

input := chanz.Generate(1, 2, 3, 4, 5)

// Synchronous - blocks until channel closed
chanz.DropAll(input, false)

// Asynchronous - returns immediately, drains in background
chanz.DropAll(input, true)
Channel Types

Type conversions for safety.

Readers

Convert to read-only channels.

chans := []chan int{make(chan int), make(chan int)}
readers := chanz.Readers(chans...)
// readers is []<-chan int
Writers

Convert to write-only channels.

chans := []chan int{make(chan int), make(chan int)}
writers := chanz.Writers(chans...)
// writers is []chan<- int
Write/Read Modes

Flexible I/O operations.

WriteTo

Create writer with mode.

ch := make(chan int, 1)

// Synchronous (blocks)
writeSync := chanz.WriteTo[int](ch, chanz.WriteSync)
writeSync(42) // Blocks until written

// Asynchronous (goroutine)
writeAsync := chanz.WriteTo[int](ch, chanz.WriteAync)
writeAsync(42) // Returns immediately

// Non-blocking (only if space)
writeIfFree := chanz.WriteTo[int](ch, chanz.WriteIfFree)
writeIfFree(42) // Only writes if buffer has space
ReadFrom

Create reader with mode.

ch := make(chan int)
ch <- 42

// Synchronous (blocks)
readSync := chanz.ReadFrom[int](ch, chanz.ReadWait)
val, ok := readSync() // Blocks, returns (42, true)

// Non-blocking
readNow := chanz.ReadFrom[int](ch, chanz.ReadIfWaiting)
val, ok := readNow() // Returns immediately

Common Patterns

Pipeline Pattern
// Build a processing pipeline
input := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

processed := chanz.Map(input, func(n int) int {
    return n * n
})

filtered := chanz.Filter(processed, func(n int) bool {
    return n > 10
})

result := chanz.Collect(filtered)
// result = []int{16, 25, 36, 49, 64, 81, 100}
Worker Pool
// Generate work
work := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Fan out to 3 workers
workers := chanz.FanOut(work, 3)

// Process and collect results
var wg sync.WaitGroup
results := make(chan int, 10)

for i, worker := range workers {
    wg.Add(1)
    go func(id int, ch <-chan int) {
        defer wg.Done()
        for n := range ch {
            // Simulate work
            time.Sleep(100 * time.Millisecond)
            results <- n * 2
            fmt.Printf("Worker %d processed %d\n", id, n)
        }
    }(i, worker)
}

// Close results when done
go func() {
    wg.Wait()
    close(results)
}()

// Collect all results
final := chanz.Collect(results)
Timeout Handling
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

input := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Slow transformation
slow := chanz.Map(input, func(n int) int {
    time.Sleep(1 * time.Second)
    return n * 2
}, chanz.OpContext(ctx))

result := chanz.Collect(slow)
// Will stop early if timeout exceeded

Performance Notes

  • Goroutine per channel: Map, Filter, etc. spawn goroutines
  • Buffered channels: Use OpBuffer(n) to improve throughput
  • Backpressure: FanOut waits for all outputs to consume
  • Clean shutdown: Always close input channels to signal completion
  • Context cancellation: Use OpContext() for graceful shutdown

See Also

  • pipez - Fluent API for synchronous operations
  • slicez - Batch operations on collected data

Documentation

Overview

Package chanz provides utility functions for working with Go channels.

The package offers functional-style operations on channels including:

  • Transformation: Map, Flatten, Zip/Unzip
  • Filtering: Filter, Compact, Take/Drop variants
  • Aggregation: FanIn, FanOut, Concat
  • Generation: Generate, Generator
  • Utilities: Collect, Partition, Done signal handling

Most functions support functional options for configuration:

  • OpBuffer(n): Set channel buffer size (default 0)
  • OpContext(ctx): Stop when context is cancelled
  • OpDone(ch): Stop when done channel is closed

Functions ending in "With" (e.g., MapWith) return closures that can be reused with the same options, useful for pipeline building.

Example pipeline:

input := chanz.Generate(1, 2, 3, 4, 5)
doubled := chanz.Map(input, func(n int) int { return n * 2 })
evens := chanz.Filter(doubled, func(n int) bool { return n%2 == 0 })
result := chanz.Collect(evens)
// result = []int{2, 4, 6, 8, 10}

Index

Constants

View Source
const (
	// WriteSync blocks until the write completes (standard channel send).
	WriteSync = iota
	// WriteAync performs the write in a goroutine (non-blocking).
	WriteAync
	// WriteIfFree writes only if the channel buffer has space (non-blocking).
	WriteIfFree
)
View Source
const (
	// ReadWait blocks until a value is received (standard channel receive).
	ReadWait = iota
	// ReadIfWaiting receives only if a value is immediately available (non-blocking).
	ReadIfWaiting
)

Variables

This section is empty.

Functions

func Buffer

func Buffer[A any](size int, in <-chan A, options ...Option) ([]A, bool)

Buffer collects up to size elements from a channel into a slice. Returns the slice and a boolean indicating if buffer was filled (true) or channel closed (false). Stops early if done signal received.

Example:

input := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
batch, filled := chanz.Buffer(3, input)
// batch = []int{1, 2, 3}, filled = true
batch, filled = chanz.Buffer(100, input)
// batch = []int{4, 5, 6, 7, 8, 9, 10}, filled = false (channel closed)

func Collect

func Collect[A any](c <-chan A, options ...Option) []A

Collect will collect all enteries in a channel into a slice and return it. It stops and returns when done or c is closed It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func Compact

func Compact[A any](c <-chan A, equal func(a, b A) bool, options ...Option) <-chan A

Compact takes a chan and applies the "equal" func to every item and its predecessor. If it returns true, the current item being the same as the previous item, the current one will not be includes on the output chan The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func CompactWith

func CompactWith[A any](options ...Option) func(c <-chan A, equal func(a, b A) bool) <-chan A

CompactWith returns a configured Compact function closure. Allows creating reusable compacters with preset options.

Example:

dedup := chanz.CompactWith[int](OpBuffer(5))
input := chanz.Generate(1, 1, 2, 2, 2, 3, 3)
result := chanz.Collect(dedup(input, func(a, b int) bool { return a == b }))
// result = []int{1, 2, 3}

func Concat

func Concat[A any](cs ...<-chan A) <-chan A

Concat concatenates multiple channels sequentially. Reads from channels in order: waits for first channel to close, then starts reading from next. Unlike FanIn, this preserves order. Output closes when all input channels are closed.

Example:

ch1 := chanz.Generate(1, 2, 3)
ch2 := chanz.Generate(4, 5, 6)
combined := chanz.Concat(ch1, ch2)
result := chanz.Collect(combined)
// result = []int{1, 2, 3, 4, 5, 6} (order preserved)

func ConcatWith

func ConcatWith[A any](options ...Option) func(cs ...<-chan A) <-chan A

ConcatWith takes a slice of chans and put all items received on the returning chan The input chans are read until closed in order. The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func Done

func Done[T any](c chan T) <-chan struct{}

Done takes a channel, c, that is ment to indicate that something is done and returns a chan struct{} that closes once c does It is ment to convert a channel of any type to a channel that aligns with context.Context.Done() if data is passed on c, Done will drain it

func Drop

func Drop[A any](c <-chan A, i int, options ...Option) <-chan A

Drop takes a chan and returns a chan. It will drop the first "i" items read from the in chan and write the remaining items onto the return chan. The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func DropAll

func DropAll[A any](c <-chan A, async bool)

DropAll consumes and discards all values from a channel until it closes. If async is false, blocks until the channel is closed. If async is true, returns immediately and consumes in background.

Example:

ch := make(chan int)
go func() { ch <- 1; ch <- 2; close(ch) }()
chanz.DropAll(ch, false) // Blocks until ch is closed

func DropBuffer

func DropBuffer[A any](c <-chan A, async bool)

DropBuffer discards all values currently in a buffered channel's buffer. Non-blocking - only drops what's already buffered, doesn't wait for more. If async is true, runs in background.

Example:

ch := make(chan int, 10)
ch <- 1; ch <- 2; ch <- 3
chanz.DropBuffer(ch, false) // ch is now empty

func DropWhile

func DropWhile[A any](c <-chan A, drop func(a A) bool, options ...Option) <-chan A

DropWhile takes a chan and returns a chan. It will drop items until the drop function returns false, and will then write the remaining items onto the return chan. The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func DropWhileWith

func DropWhileWith[A any](options ...Option) func(c <-chan A, drop func(a A) bool) <-chan A

DropWhileWith returns a configured DropWhile function closure. Allows creating reusable droppers with preset options.

Example:

dropSmall := chanz.DropWhileWith[int](OpBuffer(5))
input := chanz.Generate(1, 2, 3, 5, 8, 4, 3, 2, 1)
result := chanz.Collect(dropSmall(input, func(n int) bool { return n < 5 }))
// result = []int{5, 8, 4, 3, 2, 1} (drops while n < 5)

func DropWith

func DropWith[A any](options ...Option) func(c <-chan A, i int) <-chan A

DropWith returns a configured Drop function closure. Allows creating reusable droppers with preset options.

Example:

drop3 := chanz.DropWith[int](OpBuffer(5))
input := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
result := chanz.Collect(drop3(input, 3))
// result = []int{4, 5, 6, 7, 8, 9, 10}

func EveryDone

func EveryDone[T any](done ...<-chan T) <-chan T

EveryDone returns a channel that closes when all channels from the input arguments are closed

func FanIn

func FanIn[A any](cs ...<-chan A) <-chan A

FanIn merges multiple channels into one output channel. Reads from all input channels concurrently, so order of output is non-deterministic. Output closes when all input channels are closed.

Example:

ch1 := chanz.Generate(1, 2, 3)
ch2 := chanz.Generate(4, 5, 6)
merged := chanz.FanIn(ch1, ch2)
result := chanz.Collect(merged)
// result contains {1,2,3,4,5,6} in some order

func FanInWith

func FanInWith[A any](options ...Option) func(cs ...<-chan A) <-chan A

FanInWith will merge all input from input channels into one output channel. It differs from FlattenUntil in that it reads from all channels concurrently instead of synchronized The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func FanOut

func FanOut[A any](c <-chan A, size int, options ...Option) []<-chan A

FanOut splits one input channel into multiple output channels. Each value from the input is sent to all output channels (broadcast pattern). A value won't be read from input until all outputs have consumed the previous value (if buffers are full). Output channels are closed when input closes.

Example:

input := chanz.Generate(1, 2, 3)
outputs := chanz.FanOut(input, 2, OpBuffer(1))
ch1, ch2 := outputs[0], outputs[1]
// Both ch1 and ch2 receive: 1, 2, 3

func FanOutWith

func FanOutWith[A any](options ...Option) func(c <-chan A, size int) []<-chan A

FanOutWith returns a configured FanOut function closure. Allows creating reusable broadcasters with preset options.

Example:

broadcaster := chanz.FanOutWith[int](OpBuffer(5))
input := chanz.Generate(1, 2, 3)
outputs := broadcaster(input, 3) // Split into 3 channels

func Filter

func Filter[A any](c <-chan A, include func(a A) bool, options ...Option) <-chan A

Filter takes a chan and applies the "include" func to every item. If it returns true, the item is out on the output chan The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func FilterWith

func FilterWith[A any](options ...Option) func(c <-chan A, include func(a A) bool) <-chan A

FilterWith returns a configured Filter function closure. Allows creating reusable filters with preset options.

Example:

evens := chanz.FilterWith[int](OpBuffer(10))
input := chanz.Generate(1, 2, 3, 4, 5)
result := chanz.Collect(evens(input, func(n int) bool { return n%2 == 0 }))
// result = []int{2, 4}

func Flatten

func Flatten[A any](in <-chan []A, options ...Option) <-chan A

Flatten takes a chan of a slice put all items received on the returning chan, one by one The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func FlattenWith

func FlattenWith[A any](options ...Option) func(in <-chan []A) <-chan A

FlattenWith returns a configured Flatten function closure. Allows creating reusable flatteners with preset options.

Example:

flattener := chanz.FlattenWith[int](OpBuffer(20))
input := make(chan []int)
go func() { input <- []int{1, 2}; input <- []int{3, 4}; close(input) }()
result := chanz.Collect(flattener(input))
// result = []int{1, 2, 3, 4}

func Generate

func Generate[A any](elements ...A) <-chan A

Generate takes a slice of elements, returns a channel and writes the elements to the channel. It closes once all elements in the slice are written The return chan has a buffer of 0

func GenerateWith

func GenerateWith[A any](options ...Option) func(elements ...A) <-chan A

GenerateWith returns a configured Generate function closure. Allows creating reusable element generators with preset options.

Example:

bufferedGen := chanz.GenerateWith[int](OpBuffer(10))
ch := bufferedGen(1, 2, 3, 4, 5)
result := chanz.Collect(ch)
// result = []int{1, 2, 3, 4, 5}

func Generator

func Generator[A any](gen func(func(A)), options ...Option) <-chan A

Generator creates a channel that yields values from a generator function. The generator receives a yield function to emit values. Stops when done signal received. Useful for creating channels from iterative algorithms.

Example:

// Generate Fibonacci numbers
fib := chanz.Generator(func(yield func(int)) {
    a, b := 0, 1
    for i := 0; i < 10; i++ {
        yield(a)
        a, b = b, a+b
    }
})
result := chanz.Collect(fib)
// result = []int{0, 1, 1, 2, 3, 5, 8, 13, 21, 34}

func GeneratorWith

func GeneratorWith[A any](options ...Option) func(gen func(func(A))) <-chan A

GeneratorWith returns a configured Generator function closure. Allows creating reusable generators with preset options.

Example:

rangeGen := chanz.GeneratorWith[int](OpBuffer(5))
oneto5 := rangeGen(func(yield func(int)) {
    for i := 1; i <= 5; i++ {
        yield(i)
    }
})
result := chanz.Collect(oneto5)
// result = []int{1, 2, 3, 4, 5}

func Map

func Map[A any, B any](in <-chan A, mapper func(a A) B, options ...Option) <-chan B

Map will take a chan, in, and executes mapper and put the resulting on to the return chan. The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func MapWith

func MapWith[A any, B any](options ...Option) func(in <-chan A, mapper func(a A) B) <-chan B

MapWith returns a configured Map function closure. Allows creating reusable mappers with preset options.

Example:

doubler := chanz.MapWith[int, int](OpBuffer(10))
input := chanz.Generate(1, 2, 3, 4, 5)
result := chanz.Collect(doubler(input, func(n int) int { return n * 2 }))
// result = []int{2, 4, 6, 8, 10}

func Partition

func Partition[A any](c <-chan A, predicate func(a A) bool, options ...Option) (satisfied, notSatisfied <-chan A)

Partition takes a chan and returns two chans. For every item consumed it is passed through the predicate func. If it returns true, the item is put on the satisfied chan otherwise it is put on the notSatisfied chan The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func PartitionWith

func PartitionWith[A any](options ...Option) func(c <-chan A, predicate func(a A) bool) (satisfied, notSatisfied <-chan A)

PartitionWith returns a configured Partition function closure. Allows creating reusable partitioners with preset options.

Example:

splitter := chanz.PartitionWith[int](OpBuffer(5))
input := chanz.Generate(1, 2, 3, 4, 5)
evens, odds := splitter(input, func(n int) bool { return n%2 == 0 })
// evens receives 2, 4; odds receives 1, 3, 5

func Peek

func Peek[A any](in <-chan A, apply func(a A), options ...Option) <-chan A

Peek will take a chan, in, and executes apply on every element and then writes the element to the return chan. The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func PeekWith

func PeekWith[A any](options ...Option) func(in <-chan A, apply func(a A)) <-chan A

PeekWith returns a configured Peek function closure. Allows creating reusable peekers with preset options.

Example:

logger := chanz.PeekWith[int](OpBuffer(5))
input := chanz.Generate(1, 2, 3)
logged := logger(input, func(n int) { fmt.Println("Processing:", n) })
chanz.Collect(logged) // Prints each number as it's processed

func ReadFrom

func ReadFrom[A any](c chan A, mode int) func() (m A, ok bool)

ReadFrom returns a function that reads from a channel with specified mode. Modes: ReadWait (block), ReadIfWaiting (non-blocking). Returns value and ok (false if channel closed or no value available in non-blocking mode).

Example:

ch := make(chan int, 1)
ch <- 42

readWait := chanz.ReadFrom[int](ch, chanz.ReadWait)
val, ok := readWait() // Blocks, returns (42, true)

readIfWaiting := chanz.ReadFrom[int](ch, chanz.ReadIfWaiting)
val, ok := readIfWaiting() // Returns immediately, (0, false) if empty

func Readers

func Readers[A any](chans ...chan A) []<-chan A

Readers converts a slice of bidirectional channels to read-only channels. Useful for type safety when passing channels to consumers.

Example:

chans := []chan int{make(chan int), make(chan int)}
readers := chanz.Readers(chans...)
// readers is []<-chan int, can only receive

func SomeDone

func SomeDone[T any](done ...<-chan T) <-chan T

SomeDone returns a channel that closes as soon as any channels from the input arguments are closed

func Take

func Take[A any](c <-chan A, i int, options ...Option) <-chan A

Take takes a chan and returns a chan. It will write the first "i" items read from the in chan onto the return chan and then close the read chan. The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func TakeBuffer

func TakeBuffer[A any](c <-chan A) []A

TakeBuffer reads all values currently in a buffered channel's buffer. Non-blocking - only takes what's already buffered, doesn't wait for more. Returns values already received in the buffer.

Example:

ch := make(chan int, 10)
ch <- 1; ch <- 2; ch <- 3
buff := chanz.TakeBuffer(ch)
// buff = []int{1, 2, 3}, ch is now empty

func TakeWhile

func TakeWhile[A any](c <-chan A, take func(a A) bool, options ...Option) <-chan A

TakeWhile takes a chan and returns a chan. It will write all items read from the in chan onto the return chan until the take function returns false, then the out chan will close The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

func TakeWhileWith

func TakeWhileWith[A any](options ...Option) func(c <-chan A, take func(a A) bool) <-chan A

TakeWhileWith returns a configured TakeWhile function closure. Allows creating reusable takers with preset options.

Example:

takeUnder10 := chanz.TakeWhileWith[int](OpBuffer(5))
input := chanz.Generate(1, 5, 8, 12, 3, 4)
result := chanz.Collect(takeUnder10(input, func(n int) bool { return n < 10 }))
// result = []int{1, 5, 8} (stops at 12)

func TakeWith

func TakeWith[A any](option ...Option) func(c <-chan A, i int) <-chan A

TakeWith returns a configured Take function closure. Allows creating reusable takers with preset options.

Example:

take5 := chanz.TakeWith[int](OpBuffer(3))
input := chanz.Generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
result := chanz.Collect(take5(input, 5))
// result = []int{1, 2, 3, 4, 5}

func Unzip

func Unzip[A any, B any, C any](zipped <-chan C, unzipper func(c C) (A, B), options ...Option) (<-chan A, <-chan B)

Unzip takes one chan and returns a chan. It will read a C item from the input chan, apply the unzipper to the two resulting items on the output chans The return chan has a buffer of buffer size supplied in input args. It will stop once the any in chan are closed, done is closed

func WriteTo

func WriteTo[A any](c chan<- A, mode int) func(m A)

WriteTo returns a function that writes to a channel with specified mode. Modes: WriteSync (block), WriteAync (goroutine), WriteIfFree (non-blocking).

Example:

ch := make(chan int, 1)
writeSync := chanz.WriteTo[int](ch, chanz.WriteSync)
writeSync(42) // Blocks until written

writeAsync := chanz.WriteTo[int](ch, chanz.WriteAync)
writeAsync(42) // Returns immediately, writes in background

writeIfFree := chanz.WriteTo[int](ch, chanz.WriteIfFree)
writeIfFree(42) // Only writes if buffer has space

func Writers

func Writers[A any](chans ...chan A) []chan<- A

Writers converts a slice of bidirectional channels to write-only channels. Useful for type safety when passing channels to producers.

Example:

chans := []chan int{make(chan int), make(chan int)}
writers := chanz.Writers(chans...)
// writers is []chan<- int, can only send

func Zip

func Zip[A any, B any, C any](ac <-chan A, bc <-chan B, zipper func(a A, b B) C, options ...Option) <-chan C

Zip takes two chans and returns a chan. it will read a A item and a B item. Apply the zipper to these and output the result on the returning chan The return chan has a buffer of buffer size supplied in input Option, default is 0. It will stop once "in", "done" channel is closed or the context.Done is closed, which is supplied in Option

Types

type Option

type Option func(s settings) settings

Option is a functional option for configuring channel operations. Use OpBuffer, OpContext, or OpDone to create options.

func OpBuffer

func OpBuffer(size int) Option

OpBuffer creates an option that sets the output channel buffer size. Default buffer size is 0 (unbuffered).

func OpContext

func OpContext(ctx context.Context) Option

OpContext creates an option that stops processing when the context is cancelled. Combines with existing done signals using SomeDone.

func OpDone

func OpDone(done <-chan struct{}) Option

OpDone creates an option that stops processing when the done channel is closed. Combines with existing done signals using SomeDone.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL