pipelines

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

README

Pipelines

The pipelines module is a Go library designed to facilitate the creation and management of:

  1. Data processing pipelines
  2. Reactive streaming applications leveraging Go's concurrency primitives

It provides a set of tools for flow control, error handling, and pipeline processes. Under the hood, it uses Go's channels and goroutines to enable concurrency at each stage of the pipeline.

Setup

To get started with the pipelines module, follow these steps:

  1. Install the pipelines module:

    go get github.com/elastiflow/pipelines
    
  2. (Optional) To view local documentation via godoc:

    go install -v golang.org/x/tools/cmd/godoc@latest
    make docs
    
  3. Once running, visit GoDocs to view the latest documentation locally.

Real World Applicability

When to Use
  • ETL (Extract, Transform, Load) style scenarios where data arrives in a stream, and you want to apply transformations or filtering in a concurrent manner.
  • Complex concurrency flows: easily fan out, fan in, or broadcast data streams.
  • Reactive Streaming Applications: serves as a light framework for Go native reactive streaming applications.
Channel Management
  • Sources (e.g. FromArray, FromChannel) produce data into a channel.
  • DataStream transformations (e.g. Run, Filter, Map) read from inbound channels and write results to outbound channels.
  • Sinks (e.g. ToChannel) consume data from the final output channels.
  • Each method typically spins up one or more goroutines which connect these channels together, allowing parallel processing.

High-Level Details

Pipeline

A pipeline is a series of data processing stages connected by channels. Each stage (datastreams.DataStream) is a function that performs a specific task and passes its output to the next stage. The pipelines module provides a flexible way to define and manage these stages.

DataStream

The datastreams.DataStream struct is the core of the pipelines module. It manages the flow of data through the pipeline stages and handles errors according to the provided parameters.

Key Components
Functions
  • ProcessFunc A user-defined function type used in a given DataStream stage via the DataStream.Run() method. For instance:

    ds = ds.Run(func(v int) (int, error) {
        if v < 0 {
            return 0, fmt.Errorf("negative number: %d", v)
        }
        return v + 1, nil
    })
    
  • TransformFunc A user-defined function type func(T) (U, error) used with the Map() method to convert from type T to a different type U.
    For instance:

    ds = ds.Map(func(i int) (string, error) {
        return fmt.Sprintf("Number: %d", i), nil
    })
    
  • FilterFunc A user-defined function type func(T) (bool, error) used with the Filter() method to decide if an item should pass through (true) or be dropped (false). For instance:

    ds = ds.Filter(func(i int) (bool, error) {
        return i % 2 == 0, nil
    })
    
Sources
  • FromArray([]T): Convert a Go slice/array into a Sourcer
  • FromChannel(<-chan T): Convert an existing channel into a Sourcer
  • FromDataStream(DataStream[T]): Convert an existing DataStream into a Sourcer
Sinks
  • ToChannel(chan<- T): Write DataStream output into a channel
Methods
  • Run(ProcessFunc[T]) DataStream[T]: Process each item with a user function
  • Filter(FilterFunc[T]) DataStream[T]: Filter items by user-defined condition
  • Map(TransformFunc[T,U]) DataStream[U]: Transform each item from T to U
  • FanOut() DataStream[T]: Create multiple parallel output channels
  • FanIn() DataStream[T]: Merge multiple channels into one
  • Broadcast() DataStream[T]: Duplicate each item to multiple outputs
  • Tee() (DataStream[T], DataStream[T]): Split into two DataStreams
  • Take(Params{Num: N}) DataStream[T]: Take only N items
  • OrDone() DataStream[T]: Terminates if upstream is closed
  • Out() <-chan T: Underlying output channel
  • Sink(Sinker[T]) DataStream[T]: Push items to a sink
Method Params
  • Params: Used to pass arguments into DataStream methods.
    • Options
      • SkipError (bool): If true, any error in ProcessFunc / TransformFunc / FilterFunc causes that item to be skipped rather than stopping the pipeline.
      • Num (int): Used by methods like FanOut, Broadcast, and Take to specify how many parallel channels or how many items to consume.
      • BufferSize (int): Controls the size of the buffered channels created for that stage. Larger buffers can reduce blocking but use more memory.
      • SegmentName (string): Tag a pipeline stage name, useful for logging or debugging errors (e.g. “segment: broadcast-2”).
Examples

Below is an example of how to use the pipelines module to create simple pipelines. Additional examples can be found in the godocs.

Squaring Numbers

This example demonstrates how to set up a pipeline that takes a stream of integers, squares each odd integer, and outputs the results.

package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

func createIntArr(num int) []int {
	var arr []int
	for i := 0; i < num; i++ {
		arr = append(arr, i)
	}
	return arr
}

func squareOdds(v int) (int, error) {
	if v%2 == 0 {
		return v, fmt.Errorf("even number error: %v", v)
	}
	return v * v, nil
}

func exProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
	return p.OrDone().FanOut(
		datastreams.Params{Num: 2},
	).Run(
		squareOdds,
	)
}

func main() {
	errChan := make(chan error, 10)
	defer close(errChan)

	pl := pipelines.New[int, int]( // Create a new Pipeline
		context.Background(),
		sources.FromArray(createIntArr(10)), // Create a source to start the pipeline
		errChan,
	).Start(exProcess)

	go func(errReceiver <-chan error) { // Handle Pipeline errors
		defer pl.Close()
		for err := range errReceiver {
			if err != nil {
				slog.Error("demo error: " + err.Error())
				// return if you wanted to close the pipeline during error handling.
			}
		}
	}(pl.Errors())
	for out := range pl.Out() { // Read Pipeline output
		slog.Info("received simple pipeline output", slog.Int("out", out))
	}
}

Contributing

We welcome your contributions! Please see our Contributing Guide for details on how to open issues, submit pull requests, and propose new features.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.

Documentation

Overview

Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.

The library uses channels under the hood to pass data between pipeline stages. Each stage runs in its own goroutine, ensuring concurrency and separation of concerns.

Below is an example of an application utilizing pipelines for squaring an odd int and managing shared state counters:

	package yourpipeline

	import (
	    "context"
	    "fmt"
	    "log/slog"
	    "sync"

	    "github.com/elastiflow/pipelines"
	    "github.com/elastiflow/pipelines/datastreams"
	    "github.com/elastiflow/pipelines/datastreams/sources"
	)

	// PipelineWrapper is an example of a pipelines.Pipeline wrapper implementation. It includes shared state via counters.
	type PipelineWrapper struct {
	    mu          sync.Mutex
	    errChan     chan error
	    evenCounter int
	    oddCounter  int
	}

	// NewPipelineWrapper creates a new PipelineWrapper with counters set to 0
	func NewPipelineWrapper() *PipelineWrapper {
	    // Setup channels and return PipelineWrapper
	    errChan := make(chan error, 10)
	    return &PipelineWrapper{
	        errChan:     errChan,
	        evenCounter: 0,
	        oddCounter:  0,
	    }
	}

	// Run the PipelineWrapper
	func (pl *PipelineWrapper) Run() {
	    defer close(pl.errChan)

	    pipeline := pipelines.New[int, int]( // Create a new Pipeline
	        context.Background(),
	        sources.FromArray(createIntArr(10)), // Create a source to start the pipeline
	        pl.errChan,
	    ).Start(pl.exampleProcess)

	    go func(errReceiver <-chan error) { // Handle Pipeline errors
	        defer pipeline.Close()
	        for err := range errReceiver {
	            if err != nil {
	                slog.Error("demo error: " + err.Error())
	                // return // if you wanted to close the pipeline during error handling.
	            }
	        }
	    }(pl.errChan)

	    for out := range pipeline.Out() { // Read Pipeline output
	        slog.Info("received simple pipeline output", slog.Int("out", out))
	    }
	}

	func (pl *PipelineWrapper) squareOdds(v int) (int, error) {
	    if v%2 == 0 {
	        pl.mu.Lock()
	        pl.evenCounter++
	        pl.mu.Unlock()
	        return v, fmt.Errorf("even number error: %v", v)
	    }
	    pl.mu.Lock()
	    pl.oddCounter++
	    pl.mu.Unlock()
	    return v * v, nil
	}

	func (pl *PipelineWrapper) exampleProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
     // datastreams.DataStream.OrDone will stop the pipeline if the input channel is closed
	    return p.OrDone().FanOut(
	        datastreams.Params{Num: 2}, // datastreams.DataStream.FanOut will run subsequent ds.Pipe stages in parallel
	    ).Run(
	        pl.squareOdds,  // datastreams.DataStream.Run will execute the ds.Pipe process: "squareOdds"
	    ) // datastreams.DataStream.Out automatically FanIns to a single output channel if needed
	}

	func createIntArr(num int) []int {
	    var arr []int
	    for i := 0; i < num; i++ {
	        arr = append(arr, i)
	    }
	    return arr
	}

Package pipelines provides a set of utilities for creating and managing concurrent data processing pipelines in Go.

The package includes various functions to create, manipulate, and control the flow of data through channels, allowing for flexible and efficient data processing.

The main components of this package are: - Pipeline: A struct that defines a generic connection of data streams. - DataStream (in subpackage "datastreams"): Provides the methods to build concurrency stages.

Pipelines work by connecting a "Source" (an upstream data producer) with an optional chain of transformations or filters before optionally "Sinking" (sending the output to a consumer). Under the hood, all data flows through Go channels with concurrency managed by goroutines. Each transformation or filter is effectively run in parallel, communicating via channels.

For more in-depth usage, see the examples below and the doc.go file.

Example (Duplicate)
package main

import (
	"context"
	"log/slog"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// createIntArr creates a simple slice of int values from 0..num-1.
func createIntArr(num int) []int {
	var arr []int
	for i := 0; i < num; i++ {
		arr = append(arr, i)
	}
	return arr
}

// duplicateProcess demonstrates a pipeline stage that broadcasts each value to two streams
// and then fans them back in (duplicates).
func duplicateProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
	// Broadcasting by 2 then fanning in merges them back, effectively duplicating each item.
	return p.Broadcast(
		datastreams.Params{Num: 2},
	).FanIn()
}

func main() {
	errChan := make(chan error)
	pl := pipelines.New[int, int]( // Create a new Pipeline
		context.Background(),
		sources.FromArray(createIntArr(10)), // Create a new Source from slice
		errChan,
	).Start(duplicateProcess)
	defer func() {
		close(errChan)
		pl.Close()
	}()

	// Handle pipeline errors in a separate goroutine
	go func(errReceiver <-chan error) {
		for err := range errReceiver {
			if err != nil {
				slog.Error("demo error: " + err.Error())
				return
			}
		}
	}(errChan)

	// Read pipeline output
	for out := range pl.Out() {
		slog.Info("received simple pipeline output", slog.Int("out", out))
	}

	// Output (example):
	//   received simple pipeline output out=0
	//   received simple pipeline output out=0
	//   received simple pipeline output out=1
	//   received simple pipeline output out=1
	//   ...
	//   received simple pipeline output out=9
	//   received simple pipeline output out=9
}
Example (Filter)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// filter returns true if the input int is even, false otherwise.
func filter(p int) (bool, error) {
	return p%2 == 0, nil
}

// mapFunc transforms an even integer into a descriptive string.
func mapFunc(p int) (string, error) {
	return fmt.Sprintf("I'm an even number: %d", p), nil
}

func main() {
	errChan := make(chan error, 10)
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer func() {
		close(errChan)
		cancel()
	}()

	// Build a pipeline that fans out, then filters even numbers, then maps to string
	pl := pipelines.New[int, string](
		ctx,
		sources.FromArray(createIntArr(10)),
		errChan,
	).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		return datastreams.Map(
			p.FanOut(
				datastreams.Params{Num: 3},
			).Filter(
				filter,
			),
			mapFunc,
		)
	})

	for val := range pl.Out() {
		slog.Info("received simple pipeline output", slog.String("out", val))
	}

	// Output (example):
	// {"out":"I'm an even number: 0"}
	// {"out":"I'm an even number: 2"}
	// {"out":"I'm an even number: 4"}
	// {"out":"I'm an even number: 6"}
	// {"out":"I'm an even number: 8"}
}
Example (SourceSink)

Example_sourceSink constructs and starts the Pipeline

package main

import (
	"context"
	"fmt"
	"log/slog"
	"sync"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sinks"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// PipelineWrapper is an example struct embedding a pipeline with shared counters.
//
// The pipeline will read from a channel source, process data, and sink into
// another channel. The "evenCounter" and "oddCounter" track how many evens
// or odds we've encountered.
type PipelineWrapper struct {
	mu          sync.Mutex
	errChan     chan error
	evenCounter int
	oddCounter  int
	pipeline    *pipelines.Pipeline[int, int]
}

// NewPipelineWrapper initializes a PipelineWrapper.
func NewPipelineWrapper() *PipelineWrapper {
	errChan := make(chan error, 10)
	return &PipelineWrapper{
		errChan:     errChan,
		evenCounter: 0,
		oddCounter:  0,
	}
}

// squareOdds increments counters, squares odd numbers, and returns an error on even numbers.
func (pl *PipelineWrapper) squareOdds(v int) (int, error) {
	if v%2 == 0 {
		pl.mu.Lock()
		pl.evenCounter++
		pl.mu.Unlock()
		return v, fmt.Errorf("even number error: %v", v)
	}
	pl.mu.Lock()
	pl.oddCounter++
	pl.mu.Unlock()
	return v * v, nil
}

// exampleProcess shows the entire pipeline flow within a single method,
// including how to produce data (source) and consume it (sink).
func (pl *PipelineWrapper) exampleProcess(ctx context.Context) {
	// 1) Create channels for input and output.
	inChan := make(chan int, 10)
	outChan := make(chan int, 10)

	// 2) Build the pipeline: inChan -> (FanOut + squareOdds) -> sink -> outChan
	pl.pipeline = pipelines.New[int, int](
		ctx,
		sources.FromChannel(inChan), // source
		pl.errChan,
	).Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] {
		return ds.OrDone().FanOut(
			datastreams.Params{Num: 2},
		).Run(
			pl.squareOdds,
		).Sink( // Sink
			sinks.ToChannel(outChan),
		)
	})

	var wg sync.WaitGroup
	wg.Add(10)

	// 3) Feed data into source inChan in a separate goroutine.
	go func() {
		for _, val := range createIntArr(10) {
			inChan <- val
		}
		close(inChan)
	}()

	// 4) Read results from sink outChan
	go func() {
		for out := range outChan {
			slog.Info("received simple pipeline output", slog.Int("out", out))
			wg.Done()
		}
	}()
	wg.Wait()
	pl.pipeline.Close()
}

// Run creates a background error handler, then calls exampleProcess.
func (pl *PipelineWrapper) Run() {
	defer close(pl.errChan)

	// Handle pipeline errors
	go func(errReceiver <-chan error) {
		for err := range errReceiver {
			if err != nil {
				slog.Error("pipeline error: " + err.Error())
			}
		}
	}(pl.errChan)

	// Run the pipeline flow
	pl.exampleProcess(context.Background())

	// Inspect counters after pipeline completes
	slog.Info("pipeline counters",
		slog.Int("evenCounter", pl.evenCounter),
		slog.Int("oddCounter", pl.oddCounter),
	)
}

// Example_sourceSink constructs and starts the Pipeline
func main() {
	pl := NewPipelineWrapper()
	pl.Run()
}
Example (Transformations)
package main

import (
	"context"
	"fmt"
	"log/slog"

	"github.com/elastiflow/pipelines"
	"github.com/elastiflow/pipelines/datastreams"
	"github.com/elastiflow/pipelines/datastreams/sources"
)

// squareOdds is a basic function to demonstrate transformations in the pipeline.
func squareOdds(v int) (int, error) {
	return v * v, nil
}

// mapTransformFunc formats the squared integer as a string.
func mapTransformFunc(p int) (string, error) {
	return fmt.Sprintf("I'm a squared number: %d", p), nil
}

func main() {
	inChan := make(chan int) // Setup channels
	errChan := make(chan error, 10)
	defer func() {
		close(inChan)
		close(errChan)
	}()

	// Create a new Pipeline of int->string
	pl := pipelines.New[int, string](
		context.Background(),
		sources.FromArray(createIntArr(10)),
		errChan,
	).Start(func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		// OrDone -> FanOut(2) -> Run (squareOdds) -> Map to string
		return datastreams.Map(
			p.OrDone().FanOut(
				datastreams.Params{Num: 2},
			).Run(
				squareOdds,
			),
			mapTransformFunc,
		)
	})

	// Handle errors
	go func(errReceiver <-chan error) {
		defer pl.Close()
		for err := range errReceiver {
			if err != nil {
				slog.Error("demo error: " + err.Error())
				// return // if you wanted to close the pipeline during error handling.
			}
		}
	}(pl.Errors())

	// Read pipeline output
	for out := range pl.Out() {
		slog.Info("received simple pipeline output", slog.String("out", out))
	}

	// Output (example):
	// {"out":"I'm a squared number: 0"}
	// {"out":"I'm a squared number: 1"}
	// {"out":"I'm a squared number: 4"}
	// {"out":"I'm a squared number: 9"}
	// ...
	// {"out":"I'm a squared number: 81"}
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Opts

type Opts struct {
	BufferSize int
}

Opts defines optional configuration parameters for certain pipeline operations. Currently unused in this example, but reserved for future expansions.

type Pipeline

type Pipeline[T any, U any] struct {
	// contains filtered or unexported fields
}

Pipeline is a struct that defines a generic stream process.

Pipeline[T, U] represents a flow of data of type T at the input, ultimately producing data of type U at the output. Under the hood, a Pipeline orchestrates DataStream stages connected by channels.

Usage typically begins by calling New(...) to create a pipeline with a Source, and then applying transformations via Map, Process, or Start/Stream with a StreamFunc. Finally, you can read from the Out() channel, or Sink the output via a Sinker.

func New

func New[T any, U any](
	ctx context.Context,
	sourcer datastreams.Sourcer[T],
	errStream chan error,
) *Pipeline[T, U]

New constructs a new Pipeline of a given type by passing in a datastreams.Sourcer.

  • ctx: a context.Context used to cancel or manage the lifetime of the pipeline
  • sourcer: a datastreams.Sourcer[T] that provides the initial data stream
  • errStream: an error channel to which the pipeline can send errors

Example usage:

p := pipelines.New[int, int](context.Background(), someSource, errChan)
Example

ExampleNew demonstrates a minimal pipeline creation and usage.

errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)

// Create a new Pipeline with int -> int
pl := New[int, int](
	context.Background(),
	sources.FromChannel(inChan),
	errChan,
)

// Provide data on inChan
go func() {
	for i := 1; i <= 5; i++ {
		inChan <- i
	}
	close(inChan)
}()

// Process the data: multiply by 2
pl.Start(func(ds datastreams.DataStream[int]) datastreams.DataStream[int] {
	return ds.Run(func(v int) (int, error) {
		return v * 2, nil
	})
})

// Read pipeline output until closed
for val := range pl.Out() {
	fmt.Println("Result:", val)
}
Output:
Result: 2
Result: 4
Result: 6
Result: 8
Result: 10

func (*Pipeline[T, U]) Close

func (p *Pipeline[T, U]) Close()

Close gracefully closes a Pipeline by canceling its internal context. This signals all data streams to terminate reading or writing to channels.

func (*Pipeline[T, U]) Errors

func (p *Pipeline[T, U]) Errors() <-chan error

Errors returns the error channel of the Pipeline.

This channel receives errors from any stage (e.g. transformations, filters, sinks). The caller should consume from it to handle errors appropriately.

func (*Pipeline[T, U]) In

func (p *Pipeline[T, U]) In() <-chan T

In returns the input channel of the source datastreams.DataStream of a Pipeline.

This channel can be read from externally if needed, though typically one supplies a Sourcer when constructing a Pipeline. Use with care if manually sending data into the pipeline.

func (*Pipeline[T, U]) Map

func (p *Pipeline[T, U]) Map(
	mapper datastreams.TransformFunc[T, U],
	params ...datastreams.Params,
) datastreams.DataStream[U]

Map creates a new DataStream by applying a mapper function (TransformFunc) to each message in the pipeline's source. This is a convenience method that directly calls datastreams.Map on the pipeline's source.

  • mapper: a TransformFunc[T, U] that takes an input T and returns output U
  • params: optional datastreams.Params to configure buffer sizes, skipping errors, etc.
Example

ExamplePipeline_Map demonstrates how to create and use a simple Pipeline that maps one type (int) to another (string).

errChan := make(chan error)
defer close(errChan)

pl := New[int, string]( // Create a new Pipeline
	context.Background(),
	sources.FromArray([]int{1, 2, 3, 4, 5}),
	errChan,
).Map(
	func(p int) (string, error) {
		return fmt.Sprintf("Im a string now: %d", p), nil
	},
)

for out := range pl.Out() { // Read Pipeline output
	fmt.Println("out:", out)
}
Output:
out: Im a string now: 1
out: Im a string now: 2
out: Im a string now: 3
out: Im a string now: 4
out: Im a string now: 5

func (*Pipeline[T, U]) Out

func (p *Pipeline[T, U]) Out() <-chan U

Out returns the output channel (sink) of the pipeline.

Reading from this channel lets you consume the final processed data of type U.

func (*Pipeline[T, U]) Process

func (p *Pipeline[T, U]) Process(
	processor datastreams.ProcessFunc[T],
	params ...datastreams.Params,
) *Pipeline[T, U]

Process creates a new pipeline by applying a ProcessFunc to each message in the current pipeline's source. It internally starts a new pipeline with the processed DataStream. This is a convenience for quickly chaining transformations.

  • processor: a ProcessFunc[T] that may transform T in place
  • params: optional datastreams.Params

func (*Pipeline[T, U]) Sink

func (p *Pipeline[T, U]) Sink(sinker datastreams.Sinker[U]) error

Sink consumes the pipeline's sink DataStream using a specified datastreams.Sinker.

  • sinker: a Sinker that will receive data of type U
  • returns an error if the sink fails; otherwise nil.

Typically used to push output to a custom location or channel.

Example

ExamplePipeline_Sink demonstrates how to create and use a simple Pipeline that sinks the output to a sinks.ToChannel.

errChan := make(chan error)
outChan := make(chan string)
var wg sync.WaitGroup
wg.Add(5)
go func() {
	for out := range outChan { // Read Pipeline output
		fmt.Println("out:", out)
		wg.Done()
	}
}()

if err := New[int, string]( // Create a new Pipeline
	context.Background(),
	sources.FromArray([]int{1, 2, 3, 4, 5}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		return datastreams.Map(
			p,
			func(p int) (string, error) { return fmt.Sprintf("Im a string now: %d", p), nil },
		)
	},
).Sink(
	sinks.ToChannel(outChan),
); err != nil {
	fmt.Println("error sinking:", err)
}
wg.Wait()
Output:
out: Im a string now: 1
out: Im a string now: 2
out: Im a string now: 3
out: Im a string now: 4
out: Im a string now: 5

func (*Pipeline[T, U]) Start

func (p *Pipeline[T, U]) Start(streamFunc StreamFunc[T, U]) *Pipeline[T, U]

Start applies the given StreamFunc to the pipeline's source and returns the Pipeline itself. This is useful for chaining multiple calls on the pipeline while still returning the Pipeline.

Example:

pipeline.Start(func(ds DataStream[int]) DataStream[int] { ... }).Start(...)
Example

ExamplePipeline_Start demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output Pipeline.

errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)

// Create and open the pipeline
pl := New[int, int](
	context.Background(),
	sources.FromChannel(inChan, sources.Params{BufferSize: 5}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
		return p.Run(
			func(v int) (int, error) {
				return v * 2, nil
			},
			datastreams.Params{BufferSize: 5},
		)
	},
)

// Send values to the pipeline's source channel
go func() {
	for _, val := range []int{1, 2, 3, 4, 5} {
		inChan <- val
	}
	close(inChan)
}()

for out := range pl.Out() {
	fmt.Println("out:", out)
}
Output:
out: 2
out: 4
out: 6
out: 8
out: 10

func (*Pipeline[T, U]) Stream

func (p *Pipeline[T, U]) Stream(streamFunc StreamFunc[T, U]) datastreams.DataStream[U]

Stream applies a StreamFunc to the pipeline's source, storing the resulting DataStream as the pipeline's sink (final stage). It then returns that sink DataStream for further chaining.

Typically used for quickly connecting a pipeline to a processing function.

Example:

pipeline.Stream(func(ds DataStream[int]) DataStream[int] { ... })
Example

ExamplePipeline_Stream demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then returns an output DataStream.

errChan := make(chan error, 3)
defer close(errChan)
inChan := make(chan int, 5)

// Create and open the pipeline
ds := New[int, int](
	context.Background(),
	sources.FromChannel(inChan, sources.Params{BufferSize: 5}),
	errChan,
).Stream(
	func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
		return p.Run(
			func(v int) (int, error) {
				return v * 2, nil
			},
			datastreams.Params{BufferSize: 5},
		)
	},
)

// Send values to the pipeline's source channel
go func() {
	for _, val := range []int{1, 2, 3, 4, 5} {
		inChan <- val
	}
	close(inChan)
}()

for out := range ds.Out() {
	fmt.Println("out:", out)
}
Output:
out: 2
out: 4
out: 6
out: 8
out: 10

func (*Pipeline[T, U]) Tee

func (p *Pipeline[T, U]) Tee(params ...datastreams.Params) (datastreams.DataStream[U], datastreams.DataStream[U])

Tee creates a fork in the pipeline's sink DataStream, returning two DataStreams that each receive the same data from the sink.

This is useful for sending the same processed output to multiple consumers.

  • params: optional datastreams.Params for buffer sizing, etc.
  • returns two DataStreams of type U, each receiving the same data from the pipeline sink.
Example

ExamplePipeline_Tee demonstrates how to create and use a simple Pipeline that processes integer inputs by doubling their values and then tees the output into two distinct streams.

errChan := make(chan error)
defer close(errChan)

// Create and open the pipeline
out1, out2 := New[int, int](
	context.Background(),
	sources.FromArray([]int{1, 2, 3, 4, 5}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[int] {
		return p.Run(func(v int) (int, error) {
			return v * 2, nil
		})
	},
).Tee(datastreams.Params{BufferSize: 5})

// Collect and print the results from both outputs
for out := range out1.Out() {
	fmt.Println("out1:", out)
}
for out := range out2.Out() {
	fmt.Println("out2:", out)
}
Output:
out1: 2
out1: 4
out1: 6
out1: 8
out1: 10
out2: 2
out2: 4
out2: 6
out2: 8
out2: 10

func (*Pipeline[T, U]) ToSource

func (p *Pipeline[T, U]) ToSource() datastreams.Sourcer[U]

ToSource converts the pipeline's sink into a datastreams.Sourcer[U], allowing it to be used as a source in another pipeline.

Example

ExamplePipeline_ToSource demonstrates how to turn a Pipeline into a source to be used in another Pipeline.

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
errChan := make(chan error, 1)
defer close(errChan)

source := New[int, string](
	ctx,
	sources.FromArray([]int{1, 2, 3}),
	errChan,
).Start(
	func(p datastreams.DataStream[int]) datastreams.DataStream[string] {
		return datastreams.Map(
			p,
			func(i int) (string, error) { return string(rune('A' + i)), nil },
		).Run(
			func(s string) (string, error) {
				return "processed " + s, nil
			},
		)
	},
).ToSource()

for v := range source.Source(ctx, errChan).Out() {
	fmt.Println(v)
}
Output:
processed B
processed C
processed D

func (*Pipeline[T, U]) Wait

func (p *Pipeline[T, U]) Wait()

Wait blocks until the pipeline's source has consumed all messages or the context is canceled.

This method helps ensure you process all items before shutting down. Once Wait returns, the Pipeline is effectively drained.

type StreamFunc

type StreamFunc[T any, U any] func(stream datastreams.DataStream[T]) datastreams.DataStream[U]

StreamFunc is a function that takes a datastreams.DataStream and returns a datastreams.DataStream.

A StreamFunc represents a logical segment of a pipeline, where data can be consumed, transformed, filtered, or otherwise processed. The output of a StreamFunc is another DataStream that can be further chained.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL