channels

package
v1.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2023 License: MIT Imports: 5 Imported by: 0

README

genesis/channels

Package channels provides generic functions for channels.

Documentation

Overview

📺 Package channels provides generic functions for channels.

☎️ Naming

Most of the functions provide two version: a regular one and one with suffix C. The latter also accepts a context.Context value as the first argument, and you should always prefer it over the regular function. The reason is that if the function accepts a channel and returns another one and you close the input channel, the internal goroutine that the function starts might be blocked trying to write into the output channel, and if you never read values from it, you'll have a goroutine leak.

Generator functions producing infinite sequence of values (like Counter) have only one version and always require a context because that's the only way how you can cancel them.

⏹️ Function termination

Most of the functions in the package accept a channel, return a channel, and create a goroutine to read messages from the input channel and propagate them (usually with some changes) into the output one. Some (like TakeC) do all three of these, some (like ToSliceC) only accept a channel, some (like Counter) only start a goroutine and return a channel, etc. Which of these apply to the function is described in the function's documentation.

Here is what you need to know:

  • If a function starts a goroutine, it returns immediately, and all termination rules described below apply to the goroutine instead of the original function.
  • If the function creates and returns a channel, this channel is closed when the goroutine terminates.
  • If the function accepts a context.Context, the goroutine is terminated when the context is cancelled.
  • If the input channel is closed and the goroutine tries to read from it, the goroutine is terminated.
  • ⚠️ IMPORTANT: If the goroutine is blocked trying to write into the output channel and the input channel is closed, the goroutine will not terminate until another goroutine reads from the output channel. This is why you should always either use context and cancell the goroutine through it or make sure to read everything from the output channel.

🍾 Buffered channels

All functions in the package (except WithBuffer) return a unbuffered channel. That means, they won't do anything and won't read values from the input channel if you don't read values from their output channel. If you want to make the output channel buffered, use WithBuffer.

😱 Error handling

The only functions that return an error on the channel cancellation (either context.Canceled or context.Cause) are FirstC, MaxC, and MinC. All other functions simply terminate on cancellation, and it's on you to check the context for errors (using context.Context.Err).

The reason for this is that most of the function start a goroutine producing values, and that's the only communication channel the goroutine has. So, the only way for the goroutine to return an error would be to emit it in the output channel, and handling such errors would be tedious.

Available errors:

  • ErrEmpty is returned when channel is closed without any elements being sent.
  • ErrClosed is returned by FirstC (and First) when one of the passed channels is closed.

🖨 Sequence generators

These are the functions that make a channel and emit in it values until canceled. In other languages this is done using iterators but in Go the only infinite iterator we have is channel.

If you want to stop a generator (terminate the goroutine it started and close the channel), cancel the context you have passed in it.

Available functions:

📥 Working with channels

All these functions accept a channel (or multiple) as an input.

🏃 Functions that return a channel and start an internal goroutine producing values into the channel:

✋ Functions that block until they produce a result (or until they are canceled):

🛟 Helpers

These are little functions that help working with channels. They don't iterate over values in the channel.

Available functions:

  • BufferSize returns the channel's buffer size.
  • Close closes the channel and never panics.
  • IsEmpty tells if there are no messages in the channel.
  • IsFull tells if the channel has reached its capacity.
  • Pop is a blocking read from the channel that can be canceled.
  • Push is a blocking write into a channel that can be canceled.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("channel is closed")

ErrEmpty is an error for when a channel is closed. Currently is used only by First to distinguish a closed channel from a zero value returned without adding a third `bool` return value.

View Source
var ErrEmpty = errors.New("container is empty")

ErrEmpty is an error for when channel is closed without any elements being sent

Functions

func All

func All[T any](c <-chan T, f func(el T) bool) bool

All is an alias for AllC without a context.

func AllC added in v1.7.0

func AllC[T any](ctx context.Context, c <-chan T, f func(el T) bool) bool

All returns true if f returns true for all elements in channel.

func Any

func Any[T any](c <-chan T, f func(el T) bool) bool

Any returns true if f returns true for any element in channel.

func AnyC added in v1.7.0

func AnyC[T any](ctx context.Context, c <-chan T, f func(el T) bool) bool

Any returns true if f returns true for any element in channel.

func BufferSize added in v1.6.0

func BufferSize[T any](c <-chan T) int

BufferSize returns how many messages a channel can hold before being blocked.

When you push that many messages in the channel, the next attempt to write in it will block until another goroutine reads a message.

For unbuffered channels the result is 0.

func ChunkEvery

func ChunkEvery[T any](c <-chan T, count int) chan []T

ChunkEvery is an alias for ChunkEveryC without a context.

func ChunkEveryC added in v1.7.0

func ChunkEveryC[T any](ctx context.Context, c <-chan T, count int) chan []T

ChunkEveryC returns channel with slices containing count elements each.

func Close added in v1.6.0

func Close[T any](c chan<- T) bool

Close safely closes the given channel.

This is a safer version of built-in close. The built-in close function will panic if the given channel is already closed or nil. This function in both cases returns false.

There is a reason why the built-in function might panic. The best practice of working with channels is to close the channel only within the context ("owner") that created it. In such cases, you can be 100% sure that the channel is non-nil and closed only once. However, if you found yourself in a different situation and no refactoring can guarantee these rules, the safer Close function is here to help.

func Count

func Count[T comparable](c <-chan T, el T) int

Count is an alias for CountC without a context.

func CountC added in v1.7.0

func CountC[T comparable](ctx context.Context, c <-chan T, el T) int

Count return count of el occurrences in channel.

func Counter

func Counter[T constraints.Integer](ctx context.Context, start T, step T) chan T

Counter is like Range, but infinite.

func Drop

func Drop[T any](c <-chan T, n int) chan T

Drop is an alias for DropC without a context.

func DropC added in v1.7.0

func DropC[T any](ctx context.Context, c <-chan T, n int) chan T

Drop drops first n elements from channel c and returns a new channel with the rest. It returns channel do be unblocking. If you want array instead, wrap result into TakeAll.

func Each

func Each[T any](c <-chan T, f func(el T))

Each is an alias for EachC without a context.

func EachC added in v1.7.0

func EachC[T any](ctx context.Context, c <-chan T, f func(el T))

Each calls f for every element in the channel.

func Echo added in v1.7.0

func Echo[T any](from <-chan T, to chan<- T)

Echo is an alias for EchoC without a context.

func EchoC added in v1.7.0

func EchoC[T any](ctx context.Context, from <-chan T, to chan<- T)

EchoC moves messages from one channel to the other.

If you want to move messages from multiple channels into one, use MergeC instead.

If you want to move messages from one channel into multiple, use TeeC instead.

func Exponential

func Exponential[T constraints.Integer](ctx context.Context, start T, factor T) chan T

Exponential generates elements from start with multiplication of value by factor on every step.

func Filter

func Filter[T any](c <-chan T, f func(el T) bool) chan T

Filter is an alias for FilterC without a context.

func FilterC added in v1.7.0

func FilterC[T any](ctx context.Context, c <-chan T, f func(el T) bool) chan T

Filter returns a new channel with elements from input channel for which f returns true.

func First added in v1.6.0

func First[T any](cs ...<-chan T) (T, error)

First is an alias for FirstC without a context.

func FirstC added in v1.6.0

func FirstC[T any](ctx context.Context, cs ...<-chan T) (T, error)

FirstC selects the first available element from the given channels.

The function returns in one of the following cases:

  1. One of the given channels is closed. In this case, ErrClosed is returned.
  2. The ctx context is canceled. In this case, the cancelation reason is returned as an error.
  3. One of the given channels returns a value. In this case, the error is nil.

If all channels are non-closed and empty and ctx is not canceled, the function will block and wait for one of the above to occur.

If a message available in multiple channels, only one is chosen via a uniform pseudo-random selection to avoid starvation.

😱 Errors

  • ErrEmpty: no channels are passed into the function.
  • ErrClosed: a channel was closed.
  • Another: cancelation cause returned by ctx.Err().

func Flatten added in v1.6.0

func Flatten[T any](c <-chan <-chan T) chan T

Flatten is an alias for FlattenC without a context.

func FlattenC added in v1.7.0

func FlattenC[T any](ctx context.Context, c <-chan <-chan T) chan T

Given a channel of channels of values, return a channel of values.

This pattern is described in the "Concurrency in Go" book as "the Bridge channel" pattern. It might be useful when you design your system as a pipeline consisting of goroutines connected through channels and you have steps producing new steps.

func IsEmpty added in v1.6.0

func IsEmpty[T any](c <-chan T) bool

IsEmpty returns true if there are no messages in the channel.

For unbuffered channels, the result is always true.

func IsFull added in v1.6.0

func IsFull[T any](c <-chan T) bool

IsFull returns true if the channel's buffer is full.

Attempts to write into a full channel will block until another goroutine reads a message from it.

For unbuffered channels, the result is always true.

func Iterate

func Iterate[T constraints.Integer](ctx context.Context, val T, f func(val T) T) chan T

Iterate returns an infinite list of repeated applications of f to val.

func Map

func Map[T any, G any](c <-chan T, f func(el T) G) chan G

Map is an alias for MapC without a context.

func MapC added in v1.7.0

func MapC[T any, G any](ctx context.Context, c <-chan T, f func(el T) G) chan G

Map applies f to all elements from channel and returns channel with results.

func Max

func Max[T constraints.Ordered](c <-chan T) (T, error)

Max is an alias for MaxC without a context.

func MaxC added in v1.7.0

func MaxC[T constraints.Ordered](ctx context.Context, c <-chan T) (T, error)

Max returns the maximal element from channel.

func Merge added in v1.6.0

func Merge[T any](cs ...<-chan T) chan T

Merge is an alias for MergeC without a context.

func MergeC added in v1.6.0

func MergeC[T any](ctx context.Context, cs ...<-chan T) chan T

MergeC merges multiple channels into one.

The order in which elements are merged from different channels is not guaranteed.

func Min

func Min[T constraints.Ordered](c <-chan T) (T, error)

Min is an alias for MinC without a context.

func MinC added in v1.7.0

func MinC[T constraints.Ordered](ctx context.Context, c <-chan T) (T, error)

Min returns the minimal element from channel.

func Pop added in v1.6.0

func Pop[T any](ctx context.Context, c <-chan T) (T, bool)

Pop reads a value from the channel (with context).

The function is blocking. It will wait and return in one of the following conditions:

  1. ⏹️ The context is canceled.
  2. ⏹️ The channel is closed.
  3. There is a value pushed into the channel.

In the first two cases, the second return value (called "more" or "ok") is "false". Otherwise, if a value is succesfully pulled from the channel, it is "true".

Reads from nil channels block forever. So, if a nil channel is passed, the function will exit only when the context is canceled.

func Push added in v1.6.0

func Push[T any](ctx context.Context, c chan<- T, v T)

Push writes the value into the channel (with context).

⚠️ Experimental! Behavior of this function might change in the future or the function can be removed altogether. It's not clear yet what's the best approach for when the target channel is closed. By default, Go panics in this case, which might be not good in some situations. Also, using this function might cause situations when the canceled context will be ignored by the target function instead of exiting.

func Range

func Range[T constraints.Integer](ctx context.Context, start T, end T, step T) chan T

Range generates elements from start to end with given step.

func Reduce

func Reduce[T any, G any](c <-chan T, acc G, f func(el T, acc G) G) G

Reduce is an alias for ReduceC without a context.

func ReduceC added in v1.7.0

func ReduceC[T any, G any](ctx context.Context, c <-chan T, acc G, f func(el T, acc G) G) G

Reduce applies f to acc and every element from channel and returns acc.

func Repeat

func Repeat[T constraints.Integer](ctx context.Context, val T) chan T

Repeat returns channel that produces val infinite times.

func Replicate

func Replicate[T constraints.Integer](ctx context.Context, val T, n int) chan T

Replicate returns channel that produces val n times.

Use Repeat if you need to generate the value forever.

func Scan

func Scan[T any, G any](c <-chan T, acc G, f func(el T, acc G) G) chan G

Scan is an alias for ScanC without a context.

func ScanC added in v1.7.0

func ScanC[T any, G any](ctx context.Context, c <-chan T, acc G, f func(el T, acc G) G) chan G

Scan is like Reduce, but returns slice of f results.

func Sum

func Sum[T constraints.Ordered](c <-chan T) T

Sum is an alias for SumC without a context.

func SumC added in v1.7.0

func SumC[T constraints.Ordered](ctx context.Context, c <-chan T) T

Sum returns sum of all elements from channel.

func Take

func Take[T any](c <-chan T, count int) chan T

Take is an alias for TakeC without a context.

func TakeC added in v1.7.0

func TakeC[T any](ctx context.Context, c <-chan T, count int) chan T

Take takes first count elements from the channel.

func Tee

func Tee[T any](c <-chan T, count int) []chan T

Tee is an alias for TeeC without a context.

func TeeC added in v1.7.0

func TeeC[T any](ctx context.Context, c <-chan T, count int) []chan T

Tee returns "count" number of channels with elements from the input channel.

func ToSlice

func ToSlice[T any](c <-chan T) []T

ToSlice is an alias for ToSliceC without a context.

func ToSliceC added in v1.7.0

func ToSliceC[T any](ctx context.Context, c <-chan T) []T

ToSlice returns slice with all elements from channel.

func WithBuffer added in v1.5.0

func WithBuffer[T any](c <-chan T, bufSize int) chan T

WithBuffer is an alias for WithBufferC without a context.

func WithBufferC added in v1.7.0

func WithBufferC[T any](ctx context.Context, c <-chan T, bufSize int) chan T

WithBuffer creates an echo channel of the given one with the given buffer size.

This function effectively makes writes into the given channel non-blocking until the buffer size of pending messages is reached, assuming that all reads will be done only from the channel that the function returns.

func WithContext added in v1.5.0

func WithContext[T any](c <-chan T, ctx context.Context) chan T

WithContext creates an echo channel of the given one that can be canceled with ctx.

This can be useful in 2 scenarios:

  1. To be able to cancel any function in this package without closing the original channel.
  2. For simpler iteration through channels with support for cancellation. This pattern is descirbed in the "Concurrency in Go" book in "The or-done-channel" chapter (page 119).

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL