stream

package
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2022 License: MIT Imports: 12 Imported by: 1

Documentation

Overview

Package stream provides a way to construct data processing streams from smaller pieces. The design is inspired by fs2 Scala library.

Index

Constants

This section is empty.

Variables

View Source
var ErrHeadOfEmptyStream = errors.New("head of empty stream")

ErrHeadOfEmptyStream - an error that is returned when someone attempts to retrieve the head of an empty stream.

Functions

func AppendToSlice

func AppendToSlice[A any](stm Stream[A], start []A) io.IO[[]A]

AppendToSlice executes the stream and appends it's results to the slice.

func ChunkN added in v0.1.5

func ChunkN[A any](n int) func(sa Stream[A]) Stream[[]A]

ChunkN groups elements by n and produces a stream of slices.

func ChunksResize added in v0.2.9

func ChunksResize[A any](newSize int) func(stm Stream[[]A]) Stream[[]A]

ChunksResize rebuffers chunks to the given size.

func Collect added in v0.0.10

func Collect[A any](stm Stream[A], collector func(A) error) io.IO[fun.Unit]

Collect collects all element from the stream and for each element invokes the provided function

func DrainAll

func DrainAll[A any](stm Stream[A]) io.IO[fun.Unit]

DrainAll executes the stream and throws away all values.

func FanOut added in v0.2.6

func FanOut[A any, B any](stm Stream[A], handlers ...func(Stream[A]) io.IO[B]) io.IO[[]B]

FanOut distributes the same element to all handlers. Stream failure is also distribured to all handlers.

func FanOutOld added in v0.3.0

func FanOutOld[A any, B any](stm Stream[A], handlers ...func(Stream[A]) io.IO[B]) io.IO[[]B]

FanOut distributes the same element to all handlers. Stream failure is also distribured to all handlers.

func FoldLeft added in v0.2.8

func FoldLeft[A any, B any](stm Stream[A], zero B, combine func(B, A) B) io.IO[B]

FoldLeft aggregates stream in a more simple way than StateFlatMap.

func FoldLeftEval added in v0.2.8

func FoldLeftEval[A any, B any](stm Stream[A], zero B, combine func(B, A) io.IO[B]) io.IO[B]

FoldLeftEval aggregates stream in a more simple way than StateFlatMap. It takes `zero` as the initial accumulator value and then combines one element from the stream with the accumulator. It continuous to do so until there are no more elements in the stream. Finally, it yields the accumulator value. (In case the stream was empty, `zero` is yielded.)

func ForEach added in v0.0.10

func ForEach[A any](stm Stream[A], collector func(A)) io.IO[fun.Unit]

ForEach invokes a simple function for each element of the stream.

func FromBackpressureChannel added in v0.3.0

func FromBackpressureChannel[A any, B any](bc BackpressureChannel[A], f func(Stream[A]) io.IO[B]) io.IO[B]

FromBackpressureChannel forms a stream[A] that will be consumed by `f`. The result of `f` will be used to report back failures and finish signals. this is intended to be run in

func Head[A any](stm Stream[A]) io.IO[A]

Head returns the first element of the stream. It'll fail if the stream is empty.

func HeadAndTail added in v0.3.5

func HeadAndTail[A any](stm Stream[A]) io.IO[fun.Pair[A, Stream[A]]]

HeadAndTail returns the very first element of the stream and the rest of the stream.

func JoinManyFibers added in v0.3.4

func JoinManyFibers[A any](capacity int) io.IO[Pipe[io.Fiber[A], io.GoResult[A]]]

JoinManyFibers starts a separate go-routine for each incoming Fiber. As soon as result is ready it is sent to output. At any point in time at most capacity fibers could be waited for.

func Last added in v0.3.0

func Last[A any](stm Stream[A]) io.IO[A]

Last keeps track of the current element of the stream and returns it when the stream completes.

func LazyFinishedStepResult added in v0.3.2

func LazyFinishedStepResult[A any]() io.IO[StepResult[A]]

LazyFinishedStepResult returns

func NewPool added in v0.1.4

func NewPool[A any](size int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]

NewPool creates an execution pool that will execute tasks concurrently. Simultaneously there could be as many as size executions.

func NewPoolFromExecutionContext added in v0.2.5

func NewPoolFromExecutionContext[A any](ec io.ExecutionContext, capacity int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]

NewPoolFromExecutionContext creates an execution pool that will execute tasks concurrently. After the execution context a buffer is created to allow as many as `capacity` parallel tasks to be executed. This pool won't change the order of elements. NB! As work starts in parallel, in case of failure some future elements could be evaluated even after the failed element. Hence we use GoResult to represent evaluation results.

func NewUnorderedPoolFromExecutionContext added in v0.3.4

func NewUnorderedPoolFromExecutionContext[A any](ec io.ExecutionContext, capacity int) io.IO[Pipe[io.IO[A], io.GoResult[A]]]

NewUnorderedPoolFromExecutionContext creates an execution pool that will execute tasks concurrently. Each task's result will be passed to a channel as soon as it completes. Hence, the order of results will be different from the order of tasks.

func Partition added in v0.2.6

func Partition[A any, C any, D any](stm Stream[A],
	predicate func(A) bool,
	trueHandler Collector[A, C],
	falseHandler Collector[A, D],
) io.IO[fun.Pair[C, D]]

Partition divides the stream into two that are handled independently.

func PipeToPairOfChannels added in v0.1.3

func PipeToPairOfChannels[A any, B any](pipe Pipe[A, B]) io.IO[fun.Pair[chan<- A, <-chan B]]

PipeToPairOfChannels converts a streaming pipe to a pair of channels that could be used to interact with external systems.

func StreamFold added in v0.3.2

func StreamFold[A any, B any](
	stm Stream[A],
	onFinish func() io.IO[B],
	onValue func(a A, tail Stream[A]) io.IO[B],
	onEmpty func(tail Stream[A]) io.IO[B],
	onError func(err error) io.IO[B],
) io.IO[B]

StreamFold performs arbitrary processing of a stream's single step result.

func TakeAndTail added in v0.3.5

func TakeAndTail[A any](stm Stream[A], n int, prefix []A) io.IO[fun.Pair[[]A, Stream[A]]]

TakeAndTail collects n leading elements of the stream and returns them along with the tail of the stream. If the stream is shorter, then only available elements are returned and an emtpy stream.

func ToBackPressureChannels added in v0.3.0

func ToBackPressureChannels[A any](stm Stream[A], channels ...BackpressureChannel[A]) io.IO[fun.Unit]

ToBackPressureChannels sends each element to all channels.

func ToChannel added in v0.0.10

func ToChannel[A any](stm Stream[A], ch chan<- A) io.IO[fun.Unit]

ToChannel sends all stream elements to the given channel. When stream is completed, channel is closed. The IO blocks until the stream is exhausted. If the stream is failed, the channel is closed anyway. NB! The failure cannot be communicated via channel of type A. Hence, on the reading side there is no way to see whether it was a successful completion or a failed one.

func ToChannels added in v0.2.6

func ToChannels[A any](stm Stream[A], channels ...chan<- A) io.IO[fun.Unit]

ToChannels sends each stream element to every given channel. Failure or completion of the stream leads to closure of all channels. TODO: Send failure to the channels.

func ToChunks added in v0.2.9

func ToChunks[A any](size int) func(stm Stream[A]) Stream[[]A]

ToChunks collects incoming elements in chunks of the given size.

func ToSlice

func ToSlice[A any](stm Stream[A]) io.IO[[]A]

ToSlice executes the stream and collects all results to a slice.

Types

type BackpressureChannel added in v0.3.0

type BackpressureChannel[A any] struct {
	// contains filtered or unexported fields
}

BackpressureChannel has a control mechanism that allows consumer to influence the producer. There is a back pressure channel. Protocol:

sender              |  receiver
--------------------+------------------------------------
                    |   send "Ready to receive" to back channel
 read back          |   immediately start listening data.
 if ready           |
 send data          |   read data
                    |    start processing
                    |     the result of processing (ready-to-receive/finished/error)
 loop               |   LOOP.
                    |
                    |    on error after processing
                    |     send error to back
                    |    on processing complete
                    |     send finished to back
when finishing:     |
 send finish signal | on receiving finish signal, stop the loop.

and read back |

                    |
when error:         |
 send error         | on receiving error, stop the loop.

and read back |

                   |
if not ready,      |
don't send data    |
on back error - fail all
on back finish - unsubscribe

func NewBackpressureChannel added in v0.3.0

func NewBackpressureChannel[A any]() BackpressureChannel[A]

func (BackpressureChannel[A]) Close added in v0.3.0

func (bc BackpressureChannel[A]) Close() (err error)

func (BackpressureChannel[A]) CloseReceiverNormally added in v0.3.0

func (bc BackpressureChannel[A]) CloseReceiverNormally()

func (BackpressureChannel[A]) CloseReceiverWithError added in v0.3.0

func (bc BackpressureChannel[A]) CloseReceiverWithError(err error)

func (BackpressureChannel[A]) HappyPathReceive added in v0.3.0

func (bc BackpressureChannel[A]) HappyPathReceive() Stream[A]

HappyPathReceive forms a stream of a happy path.

func (BackpressureChannel[A]) RequestOneItem added in v0.3.0

func (bc BackpressureChannel[A]) RequestOneItem() StreamEvent[A]

RequestOneItem - sends notification to backpressure channel and receives one item from data channel.

func (BackpressureChannel[A]) Send added in v0.3.0

func (bc BackpressureChannel[A]) Send(sea StreamEvent[A]) (isFinished bool, err error)

Send receives readiness signal from `back`. If ready, sends data to `data`.

func (BackpressureChannel[A]) SendError added in v0.3.0

func (bc BackpressureChannel[A]) SendError(err error) (bool, error)

func (BackpressureChannel[A]) SendValue added in v0.3.0

func (bc BackpressureChannel[A]) SendValue(a A) (bool, error)

type Collector added in v0.2.9

type Collector[A any, B any] func(Stream[A]) io.IO[B]

Collector reads the stream and produces some value.

type Pipe added in v0.0.2

type Pipe[A any, B any] func(Stream[A]) Stream[B]

Pipe is a conversion of one stream to another. Technically it's a function that takes one stream and returns another.

func ChannelBufferPipe added in v0.3.0

func ChannelBufferPipe[A any](size int) Pipe[A, A]

ChannelBufferPipe puts incoming values into a buffer of the given size and then reads from that same buffer. This buffer allows to decouple producer and consumer to some extent.

func ConcatPipes added in v0.2.9

func ConcatPipes[A any, B any, C any](pipe1 Pipe[A, B], pipe2 Pipe[B, C]) Pipe[A, C]

ConcatPipes connects two pipes into one.

func FlatMapPipe added in v0.1.3

func FlatMapPipe[A any, B any](f func(a A) Stream[B]) Pipe[A, B]

FlatMapPipe creates a pipe that flatmaps one stream through the provided function.

func MapPipe added in v0.1.3

func MapPipe[A any, B any](f func(a A) B) Pipe[A, B]

MapPipe creates a pipe that maps one stream through the provided function.

func PairOfChannelsToPipe added in v0.1.0

func PairOfChannelsToPipe[A any, B any](input chan A, output chan B) Pipe[A, B]

PairOfChannelsToPipe - takes two channels that are being used to talk to some external process and convert them into a single pipe. It first starts a separate go routine that will continuously run the input stream and send all it's contents to the `input` channel. The current thread is left with reading from the output channel.

type Sink added in v0.0.2

type Sink[A any] Pipe[A, fun.Unit]

Sink is a pipe that does not return meaningful values.

func NewSink added in v0.0.2

func NewSink[A any](f func(a A)) Sink[A]

NewSink constructs the sink from the provided function.

func PrependPipeToSink added in v0.3.1

func PrependPipeToSink[A any, B any](pipe1 Pipe[A, B], sink Sink[B]) Sink[A]

PrependPipeToSink changes the input of a sink.

type StepResult

type StepResult[A any] struct {
	Value        A
	HasValue     bool // models "Option[A]"
	Continuation Stream[A]
	IsFinished   bool // true when stream has completed
}

StepResult[A] represents the result of a single step in the step machine. It might be one of - empty, new value, or finished. The step result also returns the continuation of the stream.

func NewStepResult added in v0.0.2

func NewStepResult[A any](value A, continuation Stream[A]) StepResult[A]

NewStepResult constructs StepResult that has one value.

func NewStepResultEmpty added in v0.0.2

func NewStepResultEmpty[A any](continuation Stream[A]) StepResult[A]

NewStepResultEmpty constructs an empty StepResult.

func NewStepResultFinished added in v0.0.2

func NewStepResultFinished[A any]() StepResult[A]

NewStepResultFinished constructs a finished StepResult. The continuation will be empty as well.

type Stream

type Stream[A any] io.IO[StepResult[A]]

Stream is modelled as a function that performs a single step in the state machine.

func AddSeparatorAfterEachElement added in v0.0.2

func AddSeparatorAfterEachElement[A any](stm Stream[A], sep A) Stream[A]

AddSeparatorAfterEachElement adds a separator after each stream element

func AndThen

func AndThen[A any](stm1 Stream[A], stm2 Stream[A]) Stream[A]

AndThen appends another stream after the end of the first one.

func AndThenLazy

func AndThenLazy[A any](stm1 Stream[A], stm2 func() Stream[A]) Stream[A]

AndThenLazy appends another stream. The other stream is constructed lazily.

func Drop

func Drop[A any](stm Stream[A], n int) Stream[A]

Drop skips n elements in the stream.

func DropWhile added in v0.3.2

func DropWhile[A any](stm Stream[A], predicate func(A) bool) Stream[A]

DropWhile removes the beginning of the stream so that the new stream starts with an element that falsifies the predicate.

func Emit added in v0.3.1

func Emit[A any](a A) Stream[A]

Emit returns a stream of a single element

func EmitMany added in v0.3.1

func EmitMany[A any](as ...A) Stream[A]

EmitMany returns a stream with all the given values.

func Empty

func Empty[A any]() Stream[A]

Empty returns an empty stream.

func EmptyUnit added in v0.1.0

func EmptyUnit() Stream[fun.Unit]

EmptyUnit returns an empty stream of units. It's more performant because the same instance is being used.

func Eval

func Eval[A any](ioa io.IO[A]) Stream[A]

Eval returns a stream of one value that is the result of IO.

func Fail added in v0.1.5

func Fail[A any](err error) Stream[A]

Fail returns a stream that fails immediately.

func Filter added in v0.0.2

func Filter[A any](stm Stream[A], predicate func(A) bool) Stream[A]

Filter leaves in the stream only the elements that satisfy the given predicate.

func FilterNot added in v0.2.6

func FilterNot[A any](stm Stream[A], predicate func(A) bool) Stream[A]

Filter leaves in the stream only the elements that do not satisfy the given predicate.

func FlatMap

func FlatMap[A any, B any](stm Stream[A], f func(a A) Stream[B]) Stream[B]

FlatMap constructs a new stream by concatenating all substreams, produced by f from elements of the original stream.

func Flatten added in v0.0.11

func Flatten[A any](stm Stream[Stream[A]]) Stream[A]

Flatten simplifies a stream of streams to just the stream of values by concatenating all inner streams.

func FoldToGoResult added in v0.2.7

func FoldToGoResult[A any](stm Stream[A]) Stream[io.GoResult[A]]

FoldToGoResult converts a stream into a stream of go results. All go results will be non-error except probably the last one.

func FromChannel added in v0.0.10

func FromChannel[A any](ch <-chan A) Stream[A]

FromChannel constructs a stream that reads from the given channel until the channel is open. When channel is closed, the stream is also closed.

func FromSideEffectfulFunction added in v0.0.3

func FromSideEffectfulFunction[A any](f func() (A, error)) Stream[A]

FromSideEffectfulFunction constructs a stream from a Go-style function. It is expected that this function is not pure and can return different results.

func FromSlice

func FromSlice[A any](as []A) Stream[A]

FromSlice constructs a stream from the slice.

func FromStepResult added in v0.0.10

func FromStepResult[A any](iosr io.IO[StepResult[A]]) Stream[A]

FromStepResult constructs a stream from an IO that returns StepResult.

func Generate added in v0.0.2

func Generate[A any, S any](zero S, f func(s S) (S, A)) Stream[A]

Generate constructs an infinite stream of values using the production function.

func GroupBy added in v0.2.3

func GroupBy[A any, K comparable](stm Stream[A], key func(A) K) Stream[fun.Pair[K, []A]]

GroupBy collects group by a user-provided key. Whenever a new key is encountered, the previous group is emitted. When the original stream finishes, the last group is emitted.

func GroupByEval added in v0.2.4

func GroupByEval[A any, K comparable](stm Stream[A], keyIO func(A) io.IO[K]) Stream[fun.Pair[K, []A]]

GroupByEval collects group by a user-provided key (which is evaluated as IO). Whenever a new key is encountered, the previous group is emitted. When the original stream finishes, the last group is emitted.

func Len added in v0.1.3

func Len[A any](sa Stream[A]) Stream[int]

Len is a pipe that returns a stream of 1 element that is the count of elements of the original stream.

func Lift

func Lift[A any](a A) Stream[A]

Lift returns a stream of one value.

func LiftMany

func LiftMany[A any](as ...A) Stream[A]

LiftMany returns a stream with all the given values.

func Map added in v0.0.2

func Map[A any, B any](stm Stream[A], f func(a A) B) Stream[B]

Map converts values of the stream.

func MapEval

func MapEval[A any, B any](stm Stream[A], f func(a A) io.IO[B]) Stream[B]

MapEval maps the values of the stream. The provided function returns an IO.

func Nats added in v0.3.6

func Nats() Stream[int]

Nats returns an infinite stream of ints starting from 1.

func Repeat

func Repeat[A any](stm Stream[A]) Stream[A]

Repeat appends the same stream infinitely.

func SideEval added in v0.3.3

func SideEval[A any](stm Stream[A], iounit func(A) io.IOUnit) Stream[A]

SideEval executes a computation for each element for it's side effect. Could be used for logging, for instance.

func StateFlatMap

func StateFlatMap[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]]) Stream[B]

StateFlatMap maintains state along the way.

func StateFlatMapWithFinish added in v0.0.4

func StateFlatMapWithFinish[A any, B any, S any](stm Stream[A],
	zero S,
	f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]],
	onFinish func(s S) Stream[B]) Stream[B]

StateFlatMapWithFinish maintains state along the way. When the source stream finishes, it invokes onFinish with the last state.

func StateFlatMapWithFinishAndFailureHandling added in v0.3.1

func StateFlatMapWithFinishAndFailureHandling[A any, B any, S any](stm Stream[A],
	zero S,
	f func(a A, s S) io.IO[fun.Pair[S, Stream[B]]],
	onFinish func(s S) Stream[B],
	onFailure func(s S, err error) Stream[B]) Stream[B]

StateFlatMapWithFinishAndFailureHandling maintains state along the way. When the source stream finishes, it invokes onFinish with the last state. If there is an error during stream evaluation, onFailure is invoked. NB! onFinish is not invoked in case of failure.

func Sum added in v0.1.3

func Sum[A fun.Number](sa Stream[A]) Stream[A]

Sum is a pipe that returns a stream of 1 element that is sum of all elements of the original stream.

func Take

func Take[A any](stm Stream[A], n int) Stream[A]

Take cuts the stream after n elements.

func TakeWhile added in v0.3.2

func TakeWhile[A any](stm Stream[A], predicate func(A) bool) Stream[A]

TakeWhile returns the beginning of the stream such that all elements satisfy the predicate.

func Through added in v0.0.2

func Through[A any, B any](stm Stream[A], pipe Pipe[A, B]) Stream[B]

Through passes the stream data through the pipe. Technically it applies the pipe function to this stream.

func ThroughExecutionContext added in v0.2.5

func ThroughExecutionContext[A any](sa Stream[io.IO[A]], ec io.ExecutionContext, capacity int) Stream[A]

ThroughExecutionContext runs a stream of tasks through an ExecutionContext. NB! This operation recovers GoResults. This will lead to lost of good elements after one that failed. At most `capacity - 1` number of lost elements.

func ThroughExecutionContextUnordered added in v0.3.4

func ThroughExecutionContextUnordered[A any](sa Stream[io.IO[A]], ec io.ExecutionContext, capacity int) Stream[A]

ThroughExecutionContextUnordered runs a stream of tasks through an ExecutionContext. The order of results is not preserved! This operation recovers GoResults. This will lead to lost of good elements after one that failed. At most `capacity - 1` number of lost elements.

func ThroughPipeEval added in v0.2.7

func ThroughPipeEval[A any, B any](stm Stream[A], pipeIO io.IO[Pipe[A, B]]) Stream[B]

ThroughPipeEval runs the given stream through pipe that is returned by the provided pipeIO.

func ToSink added in v0.0.2

func ToSink[A any](stm Stream[A], sink Sink[A]) Stream[fun.Unit]

ToSink streams all data from the stream to the sink.

func ToStreamEvent added in v0.3.0

func ToStreamEvent[A any](stm Stream[A]) Stream[StreamEvent[A]]

ToStreamEvent converts the given stream to a stream of StreamEvents. Each normal element will become a StreamEvent with data. On a failure or finish a single element is returned before the end of the stream.

func Unfold added in v0.0.2

func Unfold[A any](zero A, f func(A) A) Stream[A]

Unfold constructs an infinite stream of values using the production function.

func UnfoldGoResult added in v0.2.7

func UnfoldGoResult[A any](stm Stream[io.GoResult[A]], onFailure func(err error) Stream[A]) Stream[A]

UnfoldGoResult converts a stream of GoResults back to normal stream. On the first encounter of Error, the stream fails. default value for `onFailure` - `Fail[A]`.

func Wrapf added in v0.3.1

func Wrapf[A any](stm Stream[A], format string, args ...interface{}) Stream[A]

Wrapf wraps errors produced by this stream with additional context info.

func ZipWithIndex added in v0.3.6

func ZipWithIndex[A any](as Stream[A]) Stream[fun.Pair[int, A]]

ZipWithIndex prepends the index to each element.

type StreamEvent added in v0.3.0

type StreamEvent[A any] struct {
	Error      error
	IsFinished bool // true when stream has completed
	Value      A
}

Fields should be checked in order - If Error == nil, If !IsFinished, then Value

func NewStreamEvent added in v0.3.0

func NewStreamEvent[A any](value A) StreamEvent[A]

func NewStreamEventError added in v0.3.0

func NewStreamEventError[A any](err error) StreamEvent[A]

func NewStreamEventFinished added in v0.3.0

func NewStreamEventFinished[A any]() StreamEvent[A]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL