pipeline

package
v0.100.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package pipeline provides channel-based functional primitives with persistent worker pools.

Unlike [slice.FanOut] (semaphore-per-call, push model), pipeline functions use persistent worker goroutines that pull from input channels. Blocked workers naturally stop pulling, creating backpressure upstream.

The core primitive is FanOut, which applies a call.Func to each input using N workers while preserving input order. Compose resilience first via call decorators (Retry, CircuitBreaker, Throttle), then execute through FanOut:

fn := fetchOrder.With(call.Retrier(3, call.ExponentialBackoff(time.Second), isRetryable))
results := pipeline.FanOut(ctx, orderIDs, 8, fn)

FanOutUnordered emits results in completion order for higher throughput.

Supporting primitives (Filter, Batch, Merge, Tee) compose freely with FanOut. They operate on plain T values — when T is rslt.Result, errors pass through naturally.

All functions respect context cancellation and guarantee no goroutine leaks.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Batch

func Batch[T any](ctx context.Context, in <-chan T, size int) <-chan []T

Batch collects elements into slices of the given size. Emits a partial batch when input closes mid-batch. Each emitted slice is an independent copy. Panics if size <= 0.

func FanOut added in v0.100.0

func FanOut[T, R any](ctx context.Context, in <-chan T, workers int, fn call.Func[T, R]) <-chan rslt.Result[R]

Map applies fn to each input using workers persistent goroutines (pull model). Output order matches input order via a reorder buffer. Workers pull from input — blocked workers create natural backpressure. Panics in fn are recovered as *rslt.PanicError. Panics if workers <= 0. fn must not be nil.

func FanOutUnordered added in v0.100.0

func FanOutUnordered[T, R any](ctx context.Context, in <-chan T, workers int, fn call.Func[T, R]) <-chan rslt.Result[R]

FanOutUnordered applies fn to each input using workers persistent goroutines. Results are emitted in completion order for higher throughput when processing times vary. Panics in fn are recovered as *rslt.PanicError. Panics if workers <= 0. fn must not be nil.

func Filter

func Filter[T any](ctx context.Context, in <-chan T, fn func(T) bool) <-chan T

Filter sends only elements where fn returns true. Single goroutine — no concurrency needed for a pure predicate. fn must not be nil.

func FromSlice

func FromSlice[T any](ctx context.Context, ts []T) <-chan T

FromSlice sends each element of ts to the returned channel, then closes it. Respects ctx cancellation.

func Generate

func Generate[T any](ctx context.Context, fn func() (T, bool)) <-chan T

Generate calls fn repeatedly, sending results to the returned channel. fn returns (value, more). When more is false or ctx cancels, the channel closes. fn must not be nil.

func Merge

func Merge[T any](ctx context.Context, ins ...<-chan T) <-chan T

Merge combines multiple input channels into a single output channel. Output order is nondeterministic. Closes when all inputs are closed or ctx cancels.

func Tee

func Tee[T any](ctx context.Context, in <-chan T, n int) []<-chan T

Tee duplicates input to n output channels. All consumers must keep up — the slowest consumer determines throughput. Closes all outputs when input closes or ctx cancels. Panics if n <= 0.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL