streamchan

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: MIT Imports: 3 Imported by: 0

README

go-streaming-channel

DRY by encapsulating channel draining logic (producing and consuming items on a channel). Thanks to Go 1.18 generics it is type safe. See the tests for examples.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DrainStream

func DrainStream[T any](
	ctx context.Context,
	stream Stream[T],
	errors ...chan error,
) error

DrainStream invokes handler for every entity T read from repository stream.

Set a stream mode in context to change the handling of canceled context.

Errors are sent on the optional channel, otherwise returned as an aggregate error. In infinite stream mode, error channel should be used to avoid aggregating errors (and claiming memory) indefinitely.

func WithStreamMode

func WithStreamMode(ctx context.Context, mode StreamMode) context.Context

WithStreamMode specifies the stream mode to use for draining.

The default is Exhaustible.

Types

type Consumer

type Consumer[T any] func(ctx context.Context, entity T) error

Consumer is invoked on each stream item encountered.

type Finisher

type Finisher func(ctx context.Context) error

Finisher is invoked after all stream items have been processed.

type Producer

type Producer[T any] func(ctx context.Context, stream chan T) error

Producer sends items on the stream channel.

The function is intended to be run as go routine. It must take care to close the channel, which signals the function is done. Closing is best deferred.

type Stream

type Stream[T any] struct {
	Producer Producer[T]
	Consumer Consumer[T]
	Finisher []Finisher
}

func NewStream

func NewStream[T any](
	producer Producer[T],
	consumer Consumer[T],
	finisher ...Finisher,
) Stream[T]

type StreamMode

type StreamMode int
const (
	// Exhaustible streams are eventually closed during program execution.
	//
	// Canceling the context is considered an error.
	Exhaustible StreamMode = iota + 1

	// Infinite streams produce values indefinitely.
	//
	// Canceling the context is not considered an error.
	Infinite
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL