pipelines

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2022 License: MIT Imports: 3 Imported by: 0

README

GitHub Workflow Status Go Report Card Coveralls GitHub Go Reference GitHub tag (latest SemVer)

splunk/pipelines

Manages concurrency using Go + Generics. Separate what your code does from how it runs on threads.

Install it locally:

go get github.com/splunk/pipelines

Overview

Pipelines provides a set of primitives for managing concurrent pipelines. A pipeline is a set of processing stages connected by channels. Each stage is run on one or more goroutines. pipelines manages starting, stopping, and scaling up each pipeline stage for you, allowing you to keep concurrency concerns away from your business logic.

Stages

Each pipeline stage is provided as a top-level func which accepts one or more inputs channels and return one or more output channels.

  • Map: converts a chan S to a chan T, by converting each S to exactly one T.
  • OptionMap: converts a chan S to a chan T, by converting each S to zero or one Ts.
  • FlatMap: converts a chan S to a chan T, by converting each S to zero or more Ts.
  • Combine: combines two chan T into a single chan T.
  • Flatten: converts a chan []T to a chan T.
  • Tee: converts a chan T into two chan T, each of which receive exactly the same values.
  • ForkMapCtx: converts a chan S into a chan T, by converting each S to zero or more Ts. Unlike FlatMap, a new goroutine is started to convert each value of S.

Any stage which converts a chan S into a chan T requires that the caller pass a conversion func that knows how to turn S into T.

Map, FlatMap, and OptionMap, each have variants MapCtx, FlatMapCtx, and OptionMapCtx, which allow the caller to pass a conversion func which accepts a context.Context as its first argument. This allows a conversion func to perform I/O safely.

Each stage creates and manages the closure of its output channel, and listens for shutdown via context.Context.

Combining Stages

Stages are meant to be combined by passing output channels into input channels using the pattern shown in the toy example below:

func main()  {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  
  input := pipelines.Chan([]int{1, 3, 5})
  expanded := pipelines.FlatMap(ctx, input, withAdjacent) // (x) => [x, x+1]:  yields [1,2,3,4,5,6]
  doubled := pipelines.Map(ctx, expanded, double)         // (x) => x*2:       yields [2,4,6,8,10,12]
  exclaimed := pipelines.Map(ctx, doubled, exclaim)       // (x) => "${x}!":   yields [2!,4!,6!,8!,10!,12!]
  
  result, err := pipelines.Reduce(ctx, exclaimed, func(prefix string, str string) string {
    return prefix + " " + str
  })
  if err != nil {
    fmt.Println("context was cancelled!")
  }

  fmt.Print(result)
  
  // Output: 2! 4! 6! 8! 10! 12!
}

func withAdjacent(x int) []int { return []int{x, x+1} }
func double(x int) int { return x*2 }
func exclaim(x int) string { return fmt.Sprintf("%d!", x)}

In real-world applications, the functions used to convert values flowing through the pipeline can be much more complex. The pipelines package provides a way to separate what each stage of the pipeline is doing from the code used to make it concurrent.

Configuring Stages

Each pipeline stage can be configured with a set of powerful options which modifies the concurrent behavior of the pipelines.

  • WithBuffer(n int): buffers the output channel with size n. Output channels are unbuffered by default.
  • WithPool(n int): runs the pipeline stage on a worker pool of size n.

A few options are provided for listening to when a pipeline has halted.

  • WithDone(context.Context): configures the stage to cancel a derived context when the stage has stopped. Can be used to signal when ANY stage in a pipeline has been stopped.
  • WithWaitGroup(sync.WaitGroup): configures the stage to use the provided WaitGroup to signal when all goroutines started in the stage have stopped. Can be used to signal when ALL stages in a pipeline have been stopped.
Sinks and Helpers

A sink serves as the last stage of a processing pipeline. All sinks are implemented as blocking calls which don't start any new goroutines.

  • Drain: converts a chan T to a []T.
  • Reduce: converts a chan S to a T, by combining multiple values of S into one value of T.

The following helpers are included to make conversion from standard to channels simpler.

  • Chan: converts any []T to a chan T.
Error Handling

Fatal and non-fatal errors that occur during a pipeline can be reported via an ErrorSink. To ensure fatal errors shut down pipeline stages, NewErrorSink wraps and returns a context which is cancelled whenever a fatal error is reported. Errors can be reported by calling ErrorSink.All(), which reports all errors in flight.

See the example in the documentation for usage.

Documentation

Overview

Package pipelines provides helper functions for constructing concurrent processing pipelines. Each pipeline stage represents a single stage in a parallel computation, with an input channel and an output channel. Generally, pipeline stages have signatures starting with a context and input channel as their first arguments, and returning a channel, as below:

Stage[S,T any](ctx context.Context, in <-chan S, ...) <-chan T

The return value from a pipeline stage is referred to as the stage's 'output' channel. Each stage is a non-blocking call which starts one or more goroutines which listen on the input channel and send results to the output channel. Goroutines started by each stage respond to context cancellation or closure of the input channel by closing its output channel and cleaning up all goroutines started. Many pipeline stages take a function as an argument, which transform the input and output in some way.

By default, each pipeline starts the minimum number of threads required for its operation, and returns an unbuffered channel. These defaults can be modified by passing the result of WithBuffer or WithPool as optional arguments.

Example
package main

import (
	"context"
	"fmt"
	"github.com/splunk/pipelines"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	input := pipelines.Chan([]int{1, 3, 5})
	doubled := pipelines.FlatMap(ctx, input, func(x int) []int { return []int{x, x + 1} })         // (x) => [x, x+1]
	expanded := pipelines.Map(ctx, doubled, func(x int) int { return x * 2 })                      // x => x*2
	exclaimed := pipelines.Map(ctx, expanded, func(x int) string { return fmt.Sprintf("%d!", x) }) // x => "${x}!"

	result, err := pipelines.Reduce(ctx, exclaimed, func(prefix string, str string) string {
		return prefix + " " + str
	})
	if err != nil {
		fmt.Println("context was cancelled!")
	}

	fmt.Print(result)

}
Output:

2! 4! 6! 8! 10! 12!

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Chan

func Chan[T any](in []T) <-chan T

Chan converts a slice to a channel. The channel returned is a closed, buffered channel containing exactly the same values. Unlike other funcs in this package, Chan does not start any new goroutines.

func Combine

func Combine[T any](ctx context.Context, t1 <-chan T, t2 <-chan T, opts ...Option) <-chan T

Combine converts two "chan T" into a single "chan T". It sends all values received from both of its input channels to its output channel.

func Drain

func Drain[T any](ctx context.Context, in <-chan T) ([]T, error)

Drain receives all values from the provided channel and returns them in a slice. Drain blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.

func FlatMap

func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T, opts ...Option) <-chan T

FlatMap starts a pipeline stage which converts a "chan S" to a "chan T", by converting each S to zero or more Ts. It applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel.

func FlatMapCtx

func FlatMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) []T, opts ...Option) <-chan T

FlatMapCtx starts a pipeline stage which converts a "chan S" to a "chan T", by converting each S to zero or more Ts. It applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel.

The context passed to FlatMapCtx is passed as an argument to f, unchanged.

func Flatten

func Flatten[T any](ctx context.Context, in <-chan []T, opts ...Option) <-chan T

Flatten starts a pipeline stage which converts a "chan []T" to a "chan T". It provides a pipeline stage which converts a channel of slices to a channel of scalar values. Each value contained in slices received from the input channel is sent to the output channel.

func ForkMapCtx

func ForkMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S, chan<- T), opts ...Option) <-chan T

ForkMapCtx converts a "chan S" into a "chan T" by converting each S into zero or more Ts. Unlike FlatMap or FlatMapCtx, this func starts a new goroutine for converting each value of S. Each goroutine is responsible for sending its output to this channel. To avoid resource leaks, f must respect context cancellation when sending to its output channel. The same context passed to ForkMapCtx is passed to f.

ForkMapCtx should be used with caution, as it introduces potentially unbounded parallelism to a pipeline computation.

Variants of ForkMapCtx are intentionally omitted from this package. ForkMap is omitted because the caller cannot listen for context cancellation in some cases. ForkFlatMap is omitted because it is more efficient for the caller range over the slice and send individual values themselves.

Example
package main

import (
	"context"
	"fmt"
	"github.com/splunk/pipelines"
	"net/http"
)

func main() {
	ctx := context.Background()

	// fetch multiple URLs in parallel, sending the results to an output channel.
	urls := pipelines.Chan([]string{"https://www.google.com", "https://www.splunk.com"})
	responses := pipelines.ForkMapCtx(ctx, urls, func(ctx context.Context, url string, out chan<- *http.Response) {
		req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
		resp, err := http.DefaultClient.Do(req)
		if err == nil {
			out <- resp
		}
	})

	for response := range responses {
		fmt.Printf("%s: %d\n", response.Request.URL, response.StatusCode)
	}
}

func Map

func Map[S, T any](ctx context.Context, in <-chan S, f func(S) T, opts ...Option) <-chan T

Map starts a pipeline stage which converts a "chan S" to a "chan T", by converting each S to exactly one T. It applies f to every value received from the input channel and sends the result to the output channel. The output channel is closed whenever the input channel is closed or the provided context is cancelled.

func MapCtx

func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T, opts ...Option) <-chan T

MapCtx starts a pipeline stage which converts a "chan S" to "chan T", by converting each S to exactly one T. Unlike Map, the function which performs the conversion also accepts a context.Context. Map applies f to every value received from its input channel and sends the result to its output channel. The context passed to MapCtx is passed as an argument to f, unchanged.

func OptionMap

func OptionMap[S, T any](ctx context.Context, in <-chan S, f func(S) *T, opts ...Option) <-chan T

OptionMap converts a "chan S" to a "chan T" by converting each "S" to zero or one "T"s. It applies f to every value received from in and sends any non-nil results to its output channel.

func OptionMapCtx

func OptionMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) *T, opts ...Option) <-chan T

OptionMapCtx converts a "chan S" to a "chan T" by converting each "S" to zero or one "T"s. It applies f to every value received from in and sends all non-nil results to its output channel. The same context passed to OptionMapCtx is passed as an argument to f.

func Reduce

func Reduce[S, T any](ctx context.Context, in <-chan S, f func(T, S) T) (T, error)

Reduce runs a reducer function on every input received from the in chan and returns the output. Reduce blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.

func Tee

func Tee[T any](ctx context.Context, ch <-chan T, opts ...Option) (<-chan T, <-chan T)

Tee converts a "chan T" into two "chan T", each of which receive exactly the same values, possibly in different orders. Both output channels must be drained concurrently to avoid blocking this pipeline stage.

Types

type ErrorSink

type ErrorSink struct {
	// contains filtered or unexported fields
}

ErrorSink provides an error-handling solution for pipelines created by this package. It manages a pipeline stage which can receive fatal and non-fatal errors that may occur during the course of a pipeline.

Example
package main

import (
	"context"
	"fmt"
	"github.com/splunk/pipelines"
	"net/http"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ctx, errs := pipelines.NewErrorSink(ctx)

	urls := pipelines.Chan([]string{
		"https://httpstat.us/200",
		"https://httpstat.us/410",
		"wrong://not.a.url/test", // malformed URL; triggers a fatal error
		"https://httpstat.us/502",
	})

	// fetch a bunch of URLs, reporting errors along the way.
	responses := pipelines.OptionMapCtx(ctx, urls, func(ctx context.Context, url string) *http.Response {
		req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
		if err != nil {
			errs.Fatal(fmt.Errorf("error forming request context: %w", err))
			return nil
		}
		resp, err := http.DefaultClient.Do(req)

		if err != nil {
			errs.Error(fmt.Errorf("error fetching %s: %w", url, err))
			return nil
		}
		if resp.StatusCode >= 400 {
			errs.Error(fmt.Errorf("unsuccessful %s: %d", url, resp.StatusCode))
			return nil
		}
		return resp
	})

	// retrieve all responses; there should be only one
	for response := range responses {
		fmt.Printf("success: %s: %d\n", response.Request.URL, response.StatusCode)
	}

	// retrieve all errors; the 502 error should be skipped, since the malformed URL triggers
	// a fatal error.
	for _, err := range errs.All() {
		fmt.Printf("error:   %v\n", err.Error())
	}
}

func NewErrorSink

func NewErrorSink(ctx context.Context, opts ...Option) (context.Context, *ErrorSink)

NewErrorSink returns a new ErrorSink, along with a context which is cancelled when a fatal error is sent to the ErrorSink. Starts a new, configurable pipeline stage which catches any errors reported.

func (*ErrorSink) All

func (s *ErrorSink) All() []error

All returns all errors which have been received by this ErrorSink so far. Subsequent calls to All can return strictly more errors, but will never return fewer errors. The only way to be certain that all errors from a pipeline have been reported is to pass WithWaitGroup to every pipeline stage which sends an error to this ErrorSink and wait for all stages to terminate before calling All().

func (*ErrorSink) Error

func (s *ErrorSink) Error(err error)

Error sends a non-fatal error to this ErrorSink, which is reported and included along with All()

func (*ErrorSink) Fatal

func (s *ErrorSink) Fatal(err error)

Fatal sends a fatal error to this ErrorSink, cancelling the child context which was created by NewErrorSink, as well as reporting this error.

type Option

type Option func(*config)

An Option is passed to optionally configure a pipeline stage.

func WithBuffer

func WithBuffer(size int) Option

WithBuffer configures a pipeline to return a buffered output channel with a buffer of the provided size.s

func WithDone

func WithDone(ctx context.Context) (Option, context.Context)

WithDone configures a pipeline stage to cancel the returned context when all goroutines started by the stage have been stopped. This is appropriate for termination detection for ANY stages in a pipeline. To await termination of ALL stages in a pipeline, use WithWaitGroup.

Example
package main

import (
	"context"
	"fmt"
	"github.com/splunk/pipelines"
)

func main() {
	ctx := context.Background()

	doubleAndPrint := func(x int) int {
		x = x * 2
		fmt.Printf("%d ", x)
		return x
	}
	ints := pipelines.Chan([]int{2, 4, 6, 8, 10})

	opt, done := pipelines.WithDone(ctx)
	out := pipelines.Map(ctx, ints, doubleAndPrint, opt)
	_, err := pipelines.Drain(ctx, out)
	if err != nil {
		fmt.Println("could not drain!")
	}

	<-done.Done() // wait for the Map stage to complete before continuing.

}
Output:

4 8 12 16 20

func WithPool

func WithPool(numWorkers int) Option

WithPool configures a pipeline to run the provided stage on a parallel worker pool of the given size. All workers are kept alive until the input channel is closed or the provided context is cancelled.

func WithWaitGroup

func WithWaitGroup(wg *sync.WaitGroup) Option

WithWaitGroup configures a pipeline stage to add a value to the provided WaitGroup for each goroutine started by the stage, and signal Done when each goroutine has completed. This option is appropriate for termination detection of ALL stages in a pipeline. To detect termination of ANY stage in a pipeline, use WithDone.

Example
package main

import (
	"context"
	"fmt"
	"github.com/splunk/pipelines"
	"sync"
)

func main() {
	ctx := context.Background()

	incAndPrint := func(x int) int {
		x = x + 1
		fmt.Printf("%d ", x)
		return x
	}
	ints := pipelines.Chan([]int{1, 2, 3, 4, 5, 6})

	// WithWaitGroup
	var wg sync.WaitGroup
	out := pipelines.Map(ctx, ints, incAndPrint, pipelines.WithWaitGroup(&wg))
	_, err := pipelines.Drain(ctx, out)
	if err != nil {
		fmt.Println("could not drain!")
	}

	wg.Wait() // wait for the Map stage to complete before continuing.

}
Output:

2 3 4 5 6 7

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL