Versions in this module Expand all Collapse all v1 v1.0.0 Jun 15, 2021 Changes in this version + func DoStream(outlet streams.Outlet, inlet streams.Inlet) + func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow + func Merge(outlets ...streams.Flow) streams.Flow + func SideOutput(outlet streams.Outlet, cond func(interface{}) string, OutputTag ...string) map[string]streams.Flow + func Split(outlet streams.Outlet, cond func(interface{}) bool) [2]streams.Flow + type Error struct + Ctx context.Context + FlowErr error + type Filter struct + ErrChan chan *Error + FilterF FilterFunc + func NewFilter(filterFunc FilterFunc, parallelism uint, errChan chan *Error) *Filter + func (f *Filter) In() chan<- interface{} + func (f *Filter) Out() <-chan interface{} + func (f *Filter) To(sink streams.Sink) + func (f *Filter) Via(flow streams.Flow) streams.Flow + type FilterFunc func(interface{}) (bool, *Error) + type FlatMap struct + FlatMapF FlatMapFunc + func NewFlatMap(flatMapFunc FlatMapFunc, parallelism uint, errChan chan *Error) *FlatMap + func (fm *FlatMap) In() chan<- interface{} + func (fm *FlatMap) Out() <-chan interface{} + func (fm *FlatMap) To(sink streams.Sink) + func (fm *FlatMap) Via(flow streams.Flow) streams.Flow + type FlatMapFunc func(interface{}) ([]interface{}, *Error) + type Item struct + Msg interface{} + func NewItem(msg interface{}, epoch int64, index int) *Item + type Map struct + MapF MapFunc + func NewMap(mapFunc MapFunc, parallelism uint, errChan chan *Error) *Map + func (m *Map) In() chan<- interface{} + func (m *Map) Out() <-chan interface{} + func (m *Map) To(sink streams.Sink) + func (m *Map) Via(flow streams.Flow) streams.Flow + type MapFunc func(interface{}) (interface{}, *Error) + type PassThrough struct + func NewPassThrough() *PassThrough + func (pt *PassThrough) In() chan<- interface{} + func (pt *PassThrough) Out() <-chan interface{} + func (pt *PassThrough) To(sink streams.Sink) + func (pt *PassThrough) Via(flow streams.Flow) streams.Flow + type PriorityQueue []*Item + func (pq *PriorityQueue) Head() *Item + func (pq *PriorityQueue) Pop() interface{} + func (pq *PriorityQueue) Push(x interface{}) + func (pq *PriorityQueue) Update(item *Item, newEpoch int64) + func (pq PriorityQueue) Len() int + func (pq PriorityQueue) Less(i, j int) bool + func (pq PriorityQueue) Slice(start, end int) PriorityQueue + func (pq PriorityQueue) Swap(i, j int) + type SlidingWindow struct + func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow + func NewSlidingWindowWithTSExtractor(size time.Duration, slide time.Duration, ...) *SlidingWindow + func (sw *SlidingWindow) In() chan<- interface{} + func (sw *SlidingWindow) Out() <-chan interface{} + func (sw *SlidingWindow) To(sink streams.Sink) + func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow + type ThrottleMode int8 + const Backpressure + const Discard + type Throttler struct + func NewThrottler(elements uint, period time.Duration, bufferSize uint, mode ThrottleMode) *Throttler + func (th *Throttler) In() chan<- interface{} + func (th *Throttler) Out() <-chan interface{} + func (th *Throttler) To(sink streams.Sink) + func (th *Throttler) Via(flow streams.Flow) streams.Flow + type TumblingWindow struct + func NewTumblingWindow(size time.Duration) *TumblingWindow + func (tw *TumblingWindow) In() chan<- interface{} + func (tw *TumblingWindow) Out() <-chan interface{} + func (tw *TumblingWindow) To(sink streams.Sink) + func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow