Documentation
¶
Overview ¶
Package flow provides streams.Flow implementations.
Index ¶
- func DoStream(outlet streams.Outlet, inlet streams.Inlet)
- func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
- func Flatten[T any](parallelism int) streams.Flow
- func Merge(outlets ...streams.Flow) streams.Flow
- func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow
- func Split[T any](outlet streams.Outlet, predicate func(T) bool) [2]streams.Flow
- func ZipWith[T, R any](combine func([]T) R, outlets ...streams.Outlet) streams.Flow
- type Batch
- type Filter
- type FilterPredicate
- type FlatMap
- type FlatMapFunction
- type Fold
- type FoldFunction
- type Keyed
- type Map
- type MapFunction
- type PassThrough
- type Reduce
- type ReduceFunction
- type SessionWindow
- type SlidingWindow
- type SlidingWindowOpts
- type ThrottleMode
- type Throttler
- type TumblingWindow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoStream ¶
func DoStream(outlet streams.Outlet, inlet streams.Inlet)
DoStream streams data from the outlet to inlet.
func FanOut ¶
func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.
func Flatten ¶ added in v0.8.0
Flatten creates a Flow to flatten the stream of slices. T specifies the outgoing element type, and the incoming element type is []T.
func Merge ¶
func Merge(outlets ...streams.Flow) streams.Flow
Merge merges multiple flows into a single flow. When all specified outlets are closed, the resulting flow will close.
func RoundRobin ¶ added in v0.8.0
func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow
RoundRobin creates a balanced number of flows from the single outlet. This can be useful when work can be parallelized across multiple cores.
func Split ¶
Split splits the stream into two flows according to the given boolean predicate. T specifies the incoming and outgoing element type.
func ZipWith ¶ added in v0.12.0
func ZipWith[T, R any](combine func([]T) R, outlets ...streams.Outlet) streams.Flow
ZipWith combines elements from multiple input streams using a combiner function. It returns a new Flow with the resulting values. The combiner function is called with a slice of elements, where each element is taken from each input outlet. The elements in the slice will be in the order of outlets. If an outlet is closed, its corresponding element in the slice will be the zero value. The returned Flow will close when all the input outlets are closed.
It will panic if provided less than two outlets, or if any of the outlets has an element type other than T.
Types ¶
type Batch ¶ added in v0.10.0
type Batch[T any] struct { // contains filtered or unexported fields }
Batch processor breaks a stream of elements into batches based on size or timing. When the maximum batch size is reached or the batch time is elapsed, and the current buffer is not empty, a new batch will be emitted. Note: once a batch is sent downstream, the timer will be reset. T indicates the incoming element type, and the outgoing element type is []T.
func NewBatch ¶ added in v0.10.0
NewBatch returns a new Batch operator using the specified maximum batch size and the time interval. T specifies the incoming element type, and the outgoing element type is []T.
NewBatch will panic if the maxBatchSize argument is not positive.
type Filter ¶
type Filter[T any] struct { // contains filtered or unexported fields }
Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ -------- FilterPredicate -------- ]
out -- 1 -- 2 ------------------ 5 --
func NewFilter ¶
func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism int) *Filter[T]
NewFilter returns a new Filter operator. T specifies the incoming and the outgoing element type.
filterPredicate is a function that accepts an element of type T and returns true if the element should be included in the output stream, and false if it should be filtered out. parallelism specifies the number of goroutines to use for parallel processing. If the order of elements in the output stream must be preserved, set parallelism to 1.
NewFilter will panic if parallelism is less than 1.
type FilterPredicate ¶ added in v0.7.0
FilterPredicate represents a filter predicate (boolean-valued function).
type FlatMap ¶
type FlatMap[T, R any] struct { // contains filtered or unexported fields }
FlatMap takes one element and produces zero, one, or more elements.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ -------- FlatMapFunction -------- ]
out -- 1' - 2' -------- 4'- 4" - 5' -
func NewFlatMap ¶
func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism int) *FlatMap[T, R]
NewFlatMap returns a new FlatMap operator. T specifies the incoming element type, and the outgoing element type is []R.
flatMapFunction is the FlatMap transformation function. parallelism specifies the number of goroutines to use for parallel processing. If the order of elements in the output stream must be preserved, set parallelism to 1.
NewFlatMap will panic if parallelism is less than 1.
type FlatMapFunction ¶ added in v0.7.0
type FlatMapFunction[T, R any] func(T) []R
FlatMapFunction represents a FlatMap transformation function.
type Fold ¶ added in v0.13.0
type Fold[T, R any] struct { // contains filtered or unexported fields }
Fold implements a "rolling" fold transformation on a data stream with an initial value. Combines the current element with the last folded value and emits the new value.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ ---------- FoldFunction ---------- ]
out -- 1' - 2' --- 3' - 4' ----- 5' -
func NewFold ¶ added in v0.13.0
func NewFold[T, R any](init R, foldFunction FoldFunction[T, R]) *Fold[T, R]
NewFold returns a new Fold operator. T specifies the incoming element type, and the outgoing element type is R.
init is the initial value for the folding process. foldFunction is the function that performs the fold transformation.
type FoldFunction ¶ added in v0.13.0
type FoldFunction[T, R any] func(T, R) R
FoldFunction represents a Fold transformation function.
type Keyed ¶ added in v0.11.0
type Keyed[K comparable, V any] struct { // contains filtered or unexported fields }
Keyed represents a flow where stream elements are partitioned by key using a provided key selector function.
func NewKeyed ¶ added in v0.11.0
func NewKeyed[K comparable, V any]( keySelector func(V) K, operators ...func() streams.Flow, ) *Keyed[K, V]
NewKeyed returns a new Keyed operator. This operator splits an input stream into multiple sub-streams based on keys extracted from the elements using the keySelector function.
Each of these individual streams is then transformed by the provided chain of operators, and the results are sent to the output channel. Due to the concurrent processing of each keyed stream, the order of elements in the output channel is not deterministic and may not reflect the original order of elements in the input stream.
Each operator supplier must return a new instance of the flow to ensure that each keyed stream has its own independent state.
Example:
newSlidingWindow := func() streams.Flow {
return flow.NewSlidingWindow[event](10*time.Second, time.Second)
}
newMap := func() streams.Flow {
return flow.NewMap(func(events []event) event {
return events[len(events)-1]
}, 1)
}
keyed := flow.NewKeyed(func(e event) string {
return e.serial
}, newSlidingWindow, newMap)
If no operators are provided, NewKeyed will panic.
type Map ¶
type Map[T, R any] struct { // contains filtered or unexported fields }
Map takes one element and produces one element.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ ---------- MapFunction ---------- ]
out -- 1' - 2' --- 3' - 4' ----- 5' -
func NewMap ¶
func NewMap[T, R any](mapFunction MapFunction[T, R], parallelism int) *Map[T, R]
NewMap returns a new Map operator. T specifies the incoming element type, and the outgoing element type is R.
mapFunction is the Map transformation function. parallelism specifies the number of goroutines to use for parallel processing. If the order of elements in the output stream must be preserved, set parallelism to 1.
NewMap will panic if parallelism is less than 1.
type MapFunction ¶ added in v0.7.0
type MapFunction[T, R any] func(T) R
MapFunction represents a Map transformation function.
type PassThrough ¶
type PassThrough struct {
// contains filtered or unexported fields
}
PassThrough retransmits incoming elements downstream as they are.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
out -- 1 -- 2 ---- 3 -- 4 ------ 5 --
func NewPassThrough ¶
func NewPassThrough() *PassThrough
NewPassThrough returns a new PassThrough operator.
func (*PassThrough) In ¶
func (pt *PassThrough) In() chan<- any
In returns the input channel of the PassThrough operator.
func (*PassThrough) Out ¶
func (pt *PassThrough) Out() <-chan any
Out returns the output channel of the PassThrough operator.
func (*PassThrough) To ¶
func (pt *PassThrough) To(sink streams.Sink)
To streams data to the given Sink and blocks until the Sink has completed processing all data.
func (*PassThrough) Via ¶
func (pt *PassThrough) Via(flow streams.Flow) streams.Flow
Via asynchronously streams data to the given Flow and returns it.
type Reduce ¶ added in v0.8.0
type Reduce[T any] struct { // contains filtered or unexported fields }
Reduce implements a “rolling” reduce transformation on a data stream. Combines the current element with the last reduced value and emits the new value.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ --------- ReduceFunction --------- ]
out -- 1 -- 2' --- 3' - 4' ----- 5' -
func NewReduce ¶ added in v0.8.0
func NewReduce[T any](reduceFunction ReduceFunction[T]) *Reduce[T]
NewReduce returns a new Reduce operator. T specifies the incoming and the outgoing element type.
reduceFunction combines the current element with the last reduced value.
type ReduceFunction ¶ added in v0.8.0
type ReduceFunction[T any] func(T, T) T
ReduceFunction combines the current element with the last reduced value.
type SessionWindow ¶ added in v0.7.0
type SessionWindow[T any] struct { // contains filtered or unexported fields }
SessionWindow generates groups of elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time. T indicates the incoming element type, and the outgoing element type is []T.
func NewSessionWindow ¶ added in v0.7.0
func NewSessionWindow[T any](inactivityGap time.Duration) *SessionWindow[T]
NewSessionWindow returns a new SessionWindow operator. T specifies the incoming element type, and the outgoing element type is []T.
inactivityGap is the gap of inactivity that closes a session window when occurred.
func (*SessionWindow[T]) In ¶ added in v0.7.0
func (sw *SessionWindow[T]) In() chan<- any
In returns the input channel of the SessionWindow operator.
func (*SessionWindow[T]) Out ¶ added in v0.7.0
func (sw *SessionWindow[T]) Out() <-chan any
Out returns the output channel of the SessionWindow operator.
func (*SessionWindow[T]) To ¶ added in v0.7.0
func (sw *SessionWindow[T]) To(sink streams.Sink)
To streams data to the given Sink and blocks until the Sink has completed processing all data.
func (*SessionWindow[T]) Via ¶ added in v0.7.0
func (sw *SessionWindow[T]) Via(flow streams.Flow) streams.Flow
Via asynchronously streams data to the given Flow and returns it.
type SlidingWindow ¶
type SlidingWindow[T any] struct { // contains filtered or unexported fields }
SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows. T indicates the incoming element type, and the outgoing element type is []T.
func NewSlidingWindow ¶
func NewSlidingWindow[T any](windowSize, slidingInterval time.Duration) *SlidingWindow[T]
NewSlidingWindow returns a new SlidingWindow operator based on processing time. Processing time refers to the system time of the machine that is executing the respective operation. T specifies the incoming element type, and the outgoing element type is []T.
windowSize is the duration of each full window. slidingInterval is the interval at which new windows are created and emitted.
NewSlidingWindow panics if slidingInterval is larger than windowSize.
func NewSlidingWindowWithOpts ¶ added in v0.13.0
func NewSlidingWindowWithOpts[T any]( windowSize, slidingInterval time.Duration, opts SlidingWindowOpts[T]) *SlidingWindow[T]
NewSlidingWindowWithOpts returns a new SlidingWindow operator configured with the provided configuration options. T specifies the incoming element type, and the outgoing element type is []T.
windowSize is the duration of each full window. slidingInterval is the interval at which new windows are created and emitted. opts are the sliding window configuration options.
NewSlidingWindowWithOpts panics if slidingInterval is larger than windowSize, or the allowed lateness is larger than slidingInterval.
func (*SlidingWindow[T]) In ¶
func (sw *SlidingWindow[T]) In() chan<- any
In returns the input channel of the SlidingWindow operator.
func (*SlidingWindow[T]) Out ¶
func (sw *SlidingWindow[T]) Out() <-chan any
Out returns the output channel of the SlidingWindow operator.
func (*SlidingWindow[T]) To ¶
func (sw *SlidingWindow[T]) To(sink streams.Sink)
To streams data to the given Sink and blocks until the Sink has completed processing all data.
func (*SlidingWindow[T]) Via ¶
func (sw *SlidingWindow[T]) Via(flow streams.Flow) streams.Flow
Via asynchronously streams data to the given Flow and returns it.
type SlidingWindowOpts ¶ added in v0.13.0
type SlidingWindowOpts[T any] struct { // EventTimeExtractor is a function that extracts the event time from an element. // Event time is the time at which the event occurred on its producing device. // Using event time enables correct windowing even when events arrive out of order // or with delays. // // If EventTimeExtractor is not specified, processing time is used. Processing time // refers to the system time of the machine executing the window operation. EventTimeExtractor func(T) time.Time // EmitPartialWindow determines whether to emit window elements before the first // full window duration has elapsed. If false, the first window will only be // emitted after the full window duration. EmitPartialWindow bool // AllowedLateness provides a grace period after the window closes, during which // late data is still processed. This prevents data loss and improves the // completeness of results. If AllowedLateness is not specified, records belonging // to a closed window that arrive late will be discarded. // // The specified value must be no larger than the window sliding interval. AllowedLateness time.Duration }
SlidingWindowOpts represents SlidingWindow configuration options.
type ThrottleMode ¶
type ThrottleMode int8
ThrottleMode defines the behavior of the Throttler when its internal buffer is full.
const ( // Backpressure instructs the Throttler to block upstream ingestion when its internal // buffer is full. This effectively slows down the producer, preventing data loss // and ensuring all elements are eventually processed, albeit at a reduced rate. This // mode can cause upstream operations to block indefinitely if the downstream consumer // cannot keep up. Backpressure ThrottleMode = iota // Discard instructs the Throttler to drop incoming elements when its internal buffer // is full. This mode prioritizes maintaining the target throughput rate, even at the // cost of data loss. Elements are silently dropped without any indication to the // upstream producer. Use this mode when data loss is acceptable. Discard )
type Throttler ¶
type Throttler struct {
// contains filtered or unexported fields
}
Throttler limits the throughput to a specific number of elements per time unit.
func NewThrottler ¶
NewThrottler returns a new Throttler operator.
The Throttler operator limits the rate at which elements are produced. It allows a maximum of 'elements' number of elements to be processed within a specified 'period' of time.
elements is the maximum number of elements to be produced per the given period of time. bufferSize is the size of the internal buffer for incoming elements. This buffer temporarily holds elements waiting to be processed. mode specifies the processing behavior when the internal elements buffer is full. See ThrottleMode for available options.
If elements or bufferSize are not positive, or if mode is not a supported ThrottleMode, NewThrottler will panic.
type TumblingWindow ¶
type TumblingWindow[T any] struct { // contains filtered or unexported fields }
TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. T indicates the incoming element type, and the outgoing element type is []T.
func NewTumblingWindow ¶
func NewTumblingWindow[T any](size time.Duration) *TumblingWindow[T]
NewTumblingWindow returns a new TumblingWindow operator. T specifies the incoming element type, and the outgoing element type is []T.
size is the Duration of generated windows.
func (*TumblingWindow[T]) In ¶
func (tw *TumblingWindow[T]) In() chan<- any
In returns the input channel of the TumblingWindow operator.
func (*TumblingWindow[T]) Out ¶
func (tw *TumblingWindow[T]) Out() <-chan any
Out returns the output channel of the TumblingWindow operator.
func (*TumblingWindow[T]) To ¶
func (tw *TumblingWindow[T]) To(sink streams.Sink)
To streams data to the given Sink and blocks until the Sink has completed processing all data.
func (*TumblingWindow[T]) Via ¶
func (tw *TumblingWindow[T]) Via(flow streams.Flow) streams.Flow
Via asynchronously streams data to the given Flow and returns it.