Documentation
¶
Overview ¶
Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency. It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling.
If you have another common use case you would like to see covered by this package, please (open a feature request) https://github.com/deliveryhero/pipeline/issues.
Cookbook ¶
* (How to run a pipeline until the container is killed) https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled * (How to shut down a pipeline when there is a error) https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError * (How to shut down a pipeline after it has finished processing a batch of data) https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed
Example (PipelineShutsDownOnError) ¶
The following example shows how you can shutdown a pipeline gracefully when it receives an error message
// Create a context that can be canceled ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Create a pipeline that emits 1-10 p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // A step that will shutdown the pipeline if the number is greater than 1 p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { // Shut down the pipeline by canceling the context if i != 1 { cancel() return i, fmt.Errorf("%d caused the shutdown", i) } return i, nil }, func(i int, err error) { // The cancel func is called when an error is returned by the process func or the context is canceled fmt.Printf("could not process %d: %s\n", i, err) }), p) // Finally, lets print the results and see what happened for result := range p { fmt.Printf("result: %d\n", result) } fmt.Println("exiting the pipeline after all data is processed") // Example Output: // could not process 2: 2 caused the shutdown // result: 1 // could not process 3: context canceled // could not process 4: context canceled // could not process 5: context canceled // could not process 6: context canceled // could not process 7: context canceled // could not process 8: context canceled // could not process 9: context canceled // could not process 10: context canceled // exiting the pipeline after all data is processed
Output:
Example (PipelineShutsDownWhenContainerIsKilled) ¶
This example demonstrates a pipline that runs until the os / container the pipline is running in kills it
// Gracefully shutdown the pipeline when the the system is shutting down // by canceling the context when os.Kill or os.Interrupt signal is sent ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt) defer cancel() // Create a pipeline that keeps emitting numbers sequentially until the context is canceled var count int p := pipeline.Emitter(ctx, func() int { count++ return count }) // Filter out only even numbers p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { if i%2 == 0 { return i, nil } return i, fmt.Errorf("'%d' is an odd number", i) }, func(i int, err error) { fmt.Printf("error processing '%v': %s\n", i, err) }), p) // Wait a few nanoseconds an simulate the os.Interrupt signal go func() { time.Sleep(time.Millisecond / 10) fmt.Print("\n--- os kills the app ---\n\n") syscall.Kill(syscall.Getpid(), syscall.SIGINT) }() // Finally, lets print the results and see what happened for result := range p { fmt.Printf("result: %d\n", result) } fmt.Println("exiting after the input channel is closed") // Example Output: // error processing '1': '1' is an odd number // result: 2 // // --- os kills the app --- // // error processing '3': '3' is an odd number // error processing '4': context canceled // exiting after the input channel is closed
Output:
Example (PipelineShutsDownWhenInputChannelIsClosed) ¶
The following example demonstrates a pipeline that naturally finishes its run when the input channel is closed
// Create a context that can be canceled ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Create a pipeline that emits 1-10 and then closes its output channel p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // Multiply every number by 2 p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) { return i * 2, nil }, func(i int, err error) { fmt.Printf("could not multiply %d: %s\n", i, err) }), p) // Finally, lets print the results and see what happened for result := range p { fmt.Printf("result: %d\n", result) } fmt.Println("exiting after the input channel is closed") // Example Output: // result: 2 // result: 4 // result: 6 // result: 8 // result: 10 // result: 12 // result: 14 // result: 16 // result: 18 // result: 20 // exiting after the input channel is closed
Output:
Index ¶
- func Buffer[Item any](size int, in <-chan Item) <-chan Item
- func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item
- func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item
- func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item
- func Drain[Item any](in <-chan Item)
- func Emit[Item any](is ...Item) <-chan Item
- func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item
- func Merge[Item any](ins ...<-chan Item) <-chan Item
- func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output
- func ProcessBatch[Input, Output any](ctx context.Context, maxSize int, maxDuration time.Duration, ...) <-chan Output
- func ProcessBatchConcurrently[Input, Output any](ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, ...) <-chan Output
- func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], ...) <-chan Output
- func Split[Item any](in <-chan []Item) <-chan Item
- type Processor
- func Apply[A, B, C any](a Processor[A, []B], b Processor[B, C]) Processor[A, []C]
- func Join[A, B, C any](a Processor[A, B], b Processor[B, C]) Processor[A, C]
- func NewProcessor[Input, Output any](process func(ctx context.Context, i Input) (Output, error), ...) Processor[Input, Output]
- func Sequence[A any](ps ...Processor[A, A]) Processor[A, A]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Buffer ¶
Buffer creates a buffered channel that will close after the input is closed and the buffer is fully drained
func Cancel ¶
Cancel passes an `Item any` from the `in <-chan Item` directly to the out `<-chan Item` until the `Context` is canceled. After the context is canceled, everything from `in <-chan Item` is sent to the `cancel` func instead with the `ctx.Err()`.
Example ¶
// Create a context that lasts for 1 second ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Create a basic pipeline that emits one int every 250ms p := pipeline.Delay(ctx, time.Second/4, pipeline.Emit(1, 2, 3, 4, 5), ) // If the context is canceled, pass the ints to the cancel func for teardown p = pipeline.Cancel(ctx, func(i int, err error) { fmt.Printf("%+v could not be processed, %s\n", i, err) }, p) // Otherwise, process the inputs for out := range p { fmt.Printf("process: %+v\n", out) }
Output: process: 1 process: 2 process: 3 process: 4 5 could not be processed, context deadline exceeded
func Collect ¶
func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item
Collect collects `[Item any]`s from its in channel and returns `[]Item` from its out channel. It will collect up to `maxSize` inputs from the `in <-chan Item` over up to `maxDuration` before returning them as `[]Item`. That means when `maxSize` is reached before `maxDuration`, `[maxSize]Item` will be passed to the out channel. But if `maxDuration` is reached before `maxSize` inputs are collected, `[< maxSize]Item` will be passed to the out channel. When the `context` is canceled, everything in the buffer will be flushed to the out channel.
func Delay ¶
Delay delays reading each input by `duration`. If the context is canceled, the delay will not be applied.
func Drain ¶
func Drain[Item any](in <-chan Item)
Drain empties the input and blocks until the channel is closed
func Emit ¶
func Emit[Item any](is ...Item) <-chan Item
Emit fans `is ...Item“ out to a `<-chan Item`
func Emitter ¶
Emitter continuously emits new items generated by the next func until the context is canceled
func Merge ¶
func Merge[Item any](ins ...<-chan Item) <-chan Item
Merge fans multiple channels in to a single channel
Example ¶
one := pipeline.Emit(1) two := pipeline.Emit(2, 2) three := pipeline.Emit(3, 3, 3) for i := range pipeline.Merge(one, two, three) { fmt.Printf("output: %d\n", i) } fmt.Println("done") // Example Output: // Output:: 1 // Output:: 2 // Output:: 2 // Output:: 3 // Output:: 3
Output:
func Process ¶
func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output
Process takes each input from the `in <-chan Input` and calls `Processor.Process` on it. When `Processor.Process` returns an `Output`, it will be sent to the output `<-chan Output`. If `Processor.Process` returns an error, `Processor.Cancel` will be called with the corresponding input and error message. Finally, if the `Context` is canceled, all inputs remaining in the `in <-chan Input` will go directly to `Processor.Cancel`.
Example ¶
// Create a context that times out after 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Create a pipeline that emits 1-6 at a rate of one int per second p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) // Multiply each number by 10 p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) { return in * 10, nil }, func(i int, err error) { fmt.Printf("error: could not multiply %v, %s\n", i, err) }), p) // Finally, lets print the results and see what happened for result := range p { fmt.Printf("result: %d\n", result) } // Example Output: // result: 10 // result: 20 // result: 30 // result: 40 // result: 50 // error: could not multiply 6, context deadline exceeded
Output:
func ProcessBatch ¶
func ProcessBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output
ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of `Input`s. It passed an []Output to the `Processor.Process` method and expects a []Input back. It passes []Input batches of inputs to the `Processor.Cancel` method. If the receiver is backed up, ProcessBatch can holds up to 2x maxSize.
Example ¶
// Create a context that times out after 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Create a pipeline that emits 1-6 at a rate of one int per second p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6)) // Multiply every 2 adjacent numbers together p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) { o := 1 for _, i := range is { o *= i } return []int{o}, nil }, func(is []int, err error) { fmt.Printf("error: could not multiply %v, %s\n", is, err) }), p) // Finally, lets print the results and see what happened for result := range p { fmt.Printf("result: %d\n", result) }
Output: result: 2 result: 12 error: could not multiply [5 6], context deadline exceeded
func ProcessBatchConcurrently ¶
func ProcessBatchConcurrently[Input, Output any]( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output
ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, then it fans the out channels of the batch Processors back into a single out chan
Example ¶
// Create a context that times out after 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Create a pipeline that emits 1-9 p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9) // Add a 1 second delay to each number p = pipeline.Delay(ctx, time.Second, p) // Group two inputs at a time p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) { return ins, nil }, func(i []int, err error) { fmt.Printf("error: could not process %v, %s\n", i, err) }), p) // Finally, lets print the results and see what happened for result := range p { fmt.Printf("result: %d\n", result) } // Example Output // result: 1 // result: 2 // result: 3 // result: 5 // error: could not process [7 8], context deadline exceeded // error: could not process [4 6], context deadline exceeded // error: could not process [9], context deadline exceeded
Output:
func ProcessConcurrently ¶
func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output
ProcessConcurrently fans the in channel out to multiple Processors running concurrently, then it fans the out channels of the Processors back into a single out chan
Example ¶
// Create a context that times out after 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Create a pipeline that emits 1-7 p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7) // Add a two second delay to each number p = pipeline.Delay(ctx, 2*time.Second, p) // Add two concurrent processors that pass input numbers to the output p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) { return in, nil }, func(i int, err error) { fmt.Printf("error: could not process %v, %s\n", i, err) }), p) // Finally, lets print the results and see what happened for result := range p { log.Printf("result: %d\n", result) } // Example Output: // result: 2 // result: 1 // result: 4 // result: 3 // error: could not process 6, process was canceled // error: could not process 5, process was canceled // error: could not process 7, context deadline exceeded
Output:
Types ¶
type Processor ¶
type Processor[Input, Output any] interface { // Process processes an input and returns an output or an error, if the output could not be processed. // When the context is canceled, process should stop all blocking operations and return the `Context.Err()`. Process(ctx context.Context, i Input) (Output, error) // Cancel is called if process returns an error or if the context is canceled while there are still items in the `in <-chan Input`. Cancel(i Input, err error) }
Processor represents a blocking operation in a pipeline. Implementing `Processor` will allow you to add business logic to your pipelines without directly managing channels. This simplifies your unit tests and eliminates channel management related bugs.
func Apply ¶ added in v2.2.0
Apply connects two processes, applying the second to each item of the first output
Example ¶
transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) { return strings.Split(s, ","), nil }, nil) double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) { return s + s, nil }, nil) addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) { return "0" + s, nil }, nil) apply := pipeline.Apply( transform, pipeline.Sequence( double, addLeadingZero, double, ), ) input := "1,2,3,4,5" for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) { for j := range out { fmt.Printf("process: %s\n", out[j]) } }
Output: process: 011011 process: 022022 process: 033033 process: 044044 process: 055055
func Join ¶ added in v2.1.0
Join connects two processes where the output of the first is the input of the second