stream

package
v2.8.0-nightly.20230727 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package stream contains interfaces and helper functions for managing iterators that can block.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[T any](ctx context.Context, it Iterator[T], max int) (ret []T, _ error)

Collect reads at most max from the iterator into a buffer and returns it.

func CopyMerged

func CopyMerged[T any](dst, src *Merged[T])

CopyMerged efficiently copies a Merged from src to dst.

func EOS

func EOS() error

EOS returns a new end of stream error

func ForEach

func ForEach[T any](ctx context.Context, it Iterator[T], fn func(t T) error) error

ForEach calls fn with elements from it. The element passed to fn must not be retained after fn has returned.

func IsEOS

func IsEOS(err error) bool

IsEOS returns true if the error is an end of stream error.

func Next

func Next[T any](ctx context.Context, it Iterator[T]) (ret T, _ error)

Next is a convenience function for allocating a T and using the iterator to read into it with it.Next

func Peek

func Peek[T any](ctx context.Context, it Peekable[T]) (ret T, _ error)

Peek is a convenience function for allocating a T and using the iterator to read into it with it.Peek

func Read

func Read[T any](ctx context.Context, it Iterator[T], buf []T) (n int, _ error)

Read fills buf with elements from the iterator and returns the number copied into buf. End of iteration is signaled by returning (_, EOS)

func Skip

func Skip[T any](ctx context.Context, it Iterator[T]) error

Skip discards one element from the iterator.

Types

type CompareFunc

type CompareFunc func(Stream, Stream) int

CompareFunc is a comparision function for two streams. DEPRECATED

type Iterator

type Iterator[T any] interface {
	// Next reads the next element into dst, and advances the iterator.
	// Next returns EOS when the iteration is over, dst will not be affected.
	Next(ctx context.Context, dst *T) error
}

func NewFromForEach

func NewFromForEach[T any](ctx context.Context, cp func(dst, src *T), forEachFunc func(func(T) error) error) Iterator[T]

NewFromForEach creates a new iterator from a forEachFunc Don't write new code that needs this.

type Merged

type Merged[T any] struct {
	// Values is a slice of values, which are not ordered relative to one another.
	Values []T
	// Indexes is the index of the stream that produced each value.
	Indexes []int
}

Merged is the type of elements emitted by the Merger Iterator

func (*Merged[T]) First

func (m *Merged[T]) First() (T, int)

func (*Merged[T]) Last

func (m *Merged[T]) Last() (T, int)

type Merger

type Merger[T any] struct {
	// contains filtered or unexported fields
}

func NewMerger

func NewMerger[T any](its []Peekable[T], lt func(a, b T) bool) *Merger[T]

NewMerger creates an iterator which aggregates entries which are equal in value. The entries will come out in ascending order.

func (*Merger[T]) Next

func (m *Merger[T]) Next(ctx context.Context, dst *Merged[T]) error

type Ordered

type Ordered[T any] struct {
	// contains filtered or unexported fields
}

Ordered enforces ascending order or returns an error

func NewOrdered

func NewOrdered[T any](inner Iterator[T], lt func(a, b T) bool, cp func(dst, src *T)) *Ordered[T]

func (*Ordered[T]) Next

func (o *Ordered[T]) Next(ctx context.Context, dst *T) error

type Peekable

type Peekable[T any] interface {
	Iterator[T]

	// Peek reads the next element into dst, but does not advance the iterator.
	// Peek returns EOS when the iteration is over, dst will not be affected.
	Peek(ctx context.Context, dst *T) error
}

func NewPeekable

func NewPeekable[T any](it Iterator[T], cp func(dst, src *T)) Peekable[T]

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue implements a priority queue that operates on streams. DEPRECATED: use Merger instead

func NewPriorityQueue

func NewPriorityQueue(ss []Stream, cmp CompareFunc) *PriorityQueue

NewPriorityQueue creates a new priority queue. DEPRECATED: use Merger instead

func (*PriorityQueue) Iterate

func (pq *PriorityQueue) Iterate(cb func([]Stream) error) error

Iterate iterates through the priority queue.

type Slice

type Slice[T any] struct {
	// contains filtered or unexported fields
}

Slice is an iterator backed by an in-memory slice

func NewSlice

func NewSlice[T any](xs []T) *Slice[T]

func (*Slice[T]) Next

func (s *Slice[T]) Next(ctx context.Context, dst *T) error

func (*Slice[T]) Peek

func (s *Slice[T]) Peek(ctx context.Context, dst *T) error

func (*Slice[T]) Reset

func (s *Slice[T]) Reset()

Reset resets the iterator to the beginning.

type Stream

type Stream interface {
	Next() error
}

Stream is a sorted stream that can be iterated. DEPRECATED: use Iterator instead

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL