Documentation
¶
Overview ¶
Package pipelines provides helper functions for constructing concurrent processing pipelines. Each pipeline stage represents a single stage in a parallel computation, with an input channel and an output channel. Generally, pipeline stages have signatures starting with a context and input channel as their first arguments, and returning a channel, as below:
Stage[S,T any](ctx context.Context, in <-chan S, ...) <-chan T
The return value from a pipeline stage is referred to as the stage's 'output' channel. Each stage is a non-blocking call which starts one or more goroutines which listen on the input channel and send results to the output channel. Goroutines started by each stage respond to context cancellation or closure of the input channel by closing its output channel and cleaning up all goroutines started. Many pipeline stages take a function as an argument, which transform the input and output in some way.
By default, each pipeline starts the minimum number of threads required for its operation, and returns an unbuffered channel. These defaults can be modified by passing the result of WithBuffer or WithPool as optional arguments.
Example ¶
package main import ( "context" "fmt" "github.com/splunk/go-genlib/pipelines" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() input := pipelines.Chan([]int{1, 3, 5}) doubled := pipelines.FlatMap(ctx, input, func(x int) []int { return []int{x, x + 1} }) // (x) => [x, x+1] expanded := pipelines.Map(ctx, doubled, func(x int) int { return x * 2 }) // x => x*2 exclaimed := pipelines.Map(ctx, expanded, func(x int) string { return fmt.Sprintf("%d!", x) }) // x => "${x}!" result, err := pipelines.Reduce(ctx, exclaimed, func(prefix string, str string) string { return prefix + " " + str }) if err != nil { fmt.Println("context was cancelled!") } fmt.Print(result) }
Output: 2! 4! 6! 8! 10! 12!
Index ¶
- func Chan[T any](in []T) <-chan T
- func Combine[T any](ctx context.Context, t1 <-chan T, t2 <-chan T, opts ...Option) <-chan T
- func Drain[T any](ctx context.Context, in <-chan T) ([]T, error)
- func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T, opts ...Option) <-chan T
- func FlatMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) []T, ...) <-chan T
- func Flatten[T any](ctx context.Context, in <-chan []T, opts ...Option) <-chan T
- func ForkMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S, chan<- T), ...) <-chan T
- func Map[S, T any](ctx context.Context, in <-chan S, f func(S) T, opts ...Option) <-chan T
- func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T, opts ...Option) <-chan T
- func OptionMap[S, T any](ctx context.Context, in <-chan S, f func(S) *T, opts ...Option) <-chan T
- func OptionMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) *T, ...) <-chan T
- func Reduce[S, T string](ctx context.Context, in <-chan S, f func(T, S) T) (T, error)
- func Tee[T any](ctx context.Context, ch <-chan T, opts ...Option) (<-chan T, <-chan T)
- func WithCancel[T any](ctx context.Context, ch <-chan T, opts ...Option) <-chan T
- type Option
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Chan ¶
func Chan[T any](in []T) <-chan T
Chan converts a slice of type T to a buffered channel containing the same values. Unlike other funcs in this package, Chan does not start any new goroutines.
func Combine ¶
Combine sends all values received from both of its input channels to its output channel.
func Drain ¶ added in v0.1.1
Drain receives all values from the provided channel and returns them in a slice. Drain blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.
func FlatMap ¶
FlatMap applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel.
func FlatMapCtx ¶
func FlatMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) []T, opts ...Option) <-chan T
FlatMapCtx applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel. The same context passed to FlatMapCtx is passed as an argument to f.
func Flatten ¶
Flatten provides a pipeline stage which converts a channel of slices to a channel of scalar values. Each value contained in slices received from the input channel is sent to the output channel.
func ForkMapCtx ¶
func ForkMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S, chan<- T), opts ...Option) <-chan T
ForkMapCtx forks an invocation of f onto a new goroutine for each value received from in. f is passed the output channel directly, and is responsible for sending its output to this channel. To avoid resource leaks, f must respect context cancellation when sending to its output channel. The same context passed to ForkMapCtx is passed to f.
ForkMapCtx should be used with caution, as it introduces potentially unbounded parallelism to a pipeline computation.
Variants of ForkMapCtx are intentionally omitted from this package. ForkMap is omitted because the caller cannot listen for context cancellation in some cases. ForkFlatMap is omitted because it is more efficient for the caller range over the slice and send individual values themselves.
Example ¶
package main import ( "context" "fmt" "github.com/splunk/go-genlib/pipelines" "net/http" ) func main() { ctx := context.Background() // fetch multiple URLs in parallel, sending the results to an output channel. urls := pipelines.Chan([]string{"https://www.google.com", "https://www.splunk.com"}) responses := pipelines.ForkMapCtx(ctx, urls, func(ctx context.Context, url string, out chan<- *http.Response) { req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) resp, err := http.DefaultClient.Do(req) if err == nil { out <- resp } }) for response := range responses { fmt.Printf("%s: %d\n", response.Request.URL, response.StatusCode) } }
Output:
func Map ¶
Map applies f to every value received from the input channel and sends the result to the output channel. The output channel is closed when the input channel is closed or the provided context is cancelled.
func MapCtx ¶
func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T, opts ...Option) <-chan T
MapCtx applies f to every value received from its input channel and sends the result to its output channel. The same context passed to MapCtx is passed as an argument to f.
func OptionMap ¶ added in v0.2.0
OptionMap applies f to every value received from in and sends all non-nil results to its output channel.
func OptionMapCtx ¶ added in v0.2.0
func OptionMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) *T, opts ...Option) <-chan T
OptionMapCtx applies f to every value received from in and sends all non-nil results to its output channel. The same context passed to OptionMapCtx is passed as an argument to f.
func Reduce ¶ added in v0.2.0
Reduce runs a reducer function on every input received from the in chan and returns the output. Reduce blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.
Types ¶
type Option ¶ added in v0.3.0
type Option func(*config)
An Option is passed to optionally configure a pipeline stage.
func WithBuffer ¶ added in v0.2.0
WithBuffer configures a pipeline to return a buffered output channel with a buffer of the provided size.s
func WithDone ¶ added in v0.3.0
WithDone configures a pipeline stage to cancel the returned context when all goroutines started by the stage have been stopped. This is appropriate for termination detection in a single pipeline stage. To await termination of multiple pipeline stages, use WithWaitGroup.
func WithPool ¶ added in v0.2.0
WithPool configures a pipeline to run the provided stage on a parallel worker pool of the given size. All workers are kept alive until the input channel is closed or the provided context is cancelled.
func WithWaitGroup ¶ added in v0.3.0
WithWaitGroup configures a pipeline stage to add a value to the provided WaitGroup for each goroutine started by the stage, and signal Done when each goroutine has completed.