flow

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2024 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet streams.Outlet, inlet streams.Inlet)

DoStream streams data from the outlet to inlet.

func FanOut

func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow

FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.

func Flatten

func Flatten(parallelism uint) streams.Flow

Flatten creates a Flow to flatten the stream of slices.

func Merge

func Merge(outlets ...streams.Flow) streams.Flow

Merge merges multiple flows into a single flow.

func RoundRobin

func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow

RoundRobin creates a balanced number of flows from the single outlet. This can be useful when work can be parallelized across multiple cores.

func Split

func Split(outlet streams.Outlet, predicate func(interface{}) bool) [2]streams.Flow

Split splits the stream into two flows according to the given boolean predicate.

Types

type Filter

type Filter[T any] struct {
	// contains filtered or unexported fields
}

Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FilterPredicate -------- ]

out -- 1 -- 2 ------------------ 5 --.

func NewFilter

func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism uint) *Filter[T]

NewFilter returns a new Filter instance.

filterPredicate is the boolean-valued filter function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Filter[T]) In

func (f *Filter[T]) In() chan<- interface{}

In returns an input channel for receiving data.

func (*Filter[T]) Out

func (f *Filter[T]) Out() <-chan interface{}

Out returns an output channel for sending data.

func (*Filter[T]) To

func (f *Filter[T]) To(sink streams.Sink)

To streams data to the given sink.

func (*Filter[T]) Via

func (f *Filter[T]) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow.

type FilterPredicate

type FilterPredicate[T any] func(T) bool

FilterPredicate represents a filter predicate (boolean-valued function).

type FlatMap

type FlatMap[T, R any] struct {
	// contains filtered or unexported fields
}

FlatMap takes one element and produces zero, one, or more elements.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FlatMapFunction -------- ]

out -- 1' - 2' -------- 4'- 4" - 5' -.

func NewFlatMap

func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism uint) *FlatMap[T, R]

NewFlatMap returns a new FlatMap instance.

flatMapFunction is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*FlatMap[T, R]) In

func (fm *FlatMap[T, R]) In() chan<- interface{}

In returns an input channel for receiving data.

func (*FlatMap[T, R]) Out

func (fm *FlatMap[T, R]) Out() <-chan interface{}

Out returns an output channel for sending data.

func (*FlatMap[T, R]) To

func (fm *FlatMap[T, R]) To(sink streams.Sink)

To streams data to the given sink.

func (*FlatMap[T, R]) Via

func (fm *FlatMap[T, R]) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow.

type FlatMapFunction

type FlatMapFunction[T, R any] func(T) []R

FlatMapFunction represents a FlatMap transformation function.

type Map

type Map[T, R any] struct {
	// contains filtered or unexported fields
}

Map takes one element and produces one element.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ ---------- MapFunction ---------- ]

out -- 1' - 2' --- 3' - 4' ----- 5' -.

func NewMap

func NewMap[T, R any](mapFunction MapFunction[T, R], parallelism uint) *Map[T, R]

NewMap returns a new Map instance.

mapFunction is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Map[T, R]) In

func (m *Map[T, R]) In() chan<- interface{}

In returns an input channel for receiving data.

func (*Map[T, R]) Out

func (m *Map[T, R]) Out() <-chan interface{}

Out returns an output channel for sending data.

func (*Map[T, R]) To

func (m *Map[T, R]) To(sink streams.Sink)

To streams data to the given sink.

func (*Map[T, R]) Via

func (m *Map[T, R]) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow.

type MapFunction

type MapFunction[T, R any] func(T) R

MapFunction represents a Map transformation function.

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough retransmits incoming elements as is.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --.

func NewPassThrough

func NewPassThrough() *PassThrough

NewPassThrough returns a new PassThrough instance.

func (*PassThrough) In

func (pt *PassThrough) In() chan<- interface{}

In returns an input channel for receiving data.

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan interface{}

Out returns an output channel for sending data.

func (*PassThrough) To

func (pt *PassThrough) To(sink streams.Sink)

To streams data to the given sink.

func (*PassThrough) Via

func (pt *PassThrough) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL