Documentation
¶
Overview ¶
Package pipeline provides channel-based functional primitives with persistent worker pools.
Unlike [slice.FanOut] (semaphore-per-call, push model), pipeline functions use persistent worker goroutines that pull from input channels. Blocked workers naturally stop pulling, creating backpressure upstream.
The core primitive is FanOut, which applies a call.Func to each input using N workers while preserving input order. Compose resilience first via call decorators (Retry, CircuitBreaker, Throttle), then execute through FanOut:
fn := fetchOrder.With(call.Retrier(3, call.ExponentialBackoff(time.Second), isRetryable)) results := pipeline.FanOut(ctx, orderIDs, 8, fn)
Supporting primitives (Filter, Batch, Merge, Tee) compose freely with FanOut. They operate on plain T values — when T is rslt.Result, errors pass through naturally.
All functions respect context cancellation and guarantee no goroutine leaks.
Index ¶
- func Batch[T any](ctx context.Context, in <-chan T, size int) <-chan []T
- func FanOut[T, R any](ctx context.Context, in <-chan T, workers int, fn call.Func[T, R]) <-chan rslt.Result[R]
- func Filter[T any](ctx context.Context, in <-chan T, fn func(T) bool) <-chan T
- func FromSlice[T any](ctx context.Context, ts []T) <-chan T
- func Generate[T any](ctx context.Context, fn func() (T, bool)) <-chan T
- func Merge[T any](ctx context.Context, ins ...<-chan T) <-chan T
- func Tee[T any](ctx context.Context, in <-chan T, n int) []<-chan T
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Batch ¶
Batch collects elements into slices of the given size. Emits a partial batch when input closes mid-batch. Each emitted slice is an independent copy. Panics if size <= 0.
func FanOut ¶ added in v0.100.0
func FanOut[T, R any](ctx context.Context, in <-chan T, workers int, fn call.Func[T, R]) <-chan rslt.Result[R]
FanOut applies fn to each input using workers persistent goroutines (pull model). Output order matches input order via a reorder buffer. Workers pull from input — blocked workers create natural backpressure. Panics in fn are recovered as *rslt.PanicError. Panics if workers <= 0. fn must not be nil.
func Filter ¶
Filter sends only elements where fn returns true. Single goroutine — no concurrency needed for a pure predicate. fn must not be nil.
func FromSlice ¶
FromSlice sends each element of ts to the returned channel, then closes it. Respects ctx cancellation.
func Generate ¶
Generate calls fn repeatedly, sending results to the returned channel. fn returns (value, more). When more is false or ctx cancels, the channel closes. fn must not be nil.
Types ¶
This section is empty.