flow

package
v0.0.0-...-195e12e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2020 License: MIT Imports: 4 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet streams.Outlet, inlet streams.Inlet)

DoStream from inlet to outlet

func FanOut

func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow

FanOut the stream to magntude number of Flows

func Merge

func Merge(outlets ...streams.Flow) streams.Flow

Merge multiple flows

func Split

func Split(outlet streams.Outlet, cond func(interface{}) bool) [2]streams.Flow

Split stream to two flows first - satisfies the condition second - doesn't satisfy the condition

Types

type Filter

type Filter struct {
	FilterF FilterFunc
	// contains filtered or unexported fields
}

Filter stream flow

func NewFilter

func NewFilter(f FilterFunc, parallelism uint) *Filter

NewFilter returns new Filter instance FilterFunc - resolver function parallelism - parallelism factor, in case events order matters use parallelism = 1

func (*Filter) In

func (f *Filter) In() chan<- interface{}

In returns channel for receiving data

func (*Filter) Out

func (f *Filter) Out() <-chan interface{}

Out returns channel for sending data

func (*Filter) To

func (f *Filter) To(sink streams.Sink)

To streams data to given sink

func (*Filter) Via

func (f *Filter) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type FilterFunc

type FilterFunc func(interface{}) bool

FilterFunc resolver

type FlatMap

type FlatMap struct {
	FlatMapF FlatMapFunc
	// contains filtered or unexported fields
}

FlatMap function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[---------- FlatMapFunc ----------]
    |    |           |   |    |

out -- 1' - 2' -------- 4'- 4”- 5' -

func NewFlatMap

func NewFlatMap(f FlatMapFunc, parallelism uint) *FlatMap

NewFlatMap returns new FlatMap instance FlatMapFunc - transformation function parallelism - parallelism factor, in case events order matters use parallelism = 1

func (*FlatMap) In

func (fm *FlatMap) In() chan<- interface{}

In returns channel for receiving data

func (*FlatMap) Out

func (fm *FlatMap) Out() <-chan interface{}

Out returns channel for sending data

func (*FlatMap) To

func (fm *FlatMap) To(sink streams.Sink)

To streams data to given sink

func (*FlatMap) Via

func (fm *FlatMap) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type FlatMapFunc

type FlatMapFunc func(interface{}) []interface{}

FlatMapFunc transformer

type Item

type Item struct {
	Msg interface{}
	// contains filtered or unexported fields
}

Item of PriorityQueue

func NewItem

func NewItem(msg interface{}, epoch int64, index int) *Item

NewItem constructor

type Map

type Map struct {
	MapF MapFunc
	// contains filtered or unexported fields
}

Map function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[----------- MapFunc -------------]
    |    |      |    |        |

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap(f MapFunc, parallelism uint) *Map

NewMap returns new Map instance MapFunc - transformation function parallelism - parallelism factor, in case events order matters use parallelism = 1

func (*Map) In

func (m *Map) In() chan<- interface{}

In returns channel for receiving data

func (*Map) Out

func (m *Map) Out() <-chan interface{}

Out returns channel for sending data

func (*Map) To

func (m *Map) To(sink streams.Sink)

To streams data to given sink

func (*Map) Via

func (m *Map) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type MapFunc

type MapFunc func(interface{}) interface{}

MapFunc transformer

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

|    |      |    |        |

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --

func NewPassThrough

func NewPassThrough() *PassThrough

NewPassThrough returns new PassThrough instance

func (*PassThrough) In

func (pt *PassThrough) In() chan<- interface{}

In returns channel for receiving data

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan interface{}

Out returns channel for sending data

func (*PassThrough) To

func (pt *PassThrough) To(sink streams.Sink)

To streams data to given sink

func (*PassThrough) Via

func (pt *PassThrough) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type PriorityQueue

type PriorityQueue []*Item

PriorityQueue implements heap.Interface

func (*PriorityQueue) Head

func (pq *PriorityQueue) Head() *Item

Head returns Queue head item

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

Len returns PriorityQueue length

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

Less comparator

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

Pop item from the Queue

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

Push item to the Queue

func (PriorityQueue) Slice

func (pq PriorityQueue) Slice(start, end int) PriorityQueue

Slice Queue

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

Swap items by indexes

func (*PriorityQueue) Update

func (pq *PriorityQueue) Update(item *Item, newEpoch int64)

Update item epoch

type SlidingWindow

type SlidingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

定义每隔‘silide’时间,统计当前时间过去‘size’时间内的数据的窗口

func NewSlidingWindow

func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow

NewSlidingWindow 返回一个新的滑块处理窗口, size - 生成窗口的时间跨度 slide - 每次滑动的时间

func NewSlidingWindowWithTsExtractor

func NewSlidingWindowWithTsExtractor(size time.Duration, slide time.Duration,
	timestampExtractor func(interface{}) int64) *SlidingWindow

生成一个新的时间滑动窗口,可以包容无序事件、延迟事件或者重复出现的事件,并给出正确的过滤结果 size - 生成窗口的事件跨度 slide - 每次滑动的时间 timestampExtractor 时间戳(一纳秒为单位,对应flink时间水印)

func (*SlidingWindow) In

func (sw *SlidingWindow) In() chan<- interface{}

把接收到的数据放入接收通道

func (*SlidingWindow) Out

func (sw *SlidingWindow) Out() <-chan interface{}

把发送的数据输出到指定输出通道

func (*SlidingWindow) To

func (sw *SlidingWindow) To(sink streams.Sink)

把指定的数据流输出到接收器

func (*SlidingWindow) Via

func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow

操作指定的数据流

type ThrottleMode

type ThrottleMode int8

ThrottleMode defines Throttler behavior on buffer overflow

const (
	// Backpressure on overflow mode
	Backpressure ThrottleMode = iota
	// Discard on overflow mode
	Discard
)

type Throttler

type Throttler struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Throttler limits the throughput to a specific number of elements per time unit

func NewThrottler

func NewThrottler(elements uint, per time.Duration, buffer uint, mode ThrottleMode) *Throttler

NewThrottler returns new Throttler instance elements - number of elements per - time unit buffer - buffer channel size mode - flow behavior on buffer overflow

func (*Throttler) In

func (th *Throttler) In() chan<- interface{}

In returns channel for receiving data

func (*Throttler) Out

func (th *Throttler) Out() <-chan interface{}

Out returns channel for sending data

func (*Throttler) To

func (th *Throttler) To(sink streams.Sink)

To streams data to given sink

func (*Throttler) Via

func (th *Throttler) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type TumblingWindow

type TumblingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

每隔‘size’时间统计一次过去‘size’时间内的数据,窗口数据不重叠

func NewTumblingWindow

func NewTumblingWindow(size time.Duration) *TumblingWindow

返回一个‘TumblingWindow’窗口实例

func (*TumblingWindow) In

func (tw *TumblingWindow) In() chan<- interface{}

把接收到的数据放入接收通道

func (*TumblingWindow) Out

func (tw *TumblingWindow) Out() <-chan interface{}

把发送的数据输出到指定输出通道

func (*TumblingWindow) To

func (tw *TumblingWindow) To(sink streams.Sink)

把指定的数据流输出到接收器

func (*TumblingWindow) Via

func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow

操作指定的数据流

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL