stream

package
v0.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 4 Imported by: 0

README

stream

Lazy, memoized, persistent sequences. Zero goroutines, zero channels.

Use stream when you need lazy evaluation — infinite sequences, early termination, or deferred computation over expensive elements. For finite in-memory collections, use slice. For lightweight iter.Seq chaining without caching, use seq.

// Before: channel-based Fibonacci leaks a goroutine when you stop reading
func fib() <-chan int {
    ch := make(chan int)
    go func() {
        a, b := 0, 1
        for { ch <- a; a, b = b, a+b }
    }()
    return ch
}
// goroutine runs forever after consumer stops

// After: lazy stream — no goroutine, no channel, no leak
type pair struct{ a, b int }
fib := stream.Unfold(pair{0, 1}, func(p pair) (int, pair, bool) {
    return p.a, pair{p.b, p.a + p.b}, true
})
first10 := fib.Take(10).Collect()

What It Looks Like

// Infinite sequence of natural numbers
naturals := stream.Generate(0, func(n int) int { return n + 1 })
// First 10 primes — lazy filter over infinite sequence
primes := stream.Generate(2, inc).KeepIf(isPrime).Take(10).Collect()
// Cross-type lazy map (standalone — Go methods can't introduce type params)
names := stream.Map(users, User.Name).Collect()
// Bridge to Go's range protocol
for v := range stream.Of(1, 2, 3).Seq() {
    fmt.Println(v)
}
// Cursor-based pagination — each step returns a page and optional next cursor
pages := stream.Paginate(firstCursor, fetchPage)
allItems := stream.Map(pages, Page.Items).Collect()
// FlatMap — expand each element and flatten, head-eager
allChildren := stream.FlatMap(nodes, Node.Children).Collect()
// Concat — chain two streams end-to-end
combined := stream.Concat(priorityItems, regularItems)
// Zip — pair corresponding elements, truncates to shorter
pairs := stream.Zip(keys, values).Collect()
// Scan — running accumulation as a lazy stream
balances := stream.Scan(transactions, startingBalance, applyTransaction)

Head-Eager, Tail-Lazy

When a stream cell exists, its head value is already computed. Only the tail is deferred. This means:

  • KeepIf eagerly scans forward until it finds a match (which may not terminate on infinite streams — see Caveats)
  • Take(n) returns a stream capped at n elements — the current cell is available immediately, the remainder is produced lazily as tails are forced
  • Collect() forces all remaining thunks and materializes to a slice

The zero value is an empty stream. First() returns a not-ok option on empty. Tail() returns empty on empty. Collect() returns nil on empty.

Memoization and Persistence

Each tail thunk runs at most once on success. After evaluation, the result is cached — calling .Collect() twice on the same stream returns the same elements without re-computing them:

s := stream.Generate(0, expensiveStep).Take(1000)
a := s.Collect()  // computes all 1000 steps
b := s.Collect()  // returns cached results — no recomputation

Multiple references to the same stream share the cache. This is what makes streams persistent — operations return new streams, but shared prefixes are computed once. Note that downstream operations (predicates, transforms) in derived streams are not shared — only the upstream forcing is deduplicated:

s := stream.Generate(0, expensiveStep).Take(1000)
evens := s.KeepIf(isEven)   // s is the shared source
odds  := s.KeepIf(isOdd)    // same s — steps are computed once

Retention cost: Holding a reference to an early cell pins all forced suffix cells in memory. Release references when you're done to allow GC.

Thread-safe: concurrent forcing of the same stream is coordinated via state machine transitions — thunks execute outside internal locks.

Caveats

Retry-on-panic. If a tail thunk panics, the cell resets to pending and a later Tail() call retries it. Avoid side effects in deferred computations unless retry is acceptable. "At most once" means at most once on success.

Eager first step. Unfold and Paginate compute the first element eagerly at construction time. Panics during that first step are not memoized or retried.

Non-termination. Some operations may not terminate on infinite streams:

  • KeepIf, Find, Any — if no element matches
  • FlatMap — if all inner streams are empty (scans forward indefinitely)
  • DropWhile — if the predicate never becomes false
  • Collect, Each, Fold — on any infinite stream

Backing-array retention. From captures subslice views of the input slice. The backing array may be retained until those stream nodes are forced or become unreachable. For very large slices, this can retain more memory than expected.

Reentrancy. Callbacks must not force the same cell being evaluated (directly or indirectly). This is inherent to memoized lazy evaluation.

All callback-taking functions panic on nil inputs.

When to Use Stream vs Seq vs Slice

Use stream when... Use seq when... Use slice when...
Sequence is infinite You have an iter.Seq to chain Collection is finite and in memory
Multiple consumers share evaluation Pipeline can re-evaluate each call All elements will be consumed
Elements are expensive to compute Lightweight, no caching needed Elements are cheap or pre-computed
Memoization and persistence matter No retention cost needed Eager execution is fine

Operations

Create: From, Of, Generate, Repeat, Unfold, Paginate, Prepend, PrependLazy

Lazy (return Stream): KeepIf, RemoveIf, Convert, Take, TakeWhile, Drop, DropWhile, Map (standalone), FlatMap (standalone), Concat (standalone), Zip (standalone), Scan (standalone)

Terminal (force evaluation): Each, Collect, Find, Any, Every, None, Seq, Fold (standalone)

Access: IsEmpty, First, Tail

See pkg.go.dev for complete API documentation, the main README for installation, and the showcase for real-world comparisons.

Documentation

Overview

Package stream provides lazy, memoized, persistent sequences.

A Stream is a linked list where each cell's head is eager and tail is lazy, evaluated at most once. Streams are value types externally with shared memoization via internal pointers. If a tail thunk panics, the cell remains unevaluated and future accesses will retry (re-invoking the thunk).

This package supports pure/in-memory sources only. Effectful sources (channels, iterators) are deferred to a future version.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Fold

func Fold[T, R any](s Stream[T], initial R, fn func(R, T) R) R

Fold reduces a stream to a single value by applying fn progressively. Standalone because Go methods cannot introduce the extra type parameter R. Requires a finite stream. Panics if fn is nil.

Types

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream is a lazy, memoized, persistent sequence. The zero value is an empty stream.

Head-eager, tail-lazy: when a cell exists, its head is known. Only the tail is deferred and evaluated at most once. Operations like KeepIf eagerly scan to the first match; Map/Convert/TakeWhile eagerly transform the current head. Tail computation is always deferred into a thunk.

Concurrent tail forcing is synchronized and memoized. Multiple goroutines can safely traverse the same stream; each cell's tail thunk executes at most once (on success). Thunk execution occurs outside internal locks — other goroutines block on internal state, not on user callback execution.

Streams are persistent and memoized: multiple references to the same stream share forced cells. Holding a reference to an early cell pins all forced suffix cells reachable from it. From([]T) closures capture subslice views, which can pin the original backing array.

Retry-on-panic: if a tail thunk panics, the cell stays unevaluated and future Tail() calls re-invoke the thunk. Head computation (at construction) is eager and not retryable. Callback purity is assumed for deterministic retry behavior.

Reentrancy constraint: callbacks must not force the same cell being evaluated. This includes indirect paths (e.g., a Map callback that forces the Map result stream). This constraint is inherent to memoized lazy evaluation.

Stream is a value type externally (like Option, Either, Result). Internal pointer enables shared memoization across multiple references.

func Concat

func Concat[T any](a, b Stream[T]) Stream[T]

Concat returns a stream that yields all elements of a followed by all elements of b.

func FlatMap

func FlatMap[T, R any](s Stream[T], fn func(T) Stream[R]) Stream[R]

FlatMap applies fn to each element of s and concatenates the resulting streams. Head-eager: scans forward to find the first non-empty inner stream. Standalone because Go methods cannot introduce the extra type parameter R. Panics if fn is nil.

func From

func From[T any](ts []T) Stream[T]

From creates a lazy stream from a slice. Each tail closure captures a subslice view of the original — the backing array may be retained until those closures are evaluated or become unreachable.

func Generate

func Generate[T any](seed T, fn func(T) T) Stream[T]

Generate creates an infinite stream: seed, fn(seed), fn(fn(seed)), ... The seed is the first element (eager); subsequent elements apply fn lazily. Panics if fn is nil.

func Map

func Map[T, R any](s Stream[T], fn func(T) R) Stream[R]

Map applies fn to each element, returning a stream of a different type. Eagerly transforms the current head; tail transforms are deferred. Standalone because Go methods cannot introduce the extra type parameter R. Panics if fn is nil.

func Of

func Of[T any](vs ...T) Stream[T]

Of creates a lazy stream from variadic arguments. Delegates to From.

func Paginate

func Paginate[T, S any](seed S, fn func(S) (T, option.Option[S])) Stream[T]

Paginate creates a stream by repeatedly applying a step function to a seed. Each call to fn returns (element, nextSeed). When nextSeed is not-ok, the stream ends after emitting the element. Unlike Unfold, every call to fn produces an element — the Option[S] controls only whether to continue.

Designed for cursor-based pagination where the continuation token is already an option: the step function returns the page and the optional next cursor.

The first step is evaluated eagerly — a panic on the first call fails at construction and is not retryable. Subsequent steps are lazy and retryable.

func Prepend

func Prepend[T any](v T, s Stream[T]) Stream[T]

Prepend returns a stream with v as the head and s as the tail. The tail reference is captured at construction — s itself may contain unevaluated lazy cells, but the link from this cell to s is immediate.

func PrependLazy

func PrependLazy[T any](v T, tail func() Stream[T]) Stream[T]

PrependLazy returns a stream with v as the head and a lazily-evaluated tail. The tail function is called at most once when the tail is first accessed, consistent with the package's memoized forcing semantics (see Stream type doc). Panics if tail is nil.

func Repeat

func Repeat[T any](v T) Stream[T]

Repeat creates an infinite stream where every element is v. Uses a self-referencing cell — O(1) memory regardless of traversal length.

func Scan

func Scan[T, R any](s Stream[T], initial R, fn func(R, T) R) Stream[R]

Scan reduces a stream like Fold, but returns a stream of all intermediate accumulator values. Includes the initial value as the first element (scanl semantics). Standalone because Go methods cannot introduce the extra type parameter R. Panics if fn is nil.

func Unfold

func Unfold[T, S any](seed S, fn func(S) (T, S, bool)) Stream[T]

Unfold creates a stream by repeatedly applying a step function to a seed. Each call to fn returns (element, nextSeed, ok). When ok is false, the stream ends. Unfold is the dual of Fold: Fold consumes a stream to a value, Unfold produces a stream from a value.

The first step is evaluated eagerly — a panic on the first call fails at construction and is not retryable. Subsequent steps are lazy and retryable.

func Zip

func Zip[A, B any](a Stream[A], b Stream[B]) Stream[pair.Pair[A, B]]

Zip returns a stream of pairs from corresponding elements of a and b. Truncates to the shorter stream.

func (Stream[T]) Any

func (s Stream[T]) Any(fn func(T) bool) bool

Any returns true if fn returns true for any element. Short-circuits on first match. On an infinite stream with no matching element, this will not terminate. Panics if fn is nil.

func (Stream[T]) Collect

func (s Stream[T]) Collect() []T

Collect materializes the stream into a slice. Requires a finite stream. Returns nil for an empty stream.

func (Stream[T]) Convert

func (s Stream[T]) Convert(fn func(T) T) Stream[T]

Convert applies fn to each element, returning a stream of results. Eagerly transforms the current head; tail transforms are deferred. Same-type transform — use standalone Map for cross-type mapping. Panics if fn is nil.

func (Stream[T]) Drop

func (s Stream[T]) Drop(n int) Stream[T]

Drop skips the first n elements. Forces skipped cells eagerly. Negative n returns the stream unchanged.

func (Stream[T]) DropWhile

func (s Stream[T]) DropWhile(fn func(T) bool) Stream[T]

DropWhile skips elements while fn returns true. Forces skipped cells eagerly. On an infinite stream where fn always returns true, this will not terminate. Panics if fn is nil.

func (Stream[T]) Each

func (s Stream[T]) Each(fn func(T))

Each calls fn for every element. Requires a finite stream. Panics if fn is nil.

func (Stream[T]) Every

func (s Stream[T]) Every(fn func(T) bool) bool

Every returns true if fn returns true for every element. Returns true for an empty stream (vacuous truth). Short-circuits on first false. On an infinite stream where fn always returns true, this will not terminate. Panics if fn is nil.

func (Stream[T]) Find

func (s Stream[T]) Find(fn func(T) bool) option.Option[T]

Find returns the first element where fn returns true. Short-circuits on match. On an infinite stream with no matching element, this will not terminate. Panics if fn is nil.

func (Stream[T]) First

func (s Stream[T]) First() option.Option[T]

First returns the head element, or a not-ok option if empty.

func (Stream[T]) IsEmpty

func (s Stream[T]) IsEmpty() bool

IsEmpty returns true if the stream has no elements.

func (Stream[T]) KeepIf

func (s Stream[T]) KeepIf(fn func(T) bool) Stream[T]

KeepIf returns a stream containing only elements where fn returns true. Eagerly scans to the first match (head-eager constraint); the rest is lazy. On an infinite stream with no matching element, this will not terminate. Panics if fn is nil.

func (Stream[T]) None

func (s Stream[T]) None(fn func(T) bool) bool

None returns true if fn returns false for every element. Returns true for an empty stream (vacuous truth). Short-circuits on first true. On an infinite stream where fn always returns false, this will not terminate. Panics if fn is nil.

func (Stream[T]) RemoveIf

func (s Stream[T]) RemoveIf(fn func(T) bool) Stream[T]

RemoveIf returns a stream containing only elements where fn returns false. It is the complement of KeepIf. Panics if fn is nil.

func (Stream[T]) Seq

func (s Stream[T]) Seq() iter.Seq[T]

Seq returns an iter.Seq that yields each element. Bridges to Go's range protocol. The returned closure captures the original stream head. During iteration, the loop variable advances without accumulating additional references, but the closure itself retains the stream root while it exists.

func (Stream[T]) Tail

func (s Stream[T]) Tail() Stream[T]

Tail returns the rest of the stream after the first element. Forces one cell's tail thunk (mutex-guarded, memoized). Returns empty stream if empty.

func (Stream[T]) Take

func (s Stream[T]) Take(n int) Stream[T]

Take returns a stream of at most n elements. Negative n returns empty.

func (Stream[T]) TakeWhile

func (s Stream[T]) TakeWhile(fn func(T) bool) Stream[T]

TakeWhile returns elements while fn returns true. Stops at the first false. Evaluates the predicate on the current head immediately; tail evaluation is deferred. Panics if fn is nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL