stream

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2022 License: BSD-2-Clause Imports: 3 Imported by: 1

Documentation

Overview

Package stream provides a way to construct data processing streams from smaller pieces. The design is inspired by fs2 Scala library.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppendToSlice

func AppendToSlice[A any](stm Stream[A], start []A) io.IO[[]A]

AppendToSlice executes the stream and appends it's results to the slice.

func Collect added in v0.0.10

func Collect[A any](stm Stream[A], collector func(A) error) io.IO[fun.Unit]

Collect collects all element from the stream and for each element invokes the provided function

func DrainAll

func DrainAll[A any](stm Stream[A]) io.IO[fun.Unit]

DrainAll executes the stream and throws away all values.

func ForEach added in v0.0.10

func ForEach[A any](stm Stream[A], collector func(A)) io.IO[fun.Unit]

ForEach invokes a simple function for each element of the stream.

func Head[A any](stm Stream[A]) io.IO[A]

Head takes the first element and executes it. It'll fail if the stream is empty.

func ToChannel added in v0.0.10

func ToChannel[A any](stm Stream[A], ch chan A) io.IO[fun.Unit]

ToChannel sends all stream elements to the given channel

func ToSlice

func ToSlice[A any](stm Stream[A]) io.IO[[]A]

ToSlice executes the stream and collects all results to a slice.

Types

type Pipe added in v0.0.2

type Pipe[A any, B any] func(Stream[A]) Stream[B]

Pipe is a conversion of one stream to another. Technically it's a function that takes one stream and returns another.

type Sink added in v0.0.2

type Sink[A any] Pipe[A, fun.Unit]

Sink is a pipe that does not return meaningful values.

func NewSink added in v0.0.2

func NewSink[A any](f func(a A)) Sink[A]

NewSink constructs the sink from the provided function.

type StepResult

type StepResult[A any] struct {
	Value        A
	Continuation Stream[A]
	HasValue     bool
	IsFinished   bool
}

StepResult[A] represents the result of a single step in the step machine. It might be one of - empty, new value, or finished. The step result also returns the continuation of the stream.

func NewStepResult added in v0.0.2

func NewStepResult[A any](value A, continuation Stream[A]) StepResult[A]

NewStepResult constructs StepResult that has one value.

func NewStepResultEmpty added in v0.0.2

func NewStepResultEmpty[A any](continuation Stream[A]) StepResult[A]

NewStepResultEmpty constructs an empty StepResult.

func NewStepResultFinished added in v0.0.2

func NewStepResultFinished[A any]() StepResult[A]

NewStepResultFinished constructs a finished StepResult. The continuation will be empty as well.

type Stream

type Stream[A any] io.IO[StepResult[A]]

Stream is modelled as a function that performs a single step in the state machine.

func AddSeparatorAfterEachElement added in v0.0.2

func AddSeparatorAfterEachElement[A any](stm Stream[A], sep A) Stream[A]

AddSeparatorAfterEachElement adds a separator after each stream element

func AndThen

func AndThen[A any](stm1 Stream[A], stm2 Stream[A]) Stream[A]

AndThen appends another stream after the end of the first one.

func AndThenLazy

func AndThenLazy[A any](stm1 Stream[A], stm2 func() Stream[A]) Stream[A]

AndThenLazy appends another stream. The other stream is constructed lazily.

func Drop

func Drop[A any](stm Stream[A], n int) Stream[A]

Drop skips n elements in the stream.

func Empty

func Empty[A any]() Stream[A]

Empty returns an empty stream.

func Eval

func Eval[A any](ioa io.IO[A]) Stream[A]

Eval returns a stream of one value that is the result of IO.

func Filter added in v0.0.2

func Filter[A any](stm Stream[A], predicate func(A) bool) Stream[A]

Filter leaves in the stream only the elements that satisfy the given predicate.

func FlatMap

func FlatMap[A any, B any](stm Stream[A], f func(a A) Stream[B]) Stream[B]

FlatMap constructs a

func Flatten added in v0.0.11

func Flatten[A any](stm Stream[Stream[A]]) Stream[A]

Flatten simplifies a stream of streams to just the stream of values by concatenating all inner streams.

func FromChannel added in v0.0.10

func FromChannel[A any](ch chan A) Stream[A]

FromChannel constructs a stream that reads from the given channel until the channel is open.

func FromSideEffectfulFunction added in v0.0.3

func FromSideEffectfulFunction[A any](f func() (A, error)) Stream[A]

FromSideEffectfulFunction constructs a stream from a Go-style function. It is expected that this function is not pure and can return different results.

func FromSlice

func FromSlice[A any](as []A) Stream[A]

FromSlice constructs a stream from the slice.

func FromStepResult added in v0.0.10

func FromStepResult[A any](iosr io.IO[StepResult[A]]) Stream[A]

FromStepResult constructs a stream from an IO that returns StepResult.

func Generate added in v0.0.2

func Generate[A any, S any](zero S, f func(s S) (S, A)) Stream[A]

Generate constructs an infinite stream of values using the production function.

func Lift

func Lift[A any](a A) Stream[A]

Lift returns a stream of one value.

func LiftMany

func LiftMany[A any](as ...A) Stream[A]

LiftMany returns a stream with all the given values.

func Map added in v0.0.2

func Map[A any, B any](stm Stream[A], f func(a A) B) Stream[B]

Map converts values of the stream.

func MapEval

func MapEval[A any, B any](stm Stream[A], f func(a A) io.IO[B]) Stream[B]

MapEval maps the values of the stream. The provided function returns an IO.

func Repeat

func Repeat[A any](stm Stream[A]) Stream[A]

Repeat appends the same stream infinitely.

func StateFlatMap

func StateFlatMap[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) (S, Stream[B])) Stream[B]

StateFlatMap maintains state along the way.

func StateFlatMapWithFinish added in v0.0.4

func StateFlatMapWithFinish[A any, B any, S any](stm Stream[A], zero S, f func(a A, s S) (S, Stream[B]), onFinish func(s S) Stream[B]) Stream[B]

StateFlatMapWithFinish maintains state along the way. When the source stream finishes, it invokes onFinish with the last state.

func Take

func Take[A any](stm Stream[A], n int) Stream[A]

Take cuts the stream after n elements.

func Through added in v0.0.2

func Through[A any, B any](stm Stream[A], pipe Pipe[A, B]) Stream[B]

Through passes the stream data through the pipe. Technically it applies the pipe function to this stream.

func ToSink added in v0.0.2

func ToSink[A any](stm Stream[A], sink Sink[A]) Stream[fun.Unit]

ToSink streams all data from the stream to the sink.

func Unfold added in v0.0.2

func Unfold[A any](zero A, f func(A) A) Stream[A]

Unfold constructs an infinite stream of values using the production function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL