Documentation
¶
Overview ¶
Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.
The library uses channels under the hood to pass data between pipeline stages. Each stage runs in its own goroutine, ensuring concurrency and separation of concerns.
Below is an example of an application utilizing pipelines for squaring an odd int and managing shared state counters:
package yourpipeline
import (
"context"
"fmt"
"log/slog"
"sync"
"github.com/elastiflow/pipelines"
"github.com/elastiflow/pipelines/datastreams"
"github.com/elastiflow/pipelines/datastreams/sources"
)
// PipelineWrapper is an example of a pipelines.Pipeline wrapper implementation. It includes shared state via counters.
type PipelineWrapper struct {
mu sync.Mutex
errChan chan error
evenCounter int
oddCounter int
}
// NewPipelineWrapper creates a new PipelineWrapper with counters set to 0
func NewPipelineWrapper() *PipelineWrapper {
// Setup channels and return PipelineWrapper
errChan := make(chan error, 10)
return &PipelineWrapper{
errChan: errChan,
evenCounter: 0,
oddCounter: 0,
}
}
// Run the PipelineWrapper
func (pl *PipelineWrapper) Run() {
defer close(pl.errChan)
pipeline := pipelines.New[int, int]( // Create a new Pipeline
context.Background(),
sources.FromArray(createIntArr(10)), // Create a source to start the pipeline
pl.errChan,
).Start(pl.exampleProcess)
go func(errReceiver <-chan error) { // Handle Pipeline errors
defer pipeline.Close()
for err := range errReceiver {
if err != nil {
slog.Error("demo error: " + err.Error())
// return // if you wanted to close the pipeline during error handling.
}
}
}(pl.errChan)
for out := range pipeline.Out() { // Read Pipeline output
slog.Info("received simple pipeline output", slog.Int("out", out))
}
}
func (pl *PipelineWrapper) squareOdds(v int) (int, error) {
if v%2 == 0 {
pl.mu.Lock()
pl.evenCounter++
pl.mu.Unlock()
return v, fmt.Errorf("even number error: %v", v)
}
pl.mu.Lock()
pl.oddCounter++
pl.mu.Unlock()
return v * v, nil
}
func (pl *PipelineWrapper) exampleProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
// datastreams.DataStream.OrDone will stop the pipeline if the input channel is closed
return p.OrDone().FanOut(
datastreams.Params{Num: 2}, // datastreams.DataStream.FanOut will run subsequent ds.Pipe stages in parallel
).Run(
pl.squareOdds, // datastreams.DataStream.Run will execute the ds.Pipe process: "squareOdds"
) // datastreams.DataStream.Out automatically FanIns to a single output channel if needed
}
func createIntArr(num int) []int {
var arr []int
for i := 0; i < num; i++ {
arr = append(arr, i)
}
return arr
}
Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.
The package includes various functions to create, manipulate, and control the flow of data through channels, allowing for flexible and efficient data processing.
The main components of this package are: - Pipeline: A struct that defines a generic connection of data streams. - DataStream (in subpackage "datastreams"): Provides the methods to build concurrency stages.
Pipelines work by connecting a "Source" (an upstream data producer) with an optional chain of transformations or filters before optionally "Sinking" (sending the output to a consumer). Under the hood, all data flows through Go channels with concurrency managed by goroutines. Each transformation or filter is effectively run in parallel, communicating via channels.
For more in-depth usage, see the examples below and the doc.go file.
Example (Duplicate) ¶
package main
import (
"context"
"log/slog"
"github.com/elastiflow/pipelines"
"github.com/elastiflow/pipelines/datastreams"
"github.com/elastiflow/pipelines/datastreams/sources"
)
// createIntArr creates a simple slice of int values from 0..num-1.
func createIntArr(num int) []int {
var arr []int
for i := 0; i < num; i++ {
arr = append(arr, i)
}
return arr
}
// duplicateProcess demonstrates a pipeline stage that broadcasts each value to two streams
// and then fans them back in (duplicates).
func duplicateProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
// Broadcasting by 2 then fanning in merges them back, effectively duplicating each item.
return p.Broadcast(
datastreams.Params{Num: 2},
).FanIn()
}
func main() {
errChan := make(chan error)
pl := pipelines.New[int, int]( // Create a new Pipeline
context.Background(),
sources.FromArray(createIntArr(10)), // Create a new Source from slice
errChan,
).Start(duplicateProcess)
defer func() {
close(errChan)
pl.Close()
}()
// Handle pipeline errors in a separate goroutine
go func(errReceiver <-chan error) {
for err := range errReceiver {
if err != nil {
slog.Error("demo error: " + err.Error())
return
}
}
}(errChan)
// Read pipeline output
for out := range pl.Out() {
slog.Info("received simple pipeline output", slog.Int("out", out))
}
// Output (example):
// received simple pipeline output out=0
// received simple pipeline output out=0
// received simple pipeline output out=1
// received simple pipeline output out=1
// ...
// received simple pipeline output out=9
// received simple pipeline output out=9
}
Output:
Example (Filter) ¶
package main
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/elastiflow/pipelines"
"github.com/elastiflow/pipelines/datastreams"
"github.com/elastiflow/pipelines/datastreams/sources"
)
// filter returns true if the input int is even, false otherwise.
func filter(p int) (bool, error) {
return p%2 == 0, nil
}
// mapFunc transforms an even integer into a descriptive string.
func mapFunc(p int) (string, error) {
return fmt.Sprintf("I'm an even number: %d", p), nil
}
func main() {
errChan := make(chan error, 10)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer func() {
close(errChan)
cancel()
}()
// Build a pipeline that fans out, then filters even numbers, then maps to string
pl := pipelines.New[int, string](
ctx,
sources.FromArray(createIntArr(10)),
errChan,
).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
return datastreams.Map(
p.FanOut(
datastreams.Params{Num: 3},
).Filter(
filter,
),
mapFunc,
)
})
for val := range pl.Out() {
slog.Info("received simple pipeline output", slog.String("out", val))
}
// Output (example):
// {"out":"I'm an even number: 0"}
// {"out":"I'm an even number: 2"}
// {"out":"I'm an even number: 4"}
// {"out":"I'm an even number: 6"}
// {"out":"I'm an even number: 8"}
}
Output:
Example (SourceSink) ¶
Example_sourceSink constructs and starts the Pipeline
package main
import (
"context"
"fmt"
"log/slog"
"sync"
"github.com/elastiflow/pipelines"
"github.com/elastiflow/pipelines/datastreams"
"github.com/elastiflow/pipelines/datastreams/sinks"
"github.com/elastiflow/pipelines/datastreams/sources"
)
// PipelineWrapper is an example struct embedding a pipeline with shared counters.
//
// The pipeline will read from a channel source, process data, and sink into
// another channel. The "evenCounter" and "oddCounter" track how many evens
// or odds we've encountered.
type PipelineWrapper struct {
mu sync.Mutex
errChan chan error
evenCounter int
oddCounter int
pipeline *pipelines.Pipeline[int, int]
}
// NewPipelineWrapper initializes a PipelineWrapper.
func NewPipelineWrapper() *PipelineWrapper {
errChan := make(chan error, 10)
return &PipelineWrapper{
errChan: errChan,
evenCounter: 0,
oddCounter: 0,
}
}
// squareOdds increments counters, squares odd numbers, and returns an error on even numbers.
func (pl *PipelineWrapper) squareOdds(v int) (int, error) {
if v%2 == 0 {
pl.mu.Lock()
pl.evenCounter++
pl.mu.Unlock()
return v, fmt.Errorf("even number error: %v", v)
}
pl.mu.Lock()
pl.oddCounter++
pl.mu.Unlock()
return v * v, nil
}
// exampleProcess shows the entire pipeline flow within a single method,
// including how to produce data (source) and consume it (sink).
func (pl *PipelineWrapper) exampleProcess(ctx context.Context) {
// 1) Create channels for input and output.
inChan := make(chan int, 10)
outChan := make(chan int, 10)
// 2) Build the pipeline: inChan -> (FanOut + squareOdds) -> sink -> outChan
pl.pipeline = pipelines.New[int, int](
ctx,
sources.FromChannel(inChan), // source
pl.errChan,
).Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] {
return ds.OrDone().FanOut(
datastreams.Params{Num: 2},
).Run(
pl.squareOdds,
).Sink( // Sink
sinks.ToChannel(outChan),
)
})
var wg sync.WaitGroup
wg.Add(10)
// 3) Feed data into source inChan in a separate goroutine.
go func() {
for _, val := range createIntArr(10) {
inChan <- val
}
close(inChan)
}()
// 4) Read results from sink outChan
go func() {
for out := range outChan {
slog.Info("received simple pipeline output", slog.Int("out", out))
wg.Done()
}
}()
wg.Wait()
pl.pipeline.Close()
}
// Run creates a background error handler, then calls exampleProcess.
func (pl *PipelineWrapper) Run() {
defer close(pl.errChan)
// Handle pipeline errors
go func(errReceiver <-chan error) {
for err := range errReceiver {
if err != nil {
slog.Error("pipeline error: " + err.Error())
}
}
}(pl.errChan)
// Run the pipeline flow
pl.exampleProcess(context.Background())
// Inspect counters after pipeline completes
slog.Info("pipeline counters",
slog.Int("evenCounter", pl.evenCounter),
slog.Int("oddCounter", pl.oddCounter),
)
}
// Example_sourceSink constructs and starts the Pipeline
func main() {
pl := NewPipelineWrapper()
pl.Run()
}
Output:
Example (Transformations) ¶
package main
import (
"context"
"fmt"
"log/slog"
"github.com/elastiflow/pipelines"
"github.com/elastiflow/pipelines/datastreams"
"github.com/elastiflow/pipelines/datastreams/sources"
)
// squareOdds is a basic function to demonstrate transformations in the pipeline.
func squareOdds(v int) (int, error) {
return v * v, nil
}
// mapTransformFunc formats the squared integer as a string.
func mapTransformFunc(p int) (string, error) {
return fmt.Sprintf("I'm a squared number: %d", p), nil
}
func main() {
inChan := make(chan int) // Setup channels
errChan := make(chan error, 10)
defer func() {
close(inChan)
close(errChan)
}()
// Create a new Pipeline of int->string
pl := pipelines.New[int, string](
context.Background(),
sources.FromArray(createIntArr(10)),
errChan,
).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
// OrDone -> FanOut(2) -> Run (squareOdds) -> Map to string
return datastreams.Map(
p.OrDone().FanOut(
datastreams.Params{Num: 2},
).Run(
squareOdds,
),
mapTransformFunc,
)
})
// Handle errors
go func(errReceiver <-chan error) {
defer pl.Close()
for err := range errReceiver {
if err != nil {
slog.Error("demo error: " + err.Error())
// return // if you wanted to close the pipeline during error handling.
}
}
}(pl.Errors())
// Read pipeline output
for out := range pl.Out() {
slog.Info("received simple pipeline output", slog.String("out", out))
}
// Output (example):
// {"out":"I'm a squared number: 0"}
// {"out":"I'm a squared number: 1"}
// {"out":"I'm a squared number: 4"}
// {"out":"I'm a squared number: 9"}
// ...
// {"out":"I'm a squared number: 81"}
}
Output:
Index ¶
- type Opts
- type Pipeline
- func (p *Pipeline[T, U]) Close()
- func (p *Pipeline[T, U]) Errors() <-chan error
- func (p *Pipeline[T, U]) Expand(expander datastreams.ExpandFunc[T, U], params ...datastreams.Params) datastreams.DataStream[U]
- func (p *Pipeline[T, U]) In() <-chan T
- func (p *Pipeline[T, U]) Map(mapper datastreams.TransformFunc[T, U], params ...datastreams.Params) datastreams.DataStream[U]
- func (p *Pipeline[T, U]) Out() <-chan U
- func (p *Pipeline[T, U]) Process(processor datastreams.ProcessFunc[T], params ...datastreams.Params) *Pipeline[T, U]
- func (p *Pipeline[T, U]) Sink(sinker datastreams.Sinker[U]) error
- func (p *Pipeline[T, U]) Start(streamFunc StreamFunc[T, U]) *Pipeline[T, U]
- func (p *Pipeline[T, U]) Stream(streamFunc StreamFunc[T, U]) datastreams.DataStream[U]
- func (p *Pipeline[T, U]) Tee(params ...datastreams.Params) (datastreams.DataStream[U], datastreams.DataStream[U])
- func (p *Pipeline[T, U]) ToSource() datastreams.Sourcer[U]
- func (p *Pipeline[T, U]) Wait()
- type StreamFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Opts ¶
type Opts struct {
BufferSize int
}
Opts defines optional configuration parameters for certain pipeline operations. Currently unused in this example, but reserved for future expansions.
type Pipeline ¶
Pipeline is a struct that defines a generic stream process.
Pipeline[T, U] represents a flow of data of type T at the input, ultimately producing data of type U at the output. Under the hood, a Pipeline orchestrates DataStream stages connected by channels.
Usage typically begins by calling New(...) to create a pipeline with a Source, and then applying transformations via Map, Process, or Start/Stream with a StreamFunc. Finally, you can read from the Out() channel, or Sink the output via a Sinker.
func New ¶
func New[T any, U any]( ctx context.Context, sourcer datastreams.Sourcer[T], errStream chan error, ) *Pipeline[T, U]
New constructs a new Pipeline of a given type by passing in a datastreams.Sourcer.
- ctx: a context.Context used to cancel or manage the lifetime of the pipeline
- sourcer: a datastreams.Sourcer[T] that provides the initial data stream
- errStream: an error channel to which the pipeline can send errors
Example usage:
p := pipelines.New[int, int](context.Background(), someSource, errChan)
Example ¶
ExampleNew demonstrates a minimal pipeline creation and usage.
errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)
// Create a new Pipeline with int -> int
pl := New[int, int](
context.Background(),
sources.FromChannel(inChan),
errChan,
)
// Provide data on inChan
go func() {
for i := 1; i <= 5; i++ {
inChan <- i
}
close(inChan)
}()
// Process the data: multiply by 2
pl.Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] {
return ds.Run(func(v int) (int, error) {
return v * 2, nil
})
})
// Read pipeline output until closed
for val := range pl.Out() {
fmt.Println("Result:", val)
}
Output: Result: 2 Result: 4 Result: 6 Result: 8 Result: 10
func (*Pipeline[T, U]) Close ¶
func (p *Pipeline[T, U]) Close()
Close gracefully closes a Pipeline by canceling its internal context. This signals all data streams to terminate reading or writing to channels.
func (*Pipeline[T, U]) Errors ¶
Errors returns the error channel of the Pipeline.
This channel receives errors from any stage (e.g. transformations, filters, sinks). The caller should consume from it to handle errors appropriately.
func (*Pipeline[T, U]) Expand ¶ added in v0.0.7
func (p *Pipeline[T, U]) Expand( expander datastreams.ExpandFunc[T, U], params ...datastreams.Params, ) datastreams.DataStream[U]
Expand creates a new DataStream by applying an expander function (ExpandFunc) to each message in the pipeline's source. This is a convenience method that directly calls datastreams.Expand on the pipeline's source.
- mapper: an Expand[T, U] that takes an input T and returns an array of output U
- params: optional datastreams.Params to configure buffer sizes, skipping errors, etc.
func (*Pipeline[T, U]) In ¶
func (p *Pipeline[T, U]) In() <-chan T
In returns the input channel of the source datastreams.DataStream of a Pipeline.
This channel can be read from externally if needed, though typically one supplies a Sourcer when constructing a Pipeline. Use with care if manually sending data into the pipeline.
func (*Pipeline[T, U]) Map ¶
func (p *Pipeline[T, U]) Map( mapper datastreams.TransformFunc[T, U], params ...datastreams.Params, ) datastreams.DataStream[U]
Map creates a new DataStream by applying a mapper function (TransformFunc) to each message in the pipeline's source. This is a convenience method that directly calls datastreams.Map on the pipeline's source.
- mapper: a TransformFunc[T, U] that takes an input T and returns output U
- params: optional datastreams.Params to configure buffer sizes, skipping errors, etc.
Example ¶
ExamplePipeline_Map demonstrates how to create and use a simple Pipeline that maps one type (int) to another (string).
errChan := make(chan error)
defer close(errChan)
pl := New[int, string]( // Create a new Pipeline
context.Background(),
sources.FromArray([]int{1, 2, 3, 4, 5}),
errChan,
).Map(
func(p int) (string, error) {
return fmt.Sprintf("Im a string now: %d", p), nil
},
)
for out := range pl.Out() { // Read Pipeline output
fmt.Println("out:", out)
}
Output: out: Im a string now: 1 out: Im a string now: 2 out: Im a string now: 3 out: Im a string now: 4 out: Im a string now: 5
func (*Pipeline[T, U]) Out ¶
func (p *Pipeline[T, U]) Out() <-chan U
Out returns the output channel (sink) of the pipeline.
Reading from this channel lets you consume the final processed data of type U.
func (*Pipeline[T, U]) Process ¶
func (p *Pipeline[T, U]) Process( processor datastreams.ProcessFunc[T], params ...datastreams.Params, ) *Pipeline[T, U]
Process creates a new pipeline by applying a ProcessFunc to each message in the current pipeline's source. It internally starts a new pipeline with the processed DataStream. This is a convenience for quickly chaining transformations.
- processor: a ProcessFunc[T] that may transform T in place
- params: optional datastreams.Params
func (*Pipeline[T, U]) Sink ¶
func (p *Pipeline[T, U]) Sink(sinker datastreams.Sinker[U]) error
Sink consumes the pipeline's sink DataStream using a specified datastreams.Sinker.
- sinker: a Sinker that will receive data of type U
- returns an error if the sink fails; otherwise nil.
Typically used to push output to a custom location or channel.
Example ¶
ExamplePipeline_Sink demonstrates how to create and use a simple Pipeline that sinks the output to a sinks.ToChannel.
errChan := make(chan error)
outChan := make(chan string)
var wg sync.WaitGroup
wg.Add(5)
go func() {
for out := range outChan { // Read Pipeline output
fmt.Println("out:", out)
wg.Done()
}
}()
if err := New[int, string]( // Create a new Pipeline
context.Background(),
sources.FromArray([]int{1, 2, 3, 4, 5}),
errChan,
).Start(
func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
return datastreams.Map(
p,
func(p int) (string, error) { return fmt.Sprintf("Im a string now: %d", p), nil },
)
},
).Sink(
sinks.ToChannel(outChan),
); err != nil {
fmt.Println("error sinking:", err)
}
wg.Wait()
Output: out: Im a string now: 1 out: Im a string now: 2 out: Im a string now: 3 out: Im a string now: 4 out: Im a string now: 5
func (*Pipeline[T, U]) Start ¶
func (p *Pipeline[T, U]) Start(streamFunc StreamFunc[T, U]) *Pipeline[T, U]
Start applies the given StreamFunc to the pipeline's source and returns the Pipeline itself. This is useful for chaining multiple calls on the pipeline while still returning the Pipeline.
Example:
pipeline.Start(func(ds DataStream[int]) DataStream[int] { ... }).Start(...)
Example ¶
ExamplePipeline_Start demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output Pipeline.
errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)
// Create and open the pipeline
pl := New[int, int](
context.Background(),
sources.FromChannel(inChan, sources.Params{BufferSize: 5}),
errChan,
).Start(
func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
return p.Run(
func(v int) (int, error) {
return v * 2, nil
},
datastreams.Params{BufferSize: 5},
)
},
)
// Send values to the pipeline's source channel
go func() {
for _, val := range []int{1, 2, 3, 4, 5} {
inChan <- val
}
close(inChan)
}()
for out := range pl.Out() {
fmt.Println("out:", out)
}
Output: out: 2 out: 4 out: 6 out: 8 out: 10
func (*Pipeline[T, U]) Stream ¶
func (p *Pipeline[T, U]) Stream(streamFunc StreamFunc[T, U]) datastreams.DataStream[U]
Stream applies a StreamFunc to the pipeline's source, storing the resulting DataStream as the pipeline's sink (final stage). It then returns that sink DataStream for further chaining.
Typically used for quickly connecting a pipeline to a processing function.
Example:
pipeline.Stream(func(ds DataStream[int]) DataStream[int] { ... })
Example ¶
ExamplePipeline_Stream demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output DataStream.
errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)
// Create and open the pipeline
ds := New[int, int](
context.Background(),
sources.FromChannel(inChan, sources.Params{BufferSize: 5}),
errChan,
).Stream(
func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
return p.Run(
func(v int) (int, error) {
return v * 2, nil
},
datastreams.Params{BufferSize: 5},
)
},
)
// Send values to the pipeline's source channel
go func() {
for _, val := range []int{1, 2, 3, 4, 5} {
inChan <- val
}
close(inChan)
}()
for out := range ds.Out() {
fmt.Println("out:", out)
}
Output: out: 2 out: 4 out: 6 out: 8 out: 10
func (*Pipeline[T, U]) Tee ¶
func (p *Pipeline[T, U]) Tee(params ...datastreams.Params) (datastreams.DataStream[U], datastreams.DataStream[U])
Tee creates a fork in the pipeline's sink DataStream, returning two DataStreams that each receive the same data from the sink.
This is useful for sending the same processed output to multiple consumers.
- params: optional datastreams.Params for buffer sizing, etc.
- returns two DataStreams of type U, each receiving the same data from the pipeline sink.
Example ¶
ExamplePipeline_Tee demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then tees the output into two distinct streams.
errChan := make(chan error)
defer close(errChan)
// Create and open the pipeline
out1, out2 := New[int, int](
context.Background(),
sources.FromArray([]int{1, 2, 3, 4, 5}),
errChan,
).Start(
func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
return p.Run(func(v int) (int, error) {
return v * 2, nil
})
},
).Tee(datastreams.Params{BufferSize: 5})
// Collect and print the results from both outputs
for out := range out1.Out() {
fmt.Println("out1:", out)
}
for out := range out2.Out() {
fmt.Println("out2:", out)
}
Output: out1: 2 out1: 4 out1: 6 out1: 8 out1: 10 out2: 2 out2: 4 out2: 6 out2: 8 out2: 10
func (*Pipeline[T, U]) ToSource ¶
func (p *Pipeline[T, U]) ToSource() datastreams.Sourcer[U]
ToSource converts the pipeline's sink into a datastreams.Sourcer[U], allowing it to be used as a source in another pipeline.
Example ¶
ExamplePipeline_ToSource demonstrates how to turn a Pipeline into a source to be used in another Pipeline.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
errChan := make(chan error, 1)
defer close(errChan)
source := New[int, string](
ctx,
sources.FromArray([]int{1, 2, 3}),
errChan,
).Start(
func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
return datastreams.Map(
p,
func(i int) (string, error) { return string(rune('A' + i)), nil },
).Run(
func(s string) (string, error) {
return "processed " + s, nil
},
)
},
).ToSource()
for v := range source.Source(ctx, errChan).Out() {
fmt.Println(v)
}
Output: processed B processed C processed D
type StreamFunc ¶
type StreamFunc[T any, U any] func(stream datastreams.DataStream[T]) datastreams.DataStream[U]
StreamFunc is a function that takes a datastreams.DataStream and returns a datastreams.DataStream.
A StreamFunc represents a logical segment of a pipeline, where data can be consumed, transformed, filtered, or otherwise processed. The output of a StreamFunc is another DataStream that can be further chained.