pipelines

package module
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

README

Pipelines logo

Go checks Go Reference Go Report Card Coverage

The pipelines module is a Go library designed to facilitate the creation and management of:

  1. Data processing pipelines
  2. Reactive streaming applications leveraging Go's concurrency primitives

It provides a set of tools for flow control, error handling, and pipeline processes. Under the hood, it uses Go's channels and goroutines to enable concurrency at each stage of the pipeline.

Setup

To get started with the pipelines module, follow these steps:

  1. Install the pipelines module:

    go get github.com/elastiflow/pipelines
    
  2. (Optional) To view local documentation via godoc:

    go install -v golang.org/x/tools/cmd/godoc@latest
    make docs
    
  3. Once running, visit GoDocs to view the latest documentation locally.

Real World Applicability

When to Use

  • ETL (Extract, Transform, Load) style scenarios where data arrives in a stream, and you want to apply transformations or filtering in a concurrent manner.
  • Complex concurrency flows: easily fan out, fan in, or broadcast data streams.
  • Reactive Streaming Applications: serves as a light framework for Go native reactive streaming applications.

Channel Management

  • Sources (e.g. FromArray, FromChannel) produce data into a channel.
  • DataStream transformations (e.g. Run, Filter, Map) read from inbound channels and write results to outbound channels.
  • Sinks (e.g. ToChannel) consume data from the final output channels.
  • Each method typically spins up one or more goroutines which connect these channels together, allowing parallel processing.

High-Level Details

Pipelines

The Pipelines type represents a collection of Pipeline instances. It is designed to simplify managing multiple concurrent pipelines, especially when you split a single data source into several parallel streams (i.e., using Broadcast). This allows you to control a group of pipelines as a single unit, for example, starting or stopping them all at once.

Pipeline

A Pipeline is a series of data processing stages connected by channels. Each stage (datastreams.DataStream) is a function that performs a specific task and passes its output to the next stage. The pipelines module provides a flexible way to define and manage these stages.

DataStream

The datastreams.DataStream struct is the core of the pipelines module. It manages the flow of data through the pipeline stages and handles errors according to the provided parameters.

Key Components

Functions
  • ProcessFunc A user-defined function type used in a given DataStream stage via the DataStream.Run() method. For instance:

    ds = ds.Run(func(v int) (int, error) {
        if v < 0 {
            return 0, fmt.Errorf("negative number: %d", v)
        }
        return v + 1, nil
    })
    
  • TransformFunc A user-defined function type func(T) (U, error) used with the Map() method to convert from type T to a different type U.
    For instance:

    ds = ds.Map(func(i int) (string, error) {
        return fmt.Sprintf("Number: %d", i), nil
    })
    
  • FilterFunc A user-defined function type func(T) (bool, error) used with the Filter() method to decide if an item should pass through (true) or be dropped (false). For instance:

    ds = ds.Filter(func(i int) (bool, error) {
        return i % 2 == 0, nil
    })
    

KeyByFunc: A user-defined function type used to partition the data stream into different segments based on a key. This is useful for grouping data before applying transformations or aggregations. For instance:

    kds := ds.KeyBy[testStruct, int](
        New[testStruct](ctx, input, errCh).WithWaitGroup(&sync.WaitGroup{}),
        func(i int) (int, error) {
            return i % 2, nil
        },
        Params{
            BufferSize: 50,
            Num:        1, // only 1 output channel per key
        },
    )

WindowFunc: A user-defined function to process batched data in a window. This is useful for aggregating data over time or count-based windows. For instance:

    kds = ds.Window[testStruct, string, *testInference](
        datastreams.KeyBy[*SensorReading, string](p, keyFunc),
        TumblingWindowFunc,
        partitionFactory,
        datastreams.Params{
            BufferSize: 50,
        },
    )
Sources
  • FromArray([]T): Convert a Go slice/array into a Sourcer
  • FromChannel(<-chan T): Convert an existing channel into a Sourcer
  • FromDataStream(DataStream[T]): Convert an existing DataStream into a Sourcer
Sinks
  • ToChannel(chan<- T): Write DataStream output into a channel
Windows

Window performs time- or count-based aggregation on a partitioned stream.

  • NewTumblingFactory[T]: Creates fixed-size windows that do not overlap.
  • NewSlidingFactory[T]: Creates overlapping windows.
  • NewIntervalFactory[T]: Creates windows based on a time interval.
Methods
  • Run(ProcessFunc[T]) DataStream[T]: Process each item with a user function
  • Filter(FilterFunc[T]) DataStream[T]: Filter items by user-defined condition
  • Map(TransformFunc[T,U]) DataStream[U]: Transform each item from T to U
  • KeyBy(KeyByFunc[T]) DataStream[T]: Partition the stream by a key
  • Window(WindowFunc[T]) DataStream[T]: Apply a window function to the stream
  • Expand(ExpandFunc[T]) DataStream[T]: Explode each item into multiple items
  • FanOut() DataStream[T]: Create multiple parallel output channels
  • FanIn() DataStream[T]: Merge multiple channels into one
  • Broadcast() DataStream[T]: Duplicate each item to multiple outputs
  • Tee() (DataStream[T], DataStream[T]): Split into two DataStreams
  • Take(Params{Num: N}) DataStream[T]: Take only N items
  • OrDone() DataStream[T]: Terminates if upstream is closed
  • Out() <-chan T: Underlying output channel
  • Sink(Sinker[T]) DataStream[T]: Push items to a sink
Method Params
  • Params: Used to pass arguments into DataStream methods.
    • Options
      • SkipError (bool): If true, any error in ProcessFunc / TransformFunc / FilterFunc causes that item to be skipped rather than stopping the pipeline.
      • Num (int): Used by methods like FanOut, Broadcast, and Take to specify how many parallel channels or how many items to consume.
      • BufferSize (int): Controls the size of the buffered channels created for that stage. Larger buffers can reduce blocking but use more memory.
      • SegmentName (string): Tag a pipeline stage name, useful for logging or debugging errors (e.g. “segment: broadcast-2”).

Examples

Below is an example of how to use the pipelines module to create simple pipelines. Additional examples can be found in the godocs.

Squaring Numbers

This example demonstrates how to set up a pipeline that takes a stream of integers, squares each odd integer, and outputs the results.

package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

func createIntArr(num int) []int {
	var arr []int
	for i := 0; i < num; i++ {
		arr = append(arr, i)
	}
	return arr
}

func squareOdds(v int) (int, error) {
	if v%2 == 0 {
		return v, fmt.Errorf("even number error: %v", v)
	}
	return v * v, nil
}

func exProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
	return p.OrDone().FanOut(
		datastreams.Params{Num: 2},
	).Run(
		squareOdds,
	)
}

func main() {
	errChan := make(chan error, 10)
	defer close(errChan)

	pl := pipelines.New[int, int]( // Create a new Pipeline
		context.Background(),
		sources.FromArray(createIntArr(10)), // Create a source to start the pipeline
		errChan,
	).Start(exProcess)

	go func(errReceiver <-chan error) { // Handle Pipeline errors
		defer pl.Close()
		for err := range errReceiver {
			if err != nil {
				slog.Error("demo error: " + err.Error())
				// return if you wanted to close the pipeline during error handling.
			}
		}
	}(pl.Errors())
	for out := range pl.Out() { // Read Pipeline output
		slog.Info("received simple pipeline output", slog.Int("out", out))
	}
}

Contributing

We welcome your contributions! Please see our Contributing Guide for details on how to open issues, submit pull requests, and propose new features.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.

Documentation

Overview

Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.

The library uses channels under the hood to pass data between pipeline stages. Each stage runs in its own goroutine, ensuring concurrency and separation of concerns.

Below is an example of an application utilizing pipelines for squaring an odd int and managing shared state counters:

	package yourpipeline

	import (
	    "context"
	    "fmt"
	    "log/slog"
	    "sync"

	    "github.com/elastiflow/pipelines"
	    "github.com/elastiflow/pipelines/datastreams"
	    "github.com/elastiflow/pipelines/datastreams/sources"
	)

	// PipelineWrapper is an example of a pipelines.Pipeline wrapper implementation. It includes shared state via counters.
	type PipelineWrapper struct {
	    mu          sync.Mutex
	    errChan     chan error
	    evenCounter int
	    oddCounter  int
	}

	// NewPipelineWrapper creates a new PipelineWrapper with counters set to 0
	func NewPipelineWrapper() *PipelineWrapper {
	    // Setup channels and return PipelineWrapper
	    errChan := make(chan error, 10)
	    return &PipelineWrapper{
	        errChan:     errChan,
	        evenCounter: 0,
	        oddCounter:  0,
	    }
	}

	// Run the PipelineWrapper
	func (pl *PipelineWrapper) Run() {
	    defer close(pl.errChan)

	    pipeline := pipelines.New[int, int]( // Create a new Pipeline
	        context.Background(),
	        sources.FromArray(createIntArr(10)), // Create a source to start the pipeline
	        pl.errChan,
	    ).Start(pl.exampleProcess)

	    go func(errReceiver <-chan error) { // Handle Pipeline errors
	        defer pipeline.Close()
	        for err := range errReceiver {
	            if err != nil {
	                slog.Error("demo error: " + err.Error())
	                // return // if you wanted to close the pipeline during error handling.
	            }
	        }
	    }(pl.errChan)

	    for out := range pipeline.Out() { // Read Pipeline output
	        slog.Info("received simple pipeline output", slog.Int("out", out))
	    }
	}

	func (pl *PipelineWrapper) squareOdds(v int) (int, error) {
	    if v%2 == 0 {
	        pl.mu.Lock()
	        pl.evenCounter++
	        pl.mu.Unlock()
	        return v, fmt.Errorf("even number error: %v", v)
	    }
	    pl.mu.Lock()
	    pl.oddCounter++
	    pl.mu.Unlock()
	    return v * v, nil
	}

	func (pl *PipelineWrapper) exampleProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
     // datastreams.DataStream.OrDone will stop the pipeline if the input channel is closed
	    return p.OrDone().FanOut(
	        datastreams.Params{Num: 2}, // datastreams.DataStream.FanOut will run subsequent ds.Pipe stages in parallel
	    ).Run(
	        pl.squareOdds,  // datastreams.DataStream.Run will execute the ds.Pipe process: "squareOdds"
	    ) // datastreams.DataStream.Out automatically FanIns to a single output channel if needed
	}

	func createIntArr(num int) []int {
	    var arr []int
	    for i := 0; i < num; i++ {
	        arr = append(arr, i)
	    }
	    return arr
	}

Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.

The package includes various functions to create, manipulate, and control the flow of data through channels, allowing for flexible and efficient data processing.

The main components of this package are: - Pipeline: A struct that defines a generic connection of data streams. - DataStream (in subpackage "datastreams"): Provides the methods to build concurrency stages.

Pipelines work by connecting a "Source" (an upstream data producer) with an optional chain of transformations or filters before optionally "Sinking" (sending the output to a consumer). Under the hood, all data flows through Go channels with concurrency managed by goroutines. Each transformation or filter is effectively run in parallel, communicating via channels.

For more in-depth usage, see the examples below and the doc.go file.

Example (Duplicate)
package main

import (
	"context"
	"log/slog"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// createIntArr creates a simple slice of int values from 0..num-1.
func createIntArr(num int) []int {
	var arr []int
	for i := 0; i < num; i++ {
		arr = append(arr, i)
	}
	return arr
}

// duplicateProcess demonstrates a pipeline stage that broadcasts each value to two streams
// and then fans them back in (duplicates).
func duplicateProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
	// Broadcasting by 2 then fanning in merges them back, effectively duplicating each item.
	return p.Broadcast(
		datastreams.Params{Num: 2},
	).FanIn()
}

func main() {
	errChan := make(chan error)
	pl := pipelines.New[int, int]( // Create a new Pipeline
		context.Background(),
		sources.FromArray(createIntArr(10)), // Create a new Source from slice
		errChan,
	).Start(duplicateProcess)
	defer func() {
		close(errChan)
		pl.Close()
	}()

	// Handle pipeline errors in a separate goroutine
	go func(errReceiver <-chan error) {
		for err := range errReceiver {
			if err != nil {
				slog.Error("demo error: " + err.Error())
				return
			}
		}
	}(errChan)

	// Read pipeline output
	for out := range pl.Out() {
		slog.Info("received simple pipeline output", slog.Int("out", out))
	}

	// Output (example):
	//   received simple pipeline output out=0
	//   received simple pipeline output out=0
	//   received simple pipeline output out=1
	//   received simple pipeline output out=1
	//   ...
	//   received simple pipeline output out=9
	//   received simple pipeline output out=9
}
Example (Filter)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// filter returns true if the input int is even, false otherwise.
func filter(p int) (bool, error) {
	return p%2 == 0, nil
}

// mapFunc transforms an even integer into a descriptive string.
func mapFunc(p int) (string, error) {
	return fmt.Sprintf("I'm an even number: %d", p), nil
}

func main() {
	errChan := make(chan error, 10)
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer func() {
		close(errChan)
		cancel()
	}()

	// Build a pipeline that fans out, then filters even numbers, then maps to string
	pl := pipelines.New[int, string](
		ctx,
		sources.FromArray(createIntArr(10)),
		errChan,
	).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		return datastreams.Map(
			p.FanOut(
				datastreams.Params{Num: 3},
			).Filter(
				filter,
			),
			mapFunc,
		)
	})

	for val := range pl.Out() {
		slog.Info("received simple pipeline output", slog.String("out", val))
	}

	// Output (example):
	// {"out":"I'm an even number: 0"}
	// {"out":"I'm an even number: 2"}
	// {"out":"I'm an even number: 4"}
	// {"out":"I'm an even number: 6"}
	// {"out":"I'm an even number: 8"}
}
Example (Sliding)
package main

import (
	"context"
	"log/slog"
	"time"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
	"github.com/elastiflow/pipelines/datastreams/windower"
)

func SlingWindowFunc(readings []*SensorReading) (*SensorInference, error) {
	if len(readings) == 0 {
		return nil, nil
	}
	var totalTemp, totalHumidity float64
	for _, reading := range readings {
		totalTemp += reading.Temp
		totalHumidity += reading.Humidity
	}
	avgTemp := totalTemp / float64(len(readings))
	avgHumidity := totalHumidity / float64(len(readings))
	return &SensorInference{
		DeviceID:    readings[0].DeviceID,
		AvgTemp:     avgTemp,
		AvgHumidity: avgHumidity,
	}, nil
}

func main() {
	errChan := make(chan error, 10)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	partitionFactory := windower.NewSliding[*SensorReading, string](150*time.Millisecond, 50*time.Millisecond)
	var sensorReadings = []*SensorReading{
		{DeviceID: "device-1", Temp: 22.5, Humidity: 45.0, Timestamp: time.Now().Add(-6 * time.Second)},
		{DeviceID: "device-1", Temp: 22.7, Humidity: 46.0, Timestamp: time.Now().Add(-5 * time.Second)},
		{DeviceID: "device-1", Temp: 22.6, Humidity: 45.5, Timestamp: time.Now().Add(-4 * time.Second)},
		{DeviceID: "device-2", Temp: 19.2, Humidity: 55.0, Timestamp: time.Now().Add(-3 * time.Second)},
		{DeviceID: "device-2", Temp: 19.5, Humidity: 54.8, Timestamp: time.Now().Add(-2 * time.Second)},
		{DeviceID: "device-1", Temp: 22.9, Humidity: 44.9, Timestamp: time.Now().Add(-1 * time.Second)},
		{DeviceID: "device-2", Temp: 19.7, Humidity: 54.5, Timestamp: time.Now()},
	}

	// Create a source with 10 integers
	pl := pipelines.New[*SensorReading, *SensorInference](
		ctx,
		sources.FromArray[*SensorReading](sensorReadings, sources.Params{Throttle: 50 * time.Millisecond}),
		errChan,
	).Start(func(p datastreams.DataStream[*SensorReading]) datastreams.DataStream[*SensorInference] {
		keyFunc := func(i *SensorReading) string {
			return i.DeviceID // Key by device ID
		}
		return datastreams.Window[*SensorReading, string, *SensorInference](
			datastreams.KeyBy[*SensorReading, string](p, keyFunc),
			SlingWindowFunc,
			partitionFactory,
			datastreams.Params{
				BufferSize: 50,
			},
		).OrDone()
	})

	// Handle errors
	go func() {
		defer pl.Close()
		for err := range pl.Errors() {
			select {
			case <-ctx.Done():
				return
			default:
				if err == nil {
					continue
				}
				slog.Error("pipeline error: " + err.Error())
			}
		}
	}()

	// Read from pipeline output
	for v := range pl.Out() {
		select {
		case <-ctx.Done():
			return
		default:
			slog.Info("sliding window output", slog.String("device", v.DeviceID), slog.Float64("avg_temp", v.AvgTemp), slog.Float64("avg_humidity", v.AvgHumidity))
		}
	}

	// Output (example):
	// sliding window output device=device-1 avg_temp=22.5 avg_humidity=45
	// sliding window output device=device-1 avg_temp=22.6 avg_humidity=45.5
	// sliding window output device=device-1 avg_temp=22.65 avg_humidity=45.75
	// sliding window output device=device-1 avg_temp=22.65 avg_humidity=45.75
	// sliding window output device=device-2 avg_temp=19.2 avg_humidity=55
	// sliding window output device=device-1 avg_temp=22.6 avg_humidity=45.5
	// sliding window output device=device-2 avg_temp=19.35 avg_humidity=54.9
	// sliding window output device=device-1 avg_temp=22.9 avg_humidity=44.9
	// sliding window output device=device-2 avg_temp=19.5 avg_humidity=54.8
	// sliding window output device=device-1 avg_temp=22.9 avg_humidity=44.9
	// sliding window output device=device-2 avg_temp=19.6 avg_humidity=54.65
	// sliding window output device=device-1 avg_temp=22.9 avg_humidity=44.9
	// sliding window output device=device-2 avg_temp=19.7 avg_humidity=54.5
	// sliding window output device=device-2 avg_temp=19.7 avg_humidity=54.5
}
Example (SourceSink)

Example_sourceSink constructs and starts the Pipeline

package main

import (
	"context"
	"fmt"
	"log/slog"
	"sync"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sinks"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// PipelineWrapper is an example struct embedding a pipeline with shared counters.
//
// The pipeline will read from a channel source, process data, and sink into
// another channel. The "evenCounter" and "oddCounter" track how many evens
// or odds we've encountered.
type PipelineWrapper struct {
	mu          sync.Mutex
	errChan     chan error
	evenCounter int
	oddCounter  int
	pipeline    *pipelines.Pipeline[int, int]
}

// NewPipelineWrapper initializes a PipelineWrapper.
func NewPipelineWrapper() *PipelineWrapper {
	errChan := make(chan error, 10)
	return &PipelineWrapper{
		errChan:     errChan,
		evenCounter: 0,
		oddCounter:  0,
	}
}

// squareOdds increments counters, squares odd numbers, and returns an error on even numbers.
func (pl *PipelineWrapper) squareOdds(v int) (int, error) {
	if v%2 == 0 {
		pl.mu.Lock()
		pl.evenCounter++
		pl.mu.Unlock()
		return v, fmt.Errorf("even number error: %v", v)
	}
	pl.mu.Lock()
	pl.oddCounter++
	pl.mu.Unlock()
	return v * v, nil
}

// exampleProcess shows the entire pipeline flow within a single method,
// including how to produce data (source) and consume it (sink).
func (pl *PipelineWrapper) exampleProcess(ctx context.Context) {
	// 1) Create channels for input and output.
	inChan := make(chan int, 10)
	outChan := make(chan int, 10)

	// 2) Build the pipeline: inChan -> (FanOut + squareOdds) -> sink -> outChan
	pl.pipeline = pipelines.New[int, int](
		ctx,
		sources.FromChannel(inChan), // source
		pl.errChan,
	).Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] {
		return ds.OrDone().FanOut(
			datastreams.Params{Num: 2},
		).Run(
			pl.squareOdds,
		).Sink( // Sink
			sinks.ToChannel(outChan),
		)
	})

	var wg sync.WaitGroup
	wg.Add(10)

	// 3) Feed data into source inChan in a separate goroutine.
	go func() {
		for _, val := range createIntArr(10) {
			inChan <- val
		}
		close(inChan)
	}()

	// 4) Read results from sink outChan
	go func() {
		for out := range outChan {
			slog.Info("received simple pipeline output", slog.Int("out", out))
			wg.Done()
		}
	}()
	wg.Wait()
	pl.pipeline.Close()
}

// Run creates a background error handler, then calls exampleProcess.
func (pl *PipelineWrapper) Run() {
	defer close(pl.errChan)

	// Handle pipeline errors
	go func(errReceiver <-chan error) {
		for err := range errReceiver {
			if err != nil {
				slog.Error("pipeline error: " + err.Error())
			}
		}
	}(pl.errChan)

	// Run the pipeline flow
	pl.exampleProcess(context.Background())

	// Inspect counters after pipeline completes
	slog.Info("pipeline counters",
		slog.Int("evenCounter", pl.evenCounter),
		slog.Int("oddCounter", pl.oddCounter),
	)
}

// Example_sourceSink constructs and starts the Pipeline
func main() {
	pl := NewPipelineWrapper()
	pl.Run()
}
Example (Transformations)
package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// squareOdds is a basic function to demonstrate transformations in the pipeline.
func squareOdds(v int) (int, error) {
	return v * v, nil
}

// mapTransformFunc formats the squared integer as a string.
func mapTransformFunc(p int) (string, error) {
	return fmt.Sprintf("I'm a squared number: %d", p), nil
}

func main() {
	inChan := make(chan int) // Setup channels
	errChan := make(chan error, 10)
	defer func() {
		close(inChan)
		close(errChan)
	}()

	// Create a new Pipeline of int->string
	pl := pipelines.New[int, string](
		context.Background(),
		sources.FromArray(createIntArr(10)),
		errChan,
	).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		// OrDone -> FanOut(2) -> Run (squareOdds) -> Map to string
		return datastreams.Map(
			p.OrDone().FanOut(
				datastreams.Params{Num: 2},
			).Run(
				squareOdds,
			),
			mapTransformFunc,
		)
	})

	// Handle errors
	go func(errReceiver <-chan error) {
		defer pl.Close()
		for err := range errReceiver {
			if err != nil {
				slog.Error("demo error: " + err.Error())
				// return // if you wanted to close the pipeline during error handling.
			}
		}
	}(pl.Errors())

	// Read pipeline output
	for out := range pl.Out() {
		slog.Info("received simple pipeline output", slog.String("out", out))
	}

	// Output (example):
	// {"out":"I'm a squared number: 0"}
	// {"out":"I'm a squared number: 1"}
	// {"out":"I'm a squared number: 4"}
	// {"out":"I'm a squared number: 9"}
	// ...
	// {"out":"I'm a squared number: 81"}
}
Example (Window)
package main

import (
	"context"
	"log/slog"
	"time"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
	"github.com/elastiflow/pipelines/datastreams/windower"
)

type SensorReading struct {
	DeviceID  string
	Temp      float64
	Humidity  float64
	Timestamp time.Time
}

type SensorInference struct {
	DeviceID    string
	AvgTemp     float64
	AvgHumidity float64
}

func TumblingWindowFunc(readings []*SensorReading) (*SensorInference, error) {
	if len(readings) == 0 {
		return nil, nil
	}
	var totalTemp, totalHumidity float64
	for _, reading := range readings {
		totalTemp += reading.Temp
		totalHumidity += reading.Humidity
	}
	avgTemp := totalTemp / float64(len(readings))
	avgHumidity := totalHumidity / float64(len(readings))
	return &SensorInference{
		DeviceID:    readings[0].DeviceID,
		AvgTemp:     avgTemp,
		AvgHumidity: avgHumidity,
	}, nil
}

func main() {
	errChan := make(chan error, 10)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	var sensorReadings = []*SensorReading{
		{DeviceID: "device-1", Temp: 22.5, Humidity: 45.0, Timestamp: time.Now().Add(-6 * time.Second)},
		{DeviceID: "device-1", Temp: 22.7, Humidity: 46.0, Timestamp: time.Now().Add(-5 * time.Second)},
		{DeviceID: "device-1", Temp: 22.6, Humidity: 45.5, Timestamp: time.Now().Add(-4 * time.Second)},
		{DeviceID: "device-2", Temp: 19.2, Humidity: 55.0, Timestamp: time.Now().Add(-3 * time.Second)},
		{DeviceID: "device-2", Temp: 19.5, Humidity: 54.8, Timestamp: time.Now().Add(-2 * time.Second)},
		{DeviceID: "device-1", Temp: 22.9, Humidity: 44.9, Timestamp: time.Now().Add(-1 * time.Second)},
		{DeviceID: "device-2", Temp: 19.7, Humidity: 54.5, Timestamp: time.Now()},
	}

	partitionFactory := windower.NewTumbling[*SensorReading, string](100 * time.Millisecond)

	// Create a source with 10 integers
	pl := pipelines.New[*SensorReading, *SensorInference](
		ctx,
		sources.FromArray[*SensorReading](sensorReadings, sources.Params{Throttle: 50 * time.Millisecond}),
		errChan,
	).Start(func(p datastreams.DataStream[*SensorReading]) datastreams.DataStream[*SensorInference] {
		keyFunc := func(i *SensorReading) string {
			return i.DeviceID // Key by device ID
		}
		return datastreams.Window[*SensorReading, string, *SensorInference](
			datastreams.KeyBy[*SensorReading, string](p, keyFunc),
			TumblingWindowFunc,
			partitionFactory,
			datastreams.Params{
				BufferSize: 50,
			},
		).OrDone()
	})

	// Handle errors
	go func() {
		defer pl.Close()
		for err := range pl.Errors() {
			select {
			case <-ctx.Done():
				return
			default:
				if err == nil {
					continue
				}
				slog.Error("pipeline error: " + err.Error())
			}
		}
	}()

	// Read from pipeline output
	for v := range pl.Out() {
		select {
		case <-ctx.Done():
			return
		default:
			slog.Info("tumbling window output", slog.String("device", v.DeviceID), slog.Float64("avg_temp", v.AvgTemp), slog.Float64("avg_humidity", v.AvgHumidity))
		}
	}

	// Output (example):
	// tumbling window output device=device-1 avg_temp=22.600000000000005 avg_humidity=45.5
	// tumbling window output device=device-2 avg_temp=19.35 avg_humidity=54.9
	// tumbling window output device=device-1 avg_temp=22.9 avg_humidity=44.9
	// tumbling window output device=device-2 avg_temp=19.7 avg_humidity=54.5
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Opts

type Opts struct {
	BufferSize int
}

Opts defines optional configuration parameters for certain pipeline operations. Currently unused in this example, but reserved for future expansions.

type Pipeline

type Pipeline[T any, U any] struct {
	// contains filtered or unexported fields
}

Pipeline is a struct that defines a generic stream process.

Pipeline[T, U] represents a flow of data type T at the input, ultimately producing data of type U at the output. Under the hood, a Pipeline orchestrates DataStream stages connected by channels.

Usage typically begins by calling New(...) to create a pipeline with a Source, and then applying transformations via Map, Process, or Start/Stream with a StreamFunc. Finally, you can read from the Out() channel, or Sink the output via a Sinker.

func New

func New[T any, U any](
	ctx context.Context,
	sourcer datastreams.Sourcer[T],
	errStream chan error,
) *Pipeline[T, U]

New constructs a new Pipeline of a given type by passing in a datastreams.Sourcer.

  • ctx: a context.Context used to cancel or manage the lifetime of the pipeline
  • sourcer: a datastreams.Sourcer[T] that provides the initial data stream
  • errStream: an error channel to which the pipeline can send errors

Example usage:

p := pipelines.New[int, int](context.Background(), someSource, errChan)
Example

ExampleNew demonstrates a minimal pipeline creation and usage.

errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)

// Create a new Pipeline with int -> int
pl := New[int, int](
	context.Background(),
	sources.FromChannel(inChan),
	errChan,
)

// Provide data on inChan
go func() {
	for i := 1; i <= 5; i++ {
		inChan <- i
	}
	close(inChan)
}()

// Process the data: multiply by 2
pl.Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] {
	return ds.Run(func(v int) (int, error) {
		return v * 2, nil
	})
})

// Read pipeline output until closed
for val := range pl.Out() {
	fmt.Println("Result:", val)
}
Output:

Result: 2
Result: 4
Result: 6
Result: 8
Result: 10

func (*Pipeline[T, U]) Broadcast added in v0.0.9

func (p *Pipeline[T, U]) Broadcast(num int, streamFunc StreamFunc[T, U]) Pipelines[T, U]

Broadcast creates multiple copies of the current pipeline's source DataStream, allowing the same data to be processed in parallel across multiple pipelines and allows for different sinks or processing logic to be applied to each copy.

Parameters:

  • num: the number of copies to create.
  • streamFunc: a StreamFunc[T, U] that will be applied to each copy of the source DataStream.
Example
package main

import (
	"context"
	"fmt"
	"log"
	"sync"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams/sinks"

	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

type listenerOutput struct {
	Index   int
	Message string
}

func main() {
	log.Println("Starting Listen example...")
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	stdoutChan := make(chan listenerOutput, 10)
	wg := &sync.WaitGroup{}
	wg.Add(3)
	errs := make(chan error, 10)
	go func() {
		defer wg.Done()
		for msg := range stdoutChan {
			log.Printf("[Listener %d] Received: %s", msg.Index, msg.Message)
		}
	}()

	// 2. Create a simple source from a slice.
	sourcer := sources.FromArray([]int{100, 200, 300})

	// 3. Create the initial DataStream and attach the WaitGroup.
	// The WaitGroup will be passed to all subsequent stages.
	pls := pipelines.New[int, listenerOutput](
		ctx,
		sourcer,
		make(chan error, 1),
	).Copy(3)

	// 4. Start the pipeline with a listener that processes each stream.
	go func() {
		err := pls[0].Start(func(p datastreams.DataStream[int]) datastreams.DataStream[listenerOutput] {
			return datastreams.Map[int, listenerOutput](
				p.OrDone().FanOut(
					datastreams.Params{Num: 2},
				).Run(
					func(i int) (int, error) {
						return i * i, nil // Square the input
					},
				),
				func(i int) (listenerOutput, error) {
					return listenerOutput{
						Index:   0, // This is the index of the listener
						Message: fmt.Sprintf("Processed value: %d", i),
					}, nil
				},
			)

		}).Sink(sinks.ToChannel[listenerOutput](stdoutChan))
		if err != nil {
			errs <- err
		}
	}()

	go func() {
		err := pls[1].Start(func(p datastreams.DataStream[int]) datastreams.DataStream[listenerOutput] {
			return datastreams.Map[int, listenerOutput](
				p.OrDone().FanOut(
					datastreams.Params{Num: 2},
				).Run(
					func(i int) (int, error) {
						return i + 10, nil // Add 10 to the input
					},
				),
				func(i int) (listenerOutput, error) {
					return listenerOutput{
						Index:   1, // This is the index of the listener
						Message: fmt.Sprintf("Processed value: %d", i),
					}, nil
				},
			)

		}).Sink(sinks.ToChannel[listenerOutput](stdoutChan))
		if err != nil {
			errs <- err
		}
	}()

	go func() {
		err := pls[2].Start(func(p datastreams.DataStream[int]) datastreams.DataStream[listenerOutput] {
			return datastreams.Map[int, listenerOutput](
				p.OrDone().FanOut(
					datastreams.Params{Num: 2},
				).Run(
					func(i int) (int, error) {
						return i - 5, nil // Subtract 5 from the input
					},
				),
				func(i int) (listenerOutput, error) {
					return listenerOutput{
						Index:   2, // This is the index of the listener
						Message: fmt.Sprintf("Processed value: %d", i),
					}, nil
				},
			)

		}).Sink(sinks.ToChannel[listenerOutput](stdoutChan))
		if err != nil {
			errs <- err
		}
	}()

	go func() {
		for e := range errs {
			log.Println(fmt.Errorf("error in pipeline: %w", e))
		}
	}()

	wg.Wait()
	log.Println("✅ Listen example finished successfully.")
}

func (*Pipeline[T, U]) Close

func (p *Pipeline[T, U]) Close()

Close gracefully closes a Pipeline by canceling its internal context. This signals all data streams to terminate reading or writing to channels.

func (*Pipeline[T, U]) Copy added in v0.0.9

func (p *Pipeline[T, U]) Copy(num int) Pipelines[T, U]

Copy creates multiple copies of the current pipeline's source DataStream, allowing the same data to be processed in parallel across multiple pipelines.

Parameters:

  • num: the number of copies to create.

func (*Pipeline[T, U]) Errors

func (p *Pipeline[T, U]) Errors() <-chan error

Errors returns the error channel of the Pipeline.

This channel receives errors from any stage (e.g. transformations, filters, sinks). The caller should consume from it to handle errors appropriately.

func (*Pipeline[T, U]) Expand added in v0.0.7

func (p *Pipeline[T, U]) Expand(
	expander datastreams.ExpandFunc[T, U],
	params ...datastreams.Params,
) datastreams.DataStream[U]

Expand creates a new DataStream by applying an expander function (ExpandFunc) to each message in the pipeline's source. This is a convenience method that directly calls datastreams.Expand on the pipeline's source.

  • mapper: an Expand[T, U] that takes an input T and returns an array of output U
  • params: optional datastreams.Params to configure buffer sizes, skipping errors, etc.

func (*Pipeline[T, U]) In

func (p *Pipeline[T, U]) In() <-chan T

In returns the input channel of the source datastreams.DataStream of a Pipeline.

This channel can be read from externally if needed, though typically one supplies a Sourcer when constructing a Pipeline. Use with care if manually sending data into the pipeline.

func (*Pipeline[T, U]) Map

func (p *Pipeline[T, U]) Map(
	mapper datastreams.TransformFunc[T, U],
	params ...datastreams.Params,
) datastreams.DataStream[U]

Map creates a new DataStream by applying a mapper function (TransformFunc) to each message in the pipeline's source. This is a convenience method that directly calls datastreams.Map on the pipeline's source.

  • mapper: a TransformFunc[T, U] that takes an input T and returns output U
  • params: optional datastreams.Params to configure buffer sizes, skipping errors, etc.
Example

ExamplePipeline_Map demonstrates how to create and use a simple Pipeline that maps one type (int) to another (string).

errChan := make(chan error)
defer close(errChan)

pl := New[int, string]( // Create a new Pipeline
	context.Background(),
	sources.FromArray([]int{1, 2, 3, 4, 5}),
	errChan,
).Map(
	func(p int) (string, error) {
		return fmt.Sprintf("Im a string now: %d", p), nil
	},
)

for out := range pl.Out() { // Read Pipeline output
	fmt.Println("out:", out)
}
Output:

out: Im a string now: 1
out: Im a string now: 2
out: Im a string now: 3
out: Im a string now: 4
out: Im a string now: 5

func (*Pipeline[T, U]) Out

func (p *Pipeline[T, U]) Out() <-chan U

Out returns the output channel (sink) of the pipeline.

Reading from this channel lets you consume the final processed data of type U.

func (*Pipeline[T, U]) Process

func (p *Pipeline[T, U]) Process(
	processor datastreams.ProcessFunc[T],
	params ...datastreams.Params,
) *Pipeline[T, U]

Process adds a new processing stage to the pipeline, applying a function to each item. The processing function must take and return an item of the same type, making this method ideal for in-place transformations like data enrichment or filtering.

This method returns a new Pipeline instance that incorporates the new processing stage, allowing multiple Process calls to be fluently chained together. The final output type of the pipeline (U) remains unchanged.

The provided 'processor' function is of type datastreams.ProcessFunc[T], with the signature 'func(T) (T, error)'. If the function returns a non-nil error, processing for that item halts, and the error is sent to the pipeline's error channel.

Example:

// Assume a pipeline processes User objects.
type User struct {
	ID        int
	Name      string
	IsValid   bool
	IsAudited bool
}

// p is an existing pipeline, e.g., p := pipelines.New[User, User](...)

// We can chain multiple Process calls to create a multi-stage workflow.
finalPipeline := p.Process(func(u User) (User, error) {
	// Stage 1: Validate the user.
	if u.Name != "" {
		u.IsValid = true
	}
	return u, nil
}).Process(func(u User) (User, error) {
	// Stage 2: Audit the user.
	if u.IsValid {
		u.IsAudited = true
		fmt.Printf("Audited user %d\n", u.ID)
	}
	return u, nil
})

// The finalPipeline now contains both processing stages.
// You can then consume the results from finalPipeline.Out().

Parameters:

  • processor: a datastreams.ProcessFunc[T] to apply to each item.
  • params: optional datastreams.Params to configure the processing stage, i.e., for setting concurrency.

func (*Pipeline[T, U]) Sink

func (p *Pipeline[T, U]) Sink(sinker datastreams.Sinker[U]) error

Sink consumes the pipeline's sink DataStream using a specified datastreams.Sinker. Typically used to push output to a custom location or channel.

Parameters:

  • sinker: A Sinker that will receive data of type U.

Returns:

  • An error if the sink fails; otherwise nil.
Example

ExamplePipeline_Sink demonstrates how to create and use a simple Pipeline that sinks the output to a sinks.ToChannel.

errChan := make(chan error)
outChan := make(chan string)
var wg sync.WaitGroup
wg.Add(5)
go func() {
	for out := range outChan { // Read Pipeline output
		fmt.Println("out:", out)
		wg.Done()
	}
}()

if err := New[int, string]( // Create a new Pipeline
	context.Background(),
	sources.FromArray([]int{1, 2, 3, 4, 5}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		return datastreams.Map(
			p,
			func(p int) (string, error) { return fmt.Sprintf("Im a string now: %d", p), nil },
		)
	},
).Sink(
	sinks.ToChannel(outChan),
); err != nil {
	fmt.Println("error sinking:", err)
}
wg.Wait()
Output:

out: Im a string now: 1
out: Im a string now: 2
out: Im a string now: 3
out: Im a string now: 4
out: Im a string now: 5

func (*Pipeline[T, U]) Start

func (p *Pipeline[T, U]) Start(streamFunc StreamFunc[T, U]) *Pipeline[T, U]

Start applies the given StreamFunc to the pipeline's source and returns the Pipeline itself. This is useful for chaining multiple calls on the pipeline while still returning the Pipeline.

Example:

pipeline.Start(func(ds DataStream[int]) DataStream[int] { ... }).Start(...)

Parameters:

  • streamFunc: A function that takes a data stream and returns a new one.
Example

ExamplePipeline_Start demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output Pipeline.

errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)

// Create and open the pipeline
pl := New[int, int](
	context.Background(),
	sources.FromChannel(inChan, sources.Params{BufferSize: 5}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
		return p.Run(
			func(v int) (int, error) {
				return v * 2, nil
			},
			datastreams.Params{BufferSize: 5},
		)
	},
)

// Send values to the pipeline's source channel
go func() {
	for _, val := range []int{1, 2, 3, 4, 5} {
		inChan <- val
	}
	close(inChan)
}()

for out := range pl.Out() {
	fmt.Println("out:", out)
}
Output:

out: 2
out: 4
out: 6
out: 8
out: 10

func (*Pipeline[T, U]) Stream

func (p *Pipeline[T, U]) Stream(streamFunc StreamFunc[T, U]) datastreams.DataStream[U]

Stream applies a StreamFunc to the pipeline's source, storing the resulting DataStream as the pipeline's sink (final stage). It then returns that sink DataStream for further chaining.

Typically used for quickly connecting a pipeline to a processing function.

Example:

pipeline.Stream(func(ds DataStream[int]) DataStream[int] { ... })

Parameters:

  • streamFunc: A function that takes a data stream and returns a new one.
Example

ExamplePipeline_Stream demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output DataStream.

errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)

// Create and open the pipeline
ds := New[int, int](
	context.Background(),
	sources.FromChannel(inChan, sources.Params{BufferSize: 5}),
	errChan,
).Stream(
	func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
		return p.Run(
			func(v int) (int, error) {
				return v * 2, nil
			},
			datastreams.Params{BufferSize: 5},
		)
	},
)

// Send values to the pipeline's source channel
go func() {
	for _, val := range []int{1, 2, 3, 4, 5} {
		inChan <- val
	}
	close(inChan)
}()

for out := range ds.Out() {
	fmt.Println("out:", out)
}
Output:

out: 2
out: 4
out: 6
out: 8
out: 10

func (*Pipeline[T, U]) Tee

func (p *Pipeline[T, U]) Tee(params ...datastreams.Params) (datastreams.DataStream[U], datastreams.DataStream[U])

Tee creates a fork in the pipeline's sink DataStream, returning two DataStreams that each receive the same data from the sink.

This is useful for sending the same processed output to multiple consumers.

  • params: optional datastreams.Params for buffer sizing, etc.
  • returns two DataStreams of type U, each receiving the same data from the pipeline sink.
Example

ExamplePipeline_Tee demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then tees the output into two distinct streams.

errChan := make(chan error)
defer close(errChan)

// Create and open the pipeline
out1, out2 := New[int, int](
	context.Background(),
	sources.FromArray([]int{1, 2, 3, 4, 5}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
		return p.Run(func(v int) (int, error) {
			return v * 2, nil
		})
	},
).Tee(datastreams.Params{BufferSize: 5})

// Collect and print the results from both outputs
for out := range out1.Out() {
	fmt.Println("out1:", out)
}
for out := range out2.Out() {
	fmt.Println("out2:", out)
}
Output:

out1: 2
out1: 4
out1: 6
out1: 8
out1: 10
out2: 2
out2: 4
out2: 6
out2: 8
out2: 10

func (*Pipeline[T, U]) ToSource

func (p *Pipeline[T, U]) ToSource() datastreams.Sourcer[U]

ToSource converts the pipeline's sink into a datastreams.Sourcer[U], allowing it to be used as a source in another pipeline. This is useful when you want to take the output of one pipeline and use it as the input for another pipeline.

Example

ExamplePipeline_ToSource demonstrates how to turn a Pipeline into a source to be used in another Pipeline.

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
errChan := make(chan error, 1)
defer close(errChan)

source := New[int, string](
	ctx,
	sources.FromArray([]int{1, 2, 3}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		return datastreams.Map(
			p,
			func(i int) (string, error) { return string(rune('A' + i)), nil },
		).Run(
			func(s string) (string, error) {
				return "processed " + s, nil
			},
		)
	},
).ToSource()

for v := range source.Source(ctx, errChan).Out() {
	fmt.Println(v)
}
Output:

processed B
processed C
processed D

func (*Pipeline[T, U]) Wait

func (p *Pipeline[T, U]) Wait()

Wait blocks until the pipeline's source has consumed all messages or the context is canceled.

This method helps ensure you process all items before shutting down. Once Wait returns, the Pipeline is effectively drained.

type Pipelines added in v0.0.9

type Pipelines[T any, U any] []*Pipeline[T, U]

Pipelines represents a collection of Pipeline instances, designed to simplify the management of concurrent, parallel data processing workflows. It is typically created when a single data source is split into multiple streams using methods like Broadcast.

By grouping related pipelines, this type allows for collective operations such as starting, stopping, or waiting for all of them with a single method call. This is particularly useful for scaling up processing by distributing work across multiple concurrent workers that share the same processing logic.

Example usage:

pipeline := mainPipeline.Broadcast(3)
pipelines.Start(myProcessingFunc)
pipelines.Wait()

func (Pipelines[T, U]) Close added in v0.0.9

func (p Pipelines[T, U]) Close()

Close gracefully shuts down all pipelines in the collection by canceling their underlying contexts.

func (Pipelines[T, U]) Count added in v0.0.9

func (p Pipelines[T, U]) Count() int

Count returns the number of pipelines in the collection.

func (Pipelines[T, U]) Get added in v0.0.9

func (p Pipelines[T, U]) Get(index int) (*Pipeline[T, U], error)

Get returns the pipeline at the specified index. It returns an error if the index is out of bounds.

func (Pipelines[T, U]) Process added in v0.0.9

func (p Pipelines[T, U]) Process(
	processor datastreams.ProcessFunc[T],
	params ...datastreams.Params,
) Pipelines[T, U]

Process applies a processing function to each pipeline in the collection, adding a new processing stage to each one. This is a convenient way to apply the same transformation logic to multiple parallel streams, such as those created by Broadcast.

Each pipeline in the collection is replaced by a new pipeline that includes the additional processing step. The method returns the modified collection to allow for method chaining.

Example:

// p is a single pipeline from which we broadcast.
workers := p.Broadcast(2) // Create 2 worker pipelines.

// Define a processor that doubles an integer.
double := func(i int) (int, error) {
	return i * 2, nil
}

// Apply the processor to all worker pipelines.
workers.Process(double)

// The pipelines in the 'workers' collection now each have an additional
// stage that doubles the numbers passing through them.

Parameters:

  • processor: a datastreams.ProcessFunc[T] to apply to each item.
  • params: optional datastreams.Params to configure the processing stage, i.e., for setting concurrency.

func (Pipelines[T, U]) Start added in v0.0.9

func (p Pipelines[T, U]) Start(streamFunc StreamFunc[T, U]) Pipelines[T, U]

Start applies a StreamFunc to each pipeline in the collection, initiating the data processing flow. It is typically used after a Broadcast to run the same logic in parallel across multiple pipelines.

The method returns the collection itself to allow for method chaining.

Example:

// p is a single pipeline from which we broadcast
workers := p.Broadcast(3) // Create 3 worker pipelines

// Define the processing logic for each worker
processingFunc := func(stream datastreams.DataStream[int]) datastreams.DataStream[int] {
	// For example, map values to multiply by 2
	return datastreams.Map(stream, func(i int) int {
		return i * 2
	})
}

// Start all workers with the defined logic
workers.Start(processingFunc)

// Now, results can be gathered from each pipeline in the 'workers' collection.

func (Pipelines[T, U]) Wait added in v0.0.9

func (p Pipelines[T, U]) Wait()

Wait blocks until all pipelines in the collection have finished processing. This is useful for ensuring that all data has been consumed before the application exits.

type StreamFunc

type StreamFunc[T any, U any] func(stream datastreams.DataStream[T]) datastreams.DataStream[U]

StreamFunc is a function that takes a datastreams.DataStream and returns a datastreams.DataStream.

A StreamFunc represents a logical segment of a pipeline, where data can be consumed, transformed, filtered, or otherwise processed. The output of a StreamFunc is another DataStream that can be further chained.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL