pipelines

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2022 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

Package pipelines provides helper functions for constructing concurrent processing pipelines. Each pipeline stage represents a single stage in a parallel computation, with an input channel and an output channel. Generally, pipeline stages have signatures starting with a context and input channel as their first arguments, and returning a channel, as below:

Stage[S,T any](ctx context.Context, in <-chan S, ...) <-chan T

The return value from a pipeline stage is referred to as the stage's 'output' channel. Each stage is a non-blocking call which starts one or more goroutines which listen on the input channel and send results to the output channel. Goroutines started by each stage respond to context cancellation or closure of the input channel by closing its output channel and cleaning up all goroutines started. Many pipeline stages take a function as an argument, which transform the input and output in some way.

By default, each pipeline starts the minimum number of threads required for its operation, and returns an unbuffered channel. These defaults can be modified by passing the result of WithBuffer or WithPool as optional arguments.

Example
package main

import (
	"context"
	"fmt"
	"github.com/splunk/go-genlib/pipelines"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	input := pipelines.Chan([]int{1, 3, 5})
	doubled := pipelines.FlatMap(ctx, input, func(x int) []int { return []int{x, x + 1} })         // (x) => [x, x+1]
	expanded := pipelines.Map(ctx, doubled, func(x int) int { return x * 2 })                      // x => x*2
	exclaimed := pipelines.Map(ctx, expanded, func(x int) string { return fmt.Sprintf("%d!", x) }) // x => "${x}!"

	result, err := pipelines.Reduce(ctx, exclaimed, func(prefix string, str string) string {
		return prefix + " " + str
	})
	if err != nil {
		fmt.Println("context was cancelled!")
	}

	fmt.Print(result)

}
Output:

2! 4! 6! 8! 10! 12!

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Chan

func Chan[T any](in []T) <-chan T

Chan converts a slice of type T to a buffered channel containing the same values. Unlike other funcs in this package, Chan does not start any new goroutines.

func Combine

func Combine[T any](ctx context.Context, t1 <-chan T, t2 <-chan T, opts ...Option) <-chan T

Combine sends all values received from both of its input channels to its output channel.

func Drain added in v0.1.1

func Drain[T any](ctx context.Context, in <-chan T) ([]T, error)

Drain receives all values from the provided channel and returns them in a slice. Drain blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.

func FlatMap

func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T, opts ...Option) <-chan T

FlatMap applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel.

func FlatMapCtx

func FlatMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) []T, opts ...Option) <-chan T

FlatMapCtx applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel. The same context passed to FlatMapCtx is passed as an argument to f.

func Flatten

func Flatten[T any](ctx context.Context, in <-chan []T, opts ...Option) <-chan T

Flatten provides a pipeline stage which converts a channel of slices to a channel of scalar values. Each value contained in slices received from the input channel is sent to the output channel.

func ForkMapCtx

func ForkMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S, chan<- T), opts ...Option) <-chan T

ForkMapCtx forks an invocation of f onto a new goroutine for each value received from in. f is passed the output channel directly, and is responsible for sending its output to this channel. To avoid resource leaks, f must respect context cancellation when sending to its output channel. The same context passed to ForkMapCtx is passed to f.

ForkMapCtx should be used with caution, as it introduces potentially unbounded parallelism to a pipeline computation.

Variants of ForkMapCtx are intentionally omitted from this package. ForkMap is omitted because the caller cannot listen for context cancellation in some cases. ForkFlatMap is omitted because it is more efficient for the caller range over the slice and send individual values themselves.

Example
package main

import (
	"context"
	"fmt"
	"github.com/splunk/go-genlib/pipelines"
	"net/http"
)

func main() {
	ctx := context.Background()

	// fetch multiple URLs in parallel, sending the results to an output channel.
	urls := pipelines.Chan([]string{"https://www.google.com", "https://www.splunk.com"})
	responses := pipelines.ForkMapCtx(ctx, urls, func(ctx context.Context, url string, out chan<- *http.Response) {
		req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
		resp, err := http.DefaultClient.Do(req)
		if err == nil {
			out <- resp
		}
	})

	for response := range responses {
		fmt.Printf("%s: %d\n", response.Request.URL, response.StatusCode)
	}
}
Output:

func Map

func Map[S, T any](ctx context.Context, in <-chan S, f func(S) T, opts ...Option) <-chan T

Map applies f to every value received from the input channel and sends the result to the output channel. The output channel is closed when the input channel is closed or the provided context is cancelled.

func MapCtx

func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T, opts ...Option) <-chan T

MapCtx applies f to every value received from its input channel and sends the result to its output channel. The same context passed to MapCtx is passed as an argument to f.

func OptionMap added in v0.2.0

func OptionMap[S, T any](ctx context.Context, in <-chan S, f func(S) *T, opts ...Option) <-chan T

OptionMap applies f to every value received from in and sends all non-nil results to its output channel.

func OptionMapCtx added in v0.2.0

func OptionMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) *T, opts ...Option) <-chan T

OptionMapCtx applies f to every value received from in and sends all non-nil results to its output channel. The same context passed to OptionMapCtx is passed as an argument to f.

func Reduce added in v0.2.0

func Reduce[S, T string](ctx context.Context, in <-chan S, f func(T, S) T) (T, error)

Reduce runs a reducer function on every input received from the in chan and returns the output. Reduce blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.

func Tee added in v0.2.0

func Tee[T any](ctx context.Context, ch <-chan T, opts ...Option) (<-chan T, <-chan T)

Tee sends all values received from the input channel into 2 output channels. Both output channels must be drained simultaneously to avoid blocking this pipeline stage.

func WithCancel

func WithCancel[T any](ctx context.Context, ch <-chan T, opts ...Option) <-chan T

WithCancel passes each value received from its input channel to its output channel. If the provided context is cancelled or the input channel is closed, the output channel is also closed.

Types

type Option added in v0.3.0

type Option func(*config)

An Option is passed to optionally configure a pipeline stage.

func WithBuffer added in v0.2.0

func WithBuffer(size int) Option

WithBuffer configures a pipeline to return a buffered output channel with a buffer of the provided size.s

func WithDone added in v0.3.0

func WithDone(ctx context.Context) (Option, context.Context)

WithDone configures a pipeline stage to cancel the returned context when all goroutines started by the stage have been stopped. This is appropriate for termination detection in a single pipeline stage. To await termination of multiple pipeline stages, use WithWaitGroup.

func WithPool added in v0.2.0

func WithPool(numWorkers int) Option

WithPool configures a pipeline to run the provided stage on a parallel worker pool of the given size. All workers are kept alive until the input channel is closed or the provided context is cancelled.

func WithWaitGroup added in v0.3.0

func WithWaitGroup(wg *sync.WaitGroup) Option

WithWaitGroup configures a pipeline stage to add a value to the provided WaitGroup for each goroutine started by the stage, and signal Done when each goroutine has completed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL