pipeline

package module
v2.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2022 License: Apache-2.0 Imports: 3 Imported by: 1

README

pipeline

A multithreaded pipeline implementation with support of Go 1.18 generics.

Installation

The webapp module can be installed by running the following command in your main Go module:

go get github.com/qqiao/pipeline/v2
Documentation and Usage

API documentation and example code can be found at pkg.go.dev.

Known issues

Documentation

Overview

Package pipeline contains an implementation of the Pipeline concurrency pattern with Go 1.18 generics support.

Background

This Pipeline concurrency pattern is inspired by the blog post from the Go dev team here: https://go.dev/blog/pipelines.

This library takes the overall idea of the blog post and makes it easier to use by implementing it with support of generics, introduced in Go 1.18.

Additional care has also been taken in the design of the API to ensure that both connecting multiple pipelines and creating multi-stage pipelines are as easy as possible.

Basic Concepts

To better understand how a pipeline works we start by taking a look at the components that make up a pipeline.

A Producer is a channel from which a pipeline gets its inputs from, a pipeline will continue to read from the producer channel until it is closed or the context is cancelled . More on the cancellation of the context later in the Stopping Short section.

A Consumer is any struct that consumes a Producer. This library provides an interface with the definition of a Consumer. Anything that matches the signature is implicitly a consumer.

Since a pipeline itself is obviously a consumer of some input, and a pipeline can be considered a producer since it returns a channel where the results are sent into, chaining pipelines is both logically natural and as been made easy to do so. We will discuss it in more detail in the Chaining Pipelines section.

A ConsumerFunc, which is a function that takes a channel, into which the results of the pipeline are sent, as its sole argument. Since a channel where values are sent into is also the definition of a Producer, you can consider a ConsumerFunc as this:

type ConsumerFunc[I any] func(Producer[I])

Stages

Stages are the heart of the pipeline. While the API of a Stage look extremely similar to that of a Pipeline, the actual multiplexing of the workers and the final collation of the results are done by the stage. Therefore, the NewStage function requires additional parameters to control the multiplexing behaviours of the stage.

Each stage must have a Worker function. A worker can either be a simple function that takes an input and returns an output or an error; or a StreamWorker that continuously reads from the producer and sends its output into a channel. The Stage will take care of the concurrency of the workers and the combination of the results. Since multiple Worker instances will be created, Worker functions are expected to be thread-safe to prevent any unpredictable results.

The workerPoolSize parameter defines the upper bound of the number of workers.

All Worker goroutines are lazily created. A new worker goroutine is created when a new input is read from the producer, until workerPoolSize is reached. Given that goroutines are cheap to create, in order to keep the implementation as simple as possible, no worker re-use will happen in this growing phase.

Once workerPoolSize is reached, worker goroutines will compete for the inputs from the producer until the pipeline is done. The pool will not auto shrink for simplicity reasons.

The bufferSize parameter defines the buffer size of the output channel. This allows the processing to start without having to block wait on the consumer to start reading.

Consuming a Pipeline

There are multiple ways of consuming the output of a pipeline.

1. Create a Consumer, and have the Consumes function take the returned channel of a pipeline. When we chain pipelines, this is the most effortless way.

2. Create a ConsumerFunc, pass it to the pipeline using the WithConsumer method. The advantage of consuming the pipeline results this way is that the ConsumerFunc can be easily written and unit tested without even needing a pipeline.

3. Directly taking the returned channel of the Produces method and reading from it. Although this approach requires the least code to be written, we strongly encourage applications to not use this approach and instead use methods 1 and 2, as those two methods allow modular and unit-testable code to be written.

More advanced uses of the Consumer and Producer pattern will be discussed further in the Chaining Pipelines section.

Chaining Pipelines

A Pipeline also has a set of APIs to make pipeline chaining straight-forward. While connecting multiple pipelines will produce identical results as a single pipeline with all the constituent stages, there are advantages of having composable pipelines. The main advantage of making pipelines composable is so that pipelines from different authors and libraries can be easily re-used.

Without the ability to directly connect pipelines, authors would need to explicitly create a Stage instance for each stage of their Pipeline, and make them public, instead of allowing the Pipeline's AddStage API to do it behind the scenes automatically. More ever, users will also need to ensure that these stages are added to their own pipeline in the correct order.

With composable pipelines, this is no longer an issue. A Pipeline can be made a producer of another Pipeline by passing the return value of the Produces method. A pipeline can also directly consume the result of another Pipeline with the Consumes method.

Examples of composing pipelines can be found in the example of the Produces method.

Stopping Short

Sometimes it is desirable to stop a pipeline short of its completion. A good example would be that if we use a pipeline for a web server, in which case the producer channel will stay open for as long as the server runs and will not be closed.

However, we still want a way to force shutdown the server, a switch that if flicked, would immediately stop the server from accepting new requests and for the pipeline to organically end.

This is where the context comes in. In the case where the application needs to stop the pipeline immediately, it should cancel the context, and the pipeline and all of its stages will stop processing and terminate gracefully.

Error Handling

A pipeline is designed to fail fast. That is, if any error should occur at any time at any stage, the pipeline terminates.

The Start function on both Stage and Pipeline returns a channel of errors, however, as soon as a single error is fed into the channel, the entile pipeline halts and the error can be read from the channel.

Performance Tuning

There are 3 important dials that would help fine tune the performance characteristics of a pipeline: the buffer size of the producer, workerPoolSize and bufferSize of the stage.

Making the producer a buffered channel and adjusting the buffer size allows any code that sends into the producers to execute without having to block wait on the pipeline to be ready, which in turn might improve the overall performance of the application by allowing more code to actually run concurrently.

Similar to making the producer channel a buffered channel, you can also adjust the bufferSize of each stage in the pipeline. This is buffer size of the output channel. This allows the stage to proceed without having to block wait on the consumer being ready. And similar to the effect of having a buffered producer, this would potentially allow more code to run concurrently.

The third dial that can affect the performance of a pipeline is the size of the worker pool. In theory, if the producer can saturate the worker pool, and the consumer can consume all of the output, then the throughput of each stage should scale linearly with the size of the worker pool. However, in reality, this scaling is affected by many factors and will almost never simply be linear.

Applications are recommended to run benchmarks with real workloads and tweak the setting to find the most suitable combination of producer and consumer buffer sizes and worker pool sizes.

The Benchmark* methods in stage_test.go offers a good starting point on how to write such benchmark test cases.

Thread Safety

While use cases would be extremely rare for a pipeline itself or its stages to be shared across multiple goroutines, efforts have been made to ensure that both pipelines and stages are thread safe themselves. That is, calling AddStage* from different goroutines will not cause any race condition, and moreover, the Start methods on both Pipeline and Stage are re-entrant, so even if these methods are called multiple times from different goroutines accidentally, no ill-effect is to be expected.

The more important aspect about thread-safety is the workers. The workers need to be thread safe as they will be executed in multiple goroutines, provided that workerPoolSize is set to higher than 1. In the rare case where the workers cannot be made thread-safe, users can simply set the workerPoolSize to 1, this would force the stage to create only a single worker goroutine, thus maintaining thread safety.

Please note that even with workerPoolSize set to one, a pipeline is still useful in multiple ways: it still allows all the other benefits like chaining; you can still tweak the bufferSize of a stage so that the worker can start executing before the next stage is ready for better throughput.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrNoProducer             = errors.New("no producer")
	ErrNoStage                = errors.New("no stage")
	ErrModificationAfterStart = errors.New("modification after start")
)

Errors for invalid pipeline states.

View Source
var (
	ErrInvalidBufferSize     = errors.New("invalid buffer size")
	ErrInvalidWorkerPoolSize = errors.New("invalid worker pool size")
)

Errors for when the stage is in an invalid state

Functions

func Merge added in v2.1.0

func Merge[O any](ctx context.Context, out chan<- O, inputs <-chan (<-chan O))

Merge takes a channel of channels, and merges the content that has been sent into those separate channels into a single channel.

Types

type Consumer

type Consumer[I any] interface {
	Consumes(Producer[I])
}

Consumer is an interface that wraps the Consume method.

type ConsumerFunc

type ConsumerFunc[I any] func(Producer[I])

ConsumerFunc is a function that consumes values from the Producer

The Consumes function of a Consumer is a ConsumerFunc.

type Pipeline

type Pipeline[I, O any] struct {
	// contains filtered or unexported fields
}

Pipeline represents data processing Pipeline.

func NewPipeline

func NewPipeline[I, O any]() *Pipeline[I, O]

NewPipeline creates a pipeline.

Please note that pipelines created this way does not have a producer channel, thus calling AddStage before calling Consumes will result in AddStage throwing an ErrNoProducer.

func NewPipelineWithProducer

func NewPipelineWithProducer[I, O any](producer Producer[I]) *Pipeline[I, O]

NewPipelineWithProducer creates a pipeline with the producer given.

Internally this function simply calls NewPipeline and then the Consumes method on the returned pipeline in sequence, this method is exactly equivalent to the following code:

pipeline := NewPipeline[I, O]()
pipeline.Consumes(producer)

func (*Pipeline[I, O]) AddStage

func (p *Pipeline[I, O]) AddStage(workerPoolSize int, bufferSize int,
	worker Worker[any, any]) (*Pipeline[I, O], error)

AddStage adds a stage to the pipeline.

Internally, this method first transforms the worker to a StreamWorker and then calls AddStageStreamWorker. So this function is exactly equivalent to the following:

sw := NewStreamWorker(worker)
p.AddStageStreamWorker(workerPoolSize, bufferSize, sw)

Please refer to the documentation of AddStageStreamWorker for more details.

func (*Pipeline[I, O]) AddStageStreamWorker added in v2.1.0

func (p *Pipeline[I, O]) AddStageStreamWorker(workerPoolSize int,
	bufferSize int, worker StreamWorker[any, any]) (*Pipeline[I, O], error)

AddStageStreamWorker adds a stage to the pipeline.

This method first creates a new Stage instance by calling NewStageStreamWorker and then adds it to the end of list of stages. Please refer to documentations of NewStageStreamWorker for details on the parameters.

func (*Pipeline[I, O]) Consumes

func (p *Pipeline[I, O]) Consumes(producer Producer[I])

Consumes sets the producer of this pipeline.

Example
producer := make(chan int)
consumer := func(out pipeline.Producer[int]) {
	for v := range out {
		fmt.Println(v)
	}
}

go func() {
	defer close(producer)
	producer <- 2
	producer <- 3
}()

sq := func(in any) (any, error) {
	i := in.(int)
	return i * i, nil
}
p := pipeline.NewPipeline[int, int]()
p.Consumes(producer)
p.WithConsumer(consumer)

_, err := p.AddStage(10, 0, sq)
if err != nil {
	log.Fatalf("Unable to add stage: %v", err)
}
p.Start(context.Background())
if err != nil {
	log.Fatalf("Unable to run pipeline")
}
Output:

4
9

func (*Pipeline[I, O]) Produces

func (p *Pipeline[I, O]) Produces() (Producer[O], error)

Produces returns a channel into which results of the pipeline will be sent.

This method throws a ErrNoStage if the current pipeline does not have any stages.

Example (Chaining)
// In this example, we are going to chain 2 pipelines using the result of
// the first pipeline's Produces call as the Producer of the second.
ctx := context.Background()
producer := make(chan int)
consumer := func(out pipeline.Producer[int]) {
	for v := range out {
		fmt.Println(v)
	}
}

go func() {
	defer close(producer)
	producer <- 2
	producer <- 3
}()

sq := func(in any) (any, error) {
	i := in.(int)
	return i * i, nil
}
p1, err := pipeline.NewPipelineWithProducer[int, int](producer).AddStage(10, 0, sq)
if err != nil {
	log.Fatalf("Unable to add stage: %v", err)
}
p1Producer, err := p1.Produces()
if err != nil {
	log.Fatalf("Unable to get pipeline1's producer")
}

cube := func(in any) (any, error) {
	i := in.(int)
	return i * i * i, nil
}
// We chain the output channel of p1 into p2 by using it as the producer of
// p2
p2, err := pipeline.NewPipelineWithProducer[int, int](p1Producer).AddStage(10, 0, cube)
if err != nil {
	log.Fatalf("Unable to create pipeline 2")
}
p2.WithConsumer(consumer)
p1.Start(ctx)
p2.Start(ctx)
Output:

64
729

func (*Pipeline[I, O]) Start

func (p *Pipeline[I, O]) Start(ctx context.Context) <-chan error

Start method starts the processing of the pipeline.

This method returns a channel of errors, however because of the fail-fast nature of pipelines. The pipeline will stop execution when it encounters the first error.

Example
producer := make(chan int)
consumer := func(out pipeline.Producer[int]) {
	for v := range out {
		fmt.Println(v)
	}
}

go func() {
	defer close(producer)
	producer <- 2
	producer <- 3
}()

sq := func(in any) (any, error) {
	i := in.(int)
	return i * i, nil
}
p, err := pipeline.NewPipelineWithProducer[int, int](producer).AddStage(10, 0, sq)
if err != nil {
	log.Fatalf("Unable to add stage: %v", err)
}

out, err := p.Produces()
p.Start(context.Background())
if err != nil {
	log.Fatalf("Unable to run pipeline")
}

consumer(out)
Output:

4
9
Example (Stopping)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
producer := make(chan int)

p, err := pipeline.NewPipelineWithProducer[int,
	int](producer).AddStage(1, 0, func(in any) (any, error) {
	return in, nil
})
if err != nil {
	log.Fatalf("Unable to create pipeline: %v", err)
}

// Simulates an infinite input
go func() {
	for {
		producer <- 1
	}
}()

out, err := p.Produces()
if err != nil {
	log.Fatalf("Unable to execute pipeline: %v", err)
}
p.Start(ctx)

// This part would infinite loop if we didn't cancel the context
for range out {
}
Output:

func (*Pipeline[I, O]) WithConsumer

func (p *Pipeline[I, O]) WithConsumer(consumer ConsumerFunc[O]) *Pipeline[I, O]

WithConsumer sets the consumer of the pipeline.

Please read the package level documentation for different approaches of consuming the result of a pipeline and their comparisons.

type Producer

type Producer[O any] <-chan O

Producer is a channel from which a consumer can read its inputs from.

type Stage

type Stage[I, O any] struct {
	// contains filtered or unexported fields
}

Stage represents a single stage in the Pipeline.

Each Stage should be considered a unit of work done to the data in the Pipeline.

func NewStage

func NewStage[I, O any](workerPoolSize int, bufferSize int, in Producer[I],
	worker Worker[I, O]) (*Stage[I, O], error)

NewStage creates a new stage with necessary parameters specified.

Internally, this function is nothing but a shorthand for the following:

sw := NewStreamWorker(worker)
stage := NewStageStreamWorker(workerPoolSize, bufferSize, in, sw)

Please refer to documentation of NewStageStreamWorker for details of all parameters.

func NewStageStreamWorker added in v2.1.0

func NewStageStreamWorker[I, O any](workerPoolSize int, bufferSize int,
	in Producer[I], worker StreamWorker[I, O]) (*Stage[I, O], error)

NewStageStreamWorker creates a new stage with the given StreamWorker and parameters.

Parameters

workerPoolSize: internally, the stage maintains a pool of workers all running concurrently, workerPoolSize specifies the upper bound of the possible number of workers.

bufferSize: size of the output buffer. Setting a bufferSize of greater than 0 will make the output channel a buffered channel, which will allow some work to be done concurrently without having to block wait for the consumer.

in: channel where input will be read from

worker: is the function that actually processes each unit of data read from the in channel.

This function returns ErrInvalidBufferSize if bufferSize is less than 0.

This function returns ErrInvalidWorkerPoolSize if workerPoolSize is not at least 1.

func (*Stage[I, O]) Produces

func (s *Stage[I, O]) Produces() Producer[O]

Produces returns a Producer where the results of this stage will be sent into.

func (*Stage[I, O]) Start

func (s *Stage[I, O]) Start(ctx context.Context) <-chan error

Start method starts the processing of the stage.

This method is responsible for creating the worker pool, distributing work between the workers and collating the results in the end.

Workers are created lazily by this method. That is, for a stage with workerPoolSize of n, the first n inputs each start a worker.

The worker pool also does not shrink, that is, once n workers are created, they will keep on serving until either the stage is explicitly terminated by cancelling the context, or the in channel is closed.

Please also note that order is NOT guaranteed by the Stage. That is, results could come out of the channel in different order from they were read in the input channel.

This method also returns a channel of errors, however, due to the fail-fast nature of a pipeline, the execution will stop on the first error occurrence.

Example
input := make(chan int)

// sq takes an integer and returns the square of that integer
sq := func(in int) (int, error) {
	return in * in, nil
}

stage, err := pipeline.NewStage(10, 0, input, sq)
if err != nil {
	log.Panicf("Error creating stage: %v", err)
}

output := stage.Produces()
stage.Start(context.Background())
input <- 2
input <- 3
close(input)

for v := range output {
	fmt.Println(v)
}
Output:

4
9
Example (Ordered)
type OrderedEntry struct {
	Order int
	Value int
}
input := make(chan OrderedEntry)

// sq takes an integer and returns the square of that integer
// in this example, it also takes an additional parameter which is the
// order of the input. This value is also returned as part of the result
// so that we can ensure that the order of the results matches the order
// of the input
sq := func(in OrderedEntry) (OrderedEntry, error) {
	return OrderedEntry{
		Order: in.Order,
		Value: in.Value * in.Value,
	}, nil
}

stage, err := pipeline.NewStage(10, 5, input, sq)
if err != nil {
	log.Panicf("Error creating stage: %v", err)
}

output := stage.Produces()
stage.Start(context.Background())
input <- OrderedEntry{0, 2}
input <- OrderedEntry{1, 3}
close(input)

var results []OrderedEntry
for v := range output {
	results = append(results, v)
}

// Once we have the results, we sort based on the order
sort.Slice(results, func(i, j int) bool {
	return results[i].Order < results[j].Order
})
fmt.Printf("%v", results)
Output:

[{0 4} {1 9}]

type StreamWorker added in v2.1.0

type StreamWorker[I, O any] func(context.Context, Producer[I]) (<-chan O,
	<-chan error)

StreamWorker is a type of worker that continuously takes input from the producer and continuously produces output into the output channel.

A StreamWorker also returns an error channel, any time anything is sent into this channel, the stage and pipeline will immediately halt the execution. This is to ensure that the pipeline stays consistent with the fail-fast characteristics.

StreamWorkers will be multiplexed based on workerPoolSize. Thus, they need to be thread safe.

func NewStreamWorker added in v2.1.0

func NewStreamWorker[I, O any](worker Worker[I, O]) StreamWorker[I, O]

NewStreamWorker takes any Worker and converts it into a StreamWorker.

This allows the worker to be reused. Under the surface, the stage is simply multiplexing StreamWorkers based on workerPoolSize.

type Worker

type Worker[I, O any] func(I) (O, error)

Worker represent a unit of work.

Workers are simple functions that takes an input and returns an output. The Stage will take care of the concurrency of the workers and the combination of the results.

Since multiple Worker instances will be created, Worker functions are expected to be thread-safe to prevent any unpredictable results.

Any time an error is returned, the stage and pipeline will immediately halt any execution. This is to stay consistent with the fail-fast characteristics.

Due to the fact that Workers might be multiplexed, they should be thread-safe.

Directories

Path Synopsis
Package examples contains various example applications using the pipelines.
Package examples contains various example applications using the pipelines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL