pipeline

package module
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: MIT Imports: 4 Imported by: 1

README

pipeline

Build GoDoc Go Report Card

Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency. It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling.

If you have another common use case you would like to see covered by this package, please open a feature request.

Cookbook

Functions

func Apply

func Apply[A, B, C any](a Processor[A, []B], b Processor[B, C]) Processor[A, []C]

Apply connects two processes, applying the second to each item of the first output

transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
    return strings.Split(s, ","), nil
}, nil)

double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
    return s + s, nil
}, nil)

addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
    return "0" + s, nil
}, nil)

apply := pipeline.Apply(
    transform,
    pipeline.Sequence(
        double,
        addLeadingZero,
        double,
    ),
)

input := "1,2,3,4,5"

for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
    for j := range out {
        fmt.Printf("process: %s\n", out[j])
    }
}

Output:

process: 011011
process: 022022
process: 033033
process: 044044
process: 055055
func Buffer

func Buffer[Item any](size int, in <-chan Item) <-chan Item

Buffer creates a buffered channel that will close after the input is closed and the buffer is fully drained

func Cancel

func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item

Cancel passes an Item any from the in <-chan Item directly to the out <-chan Item until the Context is canceled. After the context is canceled, everything from in <-chan Item is sent to the cancel func instead with the ctx.Err().

// Create a context that lasts for 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Create a basic pipeline that emits one int every 250ms
p := pipeline.Delay(ctx, time.Second/4,
    pipeline.Emit(1, 2, 3, 4, 5),
)

// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
    fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)

// Otherwise, process the inputs
for out := range p {
    fmt.Printf("process: %+v\n", out)
}

Output:

process: 1
process: 2
process: 3
process: 4
5 could not be processed, context deadline exceeded
func Collect

func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item

Collect collects [Item any]s from its in channel and returns []Item from its out channel. It will collect up to maxSize inputs from the in <-chan Item over up to maxDuration before returning them as []Item. That means when maxSize is reached before maxDuration, [maxSize]Item will be passed to the out channel. But if maxDuration is reached before maxSize inputs are collected, [< maxSize]Item will be passed to the out channel. When the context is canceled, everything in the buffer will be flushed to the out channel.

func Delay

func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item

Delay delays reading each input by duration. If the context is canceled, the delay will not be applied.

func Drain

func Drain[Item any](in <-chan Item)

Drain empties the input and blocks until the channel is closed

func Emit

func Emit[Item any](is ...Item) <-chan Item

Emit fans is ...Item`` out to a <-chan Item`

func Emitter

func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item

Emitter continuously emits new items generated by the next func until the context is canceled

func Merge

func Merge[Item any](ins ...<-chan Item) <-chan Item

Merge fans multiple channels in to a single channel

one := pipeline.Emit(1)
two := pipeline.Emit(2, 2)
three := pipeline.Emit(3, 3, 3)

for i := range pipeline.Merge(one, two, three) {
    fmt.Printf("output: %d\n", i)
}

fmt.Println("done")

Output:

Output:: 1
Output:: 3
Output:: 2
Output:: 2
Output:: 3
Output:: 3
done
func Process

func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output

Process takes each input from the in <-chan Input and calls Processor.Process on it. When Processor.Process returns an Output, it will be sent to the output <-chan Output. If Processor.Process returns an error, Processor.Cancel will be called with the corresponding input and error message. Finally, if the Context is canceled, all inputs remaining in the in <-chan Input will go directly to Processor.Cancel.

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

// Multiply each number by 10
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
    return in * 10, nil
}, func(i int, err error) {
    fmt.Printf("error: could not multiply %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

Output:

result: 10
result: 20
result: 30
result: 40
result: 50
error: could not multiply 6, context deadline exceeded
func ProcessBatch

func ProcessBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output

ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of Inputs. It passed an []Output to the Processor.Process method and expects a []Input back. It passes []Input batches of inputs to the Processor.Cancel method. If the receiver is backed up, ProcessBatch can holds up to 2x maxSize.

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

// Multiply every 2 adjacent numbers together
p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) {
    o := 1
    for _, i := range is {
        o *= i
    }
    return []int{o}, nil
}, func(is []int, err error) {
    fmt.Printf("error: could not multiply %v, %s\n", is, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

Output:

result: 2
result: 12
error: could not multiply [5 6], context deadline exceeded
func ProcessBatchConcurrently

func ProcessBatchConcurrently[Input, Output any]( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output

ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, then it fans the out channels of the batch Processors back into a single out chan

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-9
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9)

// Add a 1 second delay to each number
p = pipeline.Delay(ctx, time.Second, p)

// Group two inputs at a time
p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) {
    return ins, nil
}, func(i []int, err error) {
    fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

Output:

result: 1
result: 2
result: 3
result: 5
error: could not process [7 8], context deadline exceeded
error: could not process [4 6], context deadline exceeded
error: could not process [9], context deadline exceeded
func ProcessConcurrently

func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output

ProcessConcurrently fans the in channel out to multiple Processors running concurrently, then it fans the out channels of the Processors back into a single out chan

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-7
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7)

// Add a two second delay to each number
p = pipeline.Delay(ctx, 2*time.Second, p)

// Add two concurrent processors that pass input numbers to the output
p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
    return in, nil
}, func(i int, err error) {
    fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    log.Printf("result: %d\n", result)
}

Output:

result: 2
result: 1
result: 4
result: 3
error: could not process 6, process was canceled
error: could not process 5, process was canceled
error: could not process 7, context deadline exceeded
func Split

func Split[Item any](in <-chan []Item) <-chan Item

Split takes an interface from Collect and splits it back out into individual elements

Examples

PipelineShutsDownOnError

The following example shows how you can shutdown a pipeline gracefully when it receives an error message

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a pipeline that emits 1-10
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// A step that will shutdown the pipeline if the number is greater than 1
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
    // Shut down the pipeline by canceling the context
    if i != 1 {
        cancel()
        return i, fmt.Errorf("%d caused the shutdown", i)
    }
    return i, nil
}, func(i int, err error) {
    // The cancel func is called when an error is returned by the process func or the context is canceled
    fmt.Printf("could not process %d: %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting the pipeline after all data is processed")

Output:

could not process 2: 2 caused the shutdown
result: 1
could not process 3: context canceled
could not process 4: context canceled
could not process 5: context canceled
could not process 6: context canceled
could not process 7: context canceled
could not process 8: context canceled
could not process 9: context canceled
could not process 10: context canceled
exiting the pipeline after all data is processed
PipelineShutsDownWhenContainerIsKilled

This example demonstrates a pipline that runs until the os / container the pipline is running in kills it

// Gracefully shutdown the pipeline when the the system is shutting down
// by canceling the context when os.Kill or os.Interrupt signal is sent
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer cancel()

// Create a pipeline that keeps emitting numbers sequentially until the context is canceled
var count int
p := pipeline.Emitter(ctx, func() int {
    count++
    return count
})

// Filter out only even numbers
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
    if i%2 == 0 {
        return i, nil
    }
    return i, fmt.Errorf("'%d' is an odd number", i)
}, func(i int, err error) {
    fmt.Printf("error processing '%v': %s\n", i, err)
}), p)

// Wait a few nanoseconds an simulate the os.Interrupt signal
go func() {
    time.Sleep(time.Millisecond / 10)
    fmt.Print("\n--- os kills the app ---\n\n")
    syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting after the input channel is closed")

Output:

error processing '1': '1' is an odd number
result: 2

--- os kills the app ---

error processing '3': '3' is an odd number
error processing '4': context canceled
exiting after the input channel is closed
PipelineShutsDownWhenInputChannelIsClosed

The following example demonstrates a pipeline that naturally finishes its run when the input channel is closed

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a pipeline that emits 1-10 and then closes its output channel
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Multiply every number by 2
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
    return i * 2, nil
}, func(i int, err error) {
    fmt.Printf("could not multiply %d: %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting after the input channel is closed")

Output:

result: 2
result: 4
result: 6
result: 8
result: 10
result: 12
result: 14
result: 16
result: 18
result: 20
exiting after the input channel is closed

Documentation

Overview

Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency. It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling.

If you have another common use case you would like to see covered by this package, please (open a feature request) https://github.com/deliveryhero/pipeline/issues.

Cookbook

* (How to run a pipeline until the container is killed) https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled * (How to shut down a pipeline when there is a error) https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError * (How to shut down a pipeline after it has finished processing a batch of data) https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed

Example (PipelineShutsDownOnError)

The following example shows how you can shutdown a pipeline gracefully when it receives an error message

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a pipeline that emits 1-10
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// A step that will shutdown the pipeline if the number is greater than 1
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
	// Shut down the pipeline by canceling the context
	if i != 1 {
		cancel()
		return i, fmt.Errorf("%d caused the shutdown", i)
	}
	return i, nil
}, func(i int, err error) {
	// The cancel func is called when an error is returned by the process func or the context is canceled
	fmt.Printf("could not process %d: %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
	fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting the pipeline after all data is processed")

// Example Output:
// could not process 2: 2 caused the shutdown
// result: 1
// could not process 3: context canceled
// could not process 4: context canceled
// could not process 5: context canceled
// could not process 6: context canceled
// could not process 7: context canceled
// could not process 8: context canceled
// could not process 9: context canceled
// could not process 10: context canceled
// exiting the pipeline after all data is processed
Output:

Example (PipelineShutsDownWhenContainerIsKilled)

This example demonstrates a pipline that runs until the os / container the pipline is running in kills it

// Gracefully shutdown the pipeline when the the system is shutting down
// by canceling the context when os.Kill or os.Interrupt signal is sent
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer cancel()

// Create a pipeline that keeps emitting numbers sequentially until the context is canceled
var count int
p := pipeline.Emitter(ctx, func() int {
	count++
	return count
})

// Filter out only even numbers
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
	if i%2 == 0 {
		return i, nil
	}
	return i, fmt.Errorf("'%d' is an odd number", i)
}, func(i int, err error) {
	fmt.Printf("error processing '%v': %s\n", i, err)
}), p)

// Wait a few nanoseconds an simulate the os.Interrupt signal
go func() {
	time.Sleep(time.Millisecond / 10)
	fmt.Print("\n--- os kills the app ---\n\n")
	syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

// Finally, lets print the results and see what happened
for result := range p {
	fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting after the input channel is closed")

// Example Output:
// error processing '1': '1' is an odd number
// result: 2
//
// --- os kills the app ---
//
// error processing '3': '3' is an odd number
// error processing '4': context canceled
// exiting after the input channel is closed
Output:

Example (PipelineShutsDownWhenInputChannelIsClosed)

The following example demonstrates a pipeline that naturally finishes its run when the input channel is closed

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a pipeline that emits 1-10 and then closes its output channel
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Multiply every number by 2
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
	return i * 2, nil
}, func(i int, err error) {
	fmt.Printf("could not multiply %d: %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
	fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting after the input channel is closed")

// Example Output:
// result: 2
// result: 4
// result: 6
// result: 8
// result: 10
// result: 12
// result: 14
// result: 16
// result: 18
// result: 20
// exiting after the input channel is closed
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Buffer

func Buffer[Item any](size int, in <-chan Item) <-chan Item

Buffer creates a buffered channel that will close after the input is closed and the buffer is fully drained

func Cancel

func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item

Cancel passes an `Item any` from the `in <-chan Item` directly to the out `<-chan Item` until the `Context` is canceled. After the context is canceled, everything from `in <-chan Item` is sent to the `cancel` func instead with the `ctx.Err()`.

Example
// Create a context that lasts for 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Create a basic pipeline that emits one int every 250ms
p := pipeline.Delay(ctx, time.Second/4,
	pipeline.Emit(1, 2, 3, 4, 5),
)

// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
	fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)

// Otherwise, process the inputs
for out := range p {
	fmt.Printf("process: %+v\n", out)
}
Output:

process: 1
process: 2
process: 3
process: 4
5 could not be processed, context deadline exceeded

func Collect

func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item

Collect collects `[Item any]`s from its in channel and returns `[]Item` from its out channel. It will collect up to `maxSize` inputs from the `in <-chan Item` over up to `maxDuration` before returning them as `[]Item`. That means when `maxSize` is reached before `maxDuration`, `[maxSize]Item` will be passed to the out channel. But if `maxDuration` is reached before `maxSize` inputs are collected, `[< maxSize]Item` will be passed to the out channel. When the `context` is canceled, everything in the buffer will be flushed to the out channel.

func Delay

func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item

Delay delays reading each input by `duration`. If the context is canceled, the delay will not be applied.

func Drain

func Drain[Item any](in <-chan Item)

Drain empties the input and blocks until the channel is closed

func Emit

func Emit[Item any](is ...Item) <-chan Item

Emit fans `is ...Item“ out to a `<-chan Item`

func Emitter

func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item

Emitter continuously emits new items generated by the next func until the context is canceled

func Merge

func Merge[Item any](ins ...<-chan Item) <-chan Item

Merge fans multiple channels in to a single channel

Example
one := pipeline.Emit(1)
two := pipeline.Emit(2, 2)
three := pipeline.Emit(3, 3, 3)

for i := range pipeline.Merge(one, two, three) {
	fmt.Printf("output: %d\n", i)
}

fmt.Println("done")

// Example Output:
// Output:: 1
// Output:: 2
// Output:: 2
// Output:: 3
// Output:: 3
Output:

func Process

func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output

Process takes each input from the `in <-chan Input` and calls `Processor.Process` on it. When `Processor.Process` returns an `Output`, it will be sent to the output `<-chan Output`. If `Processor.Process` returns an error, `Processor.Cancel` will be called with the corresponding input and error message. Finally, if the `Context` is canceled, all inputs remaining in the `in <-chan Input` will go directly to `Processor.Cancel`.

Example
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

// Multiply each number by 10
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
	return in * 10, nil
}, func(i int, err error) {
	fmt.Printf("error: could not multiply %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
	fmt.Printf("result: %d\n", result)
}

// Example Output:
// result: 10
// result: 20
// result: 30
// result: 40
// result: 50
// error: could not multiply 6, context deadline exceeded
Output:

func ProcessBatch

func ProcessBatch[Input, Output any](
	ctx context.Context,
	maxSize int,
	maxDuration time.Duration,
	processor Processor[[]Input, []Output],
	in <-chan Input,
) <-chan Output

ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of `Input`s. It passed an []Output to the `Processor.Process` method and expects a []Input back. It passes []Input batches of inputs to the `Processor.Cancel` method. If the receiver is backed up, ProcessBatch can holds up to 2x maxSize.

Example
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

// Multiply every 2 adjacent numbers together
p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) {
	o := 1
	for _, i := range is {
		o *= i
	}
	return []int{o}, nil
}, func(is []int, err error) {
	fmt.Printf("error: could not multiply %v, %s\n", is, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
	fmt.Printf("result: %d\n", result)
}
Output:

result: 2
result: 12
error: could not multiply [5 6], context deadline exceeded

func ProcessBatchConcurrently

func ProcessBatchConcurrently[Input, Output any](
	ctx context.Context,
	concurrently,
	maxSize int,
	maxDuration time.Duration,
	processor Processor[[]Input, []Output],
	in <-chan Input,
) <-chan Output

ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, then it fans the out channels of the batch Processors back into a single out chan

Example
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-9
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9)

// Add a 1 second delay to each number
p = pipeline.Delay(ctx, time.Second, p)

// Group two inputs at a time
p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) {
	return ins, nil
}, func(i []int, err error) {
	fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
	fmt.Printf("result: %d\n", result)
}

// Example Output
// result: 1
// result: 2
// result: 3
// result: 5
// error: could not process [7 8], context deadline exceeded
// error: could not process [4 6], context deadline exceeded
// error: could not process [9], context deadline exceeded
Output:

func ProcessConcurrently

func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output

ProcessConcurrently fans the in channel out to multiple Processors running concurrently, then it fans the out channels of the Processors back into a single out chan

Example
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-7
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7)

// Add a two second delay to each number
p = pipeline.Delay(ctx, 2*time.Second, p)

// Add two concurrent processors that pass input numbers to the output
p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
	return in, nil
}, func(i int, err error) {
	fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
	log.Printf("result: %d\n", result)
}

// Example Output:
// result: 2
// result: 1
// result: 4
// result: 3
// error: could not process 6, process was canceled
// error: could not process 5, process was canceled
// error: could not process 7, context deadline exceeded
Output:

func Split

func Split[Item any](in <-chan []Item) <-chan Item

Split takes an interface from Collect and splits it back out into individual elements

Types

type Processor

type Processor[Input, Output any] interface {
	// Process processes an input and returns an output or an error, if the output could not be processed.
	// When the context is canceled, process should stop all blocking operations and return the `Context.Err()`.
	Process(ctx context.Context, i Input) (Output, error)

	// Cancel is called if process returns an error or if the context is canceled while there are still items in the `in <-chan Input`.
	Cancel(i Input, err error)
}

Processor represents a blocking operation in a pipeline. Implementing `Processor` will allow you to add business logic to your pipelines without directly managing channels. This simplifies your unit tests and eliminates channel management related bugs.

func Apply added in v2.2.0

func Apply[A, B, C any](
	a Processor[A, []B],
	b Processor[B, C],
) Processor[A, []C]

Apply connects two processes, applying the second to each item of the first output

Example
transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
	return strings.Split(s, ","), nil
}, nil)

double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
	return s + s, nil
}, nil)

addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
	return "0" + s, nil
}, nil)

apply := pipeline.Apply(
	transform,
	pipeline.Sequence(
		double,
		addLeadingZero,
		double,
	),
)

input := "1,2,3,4,5"

for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
	for j := range out {
		fmt.Printf("process: %s\n", out[j])
	}
}
Output:

process: 011011
process: 022022
process: 033033
process: 044044
process: 055055

func Join added in v2.1.0

func Join[A, B, C any](a Processor[A, B], b Processor[B, C]) Processor[A, C]

Join connects two processes where the output of the first is the input of the second

func NewProcessor

func NewProcessor[Input, Output any](
	process func(ctx context.Context, i Input) (Output, error),
	cancel func(i Input, err error),
) Processor[Input, Output]

NewProcessor creates a process and cancel func

func Sequence added in v2.1.0

func Sequence[A any](ps ...Processor[A, A]) Processor[A, A]

Sequence connects many processors sequentially where the inputs are the same outputs

Directories

Path Synopsis
package semaphore is like a sync.WaitGroup with an upper limit.
package semaphore is like a sync.WaitGroup with an upper limit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL