pipeline

package module
v0.0.0-...-7e236c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: BSD-3-Clause Imports: 5 Imported by: 0

README

pipeline

Simple Go package for implementing pipelines. The pipeline stages must be arranged as a finite directed acyclic graph with spouts as entry points and sinks as exit points. It is not checked whether the pipeline stages are acyclic.

Spouts read input via io.Reader and sinks write output via io.Writer. Pipeline stages, including spouts and sinks, run concurrently as goroutines. A stage receives data over an input channel, transforms it and/or updates its state, and sends data over an output channel to the next pipeline stage.

Example

package main

import "bytes"
import "fmt"
import "github.com/flxch/pipeline"

func main() {
    inbuf := bytes.NewBuffer([]byte("Hello, World!"))
    outbuf := bytes.NewBuffer(nil)

    // Assemble pipeline.  No logging, channel size 1, do not pause when no
    // input at spout.  Since InputSize is 1, the spout reads byte by byte from
    // inbuf.
    p := pipeline,New(nil, 1, 1, 0)
    // Spout: Reads from inbuf and converts it to int.
    inch := pipeline.AddSpout(p, "spout", inbuf,
        func(in []byte) (int, error) { return int(in[0]), nil })
    // Stage: Filters data (only forward upper and lower case letters to the
    // next stage).  The stage's input channel is the output channel of the
    // spout.
    outch := pipeline.AddStage(p, "stage", inch,
        func(n int, out chan<- int) error {
            if n >= int('A') && n <= int('z') {
                out <- n
            }
            return nil
        })
    // Sink: Converts data to []byte and writes it to outbuf.  The sink's input
    // channel is the output channel of the filter stage.
    pipeline.AddSink(p, "sink", outch, outbuf,
        func(data int) ([]byte, error) { return []byte{byte(data)}, nil })

    // Run pipeline.
    p.Run()
    // Wait until the input buffer is empty.
    for inbuf.Len() > 0 { }
    // Close pipeline.
    if err := p.Close(); err != nil {
        panic("failed to close pipeline")
    }

    // Print received data.
    fmt.Printf("%s\n", outbuf.Bytes())
}

Notes

The package is under development. Its API might change. Feedback is welcomed.

Alternative Pipeline Packages

Incomplete list of pipeline packages:

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddSink

func AddSink[Data any](p P, id string, in <-chan Data, w io.Writer, op func(Data) ([]byte, error))

`AddSink` adds a sink with the identifier `id` to the pipeline `p`. The sink receives its input over the channel `in`. For each received value, the sink performs `op` on it before writing the output via the writer `w`. The value is dropped at the sink if the operation `op` returns the nil slice or `op` returns an error.

func AddSpout

func AddSpout[Data any](p P, id string, r io.Reader, op func([]byte) (Data, error)) <-chan Data

`AddSpout` adds a spout to the pipeline `p` with the identifier `id`. The spout uses the reader `r` to continuously receive data items (as byte slices) and uses the function `op` to process the received items. A typical use of `op` is to convert input to the corresponding data items of the pipeline's internal data type. The function `AddSpout` returns an output channel from which the next pipeline stage receives its input. Note that the input is dropped at the spout if the reader `r` reads zero bytes or if an error occurs while processing the received input (i.e., `op` returns an error).

func AddStage

func AddStage[DataIn, DataOut any](p P, id string, in <-chan DataIn, op func(DataIn, chan<- DataOut) error) <-chan DataOut

`AddStage` adds a pipeline stage with identifier `id` to the pipeline `p`. The stage receives input over `in` channel. Each input item is processed by the function `op`. The function returns a channel that is the input of the next pipeline stage. The `op` function is called with the output channel.

func AddStageNM

func AddStageNM[DataIn, DataOut any](p P, id string, ins []<-chan DataIn, degree int, op func(DataIn, ...chan<- DataOut) error) []<-chan DataOut

Note that the function `op` must be thread-safe as multiple goroutines may execute it at the same time. This is important if the stage is stateful. We could implement functions like AddStage2M that receive input from two input channels, where the `op` function does not need to be thread-safe. Unfortunately, Go does not allow us to implement such a function directly, where N is not fixed. (It seems that such an implemention would be possible by using the refelect package and its function Select(). However, such an implementation would most likely have poor performance.) Overall, the pipeline stage AddStageNM does not runs a single goroutine. Instead, the stage consists of N+1 goroutines.

Types

type P

type P struct {
	// Options (default values for pipeline stages)
	// Log pipeline errors and events.  No logging if nil.
	Logger *slog.Logger
	// Channel size between stages.
	ChanSize int
	// Size of byte slice for reading input at spout (only for spouts).  Size
	// must be large enough so that any received input can be stored in the slice.
	InputSize int
	// Pause reading input when no input at spout (only for spouts).
	Pause time.Duration
	// contains filtered or unexported fields
}

Pipeline struct, including default parameters for stages.

Example

`ExampleP` setups a pipeline with a filtering stage: spout -> filter -> sink.

package main

import (
	"bytes"
	"fmt"
	"github.com/flxch/pipeline"
)

func main() {
	inbuf := bytes.NewBuffer([]byte("Hello, World!"))
	outbuf := bytes.NewBuffer(nil)

	// Assemble pipeline.  No logging, channel size 1, do not pause when no
	// input at spout.  Since InputSize is 1, the spout reads byte by byte from
	// inbuf.
	p := pipeline.New(nil, 1, 1, 0)
	// Spout: Reads from inbuf and converts it to int.
	inch := pipeline.AddSpout(p, "spout", inbuf,
		func(in []byte) (int, error) { return int(in[0]), nil })
	// Stage: Filters data (only forward upper and lower case letters to the
	// next stage).  The stage's input channel is the output channel of the
	// spout.
	outch := pipeline.AddStage(p, "stage", inch,
		func(n int, out chan<- int) error {
			if n >= int('A') && n <= int('z') {
				out <- n
			}
			return nil
		})
	// Sink: Converts data to []byte and writes it to outbuf.  The sink's input
	// channel is the output channel of the filter stage.
	pipeline.AddSink(p, "sink", outch, outbuf,
		func(data int) ([]byte, error) { return []byte{byte(data)}, nil })

	// Run pipeline.
	p.Run()
	// Wait until the input buffer is empty.
	for inbuf.Len() > 0 {
	}
	// Close pipeline.
	if err := p.Close(); err != nil {
		panic("failed to close pipeline")
	}

	// Print received data.
	fmt.Printf("%s\n", outbuf.Bytes())

}
Output:
HelloWorld

func New

func New(l *slog.Logger, chsz, insz int, dur time.Duration) P

`New` creates a new pipeline. The function's arguments `l` (logger for the stages; no logging if nil), `chsz` (output channel size of a stage), `insz` (input size of a spout), and `dur` (pause between inputs) are default values for the pipeline's option parameters. They can be overwritten individually for each stage. Spouts, sinks, and stages are added by the functions `AddSpout`, `AddSink`, and `AddStage`, respectively. After setting up the stages, the method `Run` activates the pipeline.

func (P) Close

func (p P) Close() error

`Close` closes the pipeline `p`. This includes signaling to all spouts of the pipeline to terminate. The terminate signal is forward to the next pipeline stage, e.g., a spout signals its next pipeline stage to terminate. `Close` waits until all stages of the pipeline have terminated.

func (P) Run

func (p P) Run()

`Run` sends signals to the pipeline spouts that the pipeline `p` is ready and that they are now ready to receive input data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL