Documentation
¶
Overview ¶
Package pipelines provides helper functions for constructing concurrent processing pipelines. Each pipeline stage represents a single stage in a parallel computation, with an input channel and an output channel. Generally, pipeline stages have signatures starting with a context and input channel as their first arguments, and returning a channel, as below:
Stage[S,T any](ctx context.Context, in <-chan S, ...) <-chan T
The return value from a pipeline stage is referred to as the stage's 'output' channel. Each stage is a non-blocking call which starts one or more goroutines which listen on the input channel and send results to the output channel. Goroutines started by each stage respond to context cancellation or closure of the input channel by closing its output channel and cleaning up all goroutines started. Many pipeline stages take a function as an argument, which transform the input and output in some way.
By default, each pipeline starts the minimum number of threads required for its operation, and returns an unbuffered channel. These defaults can be modified by passing the result of WithBuffer or WithPool as optional arguments.
Example ¶
package main import ( "context" "fmt" "github.com/splunk/pipelines" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() input := pipelines.Chan([]int{1, 3, 5}) doubled := pipelines.FlatMap(ctx, input, func(x int) []int { return []int{x, x + 1} }) // (x) => [x, x+1] expanded := pipelines.Map(ctx, doubled, func(x int) int { return x * 2 }) // x => x*2 exclaimed := pipelines.Map(ctx, expanded, func(x int) string { return fmt.Sprintf("%d!", x) }) // x => "${x}!" result, err := pipelines.Reduce(ctx, exclaimed, func(prefix string, str string) string { return prefix + " " + str }) if err != nil { fmt.Println("context was cancelled!") } fmt.Print(result) }
Output: 2! 4! 6! 8! 10! 12!
Index ¶
- func Chan[T any](in []T) <-chan T
- func Combine[T any](ctx context.Context, t1 <-chan T, t2 <-chan T, opts ...Option) <-chan T
- func Drain[T any](ctx context.Context, in <-chan T) ([]T, error)
- func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T, opts ...Option) <-chan T
- func FlatMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) []T, ...) <-chan T
- func Flatten[T any](ctx context.Context, in <-chan []T, opts ...Option) <-chan T
- func ForkMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S, chan<- T), ...) <-chan T
- func Map[S, T any](ctx context.Context, in <-chan S, f func(S) T, opts ...Option) <-chan T
- func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T, opts ...Option) <-chan T
- func OptionMap[S, T any](ctx context.Context, in <-chan S, f func(S) *T, opts ...Option) <-chan T
- func OptionMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) *T, ...) <-chan T
- func Reduce[S, T any](ctx context.Context, in <-chan S, f func(T, S) T) (T, error)
- func Tee[T any](ctx context.Context, ch <-chan T, opts ...Option) (<-chan T, <-chan T)
- type ErrorSink
- type Option
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Chan ¶
func Chan[T any](in []T) <-chan T
Chan converts a slice to a channel. The channel returned is a closed, buffered channel containing exactly the same values. Unlike other funcs in this package, Chan does not start any new goroutines.
func Combine ¶
Combine converts two "chan T" into a single "chan T". It sends all values received from both of its input channels to its output channel.
func Drain ¶
Drain receives all values from the provided channel and returns them in a slice. Drain blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.
func FlatMap ¶
FlatMap starts a pipeline stage which converts a "chan S" to a "chan T", by converting each S to zero or more Ts. It applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel.
func FlatMapCtx ¶
func FlatMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) []T, opts ...Option) <-chan T
FlatMapCtx starts a pipeline stage which converts a "chan S" to a "chan T", by converting each S to zero or more Ts. It applies f to every value received from its input channel and sends all values found in the slice returned from f to its output channel.
The context passed to FlatMapCtx is passed as an argument to f, unchanged.
func Flatten ¶
Flatten starts a pipeline stage which converts a "chan []T" to a "chan T". It provides a pipeline stage which converts a channel of slices to a channel of scalar values. Each value contained in slices received from the input channel is sent to the output channel.
func ForkMapCtx ¶
func ForkMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S, chan<- T), opts ...Option) <-chan T
ForkMapCtx converts a "chan S" into a "chan T" by converting each S into zero or more Ts. Unlike FlatMap or FlatMapCtx, this func starts a new goroutine for converting each value of S. Each goroutine is responsible for sending its output to this channel. To avoid resource leaks, f must respect context cancellation when sending to its output channel. The same context passed to ForkMapCtx is passed to f.
ForkMapCtx should be used with caution, as it introduces potentially unbounded parallelism to a pipeline computation.
Variants of ForkMapCtx are intentionally omitted from this package. ForkMap is omitted because the caller cannot listen for context cancellation in some cases. ForkFlatMap is omitted because it is more efficient for the caller range over the slice and send individual values themselves.
Example ¶
package main import ( "context" "fmt" "github.com/splunk/pipelines" "net/http" ) func main() { ctx := context.Background() // fetch multiple URLs in parallel, sending the results to an output channel. urls := pipelines.Chan([]string{"https://www.google.com", "https://www.splunk.com"}) responses := pipelines.ForkMapCtx(ctx, urls, func(ctx context.Context, url string, out chan<- *http.Response) { req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) resp, err := http.DefaultClient.Do(req) if err == nil { out <- resp } }) for response := range responses { fmt.Printf("%s: %d\n", response.Request.URL, response.StatusCode) } }
func Map ¶
Map starts a pipeline stage which converts a "chan S" to a "chan T", by converting each S to exactly one T. It applies f to every value received from the input channel and sends the result to the output channel. The output channel is closed whenever the input channel is closed or the provided context is cancelled.
func MapCtx ¶
func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T, opts ...Option) <-chan T
MapCtx starts a pipeline stage which converts a "chan S" to "chan T", by converting each S to exactly one T. Unlike Map, the function which performs the conversion also accepts a context.Context. Map applies f to every value received from its input channel and sends the result to its output channel. The context passed to MapCtx is passed as an argument to f, unchanged.
func OptionMap ¶
OptionMap converts a "chan S" to a "chan T" by converting each "S" to zero or one "T"s. It applies f to every value received from in and sends any non-nil results to its output channel.
func OptionMapCtx ¶
func OptionMapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) *T, opts ...Option) <-chan T
OptionMapCtx converts a "chan S" to a "chan T" by converting each "S" to zero or one "T"s. It applies f to every value received from in and sends all non-nil results to its output channel. The same context passed to OptionMapCtx is passed as an argument to f.
func Reduce ¶
Reduce runs a reducer function on every input received from the in chan and returns the output. Reduce blocks the caller until the input channel is closed or the provided context is cancelled. An error is returned if and only if the provided context was cancelled before the input channel was closed.
Types ¶
type ErrorSink ¶
type ErrorSink struct {
// contains filtered or unexported fields
}
ErrorSink provides an error-handling solution for pipelines created by this package. It manages a pipeline stage which can receive fatal and non-fatal errors that may occur during the course of a pipeline.
Example ¶
package main import ( "context" "fmt" "github.com/splunk/pipelines" "net/http" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ctx, errs := pipelines.NewErrorSink(ctx) urls := pipelines.Chan([]string{ "https://httpstat.us/200", "https://httpstat.us/410", "wrong://not.a.url/test", // malformed URL; triggers a fatal error "https://httpstat.us/502", }) // fetch a bunch of URLs, reporting errors along the way. responses := pipelines.OptionMapCtx(ctx, urls, func(ctx context.Context, url string) *http.Response { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { errs.Fatal(fmt.Errorf("error forming request context: %w", err)) return nil } resp, err := http.DefaultClient.Do(req) if err != nil { errs.Error(fmt.Errorf("error fetching %s: %w", url, err)) return nil } if resp.StatusCode >= 400 { errs.Error(fmt.Errorf("unsuccessful %s: %d", url, resp.StatusCode)) return nil } return resp }) // retrieve all responses; there should be only one for response := range responses { fmt.Printf("success: %s: %d\n", response.Request.URL, response.StatusCode) } // retrieve all errors; the 502 error should be skipped, since the malformed URL triggers // a fatal error. for _, err := range errs.All() { fmt.Printf("error: %v\n", err.Error()) } }
func NewErrorSink ¶
NewErrorSink returns a new ErrorSink, along with a context which is cancelled when a fatal error is sent to the ErrorSink. Starts a new, configurable pipeline stage which catches any errors reported.
func (*ErrorSink) All ¶
All returns all errors which have been received by this ErrorSink so far. Subsequent calls to All can return strictly more errors, but will never return fewer errors. The only way to be certain that all errors from a pipeline have been reported is to pass WithWaitGroup to every pipeline stage which sends an error to this ErrorSink and wait for all stages to terminate before calling All().
type Option ¶
type Option func(*config)
An Option is passed to optionally configure a pipeline stage.
func WithBuffer ¶
WithBuffer configures a pipeline to return a buffered output channel with a buffer of the provided size.s
func WithDone ¶
WithDone configures a pipeline stage to cancel the returned context when all goroutines started by the stage have been stopped. This is appropriate for termination detection for ANY stages in a pipeline. To await termination of ALL stages in a pipeline, use WithWaitGroup.
Example ¶
package main import ( "context" "fmt" "github.com/splunk/pipelines" ) func main() { ctx := context.Background() doubleAndPrint := func(x int) int { x = x * 2 fmt.Printf("%d ", x) return x } ints := pipelines.Chan([]int{2, 4, 6, 8, 10}) opt, done := pipelines.WithDone(ctx) out := pipelines.Map(ctx, ints, doubleAndPrint, opt) _, err := pipelines.Drain(ctx, out) if err != nil { fmt.Println("could not drain!") } <-done.Done() // wait for the Map stage to complete before continuing. }
Output: 4 8 12 16 20
func WithPool ¶
WithPool configures a pipeline to run the provided stage on a parallel worker pool of the given size. All workers are kept alive until the input channel is closed or the provided context is cancelled.
func WithWaitGroup ¶
WithWaitGroup configures a pipeline stage to add a value to the provided WaitGroup for each goroutine started by the stage, and signal Done when each goroutine has completed. This option is appropriate for termination detection of ALL stages in a pipeline. To detect termination of ANY stage in a pipeline, use WithDone.
Example ¶
package main import ( "context" "fmt" "github.com/splunk/pipelines" "sync" ) func main() { ctx := context.Background() incAndPrint := func(x int) int { x = x + 1 fmt.Printf("%d ", x) return x } ints := pipelines.Chan([]int{1, 2, 3, 4, 5, 6}) // WithWaitGroup var wg sync.WaitGroup out := pipelines.Map(ctx, ints, incAndPrint, pipelines.WithWaitGroup(&wg)) _, err := pipelines.Drain(ctx, out) if err != nil { fmt.Println("could not drain!") } wg.Wait() // wait for the Map stage to complete before continuing. }
Output: 2 3 4 5 6 7