Documentation
¶
Overview ¶
Package flow provides streams.Flow implementations.
Index ¶
- func DoStream(outlet streams.Outlet, inlet streams.Inlet)
- func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
- func Merge(outlets ...streams.Flow) streams.Flow
- func SideOutput(outlet streams.Outlet, cond func(interface{}) string, OutputTag ...string) map[string]streams.Flow
- func Split(outlet streams.Outlet, cond func(interface{}) bool) [2]streams.Flow
- type Error
- type Filter
- type FilterFunc
- type FlatMap
- type FlatMapFunc
- type Item
- type Map
- type MapFunc
- type PassThrough
- type PriorityQueue
- func (pq *PriorityQueue) Head() *Item
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq PriorityQueue) Slice(start, end int) PriorityQueue
- func (pq PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
- type SlidingWindow
- type ThrottleMode
- type Throttler
- type TumblingWindow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoStream ¶
func DoStream(outlet streams.Outlet, inlet streams.Inlet)
DoStream streams data from the outlet to inlet.
func FanOut ¶
func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.
func Merge ¶
func Merge(outlets ...streams.Flow) streams.Flow
Merge merges multiple flows into a single flow.
func SideOutput ¶
Types ¶
type Filter ¶
type Filter struct { FilterF FilterFunc ErrChan chan *Error // contains filtered or unexported fields }
Filter filters the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if it returns false the element is discarded.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [---------- FilterFunc -----------] | | |
out -- 1 -- 2 ------------------ 5 --
func NewFilter ¶
func NewFilter(filterFunc FilterFunc, parallelism uint, errChan chan *Error) *Filter
NewFilter returns a new Filter instance. filterFunc is the filter predicate function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
func (*Filter) In ¶
func (f *Filter) In() chan<- interface{}
In returns an input channel for receiving data
type FilterFunc ¶
FilterFunc is a filter predicate function.
type FlatMap ¶
type FlatMap struct { FlatMapF FlatMapFunc // contains filtered or unexported fields }
FlatMap takes one element and produces zero, one, or more elements.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [---------- FlatMapFunc ----------] | | | | |
out -- 1' - 2' -------- 4'- 4”- 5' -
func NewFlatMap ¶
func NewFlatMap(flatMapFunc FlatMapFunc, parallelism uint, errChan chan *Error) *FlatMap
NewFlatMap returns a new FlatMap instance. flatMapFunc is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
func (*FlatMap) In ¶
func (fm *FlatMap) In() chan<- interface{}
In returns an input channel for receiving data
type FlatMapFunc ¶
type FlatMapFunc func(interface{}) ([]interface{}, *Error)
FlatMapFunc is a FlatMap transformation function.
type Item ¶
type Item struct { Msg interface{} // contains filtered or unexported fields }
Item is the PriorityQueue item.
type Map ¶
type Map struct { MapF MapFunc // contains filtered or unexported fields }
Map takes one element and produces one element.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [----------- MapFunc -------------] | | | | |
out -- 1' - 2' --- 3' - 4' ----- 5' -
func NewMap ¶
NewMap returns a new Map instance. mapFunc is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
func (*Map) In ¶
func (m *Map) In() chan<- interface{}
In returns an input channel for receiving data
type MapFunc ¶
type MapFunc func(interface{}) (interface{}, *Error)
MapFunc is a Map transformation function.
type PassThrough ¶
type PassThrough struct {
// contains filtered or unexported fields
}
PassThrough produces the received element as is.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | |
out -- 1 -- 2 ---- 3 -- 4 ------ 5 --
func NewPassThrough ¶
func NewPassThrough() *PassThrough
NewPassThrough returns a new PassThrough instance.
func (*PassThrough) In ¶
func (pt *PassThrough) In() chan<- interface{}
In returns an input channel for receiving data
func (*PassThrough) Out ¶
func (pt *PassThrough) Out() <-chan interface{}
Out returns an output channel for sending data
func (*PassThrough) To ¶
func (pt *PassThrough) To(sink streams.Sink)
To streams data to the given sink
func (*PassThrough) Via ¶
func (pt *PassThrough) Via(flow streams.Flow) streams.Flow
Via streams data through the given flow
type PriorityQueue ¶
type PriorityQueue []*Item
PriorityQueue implements the heap.Interface.
func (*PriorityQueue) Head ¶
func (pq *PriorityQueue) Head() *Item
Head returns the first item of the PriorityQueue without removing it.
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
Less is the items less comparator.
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
Pop implements the heap.Interface.Pop. Removes and returns the Len() - 1 element.
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
Push implements the heap.Interface.Push. Appends the item to the PriorityQueue.
func (PriorityQueue) Slice ¶
func (pq PriorityQueue) Slice(start, end int) PriorityQueue
Slice returns the sliced PriorityQueue using the given bounds.
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
Swap exchanges the indexes of the items.
func (*PriorityQueue) Update ¶
func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
Update sets the item's priority and calls the heap.Fix to re-establish the heap ordering.
type SlidingWindow ¶
SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.
func NewSlidingWindow ¶
func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow
NewSlidingWindow returns a new processing time based SlidingWindow. Processing time refers to the system time of the machine that is executing the respective operation. size is the size of the generated windows. slide is the sliding interval of the generated windows.
func NewSlidingWindowWithTSExtractor ¶
func NewSlidingWindowWithTSExtractor(size time.Duration, slide time.Duration, timestampExtractor func(interface{}) int64) *SlidingWindow
NewSlidingWindowWithTSExtractor returns a new event time based SlidingWindow. Event time is the time that each individual event occurred on its producing device. Gives correct results on out-of-order events, late events, or on replays of data. size is the size of the generated windows. slide is the sliding interval of the generated windows. timestampExtractor is the record timestamp (in nanoseconds) extractor.
func (*SlidingWindow) In ¶
func (sw *SlidingWindow) In() chan<- interface{}
In returns an input channel for receiving data
func (*SlidingWindow) Out ¶
func (sw *SlidingWindow) Out() <-chan interface{}
Out returns an output channel for sending data
func (*SlidingWindow) To ¶
func (sw *SlidingWindow) To(sink streams.Sink)
To streams data to the given sink
func (*SlidingWindow) Via ¶
func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow
Via streams data through the given flow
type ThrottleMode ¶
type ThrottleMode int8
ThrottleMode defines the Throttler behavior on buffer overflow.
const ( // Backpressure on overflow mode. Backpressure ThrottleMode = iota // Discard elements on overflow mode. Discard )
type Throttler ¶
Throttler limits the throughput to a specific number of elements per time unit.
func NewThrottler ¶
func NewThrottler(elements uint, period time.Duration, bufferSize uint, mode ThrottleMode) *Throttler
NewThrottler returns a new Throttler instance. elements is the maximum number of elements to be produced per the given period of time. bufferSize defines the incoming elements buffer size. mode defines the Throttler flow behavior on elements buffer overflow.
func (*Throttler) In ¶
func (th *Throttler) In() chan<- interface{}
In returns an input channel for receiving data
type TumblingWindow ¶
TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap.
func NewTumblingWindow ¶
func NewTumblingWindow(size time.Duration) *TumblingWindow
NewTumblingWindow returns a new TumblingWindow instance. size is the size of the generated windows.
func (*TumblingWindow) In ¶
func (tw *TumblingWindow) In() chan<- interface{}
In returns an input channel for receiving data
func (*TumblingWindow) Out ¶
func (tw *TumblingWindow) Out() <-chan interface{}
Out returns an output channel for sending data
func (*TumblingWindow) To ¶
func (tw *TumblingWindow) To(sink streams.Sink)
To streams data to the given sink
func (*TumblingWindow) Via ¶
func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow
Via streams data through the given flow