flow

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2025 License: MIT Imports: 6 Imported by: 38

Documentation

Overview

Package flow provides streams.Flow implementations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet streams.Outlet, inlet streams.Inlet)

DoStream streams data from the outlet to inlet.

func FanOut

func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow

FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.

func Flatten added in v0.8.0

func Flatten[T any](parallelism int) streams.Flow

Flatten creates a Flow to flatten the stream of slices. T specifies the outgoing element type, and the incoming element type is []T.

func Merge

func Merge(outlets ...streams.Flow) streams.Flow

Merge merges multiple flows into a single flow. When all specified outlets are closed, the resulting flow will close.

func RoundRobin added in v0.8.0

func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow

RoundRobin creates a balanced number of flows from the single outlet. This can be useful when work can be parallelized across multiple cores.

func Split

func Split[T any](outlet streams.Outlet, predicate func(T) bool) [2]streams.Flow

Split splits the stream into two flows according to the given boolean predicate. T specifies the incoming and outgoing element type.

func ZipWith added in v0.12.0

func ZipWith[T, R any](combine func([]T) R, outlets ...streams.Outlet) streams.Flow

ZipWith combines elements from multiple input streams using a combiner function. It returns a new Flow with the resulting values. The combiner function is called with a slice of elements, where each element is taken from each input outlet. The elements in the slice will be in the order of outlets. If an outlet is closed, its corresponding element in the slice will be the zero value. The returned Flow will close when all the input outlets are closed.

It will panic if provided less than two outlets, or if any of the outlets has an element type other than T.

Types

type Batch added in v0.10.0

type Batch[T any] struct {
	// contains filtered or unexported fields
}

Batch processor breaks a stream of elements into batches based on size or timing. When the maximum batch size is reached or the batch time is elapsed, and the current buffer is not empty, a new batch will be emitted. Note: once a batch is sent downstream, the timer will be reset. T indicates the incoming element type, and the outgoing element type is []T.

func NewBatch added in v0.10.0

func NewBatch[T any](maxBatchSize int, timeInterval time.Duration) *Batch[T]

NewBatch returns a new Batch operator using the specified maximum batch size and the time interval. T specifies the incoming element type, and the outgoing element type is []T.

NewBatch will panic if the maxBatchSize argument is not positive.

func (*Batch[T]) In added in v0.10.0

func (b *Batch[T]) In() chan<- any

In returns the input channel of the Batch operator.

func (*Batch[T]) Out added in v0.10.0

func (b *Batch[T]) Out() <-chan any

Out returns the output channel of the Batch operator.

func (*Batch[T]) To added in v0.10.0

func (b *Batch[T]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*Batch[T]) Via added in v0.10.0

func (b *Batch[T]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type Filter

type Filter[T any] struct {
	// contains filtered or unexported fields
}

Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FilterPredicate -------- ]

out -- 1 -- 2 ------------------ 5 --

func NewFilter

func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism int) *Filter[T]

NewFilter returns a new Filter operator. T specifies the incoming and the outgoing element type.

filterPredicate is a function that accepts an element of type T and returns true if the element should be included in the output stream, and false if it should be filtered out. parallelism specifies the number of goroutines to use for parallel processing. If the order of elements in the output stream must be preserved, set parallelism to 1.

NewFilter will panic if parallelism is less than 1.

func (*Filter[T]) In

func (f *Filter[T]) In() chan<- any

In returns the input channel of the Filter operator.

func (*Filter[T]) Out

func (f *Filter[T]) Out() <-chan any

Out returns the output channel of the Filter operator.

func (*Filter[T]) To

func (f *Filter[T]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*Filter[T]) Via

func (f *Filter[T]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type FilterPredicate added in v0.7.0

type FilterPredicate[T any] func(T) bool

FilterPredicate represents a filter predicate (boolean-valued function).

type FlatMap

type FlatMap[T, R any] struct {
	// contains filtered or unexported fields
}

FlatMap takes one element and produces zero, one, or more elements.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FlatMapFunction -------- ]

out -- 1' - 2' -------- 4'- 4" - 5' -

func NewFlatMap

func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism int) *FlatMap[T, R]

NewFlatMap returns a new FlatMap operator. T specifies the incoming element type, and the outgoing element type is []R.

flatMapFunction is the FlatMap transformation function. parallelism specifies the number of goroutines to use for parallel processing. If the order of elements in the output stream must be preserved, set parallelism to 1.

NewFlatMap will panic if parallelism is less than 1.

func (*FlatMap[T, R]) In

func (fm *FlatMap[T, R]) In() chan<- any

In returns the input channel of the FlatMap operator.

func (*FlatMap[T, R]) Out

func (fm *FlatMap[T, R]) Out() <-chan any

Out returns the output channel of the FlatMap operator.

func (*FlatMap[T, R]) To

func (fm *FlatMap[T, R]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*FlatMap[T, R]) Via

func (fm *FlatMap[T, R]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type FlatMapFunction added in v0.7.0

type FlatMapFunction[T, R any] func(T) []R

FlatMapFunction represents a FlatMap transformation function.

type Fold added in v0.13.0

type Fold[T, R any] struct {
	// contains filtered or unexported fields
}

Fold implements a "rolling" fold transformation on a data stream with an initial value. Combines the current element with the last folded value and emits the new value.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ ---------- FoldFunction ---------- ]

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewFold added in v0.13.0

func NewFold[T, R any](init R, foldFunction FoldFunction[T, R]) *Fold[T, R]

NewFold returns a new Fold operator. T specifies the incoming element type, and the outgoing element type is R.

init is the initial value for the folding process. foldFunction is the function that performs the fold transformation.

func (*Fold[T, R]) In added in v0.13.0

func (m *Fold[T, R]) In() chan<- any

In returns the input channel of the Fold operator.

func (*Fold[T, R]) Out added in v0.13.0

func (m *Fold[T, R]) Out() <-chan any

Out returns the output channel of the Fold operator.

func (*Fold[T, R]) To added in v0.13.0

func (m *Fold[T, R]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*Fold[T, R]) Via added in v0.13.0

func (m *Fold[T, R]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type FoldFunction added in v0.13.0

type FoldFunction[T, R any] func(T, R) R

FoldFunction represents a Fold transformation function.

type Keyed added in v0.11.0

type Keyed[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Keyed represents a flow where stream elements are partitioned by key using a provided key selector function.

func NewKeyed added in v0.11.0

func NewKeyed[K comparable, V any](
	keySelector func(V) K, operators ...func() streams.Flow,
) *Keyed[K, V]

NewKeyed returns a new Keyed operator. This operator splits an input stream into multiple sub-streams based on keys extracted from the elements using the keySelector function.

Each of these individual streams is then transformed by the provided chain of operators, and the results are sent to the output channel. Due to the concurrent processing of each keyed stream, the order of elements in the output channel is not deterministic and may not reflect the original order of elements in the input stream.

Each operator supplier must return a new instance of the flow to ensure that each keyed stream has its own independent state.

Example:

newSlidingWindow := func() streams.Flow {
	return flow.NewSlidingWindow[event](10*time.Second, time.Second)
}

newMap := func() streams.Flow {
	return flow.NewMap(func(events []event) event {
		return events[len(events)-1]
	}, 1)
}

keyed := flow.NewKeyed(func(e event) string {
	return e.serial
}, newSlidingWindow, newMap)

If no operators are provided, NewKeyed will panic.

func (*Keyed[K, V]) In added in v0.11.0

func (k *Keyed[K, V]) In() chan<- any

In returns the input channel of the Keyed operator.

func (*Keyed[K, V]) Out added in v0.11.0

func (k *Keyed[K, V]) Out() <-chan any

Out returns the output channel of the Keyed operator.

func (*Keyed[K, V]) To added in v0.11.0

func (k *Keyed[K, V]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*Keyed[K, V]) Via added in v0.11.0

func (k *Keyed[K, V]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type Map

type Map[T, R any] struct {
	// contains filtered or unexported fields
}

Map takes one element and produces one element.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ ---------- MapFunction ---------- ]

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap[T, R any](mapFunction MapFunction[T, R], parallelism int) *Map[T, R]

NewMap returns a new Map operator. T specifies the incoming element type, and the outgoing element type is R.

mapFunction is the Map transformation function. parallelism specifies the number of goroutines to use for parallel processing. If the order of elements in the output stream must be preserved, set parallelism to 1.

NewMap will panic if parallelism is less than 1.

func (*Map[T, R]) In

func (m *Map[T, R]) In() chan<- any

In returns the input channel of the Map operator.

func (*Map[T, R]) Out

func (m *Map[T, R]) Out() <-chan any

Out returns the output channel of the Map operator.

func (*Map[T, R]) To

func (m *Map[T, R]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*Map[T, R]) Via

func (m *Map[T, R]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type MapFunction added in v0.7.0

type MapFunction[T, R any] func(T) R

MapFunction represents a Map transformation function.

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough retransmits incoming elements downstream as they are.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --

func NewPassThrough

func NewPassThrough() *PassThrough

NewPassThrough returns a new PassThrough operator.

func (*PassThrough) In

func (pt *PassThrough) In() chan<- any

In returns the input channel of the PassThrough operator.

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan any

Out returns the output channel of the PassThrough operator.

func (*PassThrough) To

func (pt *PassThrough) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*PassThrough) Via

func (pt *PassThrough) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type Reduce added in v0.8.0

type Reduce[T any] struct {
	// contains filtered or unexported fields
}

Reduce implements a “rolling” reduce transformation on a data stream. Combines the current element with the last reduced value and emits the new value.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ --------- ReduceFunction --------- ]

out -- 1 -- 2' --- 3' - 4' ----- 5' -

func NewReduce added in v0.8.0

func NewReduce[T any](reduceFunction ReduceFunction[T]) *Reduce[T]

NewReduce returns a new Reduce operator. T specifies the incoming and the outgoing element type.

reduceFunction combines the current element with the last reduced value.

func (*Reduce[T]) In added in v0.8.0

func (r *Reduce[T]) In() chan<- any

In returns the input channel of the Reduce operator.

func (*Reduce[T]) Out added in v0.8.0

func (r *Reduce[T]) Out() <-chan any

Out returns the output channel of the Reduce operator.

func (*Reduce[T]) To added in v0.8.0

func (r *Reduce[T]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*Reduce[T]) Via added in v0.8.0

func (r *Reduce[T]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type ReduceFunction added in v0.8.0

type ReduceFunction[T any] func(T, T) T

ReduceFunction combines the current element with the last reduced value.

type SessionWindow added in v0.7.0

type SessionWindow[T any] struct {
	// contains filtered or unexported fields
}

SessionWindow generates groups of elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time. T indicates the incoming element type, and the outgoing element type is []T.

func NewSessionWindow added in v0.7.0

func NewSessionWindow[T any](inactivityGap time.Duration) *SessionWindow[T]

NewSessionWindow returns a new SessionWindow operator. T specifies the incoming element type, and the outgoing element type is []T.

inactivityGap is the gap of inactivity that closes a session window when occurred.

func (*SessionWindow[T]) In added in v0.7.0

func (sw *SessionWindow[T]) In() chan<- any

In returns the input channel of the SessionWindow operator.

func (*SessionWindow[T]) Out added in v0.7.0

func (sw *SessionWindow[T]) Out() <-chan any

Out returns the output channel of the SessionWindow operator.

func (*SessionWindow[T]) To added in v0.7.0

func (sw *SessionWindow[T]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*SessionWindow[T]) Via added in v0.7.0

func (sw *SessionWindow[T]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type SlidingWindow

type SlidingWindow[T any] struct {
	// contains filtered or unexported fields
}

SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows. T indicates the incoming element type, and the outgoing element type is []T.

func NewSlidingWindow

func NewSlidingWindow[T any](windowSize, slidingInterval time.Duration) *SlidingWindow[T]

NewSlidingWindow returns a new SlidingWindow operator based on processing time. Processing time refers to the system time of the machine that is executing the respective operation. T specifies the incoming element type, and the outgoing element type is []T.

windowSize is the duration of each full window. slidingInterval is the interval at which new windows are created and emitted.

NewSlidingWindow panics if slidingInterval is larger than windowSize.

func NewSlidingWindowWithOpts added in v0.13.0

func NewSlidingWindowWithOpts[T any](
	windowSize, slidingInterval time.Duration, opts SlidingWindowOpts[T]) *SlidingWindow[T]

NewSlidingWindowWithOpts returns a new SlidingWindow operator configured with the provided configuration options. T specifies the incoming element type, and the outgoing element type is []T.

windowSize is the duration of each full window. slidingInterval is the interval at which new windows are created and emitted. opts are the sliding window configuration options.

NewSlidingWindowWithOpts panics if slidingInterval is larger than windowSize, or the allowed lateness is larger than slidingInterval.

func (*SlidingWindow[T]) In

func (sw *SlidingWindow[T]) In() chan<- any

In returns the input channel of the SlidingWindow operator.

func (*SlidingWindow[T]) Out

func (sw *SlidingWindow[T]) Out() <-chan any

Out returns the output channel of the SlidingWindow operator.

func (*SlidingWindow[T]) To

func (sw *SlidingWindow[T]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*SlidingWindow[T]) Via

func (sw *SlidingWindow[T]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type SlidingWindowOpts added in v0.13.0

type SlidingWindowOpts[T any] struct {
	// EventTimeExtractor is a function that extracts the event time from an element.
	// Event time is the time at which the event occurred on its producing device.
	// Using event time enables correct windowing even when events arrive out of order
	// or with delays.
	//
	// If EventTimeExtractor is not specified, processing time is used. Processing time
	// refers to the system time of the machine executing the window operation.
	EventTimeExtractor func(T) time.Time
	// EmitPartialWindow determines whether to emit window elements before the first
	// full window duration has elapsed. If false, the first window will only be
	// emitted after the full window duration.
	EmitPartialWindow bool
	// AllowedLateness provides a grace period after the window closes, during which
	// late data is still processed. This prevents data loss and improves the
	// completeness of results. If AllowedLateness is not specified, records belonging
	// to a closed window that arrive late will be discarded.
	//
	// The specified value must be no larger than the window sliding interval.
	AllowedLateness time.Duration
}

SlidingWindowOpts represents SlidingWindow configuration options.

type ThrottleMode

type ThrottleMode int8

ThrottleMode defines the behavior of the Throttler when its internal buffer is full.

const (
	// Backpressure instructs the Throttler to block upstream ingestion when its internal
	// buffer is full. This effectively slows down the producer, preventing data loss
	// and ensuring all elements are eventually processed, albeit at a reduced rate. This
	// mode can cause upstream operations to block indefinitely if the downstream consumer
	// cannot keep up.
	Backpressure ThrottleMode = iota
	// Discard instructs the Throttler to drop incoming elements when its internal buffer
	// is full. This mode prioritizes maintaining the target throughput rate, even at the
	// cost of data loss. Elements are silently dropped without any indication to the
	// upstream producer. Use this mode when data loss is acceptable.
	Discard
)

type Throttler

type Throttler struct {
	// contains filtered or unexported fields
}

Throttler limits the throughput to a specific number of elements per time unit.

func NewThrottler

func NewThrottler(elements int, period time.Duration, bufferSize int, mode ThrottleMode) *Throttler

NewThrottler returns a new Throttler operator.

The Throttler operator limits the rate at which elements are produced. It allows a maximum of 'elements' number of elements to be processed within a specified 'period' of time.

elements is the maximum number of elements to be produced per the given period of time. bufferSize is the size of the internal buffer for incoming elements. This buffer temporarily holds elements waiting to be processed. mode specifies the processing behavior when the internal elements buffer is full. See ThrottleMode for available options.

If elements or bufferSize are not positive, or if mode is not a supported ThrottleMode, NewThrottler will panic.

func (*Throttler) In

func (th *Throttler) In() chan<- any

In returns the input channel of the Throttler operator.

func (*Throttler) Out

func (th *Throttler) Out() <-chan any

Out returns the output channel of the Throttler operator.

func (*Throttler) To

func (th *Throttler) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*Throttler) Via

func (th *Throttler) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

type TumblingWindow

type TumblingWindow[T any] struct {
	// contains filtered or unexported fields
}

TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. T indicates the incoming element type, and the outgoing element type is []T.

func NewTumblingWindow

func NewTumblingWindow[T any](size time.Duration) *TumblingWindow[T]

NewTumblingWindow returns a new TumblingWindow operator. T specifies the incoming element type, and the outgoing element type is []T.

size is the Duration of generated windows.

func (*TumblingWindow[T]) In

func (tw *TumblingWindow[T]) In() chan<- any

In returns the input channel of the TumblingWindow operator.

func (*TumblingWindow[T]) Out

func (tw *TumblingWindow[T]) Out() <-chan any

Out returns the output channel of the TumblingWindow operator.

func (*TumblingWindow[T]) To

func (tw *TumblingWindow[T]) To(sink streams.Sink)

To streams data to the given Sink and blocks until the Sink has completed processing all data.

func (*TumblingWindow[T]) Via

func (tw *TumblingWindow[T]) Via(flow streams.Flow) streams.Flow

Via asynchronously streams data to the given Flow and returns it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL