Documentation
¶
Index ¶
- func AddSink[Data any](p P, id string, in <-chan Data, w io.Writer, op func(Data) ([]byte, error))
- func AddSpout[Data any](p P, id string, r io.Reader, op func([]byte) (Data, error)) <-chan Data
- func AddStage[DataIn, DataOut any](p P, id string, in <-chan DataIn, op func(DataIn, chan<- DataOut) error) <-chan DataOut
- func AddStageNM[DataIn, DataOut any](p P, id string, ins []<-chan DataIn, degree int, ...) []<-chan DataOut
- type P
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddSink ¶
`AddSink` adds a sink with the identifier `id` to the pipeline `p`. The sink receives its input over the channel `in`. For each received value, the sink performs `op` on it before writing the output via the writer `w`. The value is dropped at the sink if the operation `op` returns the nil slice or `op` returns an error.
func AddSpout ¶
`AddSpout` adds a spout to the pipeline `p` with the identifier `id`. The spout uses the reader `r` to continuously receive data items (as byte slices) and uses the function `op` to process the received items. A typical use of `op` is to convert input to the corresponding data items of the pipeline's internal data type. The function `AddSpout` returns an output channel from which the next pipeline stage receives its input. Note that the input is dropped at the spout if the reader `r` reads zero bytes or if an error occurs while processing the received input (i.e., `op` returns an error).
func AddStage ¶
func AddStage[DataIn, DataOut any](p P, id string, in <-chan DataIn, op func(DataIn, chan<- DataOut) error) <-chan DataOut
`AddStage` adds a pipeline stage with identifier `id` to the pipeline `p`. The stage receives input over `in` channel. Each input item is processed by the function `op`. The function returns a channel that is the input of the next pipeline stage. The `op` function is called with the output channel.
func AddStageNM ¶
func AddStageNM[DataIn, DataOut any](p P, id string, ins []<-chan DataIn, degree int, op func(DataIn, ...chan<- DataOut) error) []<-chan DataOut
Note that the function `op` must be thread-safe as multiple goroutines may execute it at the same time. This is important if the stage is stateful. We could implement functions like AddStage2M that receive input from two input channels, where the `op` function does not need to be thread-safe. Unfortunately, Go does not allow us to implement such a function directly, where N is not fixed. (It seems that such an implemention would be possible by using the refelect package and its function Select(). However, such an implementation would most likely have poor performance.) Overall, the pipeline stage AddStageNM does not runs a single goroutine. Instead, the stage consists of N+1 goroutines.
Types ¶
type P ¶
type P struct {
// Options (default values for pipeline stages)
// Log pipeline errors and events. No logging if nil.
Logger *slog.Logger
// Channel size between stages.
ChanSize int
// Size of byte slice for reading input at spout (only for spouts). Size
// must be large enough so that any received input can be stored in the slice.
InputSize int
// Pause reading input when no input at spout (only for spouts).
Pause time.Duration
// contains filtered or unexported fields
}
Pipeline struct, including default parameters for stages.
Example ¶
`ExampleP` setups a pipeline with a filtering stage: spout -> filter -> sink.
package main
import (
"bytes"
"fmt"
"github.com/flxch/pipeline"
)
func main() {
inbuf := bytes.NewBuffer([]byte("Hello, World!"))
outbuf := bytes.NewBuffer(nil)
// Assemble pipeline. No logging, channel size 1, do not pause when no
// input at spout. Since InputSize is 1, the spout reads byte by byte from
// inbuf.
p := pipeline.New(nil, 1, 1, 0)
// Spout: Reads from inbuf and converts it to int.
inch := pipeline.AddSpout(p, "spout", inbuf,
func(in []byte) (int, error) { return int(in[0]), nil })
// Stage: Filters data (only forward upper and lower case letters to the
// next stage). The stage's input channel is the output channel of the
// spout.
outch := pipeline.AddStage(p, "stage", inch,
func(n int, out chan<- int) error {
if n >= int('A') && n <= int('z') {
out <- n
}
return nil
})
// Sink: Converts data to []byte and writes it to outbuf. The sink's input
// channel is the output channel of the filter stage.
pipeline.AddSink(p, "sink", outch, outbuf,
func(data int) ([]byte, error) { return []byte{byte(data)}, nil })
// Run pipeline.
p.Run()
// Wait until the input buffer is empty.
for inbuf.Len() > 0 {
}
// Close pipeline.
if err := p.Close(); err != nil {
panic("failed to close pipeline")
}
// Print received data.
fmt.Printf("%s\n", outbuf.Bytes())
}
Output: HelloWorld
func New ¶
`New` creates a new pipeline. The function's arguments `l` (logger for the stages; no logging if nil), `chsz` (output channel size of a stage), `insz` (input size of a spout), and `dur` (pause between inputs) are default values for the pipeline's option parameters. They can be overwritten individually for each stage. Spouts, sinks, and stages are added by the functions `AddSpout`, `AddSink`, and `AddStage`, respectively. After setting up the stages, the method `Run` activates the pipeline.
func (P) Close ¶
`Close` closes the pipeline `p`. This includes signaling to all spouts of the pipeline to terminate. The terminate signal is forward to the next pipeline stage, e.g., a spout signals its next pipeline stage to terminate. `Close` waits until all stages of the pipeline have terminated.