streams

package
v0.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: Apache-2.0 Imports: 3 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func All added in v0.2.0

func All[T any](in <-chan T, errs ...<-chan error) ([]T, error)

All drains the given channel and returns its elements. All is an alias for Drain(context.Background(), in, errs...).

func Await

func Await[T any](ctx context.Context, in <-chan T, errs <-chan error) (T, error)

Await awaits the next element or error (whatever happens first) from the provided channels. Await returns either an element OR an error, never both. If ctx is canceled before an element is received, ctx.Err() is returned.

func Before

func Before[Value any](in <-chan Value, fn func(Value) []Value) <-chan Value

Before returns a new channel that is filled with the elements from the input channel. Before sending an element into the returned channel, fn(el) is called. The values returned by fn are first sent into the returned channel, then the element from the input channel.

If the input channel or fn is nil, the input channel is returned directly.

func BeforeContext

func BeforeContext[Value any](ctx context.Context, in <-chan Value, fn func(Value) []Value) <-chan Value

BeforeContext returns a new channel that is filled with the elements from the input channel. The returned channel is closed when the input channel is closed, or when ctx is canceled. Before sending an element into the returned channel, fn(el) is called. The values returned by fn are first sent into the returned channel, then the element from the input channel.

If the input channel or fn is nil, the input channel is returned directly.

func Concurrent added in v0.2.0

func Concurrent[T any](c chan T) func(context.Context, ...T) error

Concurrent returns a `push` function for the provided channel. The `push` function tries to push values into the channel and accepts a Context that can cancel the push operation.

func ConcurrentContext added in v0.2.0

func ConcurrentContext[T any](ctx context.Context, c chan T) func(...T) error

ConcurrentContext does the same as Concurrent, but uses the provided Context for every push call.

func Drain

func Drain[T any](ctx context.Context, in <-chan T, errs ...<-chan error) ([]T, error)

Drain drains the given channel and returns its elements.

Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error, the already drained elements and the error are returned. Similarly, when ctx is canceled, the drained elements and ctx.Err() are returned.

Drain returns when the input channel is closed or if it encounters an error and does not wait for the error channels to be closed.

func FanIn

func FanIn[T any](in ...<-chan T) (_ <-chan T, stop func())

FanIn returns a single receive-only channel from multiple receive-only channels. When the returned stop function is called or every input channel is closed, the returned channel is closed.

If len(in) == 0, FanIn returns a closed channel.

Multiple calls to stop have no effect.

func FanInAll

func FanInAll[T any](in ...<-chan T) <-chan T

FanInAll returns FanIn(in...) but without the stop function.

func FanInContext

func FanInContext[T any](ctx context.Context, in ...<-chan T) <-chan T

FanInContext returns a single receive-only channel from multiple receive-only channels. When the provided ctx is canceled or every input channel is closed, the returned channel is closed.

If len(in) == 0, FanInContext returns a closed channel.

func Filter

func Filter[T any](in <-chan T, filters ...func(T) bool) <-chan T

Filter returns a new channel with the same type as the input channel and fills it with the elements from the input channel. The provided filter functions are called for every element. If any of the filters returns false for an element, that element is not pushed into the returned channel.

func ForEach

func ForEach[T any](
	ctx context.Context,
	fn func(el T),
	errFn func(error),
	in <-chan T,
	errs ...<-chan error,
)

ForEach iterates over the provided channels and for every element e calls calls fn(e) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.

func Map

func Map[To, From any](ctx context.Context, in <-chan From, mapper func(From) To) <-chan To

Map maps the elements from the provided `in` channel using the provided `mapper` and sends the mapped values to the returned channel. The returned channel is closed when the input channel is closed or ctx is canceled.

func New

func New[T any](in []T) <-chan T

New returns a channel that is filled with the given values. The channel is closed after all elements have been pushed into the channel.

func NewConcurrent added in v0.2.0

func NewConcurrent[T any](vals ...T) (_ <-chan T, _push func(context.Context, ...T) error, _close func())

NewConcurrent creates a channel of the given type and returns the channel and a `push` function. The `push` function tries to push a value into the channel and accepts a Context to cancel the push operation. The returned `close` function closes the channel when called. The `close` function is thread-safe and may be called multiple times. If values are provided to NewConcurrent, the buffer of the channel is set to the number of values and the values are pushed into the channel before returning.

 str, push, close := NewConcurrent(1, 2, 3)
 push(context.TODO(), 4, 5, 6)
	vals, err := All(str)
	// handle err
	// vals == []int{1, 2, 3, 4, 5, 6}

Use the Concurrent function to create a `push` function for an existing channel.

func NewConcurrentContext added in v0.2.0

func NewConcurrentContext[T any](ctx context.Context, vals ...T) (_ <-chan T, _push func(...T) error, _close func())

NewConcurrentContext does the same as NewConcurrent, but uses the provided Context for every push call.

func Take added in v0.3.7

func Take[T any](ctx context.Context, n int, in <-chan T, errs ...<-chan error) ([]T, error)

Take receives elements from the input channel until it has received n elements or the input channel is closed. It returns a slice containing the received elements. If any error occurs during the process, it is returned as the second return value.

func Walk

func Walk[T any](
	ctx context.Context,
	walkFn func(T) error,
	in <-chan T,
	errs ...<-chan error,
) error

Walk receives from the given channel until it and and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every element e that is received from the input channel, walkFn(e) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.

Example:

var bus event.Bus
in, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz")
// handle err
err := Walk(context.TODO(), func(e event) {
	log.Println(fmt.Sprintf("Received %q event: %v", e.Name(), e))
}, in, errs)
// handle err

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL