Documentation
¶
Overview ¶
Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.
The library uses channels under the hood to pass data between pipeline stages. Each stage runs in its own goroutine, ensuring concurrency and separation of concerns.
Below is an example of an application utilizing pipelines for squaring an odd int and managing shared state counters:
package yourpipeline import ( "context" "fmt" "log/slog" "sync" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sources" ) // PipelineWrapper is an example of a pipelines.Pipeline wrapper implementation. It includes shared state via counters. type PipelineWrapper struct { mu sync.Mutex errChan chan error evenCounter int oddCounter int } // NewPipelineWrapper creates a new PipelineWrapper with counters set to 0 func NewPipelineWrapper() *PipelineWrapper { // Setup channels and return PipelineWrapper errChan := make(chan error, 10) return &PipelineWrapper{ errChan: errChan, evenCounter: 0, oddCounter: 0, } } // Run the PipelineWrapper func (pl *PipelineWrapper) Run() { defer close(pl.errChan) pipeline := pipelines.New[int, int]( // Create a new Pipeline context.Background(), sources.FromArray(createIntArr(10)), // Create a source to start the pipeline pl.errChan, ).Start(pl.exampleProcess) go func(errReceiver <-chan error) { // Handle Pipeline errors defer pipeline.Close() for err := range errReceiver { if err != nil { slog.Error("demo error: " + err.Error()) // return // if you wanted to close the pipeline during error handling. } } }(pl.errChan) for out := range pipeline.Out() { // Read Pipeline output slog.Info("received simple pipeline output", slog.Int("out", out)) } } func (pl *PipelineWrapper) squareOdds(v int) (int, error) { if v%2 == 0 { pl.mu.Lock() pl.evenCounter++ pl.mu.Unlock() return v, fmt.Errorf("even number error: %v", v) } pl.mu.Lock() pl.oddCounter++ pl.mu.Unlock() return v * v, nil } func (pl *PipelineWrapper) exampleProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] { // datastreams.DataStream.OrDone will stop the pipeline if the input channel is closed return p.OrDone().FanOut( datastreams.Params{Num: 2}, // datastreams.DataStream.FanOut will run subsequent ds.Pipe stages in parallel ).Run( pl.squareOdds, // datastreams.DataStream.Run will execute the ds.Pipe process: "squareOdds" ) // datastreams.DataStream.Out automatically FanIns to a single output channel if needed } func createIntArr(num int) []int { var arr []int for i := 0; i < num; i++ { arr = append(arr, i) } return arr }
Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.
The package includes various functions to create, manipulate, and control the flow of data through channels, allowing for flexible and efficient data processing.
The main components of this package are: - Pipeline: A struct that defines a generic connection of data streams. - DataStream (in subpackage "datastreams"): Provides the methods to build concurrency stages.
Pipelines work by connecting a "Source" (an upstream data producer) with an optional chain of transformations or filters before optionally "Sinking" (sending the output to a consumer). Under the hood, all data flows through Go channels with concurrency managed by goroutines. Each transformation or filter is effectively run in parallel, communicating via channels.
For more in-depth usage, see the examples below and the doc.go file.
Example (Duplicate) ¶
package main import ( "context" "log/slog" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sources" ) // createIntArr creates a simple slice of int values from 0..num-1. func createIntArr(num int) []int { var arr []int for i := 0; i < num; i++ { arr = append(arr, i) } return arr } // duplicateProcess demonstrates a pipeline stage that broadcasts each value to two streams // and then fans them back in (duplicates). func duplicateProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] { // Broadcasting by 2 then fanning in merges them back, effectively duplicating each item. return p.Broadcast( datastreams.Params{Num: 2}, ).FanIn() } func main() { errChan := make(chan error) pl := pipelines.New[int, int]( // Create a new Pipeline context.Background(), sources.FromArray(createIntArr(10)), // Create a new Source from slice errChan, ).Start(duplicateProcess) defer func() { close(errChan) pl.Close() }() // Handle pipeline errors in a separate goroutine go func(errReceiver <-chan error) { for err := range errReceiver { if err != nil { slog.Error("demo error: " + err.Error()) return } } }(errChan) // Read pipeline output for out := range pl.Out() { slog.Info("received simple pipeline output", slog.Int("out", out)) } // Output (example): // received simple pipeline output out=0 // received simple pipeline output out=0 // received simple pipeline output out=1 // received simple pipeline output out=1 // ... // received simple pipeline output out=9 // received simple pipeline output out=9 }
Example (Filter) ¶
package main import ( "context" "fmt" "log/slog" "time" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sources" ) // filter returns true if the input int is even, false otherwise. func filter(p int) (bool, error) { return p%2 == 0, nil } // mapFunc transforms an even integer into a descriptive string. func mapFunc(p int) (string, error) { return fmt.Sprintf("I'm an even number: %d", p), nil } func main() { errChan := make(chan error, 10) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer func() { close(errChan) cancel() }() // Build a pipeline that fans out, then filters even numbers, then maps to string pl := pipelines.New[int, string]( ctx, sources.FromArray(createIntArr(10)), errChan, ).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] { return datastreams.Map( p.FanOut( datastreams.Params{Num: 3}, ).Filter( filter, ), mapFunc, ) }) for val := range pl.Out() { slog.Info("received simple pipeline output", slog.String("out", val)) } // Output (example): // {"out":"I'm an even number: 0"} // {"out":"I'm an even number: 2"} // {"out":"I'm an even number: 4"} // {"out":"I'm an even number: 6"} // {"out":"I'm an even number: 8"} }
Example (Sliding) ¶
package main import ( "context" "log/slog" "time" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sources" "github.com/elastiflow/pipelines/datastreams/windower" ) func SlingWindowFunc(readings []*SensorReading) (*SensorInference, error) { if len(readings) == 0 { return nil, nil } var totalTemp, totalHumidity float64 for _, reading := range readings { totalTemp += reading.Temp totalHumidity += reading.Humidity } avgTemp := totalTemp / float64(len(readings)) avgHumidity := totalHumidity / float64(len(readings)) return &SensorInference{ DeviceID: readings[0].DeviceID, AvgTemp: avgTemp, AvgHumidity: avgHumidity, }, nil } func main() { errChan := make(chan error, 10) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() partitionFactory := windower.NewSliding[*SensorReading, string](150*time.Millisecond, 50*time.Millisecond) var sensorReadings = []*SensorReading{ {DeviceID: "device-1", Temp: 22.5, Humidity: 45.0, Timestamp: time.Now().Add(-6 * time.Second)}, {DeviceID: "device-1", Temp: 22.7, Humidity: 46.0, Timestamp: time.Now().Add(-5 * time.Second)}, {DeviceID: "device-1", Temp: 22.6, Humidity: 45.5, Timestamp: time.Now().Add(-4 * time.Second)}, {DeviceID: "device-2", Temp: 19.2, Humidity: 55.0, Timestamp: time.Now().Add(-3 * time.Second)}, {DeviceID: "device-2", Temp: 19.5, Humidity: 54.8, Timestamp: time.Now().Add(-2 * time.Second)}, {DeviceID: "device-1", Temp: 22.9, Humidity: 44.9, Timestamp: time.Now().Add(-1 * time.Second)}, {DeviceID: "device-2", Temp: 19.7, Humidity: 54.5, Timestamp: time.Now()}, } // Create a source with 10 integers pl := pipelines.New[*SensorReading, *SensorInference]( ctx, sources.FromArray[*SensorReading](sensorReadings, sources.Params{Throttle: 50 * time.Millisecond}), errChan, ).Start(func(p datastreams.DataStream[*SensorReading]) datastreams.DataStream[*SensorInference] { keyFunc := func(i *SensorReading) string { return i.DeviceID // Key by device ID } return datastreams.Window[*SensorReading, string, *SensorInference]( datastreams.KeyBy[*SensorReading, string](p, keyFunc), SlingWindowFunc, partitionFactory, datastreams.Params{ BufferSize: 50, }, ).OrDone() }) // Handle errors go func() { defer pl.Close() for err := range pl.Errors() { select { case <-ctx.Done(): return default: if err == nil { continue } slog.Error("pipeline error: " + err.Error()) } } }() // Read from pipeline output for v := range pl.Out() { select { case <-ctx.Done(): return default: slog.Info("sliding window output", slog.String("device", v.DeviceID), slog.Float64("avg_temp", v.AvgTemp), slog.Float64("avg_humidity", v.AvgHumidity)) } } // Output (example): // sliding window output device=device-1 avg_temp=22.5 avg_humidity=45 // sliding window output device=device-1 avg_temp=22.6 avg_humidity=45.5 // sliding window output device=device-1 avg_temp=22.65 avg_humidity=45.75 // sliding window output device=device-1 avg_temp=22.65 avg_humidity=45.75 // sliding window output device=device-2 avg_temp=19.2 avg_humidity=55 // sliding window output device=device-1 avg_temp=22.6 avg_humidity=45.5 // sliding window output device=device-2 avg_temp=19.35 avg_humidity=54.9 // sliding window output device=device-1 avg_temp=22.9 avg_humidity=44.9 // sliding window output device=device-2 avg_temp=19.5 avg_humidity=54.8 // sliding window output device=device-1 avg_temp=22.9 avg_humidity=44.9 // sliding window output device=device-2 avg_temp=19.6 avg_humidity=54.65 // sliding window output device=device-1 avg_temp=22.9 avg_humidity=44.9 // sliding window output device=device-2 avg_temp=19.7 avg_humidity=54.5 // sliding window output device=device-2 avg_temp=19.7 avg_humidity=54.5 }
Example (SourceSink) ¶
Example_sourceSink constructs and starts the Pipeline
package main import ( "context" "fmt" "log/slog" "sync" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sinks" "github.com/elastiflow/pipelines/datastreams/sources" ) // PipelineWrapper is an example struct embedding a pipeline with shared counters. // // The pipeline will read from a channel source, process data, and sink into // another channel. The "evenCounter" and "oddCounter" track how many evens // or odds we've encountered. type PipelineWrapper struct { mu sync.Mutex errChan chan error evenCounter int oddCounter int pipeline *pipelines.Pipeline[int, int] } // NewPipelineWrapper initializes a PipelineWrapper. func NewPipelineWrapper() *PipelineWrapper { errChan := make(chan error, 10) return &PipelineWrapper{ errChan: errChan, evenCounter: 0, oddCounter: 0, } } // squareOdds increments counters, squares odd numbers, and returns an error on even numbers. func (pl *PipelineWrapper) squareOdds(v int) (int, error) { if v%2 == 0 { pl.mu.Lock() pl.evenCounter++ pl.mu.Unlock() return v, fmt.Errorf("even number error: %v", v) } pl.mu.Lock() pl.oddCounter++ pl.mu.Unlock() return v * v, nil } // exampleProcess shows the entire pipeline flow within a single method, // including how to produce data (source) and consume it (sink). func (pl *PipelineWrapper) exampleProcess(ctx context.Context) { // 1) Create channels for input and output. inChan := make(chan int, 10) outChan := make(chan int, 10) // 2) Build the pipeline: inChan -> (FanOut + squareOdds) -> sink -> outChan pl.pipeline = pipelines.New[int, int]( ctx, sources.FromChannel(inChan), // source pl.errChan, ).Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] { return ds.OrDone().FanOut( datastreams.Params{Num: 2}, ).Run( pl.squareOdds, ).Sink( // Sink sinks.ToChannel(outChan), ) }) var wg sync.WaitGroup wg.Add(10) // 3) Feed data into source inChan in a separate goroutine. go func() { for _, val := range createIntArr(10) { inChan <- val } close(inChan) }() // 4) Read results from sink outChan go func() { for out := range outChan { slog.Info("received simple pipeline output", slog.Int("out", out)) wg.Done() } }() wg.Wait() pl.pipeline.Close() } // Run creates a background error handler, then calls exampleProcess. func (pl *PipelineWrapper) Run() { defer close(pl.errChan) // Handle pipeline errors go func(errReceiver <-chan error) { for err := range errReceiver { if err != nil { slog.Error("pipeline error: " + err.Error()) } } }(pl.errChan) // Run the pipeline flow pl.exampleProcess(context.Background()) // Inspect counters after pipeline completes slog.Info("pipeline counters", slog.Int("evenCounter", pl.evenCounter), slog.Int("oddCounter", pl.oddCounter), ) } // Example_sourceSink constructs and starts the Pipeline func main() { pl := NewPipelineWrapper() pl.Run() }
Example (Transformations) ¶
package main import ( "context" "fmt" "log/slog" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sources" ) // squareOdds is a basic function to demonstrate transformations in the pipeline. func squareOdds(v int) (int, error) { return v * v, nil } // mapTransformFunc formats the squared integer as a string. func mapTransformFunc(p int) (string, error) { return fmt.Sprintf("I'm a squared number: %d", p), nil } func main() { inChan := make(chan int) // Setup channels errChan := make(chan error, 10) defer func() { close(inChan) close(errChan) }() // Create a new Pipeline of int->string pl := pipelines.New[int, string]( context.Background(), sources.FromArray(createIntArr(10)), errChan, ).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] { // OrDone -> FanOut(2) -> Run (squareOdds) -> Map to string return datastreams.Map( p.OrDone().FanOut( datastreams.Params{Num: 2}, ).Run( squareOdds, ), mapTransformFunc, ) }) // Handle errors go func(errReceiver <-chan error) { defer pl.Close() for err := range errReceiver { if err != nil { slog.Error("demo error: " + err.Error()) // return // if you wanted to close the pipeline during error handling. } } }(pl.Errors()) // Read pipeline output for out := range pl.Out() { slog.Info("received simple pipeline output", slog.String("out", out)) } // Output (example): // {"out":"I'm a squared number: 0"} // {"out":"I'm a squared number: 1"} // {"out":"I'm a squared number: 4"} // {"out":"I'm a squared number: 9"} // ... // {"out":"I'm a squared number: 81"} }
Example (Window) ¶
package main import ( "context" "log/slog" "time" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sources" "github.com/elastiflow/pipelines/datastreams/windower" ) type SensorReading struct { DeviceID string Temp float64 Humidity float64 Timestamp time.Time } type SensorInference struct { DeviceID string AvgTemp float64 AvgHumidity float64 } func TumblingWindowFunc(readings []*SensorReading) (*SensorInference, error) { if len(readings) == 0 { return nil, nil } var totalTemp, totalHumidity float64 for _, reading := range readings { totalTemp += reading.Temp totalHumidity += reading.Humidity } avgTemp := totalTemp / float64(len(readings)) avgHumidity := totalHumidity / float64(len(readings)) return &SensorInference{ DeviceID: readings[0].DeviceID, AvgTemp: avgTemp, AvgHumidity: avgHumidity, }, nil } func main() { errChan := make(chan error, 10) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var sensorReadings = []*SensorReading{ {DeviceID: "device-1", Temp: 22.5, Humidity: 45.0, Timestamp: time.Now().Add(-6 * time.Second)}, {DeviceID: "device-1", Temp: 22.7, Humidity: 46.0, Timestamp: time.Now().Add(-5 * time.Second)}, {DeviceID: "device-1", Temp: 22.6, Humidity: 45.5, Timestamp: time.Now().Add(-4 * time.Second)}, {DeviceID: "device-2", Temp: 19.2, Humidity: 55.0, Timestamp: time.Now().Add(-3 * time.Second)}, {DeviceID: "device-2", Temp: 19.5, Humidity: 54.8, Timestamp: time.Now().Add(-2 * time.Second)}, {DeviceID: "device-1", Temp: 22.9, Humidity: 44.9, Timestamp: time.Now().Add(-1 * time.Second)}, {DeviceID: "device-2", Temp: 19.7, Humidity: 54.5, Timestamp: time.Now()}, } partitionFactory := windower.NewTumbling[*SensorReading, string](100 * time.Millisecond) // Create a source with 10 integers pl := pipelines.New[*SensorReading, *SensorInference]( ctx, sources.FromArray[*SensorReading](sensorReadings, sources.Params{Throttle: 50 * time.Millisecond}), errChan, ).Start(func(p datastreams.DataStream[*SensorReading]) datastreams.DataStream[*SensorInference] { keyFunc := func(i *SensorReading) string { return i.DeviceID // Key by device ID } return datastreams.Window[*SensorReading, string, *SensorInference]( datastreams.KeyBy[*SensorReading, string](p, keyFunc), TumblingWindowFunc, partitionFactory, datastreams.Params{ BufferSize: 50, }, ).OrDone() }) // Handle errors go func() { defer pl.Close() for err := range pl.Errors() { select { case <-ctx.Done(): return default: if err == nil { continue } slog.Error("pipeline error: " + err.Error()) } } }() // Read from pipeline output for v := range pl.Out() { select { case <-ctx.Done(): return default: slog.Info("tumbling window output", slog.String("device", v.DeviceID), slog.Float64("avg_temp", v.AvgTemp), slog.Float64("avg_humidity", v.AvgHumidity)) } } // Output (example): // tumbling window output device=device-1 avg_temp=22.600000000000005 avg_humidity=45.5 // tumbling window output device=device-2 avg_temp=19.35 avg_humidity=54.9 // tumbling window output device=device-1 avg_temp=22.9 avg_humidity=44.9 // tumbling window output device=device-2 avg_temp=19.7 avg_humidity=54.5 }
Index ¶
- type Opts
- type Pipeline
- func (p *Pipeline[T, U]) Broadcast(num int, streamFunc StreamFunc[T, U]) Pipelines[T, U]
- func (p *Pipeline[T, U]) Close()
- func (p *Pipeline[T, U]) Copy(num int) Pipelines[T, U]
- func (p *Pipeline[T, U]) Errors() <-chan error
- func (p *Pipeline[T, U]) Expand(expander datastreams.ExpandFunc[T, U], params ...datastreams.Params) datastreams.DataStream[U]
- func (p *Pipeline[T, U]) In() <-chan T
- func (p *Pipeline[T, U]) Map(mapper datastreams.TransformFunc[T, U], params ...datastreams.Params) datastreams.DataStream[U]
- func (p *Pipeline[T, U]) Out() <-chan U
- func (p *Pipeline[T, U]) Process(processor datastreams.ProcessFunc[T], params ...datastreams.Params) *Pipeline[T, U]
- func (p *Pipeline[T, U]) Sink(sinker datastreams.Sinker[U]) error
- func (p *Pipeline[T, U]) Start(streamFunc StreamFunc[T, U]) *Pipeline[T, U]
- func (p *Pipeline[T, U]) Stream(streamFunc StreamFunc[T, U]) datastreams.DataStream[U]
- func (p *Pipeline[T, U]) Tee(params ...datastreams.Params) (datastreams.DataStream[U], datastreams.DataStream[U])
- func (p *Pipeline[T, U]) ToSource() datastreams.Sourcer[U]
- func (p *Pipeline[T, U]) Wait()
- type Pipelines
- func (p Pipelines[T, U]) Close()
- func (p Pipelines[T, U]) Count() int
- func (p Pipelines[T, U]) Get(index int) (*Pipeline[T, U], error)
- func (p Pipelines[T, U]) Process(processor datastreams.ProcessFunc[T], params ...datastreams.Params) Pipelines[T, U]
- func (p Pipelines[T, U]) Start(streamFunc StreamFunc[T, U]) Pipelines[T, U]
- func (p Pipelines[T, U]) Wait()
- type StreamFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Opts ¶
type Opts struct {
BufferSize int
}
Opts defines optional configuration parameters for certain pipeline operations. Currently unused in this example, but reserved for future expansions.
type Pipeline ¶
Pipeline is a struct that defines a generic stream process.
Pipeline[T, U] represents a flow of data type T at the input, ultimately producing data of type U at the output. Under the hood, a Pipeline orchestrates DataStream stages connected by channels.
Usage typically begins by calling New(...) to create a pipeline with a Source, and then applying transformations via Map, Process, or Start/Stream with a StreamFunc. Finally, you can read from the Out() channel, or Sink the output via a Sinker.
func New ¶
func New[T any, U any]( ctx context.Context, sourcer datastreams.Sourcer[T], errStream chan error, ) *Pipeline[T, U]
New constructs a new Pipeline of a given type by passing in a datastreams.Sourcer.
- ctx: a context.Context used to cancel or manage the lifetime of the pipeline
- sourcer: a datastreams.Sourcer[T] that provides the initial data stream
- errStream: an error channel to which the pipeline can send errors
Example usage:
p := pipelines.New[int, int](context.Background(), someSource, errChan)
Example ¶
ExampleNew demonstrates a minimal pipeline creation and usage.
errChan := make(chan error, 3) defer close(errChan) inChan := make(chan int, 5) // Create a new Pipeline with int -> int pl := New[int, int]( context.Background(), sources.FromChannel(inChan), errChan, ) // Provide data on inChan go func() { for i := 1; i <= 5; i++ { inChan <- i } close(inChan) }() // Process the data: multiply by 2 pl.Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] { return ds.Run(func(v int) (int, error) { return v * 2, nil }) }) // Read pipeline output until closed for val := range pl.Out() { fmt.Println("Result:", val) }
Output: Result: 2 Result: 4 Result: 6 Result: 8 Result: 10
func (*Pipeline[T, U]) Broadcast ¶ added in v0.0.9
func (p *Pipeline[T, U]) Broadcast(num int, streamFunc StreamFunc[T, U]) Pipelines[T, U]
Broadcast creates multiple copies of the current pipeline's source DataStream, allowing the same data to be processed in parallel across multiple pipelines and allows for different sinks or processing logic to be applied to each copy.
Parameters:
- num: the number of copies to create.
- streamFunc: a StreamFunc[T, U] that will be applied to each copy of the source DataStream.
Example ¶
package main import ( "context" "fmt" "log" "sync" "github.com/elastiflow/pipelines" "github.com/elastiflow/pipelines/datastreams/sinks" "github.com/elastiflow/pipelines/datastreams" "github.com/elastiflow/pipelines/datastreams/sources" ) type listenerOutput struct { Index int Message string } func main() { log.Println("Starting Listen example...") ctx, cancel := context.WithCancel(context.Background()) defer cancel() stdoutChan := make(chan listenerOutput, 10) wg := &sync.WaitGroup{} wg.Add(3) errs := make(chan error, 10) go func() { defer wg.Done() for msg := range stdoutChan { log.Printf("[Listener %d] Received: %s", msg.Index, msg.Message) } }() // 2. Create a simple source from a slice. sourcer := sources.FromArray([]int{100, 200, 300}) // 3. Create the initial DataStream and attach the WaitGroup. // The WaitGroup will be passed to all subsequent stages. pls := pipelines.New[int, listenerOutput]( ctx, sourcer, make(chan error, 1), ).Copy(3) // 4. Start the pipeline with a listener that processes each stream. go func() { err := pls[0].Start(func(p datastreams.DataStream[int]) datastreams.DataStream[listenerOutput] { return datastreams.Map[int, listenerOutput]( p.OrDone().FanOut( datastreams.Params{Num: 2}, ).Run( func(i int) (int, error) { return i * i, nil // Square the input }, ), func(i int) (listenerOutput, error) { return listenerOutput{ Index: 0, // This is the index of the listener Message: fmt.Sprintf("Processed value: %d", i), }, nil }, ) }).Sink(sinks.ToChannel[listenerOutput](stdoutChan)) if err != nil { errs <- err } }() go func() { err := pls[1].Start(func(p datastreams.DataStream[int]) datastreams.DataStream[listenerOutput] { return datastreams.Map[int, listenerOutput]( p.OrDone().FanOut( datastreams.Params{Num: 2}, ).Run( func(i int) (int, error) { return i + 10, nil // Add 10 to the input }, ), func(i int) (listenerOutput, error) { return listenerOutput{ Index: 1, // This is the index of the listener Message: fmt.Sprintf("Processed value: %d", i), }, nil }, ) }).Sink(sinks.ToChannel[listenerOutput](stdoutChan)) if err != nil { errs <- err } }() go func() { err := pls[2].Start(func(p datastreams.DataStream[int]) datastreams.DataStream[listenerOutput] { return datastreams.Map[int, listenerOutput]( p.OrDone().FanOut( datastreams.Params{Num: 2}, ).Run( func(i int) (int, error) { return i - 5, nil // Subtract 5 from the input }, ), func(i int) (listenerOutput, error) { return listenerOutput{ Index: 2, // This is the index of the listener Message: fmt.Sprintf("Processed value: %d", i), }, nil }, ) }).Sink(sinks.ToChannel[listenerOutput](stdoutChan)) if err != nil { errs <- err } }() go func() { for e := range errs { log.Println(fmt.Errorf("error in pipeline: %w", e)) } }() wg.Wait() log.Println("✅ Listen example finished successfully.") }
func (*Pipeline[T, U]) Close ¶
func (p *Pipeline[T, U]) Close()
Close gracefully closes a Pipeline by canceling its internal context. This signals all data streams to terminate reading or writing to channels.
func (*Pipeline[T, U]) Copy ¶ added in v0.0.9
Copy creates multiple copies of the current pipeline's source DataStream, allowing the same data to be processed in parallel across multiple pipelines.
Parameters:
- num: the number of copies to create.
func (*Pipeline[T, U]) Errors ¶
Errors returns the error channel of the Pipeline.
This channel receives errors from any stage (e.g. transformations, filters, sinks). The caller should consume from it to handle errors appropriately.
func (*Pipeline[T, U]) Expand ¶ added in v0.0.7
func (p *Pipeline[T, U]) Expand( expander datastreams.ExpandFunc[T, U], params ...datastreams.Params, ) datastreams.DataStream[U]
Expand creates a new DataStream by applying an expander function (ExpandFunc) to each message in the pipeline's source. This is a convenience method that directly calls datastreams.Expand on the pipeline's source.
- mapper: an Expand[T, U] that takes an input T and returns an array of output U
- params: optional datastreams.Params to configure buffer sizes, skipping errors, etc.
func (*Pipeline[T, U]) In ¶
func (p *Pipeline[T, U]) In() <-chan T
In returns the input channel of the source datastreams.DataStream of a Pipeline.
This channel can be read from externally if needed, though typically one supplies a Sourcer when constructing a Pipeline. Use with care if manually sending data into the pipeline.
func (*Pipeline[T, U]) Map ¶
func (p *Pipeline[T, U]) Map( mapper datastreams.TransformFunc[T, U], params ...datastreams.Params, ) datastreams.DataStream[U]
Map creates a new DataStream by applying a mapper function (TransformFunc) to each message in the pipeline's source. This is a convenience method that directly calls datastreams.Map on the pipeline's source.
- mapper: a TransformFunc[T, U] that takes an input T and returns output U
- params: optional datastreams.Params to configure buffer sizes, skipping errors, etc.
Example ¶
ExamplePipeline_Map demonstrates how to create and use a simple Pipeline that maps one type (int) to another (string).
errChan := make(chan error) defer close(errChan) pl := New[int, string]( // Create a new Pipeline context.Background(), sources.FromArray([]int{1, 2, 3, 4, 5}), errChan, ).Map( func(p int) (string, error) { return fmt.Sprintf("Im a string now: %d", p), nil }, ) for out := range pl.Out() { // Read Pipeline output fmt.Println("out:", out) }
Output: out: Im a string now: 1 out: Im a string now: 2 out: Im a string now: 3 out: Im a string now: 4 out: Im a string now: 5
func (*Pipeline[T, U]) Out ¶
func (p *Pipeline[T, U]) Out() <-chan U
Out returns the output channel (sink) of the pipeline.
Reading from this channel lets you consume the final processed data of type U.
func (*Pipeline[T, U]) Process ¶
func (p *Pipeline[T, U]) Process( processor datastreams.ProcessFunc[T], params ...datastreams.Params, ) *Pipeline[T, U]
Process adds a new processing stage to the pipeline, applying a function to each item. The processing function must take and return an item of the same type, making this method ideal for in-place transformations like data enrichment or filtering.
This method returns a new Pipeline instance that incorporates the new processing stage, allowing multiple Process calls to be fluently chained together. The final output type of the pipeline (U) remains unchanged.
The provided 'processor' function is of type datastreams.ProcessFunc[T], with the signature 'func(T) (T, error)'. If the function returns a non-nil error, processing for that item halts, and the error is sent to the pipeline's error channel.
Example:
// Assume a pipeline processes User objects. type User struct { ID int Name string IsValid bool IsAudited bool } // p is an existing pipeline, e.g., p := pipelines.New[User, User](...) // We can chain multiple Process calls to create a multi-stage workflow. finalPipeline := p.Process(func(u User) (User, error) { // Stage 1: Validate the user. if u.Name != "" { u.IsValid = true } return u, nil }).Process(func(u User) (User, error) { // Stage 2: Audit the user. if u.IsValid { u.IsAudited = true fmt.Printf("Audited user %d\n", u.ID) } return u, nil }) // The finalPipeline now contains both processing stages. // You can then consume the results from finalPipeline.Out().
Parameters:
- processor: a datastreams.ProcessFunc[T] to apply to each item.
- params: optional datastreams.Params to configure the processing stage, i.e., for setting concurrency.
func (*Pipeline[T, U]) Sink ¶
func (p *Pipeline[T, U]) Sink(sinker datastreams.Sinker[U]) error
Sink consumes the pipeline's sink DataStream using a specified datastreams.Sinker. Typically used to push output to a custom location or channel.
Parameters:
- sinker: A Sinker that will receive data of type U.
Returns:
- An error if the sink fails; otherwise nil.
Example ¶
ExamplePipeline_Sink demonstrates how to create and use a simple Pipeline that sinks the output to a sinks.ToChannel.
errChan := make(chan error) outChan := make(chan string) var wg sync.WaitGroup wg.Add(5) go func() { for out := range outChan { // Read Pipeline output fmt.Println("out:", out) wg.Done() } }() if err := New[int, string]( // Create a new Pipeline context.Background(), sources.FromArray([]int{1, 2, 3, 4, 5}), errChan, ).Start( func(p datastreams.DataStream[int]) datastreams.DataStream[string] { return datastreams.Map( p, func(p int) (string, error) { return fmt.Sprintf("Im a string now: %d", p), nil }, ) }, ).Sink( sinks.ToChannel(outChan), ); err != nil { fmt.Println("error sinking:", err) } wg.Wait()
Output: out: Im a string now: 1 out: Im a string now: 2 out: Im a string now: 3 out: Im a string now: 4 out: Im a string now: 5
func (*Pipeline[T, U]) Start ¶
func (p *Pipeline[T, U]) Start(streamFunc StreamFunc[T, U]) *Pipeline[T, U]
Start applies the given StreamFunc to the pipeline's source and returns the Pipeline itself. This is useful for chaining multiple calls on the pipeline while still returning the Pipeline.
Example:
pipeline.Start(func(ds DataStream[int]) DataStream[int] { ... }).Start(...)
Parameters:
- streamFunc: A function that takes a data stream and returns a new one.
Example ¶
ExamplePipeline_Start demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output Pipeline.
errChan := make(chan error, 3) defer close(errChan) inChan := make(chan int, 5) // Create and open the pipeline pl := New[int, int]( context.Background(), sources.FromChannel(inChan, sources.Params{BufferSize: 5}), errChan, ).Start( func(p datastreams.DataStream[int]) datastreams.DataStream[int] { return p.Run( func(v int) (int, error) { return v * 2, nil }, datastreams.Params{BufferSize: 5}, ) }, ) // Send values to the pipeline's source channel go func() { for _, val := range []int{1, 2, 3, 4, 5} { inChan <- val } close(inChan) }() for out := range pl.Out() { fmt.Println("out:", out) }
Output: out: 2 out: 4 out: 6 out: 8 out: 10
func (*Pipeline[T, U]) Stream ¶
func (p *Pipeline[T, U]) Stream(streamFunc StreamFunc[T, U]) datastreams.DataStream[U]
Stream applies a StreamFunc to the pipeline's source, storing the resulting DataStream as the pipeline's sink (final stage). It then returns that sink DataStream for further chaining.
Typically used for quickly connecting a pipeline to a processing function.
Example:
pipeline.Stream(func(ds DataStream[int]) DataStream[int] { ... })
Parameters:
- streamFunc: A function that takes a data stream and returns a new one.
Example ¶
ExamplePipeline_Stream demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output DataStream.
errChan := make(chan error, 3) defer close(errChan) inChan := make(chan int, 5) // Create and open the pipeline ds := New[int, int]( context.Background(), sources.FromChannel(inChan, sources.Params{BufferSize: 5}), errChan, ).Stream( func(p datastreams.DataStream[int]) datastreams.DataStream[int] { return p.Run( func(v int) (int, error) { return v * 2, nil }, datastreams.Params{BufferSize: 5}, ) }, ) // Send values to the pipeline's source channel go func() { for _, val := range []int{1, 2, 3, 4, 5} { inChan <- val } close(inChan) }() for out := range ds.Out() { fmt.Println("out:", out) }
Output: out: 2 out: 4 out: 6 out: 8 out: 10
func (*Pipeline[T, U]) Tee ¶
func (p *Pipeline[T, U]) Tee(params ...datastreams.Params) (datastreams.DataStream[U], datastreams.DataStream[U])
Tee creates a fork in the pipeline's sink DataStream, returning two DataStreams that each receive the same data from the sink.
This is useful for sending the same processed output to multiple consumers.
- params: optional datastreams.Params for buffer sizing, etc.
- returns two DataStreams of type U, each receiving the same data from the pipeline sink.
Example ¶
ExamplePipeline_Tee demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then tees the output into two distinct streams.
errChan := make(chan error) defer close(errChan) // Create and open the pipeline out1, out2 := New[int, int]( context.Background(), sources.FromArray([]int{1, 2, 3, 4, 5}), errChan, ).Start( func(p datastreams.DataStream[int]) datastreams.DataStream[int] { return p.Run(func(v int) (int, error) { return v * 2, nil }) }, ).Tee(datastreams.Params{BufferSize: 5}) // Collect and print the results from both outputs for out := range out1.Out() { fmt.Println("out1:", out) } for out := range out2.Out() { fmt.Println("out2:", out) }
Output: out1: 2 out1: 4 out1: 6 out1: 8 out1: 10 out2: 2 out2: 4 out2: 6 out2: 8 out2: 10
func (*Pipeline[T, U]) ToSource ¶
func (p *Pipeline[T, U]) ToSource() datastreams.Sourcer[U]
ToSource converts the pipeline's sink into a datastreams.Sourcer[U], allowing it to be used as a source in another pipeline. This is useful when you want to take the output of one pipeline and use it as the input for another pipeline.
Example ¶
ExamplePipeline_ToSource demonstrates how to turn a Pipeline into a source to be used in another Pipeline.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() errChan := make(chan error, 1) defer close(errChan) source := New[int, string]( ctx, sources.FromArray([]int{1, 2, 3}), errChan, ).Start( func(p datastreams.DataStream[int]) datastreams.DataStream[string] { return datastreams.Map( p, func(i int) (string, error) { return string(rune('A' + i)), nil }, ).Run( func(s string) (string, error) { return "processed " + s, nil }, ) }, ).ToSource() for v := range source.Source(ctx, errChan).Out() { fmt.Println(v) }
Output: processed B processed C processed D
type Pipelines ¶ added in v0.0.9
Pipelines represents a collection of Pipeline instances, designed to simplify the management of concurrent, parallel data processing workflows. It is typically created when a single data source is split into multiple streams using methods like Broadcast.
By grouping related pipelines, this type allows for collective operations such as starting, stopping, or waiting for all of them with a single method call. This is particularly useful for scaling up processing by distributing work across multiple concurrent workers that share the same processing logic.
Example usage:
pipeline := mainPipeline.Broadcast(3) pipelines.Start(myProcessingFunc) pipelines.Wait()
func (Pipelines[T, U]) Close ¶ added in v0.0.9
func (p Pipelines[T, U]) Close()
Close gracefully shuts down all pipelines in the collection by canceling their underlying contexts.
func (Pipelines[T, U]) Count ¶ added in v0.0.9
Count returns the number of pipelines in the collection.
func (Pipelines[T, U]) Get ¶ added in v0.0.9
Get returns the pipeline at the specified index. It returns an error if the index is out of bounds.
func (Pipelines[T, U]) Process ¶ added in v0.0.9
func (p Pipelines[T, U]) Process( processor datastreams.ProcessFunc[T], params ...datastreams.Params, ) Pipelines[T, U]
Process applies a processing function to each pipeline in the collection, adding a new processing stage to each one. This is a convenient way to apply the same transformation logic to multiple parallel streams, such as those created by Broadcast.
Each pipeline in the collection is replaced by a new pipeline that includes the additional processing step. The method returns the modified collection to allow for method chaining.
Example:
// p is a single pipeline from which we broadcast. workers := p.Broadcast(2) // Create 2 worker pipelines. // Define a processor that doubles an integer. double := func(i int) (int, error) { return i * 2, nil } // Apply the processor to all worker pipelines. workers.Process(double) // The pipelines in the 'workers' collection now each have an additional // stage that doubles the numbers passing through them.
Parameters:
- processor: a datastreams.ProcessFunc[T] to apply to each item.
- params: optional datastreams.Params to configure the processing stage, i.e., for setting concurrency.
func (Pipelines[T, U]) Start ¶ added in v0.0.9
func (p Pipelines[T, U]) Start(streamFunc StreamFunc[T, U]) Pipelines[T, U]
Start applies a StreamFunc to each pipeline in the collection, initiating the data processing flow. It is typically used after a Broadcast to run the same logic in parallel across multiple pipelines.
The method returns the collection itself to allow for method chaining.
Example:
// p is a single pipeline from which we broadcast workers := p.Broadcast(3) // Create 3 worker pipelines // Define the processing logic for each worker processingFunc := func(stream datastreams.DataStream[int]) datastreams.DataStream[int] { // For example, map values to multiply by 2 return datastreams.Map(stream, func(i int) int { return i * 2 }) } // Start all workers with the defined logic workers.Start(processingFunc) // Now, results can be gathered from each pipeline in the 'workers' collection.
type StreamFunc ¶
type StreamFunc[T any, U any] func(stream datastreams.DataStream[T]) datastreams.DataStream[U]
StreamFunc is a function that takes a datastreams.DataStream and returns a datastreams.DataStream.
A StreamFunc represents a logical segment of a pipeline, where data can be consumed, transformed, filtered, or otherwise processed. The output of a StreamFunc is another DataStream that can be further chained.