Documentation
¶
Index ¶
- func AllMatchCtx[T any](ctx context.Context, s Stream[T], pred func(T) bool) (bool, error)
- func AnyMatchCtx[T any](ctx context.Context, s Stream[T], pred func(T) bool) (bool, error)
- func Associate[T any, K comparable, V any](s Stream[T], fn func(T) (K, V)) map[K]V
- func AssociateBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K]T
- func BottomK[T any](s Stream[T], k int, less func(T, T) bool) []T
- func CollectCtx[T any](ctx context.Context, s Stream[T]) ([]T, error)
- func CollectResults[T any](s Stream[Result[T]]) ([]T, error)
- func CollectResultsAll[T any](s Stream[Result[T]]) ([]T, []error)
- func CollectTo[T, A, R any](s Stream[T], c Collector[T, A, R]) R
- func CollectToList[T any](seq iter.Seq[T]) collections.List[T]
- func CollectToMap[K comparable, V any](seq iter.Seq2[K, V]) collections.Map[K, V]
- func CollectToSet[T comparable](seq iter.Seq[T]) collections.Set[T]
- func Contains[T comparable](s Stream[T], target T) bool
- func CountBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K]int
- func CountCtx[T any](ctx context.Context, s Stream[T]) (int, error)
- func FoldTo[T, R any](s Stream[T], identity R, fn func(R, T) R) R
- func ForEachCtx[T any, A ~func(T) | ~func(T) error](ctx context.Context, s Stream[T], action A) error
- func Frequencies[T comparable](s Stream[T]) map[T]int
- func Frequency[T comparable](s Stream[T]) map[T]int
- func FrequencyToHashMap[T comparable](s Stream[T]) collections.Map[T, int]
- func GroupBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K][]T
- func GroupByTo[T any, K comparable, V any](s Stream[T], keyFn func(T) K, valFn func(T) V) map[K][]V
- func GroupByToHashMap[T any, K comparable](s Stream[T], keyFn func(T) K) collections.Map[K, []T]
- func GroupByToTreeMap[T any, K any](s Stream[T], keyFn func(T) K, keyCmp collections.Comparator[K]) collections.SortedMap[K, []T]
- func GroupValues[K comparable, V any](s Stream2[K, V]) map[K][]V
- func GroupValuesToHashMap[K comparable, V any](s Stream2[K, V]) collections.Map[K, []V]
- func HistogramToHashMap[T any, K comparable](s Stream[T], keyFn func(T) K) collections.Map[K, []T]
- func IndexBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K]T
- func Joining(s Stream[string], sep string) string
- func JoiningWithPrefixSuffix(s Stream[string], sep, prefix, suffix string) string
- func OptionalEquals[T comparable](o1, o2 Optional[T]) bool
- func ParallelCollect[T any](s Stream[T], opts ...ParallelOption) []T
- func ParallelForEach[T any](s Stream[T], action func(T), opts ...ParallelOption)
- func ParallelForEachCtx[T any](ctx context.Context, s Stream[T], action func(context.Context, T), ...) error
- func ParallelReduce[T any](s Stream[T], identity T, op func(T, T) T, opts ...ParallelOption) T
- func PartitionBy[T any](s Stream[T], pred func(T) bool) ([]T, []T)
- func PartitionResults[T any](s Stream[Result[T]]) ([]T, []error)
- func Product[T Numeric](s Stream[T]) T
- func ReduceByKey[K comparable, V any](s Stream2[K, V], merge func(V, V) V) map[K]V
- func ReduceByKeyWithInit[K comparable, V, R any](s Stream2[K, V], init func() R, merge func(R, V) R) map[K]R
- func ReduceCtx[T any, F ~func(T, T) T | ~func(T, T) (T, error)](ctx context.Context, s Stream[T], identity T, fn F) (T, error)
- func Sum[T Numeric](s Stream[T]) T
- func SumBy[T any, N Numeric](s Stream[T], fn func(T) N) N
- func ToArrayList[T any](s Stream[T]) collections.List[T]
- func ToCSV(s Stream[[]string], w io.Writer) error
- func ToCSVFile(s Stream[[]string], path string) error
- func ToFile[T any](s Stream[T], path string, format func(T) string) error
- func ToHashMap2C[K comparable, V any](s Stream2[K, V]) collections.Map[K, V]
- func ToHashMapC[T any, K comparable, V any](s Stream[T], keyFn func(T) K, valFn func(T) V) collections.Map[K, V]
- func ToHashSet[T comparable](s Stream[T]) collections.Set[T]
- func ToLinkedList[T any](s Stream[T]) collections.List[T]
- func ToMap[T any, K comparable, V any](s Stream[T], keyFn func(T) K, valFn func(T) V) map[K]V
- func ToMap2[K comparable, V any](s Stream2[K, V]) map[K]V
- func ToSet[T comparable](s Stream[T]) map[T]struct{}
- func ToTreeMap2C[K any, V any](s Stream2[K, V], keyCmp collections.Comparator[K]) collections.SortedMap[K, V]
- func ToTreeMapC[T any, K any, V any](s Stream[T], keyFn func(T) K, valFn func(T) V, ...) collections.SortedMap[K, V]
- func ToTreeSet[T any](s Stream[T], cmp collections.Comparator[T]) collections.SortedSet[T]
- func ToWriter[T any](s Stream[T], w io.Writer, format func(T) string) error
- func TopK[T any](s Stream[T], k int, less func(T, T) bool) []T
- func Unzip[T, U any](s Stream[Pair[T, U]]) ([]T, []U)
- func Using[T interface{ ... }, R any](resource T, fn func(T) R) R
- type CSVRecord
- type CSVStream
- type CoGrouped
- type Collector
- func AveragingCollector[T Numeric]() Collector[T, *averagingState, Optional[float64]]
- func BottomKCollector[T any](k int, less func(T, T) bool) Collector[T, *bottomKState[T], []T]
- func CountingCollector[T any]() Collector[T, *countingState, int]
- func FilteringCollector[T, A, R any](pred func(T) bool, downstream Collector[T, A, R]) Collector[T, A, R]
- func FirstCollector[T any]() Collector[T, *firstState[T], Optional[T]]
- func FlatMappingCollector[T, U, A, R any](mapper func(T) Stream[U], downstream Collector[U, A, R]) Collector[T, A, R]
- func FrequencyCollector[T comparable]() Collector[T, map[T]int, map[T]int]
- func GroupingByCollector[T any, K comparable](keyFn func(T) K) Collector[T, map[K][]T, map[K][]T]
- func HistogramCollector[T any, K comparable](keyFn func(T) K) Collector[T, *histogramState[T, K], map[K][]T]
- func JoiningCollector(sep string) Collector[string, *strings.Builder, string]
- func JoiningCollectorFull(sep, prefix, suffix string) Collector[string, *strings.Builder, string]
- func LastCollector[T any]() Collector[T, *lastState[T], Optional[T]]
- func MappingCollector[T, U, A, R any](mapper func(T) U, downstream Collector[U, A, R]) Collector[T, A, R]
- func MaxByCollector[T any](cmp func(T, T) int) Collector[T, *maxState[T], Optional[T]]
- func MinByCollector[T any](cmp func(T, T) int) Collector[T, *minState[T], Optional[T]]
- func PartitioningByCollector[T any](pred func(T) bool) Collector[T, *partitionState[T], map[bool][]T]
- func QuantileCollector[T any](q float64, less func(T, T) bool) Collector[T, *quantileState[T], Optional[T]]
- func ReducingCollector[T any](identity T, fn func(T, T) T) Collector[T, *T, T]
- func SummingCollector[T Numeric]() Collector[T, *summingState[T], T]
- func TeeingCollector[T, A1, R1, A2, R2, R any](c1 Collector[T, A1, R1], c2 Collector[T, A2, R2], merger func(R1, R2) R) Collector[T, *teeingState[A1, A2], R]
- func ToArrayListCollector[T any]() Collector[T, collections.List[T], collections.List[T]]
- func ToHashMapCollector[T any, K comparable, V any](keyFn func(T) K, valFn func(T) V) Collector[T, collections.Map[K, V], collections.Map[K, V]]
- func ToHashSetCollector[T comparable]() Collector[T, collections.Set[T], collections.Set[T]]
- func ToMapCollector[T any, K comparable, V any](keyFn func(T) K, valFn func(T) V) Collector[T, map[K]V, map[K]V]
- func ToMapCollectorMerging[T any, K comparable, V any](keyFn func(T) K, valFn func(T) V, merge func(V, V) V) Collector[T, map[K]V, map[K]V]
- func ToSetCollector[T comparable]() Collector[T, map[T]struct{}, map[T]struct{}]
- func ToSliceCollector[T any]() Collector[T, []T, []T]
- func ToTreeMapCollector[T any, K any, V any](keyFn func(T) K, valFn func(T) V, keyCmp collections.Comparator[K]) Collector[T, collections.SortedMap[K, V], collections.SortedMap[K, V]]
- func ToTreeSetCollector[T any](cmp collections.Comparator[T]) Collector[T, collections.SortedSet[T], collections.SortedSet[T]]
- func TopKCollector[T any](k int, less func(T, T) bool) Collector[T, *topKState[T], []T]
- type ContextError
- type FileLineStream
- type Float
- type Integer
- type JoinResult
- type JoinResultOptional
- type Numeric
- type Optional
- func Average[T Numeric](s Stream[T]) Optional[float64]
- func AverageBy[T any, N Numeric](s Stream[T], fn func(T) N) Optional[float64]
- func FindFirstCtx[T any](ctx context.Context, s Stream[T], pred func(T) bool) (Optional[T], error)
- func GetStatistics[T Numeric](s Stream[T]) Optional[Statistics[T]]
- func MaxBy[T any, K cmp.Ordered](s Stream[T], fn func(T) K) Optional[T]
- func MaxValue[T cmp.Ordered](s Stream[T]) Optional[T]
- func Median[T any](s Stream[T], less func(T, T) bool) Optional[T]
- func MinBy[T any, K cmp.Ordered](s Stream[T], fn func(T) K) Optional[T]
- func MinMax[T cmp.Ordered](s Stream[T]) Optional[Pair[T, T]]
- func MinValue[T cmp.Ordered](s Stream[T]) Optional[T]
- func None[T any]() Optional[T]
- func OptionalFlatMap[T, U any](o Optional[T], fn func(T) Optional[U]) Optional[U]
- func OptionalFromCondition[T any](condition bool, value T) Optional[T]
- func OptionalMap[T, U any](o Optional[T], fn func(T) U) Optional[U]
- func OptionalOf[T any](ptr *T) Optional[T]
- func OptionalZip[T, U any](o1 Optional[T], o2 Optional[U]) Optional[Pair[T, U]]
- func Percentile[T any](s Stream[T], p float64, less func(T, T) bool) Optional[T]
- func Quantile[T any](s Stream[T], q float64, less func(T, T) bool) Optional[T]
- func Some[T any](value T) Optional[T]
- func (o Optional[T]) Filter(pred func(T) bool) Optional[T]
- func (o Optional[T]) Get() T
- func (o Optional[T]) GetOrElse(defaultVal T) T
- func (o Optional[T]) GetOrElseGet(supplier func() T) T
- func (o Optional[T]) GetOrZero() T
- func (o Optional[T]) IfPresent(action func(T))
- func (o Optional[T]) IfPresentOrElse(action func(T), emptyAction func())
- func (o Optional[T]) IsEmpty() bool
- func (o Optional[T]) IsPresent() bool
- func (o Optional[T]) Map(fn func(T) T) Optional[T]
- func (o Optional[T]) OrElse(other Optional[T]) Optional[T]
- func (o Optional[T]) OrElseGet(supplier func() Optional[T]) Optional[T]
- func (o Optional[T]) String() string
- func (o Optional[T]) ToPointer() *T
- func (o Optional[T]) ToSlice() []T
- func (o Optional[T]) ToStream() Stream[T]
- type Pair
- type ParallelConfig
- type ParallelOption
- type Quad
- type Result
- func (r Result[T]) And(other Result[T]) Result[T]
- func (r Result[T]) Error() error
- func (r Result[T]) Get() (T, error)
- func (r Result[T]) IsErr() bool
- func (r Result[T]) IsOk() bool
- func (r Result[T]) Map(fn func(T) T) Result[T]
- func (r Result[T]) MapErr(fn func(error) error) Result[T]
- func (r Result[T]) Or(other Result[T]) Result[T]
- func (r Result[T]) ToOptional() Optional[T]
- func (r Result[T]) Unwrap() T
- func (r Result[T]) UnwrapErr() error
- func (r Result[T]) UnwrapOr(defaultVal T) T
- func (r Result[T]) UnwrapOrElse(fn func(error) T) T
- func (r Result[T]) Value() T
- type Signed
- type Statistics
- type Stream
- func Abs[T Signed](s Stream[T]) Stream[T]
- func AbsFloat[T Float](s Stream[T]) Stream[T]
- func AntiJoinBy[T, U any, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[T]
- func Cartesian[T, U any](s1 Stream[T], s2 Stream[U]) Stream[Pair[T, U]]
- func CartesianSelf[T any](s Stream[T]) Stream[Pair[T, T]]
- func Chunk[T any](s Stream[T], size int) Stream[[]T]
- func Clamp[T cmp.Ordered](s Stream[T], minVal, maxVal T) Stream[T]
- func CoGroup[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[CoGrouped[K, V1, V2]]
- func Combinations[T any](s Stream[T], k int) Stream[[]T]
- func Concat[T any](streams ...Stream[T]) Stream[T]
- func CrossProduct[T any](streams ...Stream[T]) Stream[[]T]
- func Cycle[T any](values ...T) Stream[T]
- func Debounce[T any](ctx context.Context, s Stream[T], quiet time.Duration) Stream[T]
- func Delay[T any](s Stream[T], duration time.Duration) Stream[T]
- func DelayCtx[T any](ctx context.Context, s Stream[T], duration time.Duration) Stream[T]
- func Differences[T Numeric](s Stream[T]) Stream[T]
- func Distinct[T comparable](s Stream[T]) Stream[T]
- func DistinctBy[T any, K comparable](s Stream[T], keyFn func(T) K) Stream[T]
- func DistinctUntilChanged[T comparable](s Stream[T]) Stream[T]
- func DistinctUntilChangedBy[T any](s Stream[T], eq func(a, b T) bool) Stream[T]
- func Empty[T any]() Stream[T]
- func FilterCtx[T any](ctx context.Context, s Stream[T], pred func(T) bool) Stream[T]
- func FilterErr[T any](s Stream[T], pred func(T) (bool, error)) Stream[Result[T]]
- func FilterErrs[T any](s Stream[Result[T]]) Stream[error]
- func FilterOk[T any](s Stream[Result[T]]) Stream[T]
- func FlatMap[T, U any](s Stream[T], fn func(T) Stream[U]) Stream[U]
- func FlatMapErr[T, U any](s Stream[T], fn func(T) (Stream[U], error)) Stream[Result[U]]
- func FlatMapSeq[T, U any](s Stream[T], fn func(T) iter.Seq[U]) Stream[U]
- func Flatten[T any](s Stream[[]T]) Stream[T]
- func FlattenSeq[T any](s Stream[iter.Seq[T]]) Stream[T]
- func From[T any](seq iter.Seq[T]) Stream[T]
- func FromBytes(data []byte) Stream[byte]
- func FromCSV(r io.Reader) Stream[[]string]
- func FromCSVErr(r io.Reader) Stream[Result[[]string]]
- func FromCSVWithHeader(r io.Reader) Stream[CSVRecord]
- func FromCSVWithHeaderErr(r io.Reader) Stream[Result[CSVRecord]]
- func FromChannel[T any](ch <-chan T) Stream[T]
- func FromChannelCtx[T any](ctx context.Context, ch <-chan T) Stream[T]
- func FromDeque[T any](d collections.Deque[T]) Stream[T]
- func FromDequeReversed[T any](d collections.Deque[T]) Stream[T]
- func FromList[T any](list collections.List[T]) Stream[T]
- func FromPriorityQueue[T any](pq collections.PriorityQueue[T]) Stream[T]
- func FromPriorityQueueSorted[T any](pq collections.PriorityQueue[T]) Stream[T]
- func FromQueue[T any](q collections.Queue[T]) Stream[T]
- func FromReaderLines(r io.Reader) Stream[string]
- func FromReaderLinesCtx(ctx context.Context, r io.Reader) Stream[string]
- func FromReaderLinesErr(r io.Reader) Stream[Result[string]]
- func FromResults[T any](results ...Result[T]) Stream[Result[T]]
- func FromRunes(s string) Stream[rune]
- func FromScanner(scanner *bufio.Scanner) Stream[string]
- func FromScannerErr(scanner *bufio.Scanner) Stream[Result[string]]
- func FromSet[T any](set collections.Set[T]) Stream[T]
- func FromSlice[T any](s []T) Stream[T]
- func FromSortedSet[T any](set collections.SortedSet[T]) Stream[T]
- func FromSortedSetDescending[T any](set collections.SortedSet[T]) Stream[T]
- func FromStack[T any](s collections.Stack[T]) Stream[T]
- func FromStringLines(s string) Stream[string]
- func FromTSV(r io.Reader) Stream[[]string]
- func FromTSVErr(r io.Reader) Stream[Result[[]string]]
- func FullJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResultOptional[K, V1, V2]]
- func Generate[T any](supplier func() T) Stream[T]
- func GenerateCtx[T any](ctx context.Context, supplier func() T) Stream[T]
- func InnerJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResult[K, V1, V2]]
- func Interleave[T any](s1, s2 Stream[T]) Stream[T]
- func Interval(ctx context.Context, interval time.Duration) Stream[int]
- func Iterate[T any](seed T, fn func(T) T) Stream[T]
- func IterateCtx[T any](ctx context.Context, seed T, fn func(T) T) Stream[T]
- func JoinBy[T, U, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[Pair[T, U]]
- func LeftJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResultOptional[K, V1, V2]]
- func LeftJoinBy[T, U any, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[Pair[T, Optional[U]]]
- func LeftJoinWith[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2], defaultV2 V2) Stream[JoinResult[K, V1, V2]]
- func MapCtx[T any](ctx context.Context, s Stream[T], fn func(T) T) Stream[T]
- func MapErrTo[T, U any](s Stream[T], fn func(T) (U, error)) Stream[Result[U]]
- func MapTo[T, U any](s Stream[T], fn func(T) U) Stream[U]
- func MapToCtx[T, U any](ctx context.Context, s Stream[T], fn func(T) U) Stream[U]
- func MergeSorted[T any](s1, s2 Stream[T], cmp func(a, b T) int) Stream[T]
- func MergeSortedN[T any](cmp func(a, b T) int, streams ...Stream[T]) Stream[T]
- func MergeSortedNHeap[T any](cmp func(a, b T) int, streams ...Stream[T]) Stream[T]
- func Negative[T Signed](s Stream[T]) Stream[T]
- func NonZero[T Numeric](s Stream[T]) Stream[T]
- func Of[T any](values ...T) Stream[T]
- func Offset[T Numeric](s Stream[T], offset T) Stream[T]
- func Pairwise[T any](s Stream[T]) Stream[Pair[T, T]]
- func ParallelFilter[T any](s Stream[T], pred func(T) bool, opts ...ParallelOption) Stream[T]
- func ParallelFilterCtx[T any](ctx context.Context, s Stream[T], pred func(context.Context, T) bool, ...) Stream[T]
- func ParallelFlatMap[T, U any](s Stream[T], fn func(T) Stream[U], opts ...ParallelOption) Stream[U]
- func ParallelFlatMapCtx[T, U any](ctx context.Context, s Stream[T], fn func(context.Context, T) Stream[U], ...) Stream[U]
- func ParallelMap[T, U any](s Stream[T], fn func(T) U, opts ...ParallelOption) Stream[U]
- func ParallelMapCtx[T, U any](ctx context.Context, s Stream[T], fn func(context.Context, T) U, ...) Stream[U]
- func Permutations[T any](s Stream[T]) Stream[[]T]
- func Positive[T Numeric](s Stream[T]) Stream[T]
- func Prefetch[T any](s Stream[T], n int) Stream[T]
- func Range(start, end int) Stream[int]
- func RangeClosed(start, end int) Stream[int]
- func RangeCtx(ctx context.Context, start, end int) Stream[int]
- func RateLimit[T any](s Stream[T], n int, per time.Duration) Stream[T]
- func RateLimitCtx[T any](ctx context.Context, s Stream[T], n int, per time.Duration) Stream[T]
- func Repeat[T any](value T, n int) Stream[T]
- func RepeatForever[T any](value T) Stream[T]
- func RightJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResultOptional[K, V1, V2]]
- func RightJoinWith[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2], defaultV1 V1) Stream[JoinResult[K, V1, V2]]
- func RunningProduct[T Numeric](s Stream[T]) Stream[T]
- func RunningSum[T Numeric](s Stream[T]) Stream[T]
- func Sample[T any](ctx context.Context, s Stream[T], interval time.Duration) Stream[T]
- func Scale[T Numeric](s Stream[T], factor T) Stream[T]
- func Scan[T any, A any](s Stream[T], init A, fn func(A, T) A) Stream[A]
- func SemiJoinBy[T, U any, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[T]
- func SessionWindow[T any](ctx context.Context, s Stream[T], gap time.Duration) Stream[[]T]
- func SlidingTimeWindow[T any](ctx context.Context, s Stream[T], windowSize, slideInterval time.Duration) Stream[[]T]
- func SortedBy[T any, K cmp.Ordered](s Stream[T], keyFn func(T) K) Stream[T]
- func SortedStableBy[T any, K cmp.Ordered](s Stream[T], keyFn func(T) K) Stream[T]
- func TakeUntilErr[T any](s Stream[Result[T]]) Stream[T]
- func Throttle[T any](s Stream[T], interval time.Duration) Stream[T]
- func ThrottleCtx[T any](ctx context.Context, s Stream[T], interval time.Duration) Stream[T]
- func Timeout[T any](ctx context.Context, s Stream[T], timeout time.Duration) Stream[Result[T]]
- func Timer[T any](ctx context.Context, duration time.Duration, value T) Stream[T]
- func Triples[T any](s Stream[T]) Stream[Triple[T, T, T]]
- func TumblingTimeWindow[T any](ctx context.Context, s Stream[T], windowSize time.Duration) Stream[[]T]
- func UnwrapOrDefault[T any](s Stream[Result[T]], defaultVal T) Stream[T]
- func UnwrapResults[T any](s Stream[Result[T]]) Stream[T]
- func Window[T any](s Stream[T], size int) Stream[[]T]
- func WindowWithStep[T any](s Stream[T], size, step int, allowPartial bool) Stream[[]T]
- func WithContext[T any](ctx context.Context, s Stream[T]) Stream[T]
- func WithTimestamp[T any](s Stream[T]) Stream[TimestampedValue[T]]
- func Zip[T, U any](s1 Stream[T], s2 Stream[U]) Stream[Pair[T, U]]
- func Zip3[A, B, C any](s1 Stream[A], s2 Stream[B], s3 Stream[C]) Stream[Triple[A, B, C]]
- func ZipLongest[T, U any](s1 Stream[T], s2 Stream[U]) Stream[Pair[Optional[T], Optional[U]]]
- func ZipLongestWith[T, U any](s1 Stream[T], s2 Stream[U], defaultT T, defaultU U) Stream[Pair[T, U]]
- func (s Stream[T]) AllMatch(pred func(T) bool) bool
- func (s Stream[T]) AnyMatch(pred func(T) bool) bool
- func (s Stream[T]) At(index int) Optional[T]
- func (s Stream[T]) Collect() []T
- func (s Stream[T]) Count() int
- func (s Stream[T]) DropLast(n int) Stream[T]
- func (s Stream[T]) DropWhile(pred func(T) bool) Stream[T]
- func (s Stream[T]) Filter(pred func(T) bool) Stream[T]
- func (s Stream[T]) FindFirst(pred func(T) bool) Optional[T]
- func (s Stream[T]) FindLast(pred func(T) bool) Optional[T]
- func (s Stream[T]) First() Optional[T]
- func (s Stream[T]) Fold(identity T, fn func(T, T) T) T
- func (s Stream[T]) ForEach(action func(T))
- func (s Stream[T]) ForEachErr(action func(T) error) error
- func (s Stream[T]) ForEachIndexed(action func(int, T))
- func (s Stream[T]) ForEachIndexedErr(action func(int, T) error) error
- func (s Stream[T]) Intersperse(sep T) Stream[T]
- func (s Stream[T]) IsEmpty() bool
- func (s Stream[T]) IsNotEmpty() bool
- func (s Stream[T]) Last() Optional[T]
- func (s Stream[T]) Limit(n int) Stream[T]
- func (s Stream[T]) Map(fn func(T) T) Stream[T]
- func (s Stream[T]) Max(cmp func(T, T) int) Optional[T]
- func (s Stream[T]) Min(cmp func(T, T) int) Optional[T]
- func (s Stream[T]) NoneMatch(pred func(T) bool) bool
- func (s Stream[T]) Nth(index int) Optional[T]
- func (s Stream[T]) Peek(action func(T)) Stream[T]
- func (s Stream[T]) Reduce(identity T, fn func(T, T) T) T
- func (s Stream[T]) ReduceOptional(fn func(T, T) T) Optional[T]
- func (s Stream[T]) Reverse() Stream[T]
- func (s Stream[T]) Seq() iter.Seq[T]
- func (s Stream[T]) Single() Optional[T]
- func (s Stream[T]) Skip(n int) Stream[T]
- func (s Stream[T]) Sorted(cmp func(a, b T) int) Stream[T]
- func (s Stream[T]) SortedStable(cmp func(a, b T) int) Stream[T]
- func (s Stream[T]) Step(n int) Stream[T]
- func (s Stream[T]) TakeLast(n int) Stream[T]
- func (s Stream[T]) TakeWhile(pred func(T) bool) Stream[T]
- type Stream2
- func AntiJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream2[K, V1]
- func DistinctKeys[K comparable, V any](s Stream2[K, V]) Stream2[K, V]
- func DistinctValues[K any, V comparable](s Stream2[K, V]) Stream2[K, V]
- func Empty2[K, V any]() Stream2[K, V]
- func From2[K, V any](seq iter.Seq2[K, V]) Stream2[K, V]
- func FromMap[K comparable, V any](m map[K]V) Stream2[K, V]
- func FromMapC[K, V any](m collections.Map[K, V]) Stream2[K, V]
- func FromSortedMapC[K, V any](m collections.SortedMap[K, V]) Stream2[K, V]
- func FromSortedMapCDescending[K, V any](m collections.SortedMap[K, V]) Stream2[K, V]
- func MapKeysTo[K, V, K2 any](s Stream2[K, V], fn func(K) K2) Stream2[K2, V]
- func MapPairs[K, V, K2, V2 any](s Stream2[K, V], fn func(K, V) (K2, V2)) Stream2[K2, V2]
- func MapValuesTo[K, V, V2 any](s Stream2[K, V], fn func(V) V2) Stream2[K, V2]
- func PairsOf[K, V any](pairs ...Pair[K, V]) Stream2[K, V]
- func SemiJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream2[K, V1]
- func SwapKeyValue[K, V any](s Stream2[K, V]) Stream2[V, K]
- func WithContext2[K, V any](ctx context.Context, s Stream2[K, V]) Stream2[K, V]
- func ZipWithIndex[T any](s Stream[T]) Stream2[int, T]
- func (s Stream2[K, V]) AllMatch(pred func(K, V) bool) bool
- func (s Stream2[K, V]) AnyMatch(pred func(K, V) bool) bool
- func (s Stream2[K, V]) CollectPairs() []Pair[K, V]
- func (s Stream2[K, V]) Count() int
- func (s Stream2[K, V]) DropWhile(pred func(K, V) bool) Stream2[K, V]
- func (s Stream2[K, V]) Filter(pred func(K, V) bool) Stream2[K, V]
- func (s Stream2[K, V]) First() Optional[Pair[K, V]]
- func (s Stream2[K, V]) ForEach(action func(K, V))
- func (s Stream2[K, V]) Keys() Stream[K]
- func (s Stream2[K, V]) Limit(n int) Stream2[K, V]
- func (s Stream2[K, V]) MapKeys(fn func(K) K) Stream2[K, V]
- func (s Stream2[K, V]) MapValues(fn func(V) V) Stream2[K, V]
- func (s Stream2[K, V]) NoneMatch(pred func(K, V) bool) bool
- func (s Stream2[K, V]) ParallelFilter(pred func(K, V) bool, opts ...ParallelOption) Stream2[K, V]
- func (s Stream2[K, V]) ParallelMapValues(fn func(V) V, opts ...ParallelOption) Stream2[K, V]
- func (s Stream2[K, V]) Peek(action func(K, V)) Stream2[K, V]
- func (s Stream2[K, V]) Reduce(identity Pair[K, V], fn func(Pair[K, V], K, V) Pair[K, V]) Pair[K, V]
- func (s Stream2[K, V]) Seq2() iter.Seq2[K, V]
- func (s Stream2[K, V]) Skip(n int) Stream2[K, V]
- func (s Stream2[K, V]) TakeWhile(pred func(K, V) bool) Stream2[K, V]
- func (s Stream2[K, V]) ToPairs() Stream[Pair[K, V]]
- func (s Stream2[K, V]) Values() Stream[V]
- type TimestampedValue
- type Triple
- type Unsigned
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AllMatchCtx ¶
AllMatchCtx checks if all elements match the predicate with context support.
func AnyMatchCtx ¶
AnyMatchCtx checks if any element matches the predicate with context support.
func Associate ¶
func Associate[T any, K comparable, V any](s Stream[T], fn func(T) (K, V)) map[K]V
Associate creates a map from elements using a function that returns key-value pairs.
func AssociateBy ¶
func AssociateBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K]T
AssociateBy creates a map using element as value and a key function.
func CollectCtx ¶
CollectCtx collects all elements into a slice with context support. Returns the elements collected so far and the context error if cancelled.
func CollectResults ¶
CollectResults collects a stream of Results into a slice and error. Returns the first error encountered, or nil if all succeeded.
func CollectResultsAll ¶
CollectResultsAll collects all Results, continuing even after errors. Returns all successful values and all errors encountered.
func CollectToList ¶
func CollectToList[T any](seq iter.Seq[T]) collections.List[T]
CollectToList is a convenience function that collects an iter.Seq into a collections.List.
func CollectToMap ¶
func CollectToMap[K comparable, V any](seq iter.Seq2[K, V]) collections.Map[K, V]
CollectToMap is a convenience function that collects an iter.Seq2 into a collections.Map.
func CollectToSet ¶
func CollectToSet[T comparable](seq iter.Seq[T]) collections.Set[T]
CollectToSet is a convenience function that collects an iter.Seq into a collections.Set.
func Contains ¶
func Contains[T comparable](s Stream[T], target T) bool
Contains returns true if the stream contains the target element. Elements must be comparable.
func CountBy ¶
func CountBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K]int
CountBy counts elements by a key function.
func ForEachCtx ¶
func ForEachCtx[T any, A ~func(T) | ~func(T) error](ctx context.Context, s Stream[T], action A) error
ForEachCtx executes an action on each element with context support. The action can be either func(T) or func(T) error. If action is func(T) error and returns an error, iteration stops and the error is returned. Returns the context error if cancelled, or the first error from action.
Examples:
err := ForEachCtx(ctx, s, func(v int) { fmt.Println(v) })
err := ForEachCtx(ctx, s, func(v int) error { return process(v) })
func Frequencies ¶
func Frequencies[T comparable](s Stream[T]) map[T]int
Frequencies counts occurrences of each element.
func Frequency ¶
func Frequency[T comparable](s Stream[T]) map[T]int
Frequency returns a map of element frequencies.
func FrequencyToHashMap ¶
func FrequencyToHashMap[T comparable](s Stream[T]) collections.Map[T, int]
FrequencyToHashMap returns element frequencies as a collections.Map[T, int].
func GroupBy ¶
func GroupBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K][]T
GroupBy groups elements by a key function.
func GroupByTo ¶
func GroupByTo[T any, K comparable, V any](s Stream[T], keyFn func(T) K, valFn func(T) V) map[K][]V
GroupByTo groups elements by a key function and transforms values.
func GroupByToHashMap ¶
func GroupByToHashMap[T any, K comparable](s Stream[T], keyFn func(T) K) collections.Map[K, []T]
GroupByToHashMap groups elements by key into a collections.Map[K, []T].
func GroupByToTreeMap ¶
func GroupByToTreeMap[T any, K any](s Stream[T], keyFn func(T) K, keyCmp collections.Comparator[K]) collections.SortedMap[K, []T]
GroupByToTreeMap groups elements by key into a collections.SortedMap[K, []T].
func GroupValues ¶
func GroupValues[K comparable, V any](s Stream2[K, V]) map[K][]V
GroupValues groups all values by their keys into slices. Returns a map where each key maps to a slice of all values with that key.
func GroupValuesToHashMap ¶
func GroupValuesToHashMap[K comparable, V any](s Stream2[K, V]) collections.Map[K, []V]
GroupValuesToHashMap groups Stream2 values by key into a collections.Map[K, []V].
func HistogramToHashMap ¶
func HistogramToHashMap[T any, K comparable](s Stream[T], keyFn func(T) K) collections.Map[K, []T]
HistogramToHashMap buckets elements by key into a collections.Map[K, []T].
func IndexBy ¶
func IndexBy[T any, K comparable](s Stream[T], keyFn func(T) K) map[K]T
IndexBy is an alias for AssociateBy.
func Joining ¶
Joining concatenates string elements with a separator. Uses strings.Builder for O(n) performance.
func JoiningWithPrefixSuffix ¶
JoiningWithPrefixSuffix concatenates string elements with separator, prefix, and suffix.
func OptionalEquals ¶
func OptionalEquals[T comparable](o1, o2 Optional[T]) bool
OptionalEquals checks if two Optionals are equal. Two Optionals are equal if both are empty, or both are present with equal values.
func ParallelCollect ¶
func ParallelCollect[T any](s Stream[T], opts ...ParallelOption) []T
ParallelCollect collects elements in parallel into a slice. Note: Order is not guaranteed unless the source stream has been ordered.
func ParallelForEach ¶
func ParallelForEach[T any](s Stream[T], action func(T), opts ...ParallelOption)
ParallelForEach executes an action on each element in parallel. This is a terminal operation that blocks until all elements are processed.
func ParallelForEachCtx ¶
func ParallelForEachCtx[T any](ctx context.Context, s Stream[T], action func(context.Context, T), opts ...ParallelOption) error
ParallelForEachCtx executes an action on each element in parallel with context support.
func ParallelReduce ¶
func ParallelReduce[T any](s Stream[T], identity T, op func(T, T) T, opts ...ParallelOption) T
ParallelReduce reduces elements in parallel using an associative operation. The operation must be associative for correct results.
func PartitionBy ¶
PartitionBy splits elements into two groups based on a predicate. Returns (matching, notMatching).
func PartitionResults ¶
PartitionResults separates a stream of Results into successes and failures.
func ReduceByKey ¶
func ReduceByKey[K comparable, V any](s Stream2[K, V], merge func(V, V) V) map[K]V
ReduceByKey groups values by key and reduces each group using the merge function. Returns a map where each key maps to the reduced value of all values with that key.
func ReduceByKeyWithInit ¶
func ReduceByKeyWithInit[K comparable, V, R any](s Stream2[K, V], init func() R, merge func(R, V) R) map[K]R
ReduceByKeyWithInit groups values by key and reduces each group using the merge function. Uses init as the initial value for each key's reduction.
func ReduceCtx ¶
func ReduceCtx[T any, F ~func(T, T) T | ~func(T, T) (T, error)](ctx context.Context, s Stream[T], identity T, fn F) (T, error)
ReduceCtx reduces the stream with context support. The reducer function can be either func(T, T) T or func(T, T) (T, error). If reducer is func(T, T) (T, error) and returns an error, reduction stops and the error is returned. Returns the accumulated result and context error if cancelled, or the first error from reducer.
Examples:
result, err := ReduceCtx(ctx, s, 0, func(a, b int) int { return a + b })
result, err := ReduceCtx(ctx, s, 0, func(a, b int) (int, error) { return compute(a, b) })
func ToArrayList ¶
func ToArrayList[T any](s Stream[T]) collections.List[T]
ToArrayList collects stream elements into a collections.List[T].
func ToHashMap2C ¶
func ToHashMap2C[K comparable, V any](s Stream2[K, V]) collections.Map[K, V]
ToHashMap2C converts a Stream2[K, V] into a collections.Map[K, V].
func ToHashMapC ¶
func ToHashMapC[T any, K comparable, V any](s Stream[T], keyFn func(T) K, valFn func(T) V) collections.Map[K, V]
ToHashMapC collects stream elements into a collections.Map[K, V]. The "C" suffix distinguishes it from ToMap which returns a Go map.
func ToHashSet ¶
func ToHashSet[T comparable](s Stream[T]) collections.Set[T]
ToHashSet collects stream elements into a collections.Set[T].
func ToLinkedList ¶
func ToLinkedList[T any](s Stream[T]) collections.List[T]
ToLinkedList collects stream elements into a collections.List[T] (linked list implementation).
func ToMap ¶
func ToMap[T any, K comparable, V any](s Stream[T], keyFn func(T) K, valFn func(T) V) map[K]V
ToMap collects Stream into a map using key and value functions.
func ToMap2 ¶
func ToMap2[K comparable, V any](s Stream2[K, V]) map[K]V
ToMap2 collects Stream2 into a map. Keys must be comparable.
func ToSet ¶
func ToSet[T comparable](s Stream[T]) map[T]struct{}
ToSet collects Stream into a set (map with struct{} values).
func ToTreeMap2C ¶
func ToTreeMap2C[K any, V any](s Stream2[K, V], keyCmp collections.Comparator[K]) collections.SortedMap[K, V]
ToTreeMap2C converts a Stream2[K, V] into a collections.SortedMap[K, V].
func ToTreeMapC ¶
func ToTreeMapC[T any, K any, V any](s Stream[T], keyFn func(T) K, valFn func(T) V, keyCmp collections.Comparator[K]) collections.SortedMap[K, V]
ToTreeMapC collects stream elements into a collections.SortedMap[K, V]. Keys are maintained in sorted order according to the comparator.
func ToTreeSet ¶
func ToTreeSet[T any](s Stream[T], cmp collections.Comparator[T]) collections.SortedSet[T]
ToTreeSet collects stream elements into a collections.SortedSet[T]. Elements are maintained in sorted order according to the comparator.
func ToWriter ¶
ToWriter writes stream elements to an io.Writer, one per line. Note: A newline is automatically appended after each formatted element. The provided format function should NOT include a trailing newline.
Types ¶
type CSVRecord ¶
CSVRecord represents a single CSV record with named fields.
type CSVStream ¶
CSVStream represents a stream of CSV records with resource management.
func FromCSVFile ¶
FromCSVFile opens a CSV file and creates a stream of records. Parse errors terminate the stream silently. For error handling, use FromCSVErr with manual file open.
func FromTSVFile ¶
FromTSVFile opens a TSV file and creates a stream of records. Parse errors terminate the stream silently. For error handling, use FromTSVErr with manual file open.
func MustFromCSVFile ¶
MustFromCSVFile opens a CSV file and creates a stream of records. Panics if the file cannot be opened.
func MustFromTSVFile ¶
MustFromTSVFile opens a TSV file and creates a stream of records. Panics if the file cannot be opened.
type CoGrouped ¶
type CoGrouped[K, V1, V2 any] struct { Key K Left []V1 Right []V2 }
CoGrouped holds grouped values from two streams with the same key.
type Collector ¶
type Collector[T, A, R any] struct { // Supplier creates a new accumulator. Supplier func() A // Accumulator adds an element to the accumulator. Accumulator func(A, T) A // Finisher transforms the accumulator to the final result. Finisher func(A) R }
Collector defines how to accumulate elements into a result. T is the element type, A is the accumulator type, R is the result type.
func AveragingCollector ¶
AveragingCollector returns a Collector that computes the average of numeric elements.
func BottomKCollector ¶
BottomKCollector returns a Collector that finds the k smallest elements. Uses a max-heap to maintain O(n log k) complexity.
func CountingCollector ¶
CountingCollector returns a Collector that counts elements.
func FilteringCollector ¶
func FilteringCollector[T, A, R any](pred func(T) bool, downstream Collector[T, A, R]) Collector[T, A, R]
FilteringCollector filters elements before collecting.
func FirstCollector ¶
FirstCollector returns a Collector that returns the first element.
func FlatMappingCollector ¶
func FlatMappingCollector[T, U, A, R any](mapper func(T) Stream[U], downstream Collector[U, A, R]) Collector[T, A, R]
FlatMappingCollector flat-maps elements before collecting.
func FrequencyCollector ¶
func FrequencyCollector[T comparable]() Collector[T, map[T]int, map[T]int]
FrequencyCollector returns a Collector that counts occurrences of each element.
func GroupingByCollector ¶
func GroupingByCollector[T any, K comparable](keyFn func(T) K) Collector[T, map[K][]T, map[K][]T]
GroupingByCollector returns a Collector that groups elements by a key function.
func HistogramCollector ¶
func HistogramCollector[T any, K comparable](keyFn func(T) K) Collector[T, *histogramState[T, K], map[K][]T]
HistogramCollector groups elements into buckets based on a key function.
func JoiningCollector ¶
JoiningCollector returns a Collector that joins strings with a separator.
func JoiningCollectorFull ¶
JoiningCollectorFull returns a Collector that joins strings with separator, prefix, and suffix.
func LastCollector ¶
LastCollector returns a Collector that returns the last element.
func MappingCollector ¶
func MappingCollector[T, U, A, R any](mapper func(T) U, downstream Collector[U, A, R]) Collector[T, A, R]
MappingCollector applies a transformation before collecting.
func MaxByCollector ¶
MaxByCollector returns a Collector that finds the maximum element.
func MinByCollector ¶
MinByCollector returns a Collector that finds the minimum element.
func PartitioningByCollector ¶
func PartitioningByCollector[T any](pred func(T) bool) Collector[T, *partitionState[T], map[bool][]T]
PartitioningByCollector returns a Collector that partitions elements by a predicate.
func QuantileCollector ¶
func QuantileCollector[T any](q float64, less func(T, T) bool) Collector[T, *quantileState[T], Optional[T]]
QuantileCollector returns a Collector that computes a quantile. The quantile q should be between 0 and 1 (e.g., 0.5 for median). Note: This collector stores all elements in memory.
func ReducingCollector ¶
ReducingCollector returns a Collector that reduces elements using an identity and function.
func SummingCollector ¶
SummingCollector returns a Collector that sums numeric elements.
func TeeingCollector ¶
func TeeingCollector[T, A1, R1, A2, R2, R any]( c1 Collector[T, A1, R1], c2 Collector[T, A2, R2], merger func(R1, R2) R, ) Collector[T, *teeingState[A1, A2], R]
TeeingCollector combines the results of two collectors.
func ToArrayListCollector ¶
func ToArrayListCollector[T any]() Collector[T, collections.List[T], collections.List[T]]
ToArrayListCollector returns a Collector that accumulates elements into a collections.List.
func ToHashMapCollector ¶
func ToHashMapCollector[T any, K comparable, V any](keyFn func(T) K, valFn func(T) V) Collector[T, collections.Map[K, V], collections.Map[K, V]]
ToHashMapCollector returns a Collector that accumulates elements into a collections.Map.
func ToHashSetCollector ¶
func ToHashSetCollector[T comparable]() Collector[T, collections.Set[T], collections.Set[T]]
ToHashSetCollector returns a Collector that accumulates elements into a collections.Set.
func ToMapCollector ¶
func ToMapCollector[T any, K comparable, V any](keyFn func(T) K, valFn func(T) V) Collector[T, map[K]V, map[K]V]
ToMapCollector returns a Collector that creates a map from elements.
func ToMapCollectorMerging ¶
func ToMapCollectorMerging[T any, K comparable, V any]( keyFn func(T) K, valFn func(T) V, merge func(V, V) V, ) Collector[T, map[K]V, map[K]V]
ToMapCollectorMerging returns a Collector that creates a map with a merge function for duplicate keys.
func ToSetCollector ¶
func ToSetCollector[T comparable]() Collector[T, map[T]struct{}, map[T]struct{}]
ToSetCollector returns a Collector that accumulates into a set.
func ToSliceCollector ¶
ToSliceCollector returns a Collector that accumulates into a slice.
func ToTreeMapCollector ¶
func ToTreeMapCollector[T any, K any, V any](keyFn func(T) K, valFn func(T) V, keyCmp collections.Comparator[K]) Collector[T, collections.SortedMap[K, V], collections.SortedMap[K, V]]
ToTreeMapCollector returns a Collector that accumulates elements into a collections.SortedMap.
func ToTreeSetCollector ¶
func ToTreeSetCollector[T any](cmp collections.Comparator[T]) Collector[T, collections.SortedSet[T], collections.SortedSet[T]]
ToTreeSetCollector returns a Collector that accumulates elements into a collections.SortedSet.
type ContextError ¶
type ContextError struct {
Err error
Partial bool // true if some results were collected before the error
}
ContextError represents an error that occurred during context-aware operations.
func (*ContextError) Error ¶
func (e *ContextError) Error() string
func (*ContextError) Unwrap ¶
func (e *ContextError) Unwrap() error
type FileLineStream ¶
FileLineStream represents a stream of lines from a file with resource management.
func FromFileLines ¶
func FromFileLines(path string) (*FileLineStream, error)
FromFileLines opens a file and creates a Stream of its lines. Returns the stream and a close function that must be called when done. Usage:
stream, err := FromFileLines("file.txt")
if err != nil { ... }
defer stream.Close()
for line := range stream.Seq() { ... }
func MustFromFileLines ¶
func MustFromFileLines(path string) *FileLineStream
MustFromFileLines opens a file and creates a Stream of its lines. Panics if the file cannot be opened.
func (*FileLineStream) Close ¶
func (f *FileLineStream) Close() error
Close closes the underlying file.
type JoinResult ¶
type JoinResult[K, V1, V2 any] struct { Key K Left V1 Right V2 }
JoinResult holds the result of a join operation.
type JoinResultOptional ¶
JoinResultOptional holds the result of an outer join operation.
type Numeric ¶
type Numeric interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr |
~float32 | ~float64
}
Numeric is a constraint that includes all numeric types.
type Optional ¶
type Optional[T any] struct { // contains filtered or unexported fields }
Optional represents a value that may or may not exist. It provides a type-safe alternative to using nil pointers or zero values.
func Average ¶
Average returns the average of all numeric elements. Returns None for an empty stream.
func AverageBy ¶
AverageBy computes the average of the results of applying a function to each element.
func FindFirstCtx ¶
FindFirstCtx finds the first element matching the predicate with context support.
func GetStatistics ¶
func GetStatistics[T Numeric](s Stream[T]) Optional[Statistics[T]]
GetStatistics computes basic statistics for a numeric stream. Returns None for an empty stream.
func MaxBy ¶
MaxBy returns the element that produces the maximum value when the function is applied.
func MaxValue ¶
MaxValue returns the maximum value from a stream of ordered elements. Returns None for an empty stream.
func MinBy ¶
MinBy returns the element that produces the minimum value when the function is applied.
func MinValue ¶
MinValue returns the minimum value from a stream of ordered elements. Returns None for an empty stream.
func OptionalFlatMap ¶
OptionalFlatMap transforms Optional[T] to Optional[U] where the function returns an Optional.
func OptionalFromCondition ¶
OptionalFromCondition creates an Optional based on a condition. If the condition is true, returns Some(value); otherwise returns None.
func OptionalMap ¶
OptionalMap transforms Optional[T] to Optional[U]. Use this when the transformation changes the type.
func OptionalOf ¶
OptionalOf creates an Optional from a pointer. If the pointer is nil, returns None; otherwise returns Some(*ptr).
func OptionalZip ¶
OptionalZip combines two Optionals into an Optional of Pair. Returns None if either Optional is empty.
func Percentile ¶
Percentile returns the p-th percentile (p in 0-100) from a stream.
func (Optional[T]) Filter ¶
Filter returns the Optional if present and the predicate returns true; otherwise None.
func (Optional[T]) Get ¶
func (o Optional[T]) Get() T
Get returns the value if present, or panics if empty. Use GetOrElse or GetOrElseGet for safe access.
func (Optional[T]) GetOrElse ¶
func (o Optional[T]) GetOrElse(defaultVal T) T
GetOrElse returns the value if present, or the given default value.
func (Optional[T]) GetOrElseGet ¶
func (o Optional[T]) GetOrElseGet(supplier func() T) T
GetOrElseGet returns the value if present, or computes a default using the supplier.
func (Optional[T]) GetOrZero ¶
func (o Optional[T]) GetOrZero() T
GetOrZero returns the value if present, or the zero value of T.
func (Optional[T]) IfPresent ¶
func (o Optional[T]) IfPresent(action func(T))
IfPresent calls the action with the value if present.
func (Optional[T]) IfPresentOrElse ¶
func (o Optional[T]) IfPresentOrElse(action func(T), emptyAction func())
IfPresentOrElse calls the action with the value if present, or calls emptyAction.
func (Optional[T]) Map ¶
Map transforms the value if present. For type-changing transformations, use OptionalMap function instead.
func (Optional[T]) OrElseGet ¶
OrElseGet returns this Optional if present, or computes another Optional using the supplier.
func (Optional[T]) ToPointer ¶
func (o Optional[T]) ToPointer() *T
ToPointer returns a pointer to the value if present, or nil.
type Pair ¶
type Pair[T, U any] struct { First T Second U }
Pair represents a tuple of two values.
func MostCommon ¶
func MostCommon[T comparable](s Stream[T], n int) []Pair[T, int]
MostCommon returns the n most common elements with their counts.
type ParallelConfig ¶
type ParallelConfig struct {
Concurrency int // Number of concurrent workers
Ordered bool // Whether to preserve input order
BufferSize int // Size of output buffer
ChunkSize int // Chunk size for chunked reordering (0 = disabled, uses streaming mode)
}
ParallelConfig holds configuration for parallel operations.
func DefaultParallelConfig ¶
func DefaultParallelConfig() ParallelConfig
DefaultParallelConfig returns the default parallel configuration.
type ParallelOption ¶
type ParallelOption func(*ParallelConfig)
ParallelOption is a function that modifies ParallelConfig.
func WithBufferSize ¶
func WithBufferSize(size int) ParallelOption
WithBufferSize sets the output buffer size.
func WithChunkSize ¶
func WithChunkSize(size int) ParallelOption
WithChunkSize sets the chunk size for chunked reordering in ordered parallel operations. When set to a value > 0, ordered operations will process elements in chunks, limiting memory usage by only buffering up to ChunkSize results at a time. Set to 0 (default) to use streaming mode which may buffer all out-of-order results.
Trade-off: Smaller chunk sizes reduce memory but may underutilize parallelism. WithChunkSize(1) provides minimum memory usage but processes sequentially within each chunk. A good starting point is 2-4x the concurrency level.
func WithConcurrency ¶
func WithConcurrency(n int) ParallelOption
WithConcurrency sets the number of concurrent workers.
func WithOrdered ¶
func WithOrdered(ordered bool) ParallelOption
WithOrdered sets whether to preserve input order.
type Quad ¶
type Quad[A, B, C, D any] struct { First A Second B Third C Fourth D }
Quad represents a tuple of four values.
func (Quad[A, B, C, D]) ToPair ¶
ToPair converts Quad to Pair by dropping the third and fourth elements.
type Result ¶
type Result[T any] struct { // contains filtered or unexported fields }
Result represents a value that may be either a success (Ok) or a failure (Err). It's useful for error propagation in stream pipelines.
func FlatMapResult ¶
FlatMapResult transforms Result[T] to Result[U], allowing the function to fail.
func MapResultTo ¶
MapResultTo transforms Result[T] to Result[U] using the given function.
func TryCollect ¶
TryCollect attempts to collect a stream, wrapping any panic as an error. This is useful when the stream's source might panic.
func (Result[T]) ToOptional ¶
ToOptional converts Result to Optional, discarding the error.
func (Result[T]) Unwrap ¶
func (r Result[T]) Unwrap() T
Unwrap returns the value if Ok, or panics if Err.
func (Result[T]) UnwrapOr ¶
func (r Result[T]) UnwrapOr(defaultVal T) T
UnwrapOr returns the value if Ok, or the default value if Err.
func (Result[T]) UnwrapOrElse ¶
UnwrapOrElse returns the value if Ok, or calls the function if Err.
type Statistics ¶
Statistics holds basic statistics about a numeric stream.
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream is a lazy sequence of elements. It wraps iter.Seq[T] and provides fluent functional programming operations.
func AntiJoinBy ¶
func AntiJoinBy[T, U any, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[T]
AntiJoinBy returns elements from s1 that don't have matching keys in s2 (using key extractors).
func Cartesian ¶
Cartesian returns the Cartesian product of two streams. Note: The second stream is collected into memory as it needs to be iterated multiple times.
func CartesianSelf ¶
CartesianSelf returns the Cartesian product of a stream with itself. Note: The stream is collected into memory.
func Chunk ¶
Chunk returns a Stream of slices, each containing up to size elements. The last chunk may contain fewer elements. Note: This is a free function due to Go generics limitation with method return types.
func CoGroup ¶
func CoGroup[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[CoGrouped[K, V1, V2]]
CoGroup groups values from two streams by their keys. Similar to SQL's FULL OUTER JOIN but groups all matching values together.
func Combinations ¶
Combinations returns all k-combinations of elements from the stream. Note: The stream is collected into memory.
func Concat ¶
Concat concatenates multiple Streams into one. Elements are produced in order from each stream.
func CrossProduct ¶
CrossProduct returns the Cartesian product of multiple streams of the same type. Note: All streams are collected into memory.
func Cycle ¶
Cycle creates an infinite Stream that cycles through the given values. Returns an empty stream if no values are provided. Be sure to use Limit() or TakeWhile() to bound the stream.
func Debounce ¶
Debounce emits an element only after a quiet period with no new elements. Useful for coalescing rapid updates into a single emission.
func Differences ¶
Differences returns a Stream of differences between consecutive elements. The first difference is between the second and first elements.
func Distinct ¶
func Distinct[T comparable](s Stream[T]) Stream[T]
Distinct returns a Stream with duplicate elements removed. Elements must be comparable.
func DistinctBy ¶
func DistinctBy[T any, K comparable](s Stream[T], keyFn func(T) K) Stream[T]
DistinctBy returns a Stream with duplicates removed based on a key function. Elements are considered duplicates if they produce the same key.
func DistinctUntilChanged ¶
func DistinctUntilChanged[T comparable](s Stream[T]) Stream[T]
DistinctUntilChanged returns a Stream that removes consecutive duplicate elements. Only adjacent duplicates are removed; the same value appearing later is kept. Elements must be comparable.
func DistinctUntilChangedBy ¶
DistinctUntilChangedBy returns a Stream that removes consecutive elements that produce the same key. Uses the provided equality function for comparison.
func FilterErr ¶
FilterErr filters elements using a predicate that may return an error. Elements that pass the predicate are wrapped in Ok, errors are wrapped in Err.
func FilterErrs ¶
FilterErrs filters a stream of Results to only include errors.
func FlatMapErr ¶
FlatMapErr maps each element to a stream using a function that may return an error.
func FlatMapSeq ¶
FlatMapSeq maps each element to an iter.Seq and flattens the result.
func FlattenSeq ¶
FlattenSeq flattens a Stream of iter.Seq into a single Stream.
func From ¶
From creates a Stream from an iter.Seq. This provides interoperability with the standard library.
func FromCSV ¶
FromCSV creates a Stream of CSV records (each record is a []string). Parse errors terminate the stream silently. Use FromCSVErr for explicit error handling. The caller is responsible for closing the reader.
func FromCSVErr ¶
FromCSVErr creates a Stream of CSV records with error handling. Parse errors are yielded as Err results, and parsing continues with the next record. This allows handling malformed records without terminating the stream.
func FromCSVWithHeader ¶
FromCSVWithHeader creates a Stream of CSVRecords using the first row as headers. Parse errors terminate the stream silently. Use FromCSVWithHeaderErr for explicit error handling.
func FromCSVWithHeaderErr ¶
FromCSVWithHeaderErr creates a Stream of CSVRecords with error handling. Parse errors are yielded as Err results, and parsing continues with the next record.
func FromChannel ¶
FromChannel creates a Stream from a receive-only channel. The stream will consume all values from the channel until it's closed.
func FromChannelCtx ¶
FromChannelCtx creates a Stream from a channel with context support.
func FromDeque ¶
func FromDeque[T any](d collections.Deque[T]) Stream[T]
FromDeque creates a Stream from a collections.Deque (front to back).
func FromDequeReversed ¶
func FromDequeReversed[T any](d collections.Deque[T]) Stream[T]
FromDequeReversed creates a Stream from a collections.Deque (back to front).
func FromList ¶
func FromList[T any](list collections.List[T]) Stream[T]
FromList creates a Stream from a collections.List.
func FromPriorityQueue ¶
func FromPriorityQueue[T any](pq collections.PriorityQueue[T]) Stream[T]
FromPriorityQueue creates a Stream from a collections.PriorityQueue. Elements are yielded in heap order (not priority-sorted order). Use FromPriorityQueueSorted for priority-sorted iteration.
func FromPriorityQueueSorted ¶
func FromPriorityQueueSorted[T any](pq collections.PriorityQueue[T]) Stream[T]
FromPriorityQueueSorted creates a Stream from a collections.PriorityQueue. Elements are yielded in priority order (sorted). Note: This collects all elements first.
func FromQueue ¶
func FromQueue[T any](q collections.Queue[T]) Stream[T]
FromQueue creates a Stream from a collections.Queue (FIFO order).
func FromReaderLines ¶
FromReaderLines creates a Stream of lines from an io.Reader. Each line excludes the trailing newline character. The caller is responsible for closing the reader.
func FromReaderLinesCtx ¶
FromReaderLinesCtx creates a Stream of lines from an io.Reader with context support.
func FromReaderLinesErr ¶
FromReaderLinesErr creates a Stream of Results from an io.Reader. Reader errors are yielded as Err results.
func FromResults ¶
FromResults creates a Stream of Results from variadic Results.
func FromScanner ¶
FromScanner creates a Stream from a bufio.Scanner. Each call to the scanner's Scan method yields one element. The caller is responsible for the scanner's lifecycle.
func FromScannerErr ¶
FromScannerErr creates a Stream of Results from a bufio.Scanner. Scanner errors are yielded as Err results.
func FromSet ¶
func FromSet[T any](set collections.Set[T]) Stream[T]
FromSet creates a Stream from a collections.Set.
func FromSortedSet ¶
func FromSortedSet[T any](set collections.SortedSet[T]) Stream[T]
FromSortedSet creates a Stream from a collections.SortedSet in ascending order.
func FromSortedSetDescending ¶
func FromSortedSetDescending[T any](set collections.SortedSet[T]) Stream[T]
FromSortedSetDescending creates a Stream from a collections.SortedSet in descending order.
func FromStack ¶
func FromStack[T any](s collections.Stack[T]) Stream[T]
FromStack creates a Stream from a collections.Stack (LIFO order).
func FromStringLines ¶
FromStringLines creates a Stream of lines from a string.
func FromTSV ¶
FromTSV creates a Stream of TSV (tab-separated) records. Parse errors terminate the stream silently. Use FromTSVErr for explicit error handling.
func FromTSVErr ¶
FromTSVErr creates a Stream of TSV records with error handling. Parse errors are yielded as Err results, and parsing continues with the next record.
func FullJoin ¶
func FullJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResultOptional[K, V1, V2]]
FullJoin performs a full outer join between two Stream2s. All pairs from both streams are included; missing values are None. Note: Both streams are collected into memory for the join.
func Generate ¶
Generate creates an infinite Stream using a supplier function. Each call to the supplier generates the next element. Be sure to use Limit() or TakeWhile() to bound the stream.
func GenerateCtx ¶
GenerateCtx creates an infinite Stream using a supplier function with context support.
func InnerJoin ¶
func InnerJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResult[K, V1, V2]]
InnerJoin performs an inner join between two Stream2s on their keys. Only pairs with matching keys in both streams are included. Note: The second stream is collected into memory for the join.
func Interleave ¶
Interleave combines two streams by alternating their elements. Elements are taken one at a time from each stream. When one stream is exhausted, remaining elements from the other stream are included.
func Interval ¶
Interval creates a Stream that emits sequential integers at regular intervals. Starts from 0 and increments by 1 each interval.
func Iterate ¶
Iterate creates an infinite Stream: seed, f(seed), f(f(seed)), ... Be sure to use Limit() or TakeWhile() to bound the stream.
func IterateCtx ¶
IterateCtx creates an infinite Stream with context support.
func JoinBy ¶
func JoinBy[T, U, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[Pair[T, U]]
JoinBy performs an inner join on two streams using key extraction functions.
func LeftJoin ¶
func LeftJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResultOptional[K, V1, V2]]
LeftJoin performs a left outer join between two Stream2s. All pairs from the left stream are included; right values are None if no match. Note: The second stream is collected into memory for the join.
func LeftJoinBy ¶
func LeftJoinBy[T, U any, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[Pair[T, Optional[U]]]
LeftJoinBy performs a left join on two streams using key extraction functions.
func LeftJoinWith ¶
func LeftJoinWith[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2], defaultV2 V2) Stream[JoinResult[K, V1, V2]]
LeftJoinWith performs a left join with a default value for missing right values.
func MapErrTo ¶
MapErr transforms each element using a function that may return an error. The resulting stream contains Result values.
func MapTo ¶
MapTo transforms Stream[T] to Stream[U]. Use this when the transformation changes the element type.
func MergeSorted ¶
MergeSorted merges two sorted streams into one sorted stream. Both input streams must be sorted according to the same comparison function. The comparison function should return negative if a < b, zero if a == b, positive if a > b.
func MergeSortedN ¶
MergeSortedN merges multiple sorted streams into one sorted stream. All input streams must be sorted according to the same comparison function.
Complexity: Uses pairwise merge with O(n * k) comparisons where n is total elements and k is number of streams. For large k with many streams, use MergeSortedNHeap for O(n log k) complexity.
func MergeSortedNHeap ¶
MergeSortedNHeap merges multiple sorted streams using a heap-based k-way merge. All input streams must be sorted according to the same comparison function.
Complexity: O(n log k) where n is total elements and k is number of streams. Preferred over MergeSortedN when k is large (e.g., k > 8).
func Pairwise ¶
Pairwise returns a Stream of consecutive pairs (sliding window of size 2). For input [a, b, c, d], yields [(a,b), (b,c), (c,d)].
func ParallelFilter ¶
func ParallelFilter[T any](s Stream[T], pred func(T) bool, opts ...ParallelOption) Stream[T]
ParallelFilter filters elements using the given predicate in parallel. By default, it preserves the input order.
func ParallelFilterCtx ¶
func ParallelFilterCtx[T any](ctx context.Context, s Stream[T], pred func(context.Context, T) bool, opts ...ParallelOption) Stream[T]
ParallelFilterCtx filters elements using the given predicate in parallel with context support.
func ParallelFlatMap ¶
func ParallelFlatMap[T, U any](s Stream[T], fn func(T) Stream[U], opts ...ParallelOption) Stream[U]
ParallelFlatMap maps each element to a stream and flattens the results in parallel.
func ParallelFlatMapCtx ¶
func ParallelFlatMapCtx[T, U any](ctx context.Context, s Stream[T], fn func(context.Context, T) Stream[U], opts ...ParallelOption) Stream[U]
ParallelFlatMapCtx maps each element to a stream and flattens the results in parallel with context support.
func ParallelMap ¶
func ParallelMap[T, U any](s Stream[T], fn func(T) U, opts ...ParallelOption) Stream[U]
ParallelMap transforms each element using the given function in parallel. By default, it preserves the input order.
func ParallelMapCtx ¶
func ParallelMapCtx[T, U any](ctx context.Context, s Stream[T], fn func(context.Context, T) U, opts ...ParallelOption) Stream[U]
ParallelMapCtx transforms each element using the given function in parallel with context support. The context passed to fn is the same as the ctx parameter, allowing for cancellation checks.
func Permutations ¶
Permutations returns all permutations of elements from the stream. Note: The stream is collected into memory.
func Prefetch ¶
Prefetch creates a Stream that prefetches n elements ahead in a goroutine. This decouples the producer from the consumer, allowing them to run concurrently.
func Range ¶
Range creates a Stream of integers [start, end). Returns an empty stream if start >= end.
func RangeClosed ¶
RangeClosed creates a Stream of integers [start, end]. Returns an empty stream if start > end.
func RateLimitCtx ¶
RateLimitCtx is like RateLimit but respects context cancellation.
func Repeat ¶
Repeat creates a Stream that repeats the given value n times. If n <= 0, returns an empty stream.
func RepeatForever ¶
RepeatForever creates an infinite Stream that repeatedly yields the given value. Be sure to use Limit() or TakeWhile() to bound the stream.
func RightJoin ¶
func RightJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream[JoinResultOptional[K, V1, V2]]
RightJoin performs a right outer join between two Stream2s. All pairs from the right stream are included; left values are None if no match. Note: Both streams are collected into memory for the join.
func RightJoinWith ¶
func RightJoinWith[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2], defaultV1 V1) Stream[JoinResult[K, V1, V2]]
RightJoinWith performs a right join with a default value for missing left values.
func RunningProduct ¶
RunningProduct returns a Stream of cumulative products.
func RunningSum ¶
RunningSum returns a Stream of cumulative sums.
func Sample ¶
Sample emits the most recent element at regular intervals. Elements arriving between samples are dropped.
func Scan ¶
Scan applies an accumulator function over the stream and yields each intermediate result. This is a generalized version of RunningSum/RunningProduct that works with any accumulator. The first yielded value is fn(init, first_element).
func SemiJoinBy ¶
func SemiJoinBy[T, U any, K comparable](s1 Stream[T], s2 Stream[U], keyT func(T) K, keyU func(U) K) Stream[T]
SemiJoinBy returns elements from s1 that have matching keys in s2 (using key extractors).
func SessionWindow ¶
SessionWindow groups elements into sessions separated by gaps of inactivity. A new session starts when no elements arrive within the gap duration.
func SlidingTimeWindow ¶
func SlidingTimeWindow[T any](ctx context.Context, s Stream[T], windowSize, slideInterval time.Duration) Stream[[]T]
SlidingTimeWindow groups elements into overlapping windows based on time. windowSize is the duration of each window, slideInterval is how often a new window starts.
func SortedBy ¶
SortedBy returns a Stream sorted by a key extracted from each element. Note: This is an eager operation that collects all elements into memory.
func SortedStableBy ¶
SortedStableBy returns a Stream sorted by a key extracted from each element. Unlike SortedBy, this maintains the relative order of elements with equal keys (stable sort). Note: This is an eager operation that collects all elements into memory.
func TakeUntilErr ¶
TakeUntilErr takes elements until the first error is encountered. The error is not yielded; use CollectResults if you need the error.
func Throttle ¶
Throttle ensures elements are emitted at most once per interval. Elements arriving faster are delayed; no elements are dropped.
func ThrottleCtx ¶
ThrottleCtx is like Throttle but respects context cancellation.
func Triples ¶
Triples returns a Stream of consecutive triples (sliding window of size 3). For input [a, b, c, d, e], yields [(a,b,c), (b,c,d), (c,d,e)].
func TumblingTimeWindow ¶
func TumblingTimeWindow[T any](ctx context.Context, s Stream[T], windowSize time.Duration) Stream[[]T]
TumblingTimeWindow groups elements into fixed-duration, non-overlapping windows. Elements are collected based on their arrival time (wall clock). This is a blocking operation that runs until the context is cancelled or timeout.
func UnwrapOrDefault ¶
UnwrapOrDefault unwraps Results, using a default value for errors.
func UnwrapResults ¶
UnwrapResults unwraps all Results, panicking on the first error.
func Window ¶
Window returns a Stream of sliding windows of size n. Each window is a slice containing exactly n elements. The last few elements that don't form a complete window are not yielded. Note: This is a free function due to Go generics limitation with method return types.
func WindowWithStep ¶
WindowWithStep returns a Stream of sliding windows with configurable step size.
Parameters:
- size: the number of elements in each window
- step: how many elements to advance between windows
- allowPartial: if true, yields partial windows at the end; if false, only full windows
Behavior:
- step < size: overlapping windows (sliding window)
- step == size: non-overlapping chunks (same as Chunk)
- step > size: windows with gaps (elements between windows are skipped)
Each yielded window is an independent copy (safe to retain). Note: This is a free function due to Go generics limitation with method return types.
func WithContext ¶
WithContext wraps a Stream to respect context cancellation. When the context is cancelled, the stream will stop yielding elements.
func WithTimestamp ¶
func WithTimestamp[T any](s Stream[T]) Stream[TimestampedValue[T]]
WithTimestamp adds the current timestamp to each element.
func Zip ¶
Zip combines two Streams into a Stream of Pairs. The resulting stream ends when either input stream ends.
func Zip3 ¶
Zip3 combines three Streams into a Stream of Triples. The resulting stream ends when any input stream ends.
func ZipLongest ¶
ZipLongest combines two streams, continuing until both are exhausted. Missing elements are represented as None in the Optional.
func ZipLongestWith ¶
func ZipLongestWith[T, U any](s1 Stream[T], s2 Stream[U], defaultT T, defaultU U) Stream[Pair[T, U]]
ZipLongestWith combines two streams with default values for missing elements.
func (Stream[T]) AllMatch ¶
AllMatch returns true if all elements match the predicate. Returns true for an empty stream.
func (Stream[T]) Collect ¶
func (s Stream[T]) Collect() []T
Collect gathers all elements into a slice.
func (Stream[T]) DropLast ¶
DropLast returns a Stream with the last n elements removed. Uses a ring buffer for O(L) time complexity where L is the input length. If n <= 0, returns the original stream unchanged.
func (Stream[T]) DropWhile ¶
DropWhile returns a Stream that skips elements while the predicate is true. Once the predicate returns false, all remaining elements are yielded.
func (Stream[T]) Filter ¶
Filter returns a Stream containing only elements that match the predicate.
func (Stream[T]) Fold ¶
func (s Stream[T]) Fold(identity T, fn func(T, T) T) T
Fold is an alias for Reduce.
func (Stream[T]) ForEach ¶
func (s Stream[T]) ForEach(action func(T))
ForEach executes the action on each element.
func (Stream[T]) ForEachErr ¶
ForEachErr executes the action on each element, returning the first error encountered. If the action returns an error, iteration stops immediately and the error is returned.
func (Stream[T]) ForEachIndexed ¶
ForEachIndexed executes the action on each element with its index.
func (Stream[T]) ForEachIndexedErr ¶
ForEachIndexedErr executes the action on each element with its index, returning the first error encountered. If the action returns an error, iteration stops immediately and the error is returned.
func (Stream[T]) Intersperse ¶
Intersperse inserts a separator element between each element of the stream. For input [a, b, c] with separator x, yields [a, x, b, x, c].
func (Stream[T]) IsNotEmpty ¶
IsNotEmpty returns true if the stream has at least one element.
func (Stream[T]) Map ¶
Map transforms each element using the given function. For type-changing transformations, use the MapTo function instead.
func (Stream[T]) NoneMatch ¶
NoneMatch returns true if no elements match the predicate. Returns true for an empty stream.
func (Stream[T]) Peek ¶
Peek performs the given action on each element as it passes through. Useful for debugging or side effects.
func (Stream[T]) Reduce ¶
func (s Stream[T]) Reduce(identity T, fn func(T, T) T) T
Reduce combines all elements using the given function. Returns identity if the stream is empty.
func (Stream[T]) ReduceOptional ¶
ReduceOptional combines all elements using the given function. Returns None if the stream is empty.
func (Stream[T]) Reverse ¶
Reverse returns a Stream with elements in reverse order. Note: This is an eager operation that collects all elements into memory.
func (Stream[T]) Seq ¶
Seq returns the underlying iter.Seq for stdlib interop. This is the escape hatch to use the stream with for-range loops and other iter.Seq-based APIs.
func (Stream[T]) Sorted ¶
Sorted returns a Stream with elements sorted using the given comparison function. The comparison function should return:
- negative if a < b
- zero if a == b
- positive if a > b
Note: This is an eager operation that collects all elements into memory.
func (Stream[T]) SortedStable ¶
SortedStable returns a Stream with elements sorted using the given comparison function. Unlike Sorted, this maintains the relative order of equal elements (stable sort). Note: This is an eager operation that collects all elements into memory.
func (Stream[T]) Step ¶
Step returns a Stream that yields every nth element (starting from the first). If n <= 1, returns the original stream unchanged.
type Stream2 ¶
type Stream2[K, V any] struct { // contains filtered or unexported fields }
Stream2 is a lazy sequence of key-value pairs. It wraps iter.Seq2[K, V] and provides fluent functional programming operations.
func AntiJoin ¶
func AntiJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream2[K, V1]
AntiJoin returns elements from s1 that don't have matching keys in s2.
func DistinctKeys ¶
func DistinctKeys[K comparable, V any](s Stream2[K, V]) Stream2[K, V]
DistinctKeys returns a Stream2 with duplicate keys removed. Only the first occurrence of each key is kept.
func DistinctValues ¶
func DistinctValues[K any, V comparable](s Stream2[K, V]) Stream2[K, V]
DistinctValues returns a Stream2 with duplicate values removed. Only the first occurrence of each value is kept.
func From2 ¶
From2 creates a Stream2 from an iter.Seq2. This provides interoperability with the standard library.
func FromMap ¶
func FromMap[K comparable, V any](m map[K]V) Stream2[K, V]
FromMap creates a Stream2 from a map.
func FromMapC ¶
func FromMapC[K, V any](m collections.Map[K, V]) Stream2[K, V]
FromMapC creates a Stream2 from a collections.Map. The "C" suffix distinguishes it from FromMap which takes a Go map.
func FromSortedMapC ¶
func FromSortedMapC[K, V any](m collections.SortedMap[K, V]) Stream2[K, V]
FromSortedMapC creates a Stream2 from a collections.SortedMap in ascending key order.
func FromSortedMapCDescending ¶
func FromSortedMapCDescending[K, V any](m collections.SortedMap[K, V]) Stream2[K, V]
FromSortedMapCDescending creates a Stream2 from a collections.SortedMap in descending key order.
func MapValuesTo ¶
MapValuesTo transforms Stream2[K, V] to Stream2[K, V2].
func SemiJoin ¶
func SemiJoin[K comparable, V1, V2 any](s1 Stream2[K, V1], s2 Stream2[K, V2]) Stream2[K, V1]
SemiJoin returns elements from s1 that have matching keys in s2. Unlike inner join, it doesn't include the matching elements from s2.
func SwapKeyValue ¶
SwapKeyValue swaps keys and values in a Stream2.
func WithContext2 ¶
WithContext2 wraps a Stream2 to respect context cancellation.
func ZipWithIndex ¶
ZipWithIndex adds an index to each element. Returns a Stream2[int, T] where the key is the index.
func (Stream2[K, V]) AllMatch ¶
AllMatch returns true if all pairs match the predicate. Returns true for an empty stream.
func (Stream2[K, V]) CollectPairs ¶
CollectPairs collects all pairs into a slice of Pairs.
func (Stream2[K, V]) DropWhile ¶
DropWhile returns a Stream2 that skips pairs while the predicate is true.
func (Stream2[K, V]) Filter ¶
Filter returns a Stream2 containing only pairs that match the predicate.
func (Stream2[K, V]) ForEach ¶
func (s Stream2[K, V]) ForEach(action func(K, V))
ForEach executes the action on each key-value pair.
func (Stream2[K, V]) NoneMatch ¶
NoneMatch returns true if no pairs match the predicate. Returns true for an empty stream.
func (Stream2[K, V]) ParallelFilter ¶
func (s Stream2[K, V]) ParallelFilter(pred func(K, V) bool, opts ...ParallelOption) Stream2[K, V]
ParallelFilter returns a Stream2 containing only pairs that match the predicate in parallel.
func (Stream2[K, V]) ParallelMapValues ¶
func (s Stream2[K, V]) ParallelMapValues(fn func(V) V, opts ...ParallelOption) Stream2[K, V]
ParallelMapValues transforms the values using the given function in parallel.
func (Stream2[K, V]) TakeWhile ¶
TakeWhile returns a Stream2 that yields pairs while the predicate is true.
type TimestampedValue ¶
TimestampedValue holds a value with its timestamp.
func NewTimestamped ¶
func NewTimestamped[T any](value T) TimestampedValue[T]
NewTimestamped creates a TimestampedValue with the given value and current time.
func NewTimestampedAt ¶
func NewTimestampedAt[T any](value T, ts time.Time) TimestampedValue[T]
NewTimestampedAt creates a TimestampedValue with the given value and timestamp.
type Triple ¶
type Triple[A, B, C any] struct { First A Second B Third C }
Triple represents a tuple of three values.