flow

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2021 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package flow provides streams.Flow implementations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet streams.Outlet, inlet streams.Inlet)

DoStream streams data from the outlet to inlet.

func FanOut

func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow

FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.

func Merge

func Merge(outlets ...streams.Flow) streams.Flow

Merge merges multiple flows into a single flow.

func SideOutput

func SideOutput(outlet streams.Outlet, cond func(interface{}) string, OutputTag ...string) map[string]streams.Flow

func Split

func Split(outlet streams.Outlet, cond func(interface{}) bool) [2]streams.Flow

Split splits the stream into two flows according to some criterion.

Types

type Error

type Error struct {
	Ctx     context.Context
	FlowErr error
}

type Filter

type Filter struct {
	FilterF FilterFunc

	ErrChan chan *Error
	// contains filtered or unexported fields
}

Filter filters the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if it returns false the element is discarded.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[---------- FilterFunc -----------]
    |    |                    |

out -- 1 -- 2 ------------------ 5 --

func NewFilter

func NewFilter(filterFunc FilterFunc, parallelism uint, errChan chan *Error) *Filter

NewFilter returns a new Filter instance. filterFunc is the filter predicate function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Filter) In

func (f *Filter) In() chan<- interface{}

In returns an input channel for receiving data

func (*Filter) Out

func (f *Filter) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Filter) To

func (f *Filter) To(sink streams.Sink)

To streams data to the given sink

func (*Filter) Via

func (f *Filter) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type FilterFunc

type FilterFunc func(interface{}) (bool, *Error)

FilterFunc is a filter predicate function.

type FlatMap

type FlatMap struct {
	FlatMapF FlatMapFunc
	// contains filtered or unexported fields
}

FlatMap takes one element and produces zero, one, or more elements.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[---------- FlatMapFunc ----------]
    |    |           |   |    |

out -- 1' - 2' -------- 4'- 4”- 5' -

func NewFlatMap

func NewFlatMap(flatMapFunc FlatMapFunc, parallelism uint, errChan chan *Error) *FlatMap

NewFlatMap returns a new FlatMap instance. flatMapFunc is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*FlatMap) In

func (fm *FlatMap) In() chan<- interface{}

In returns an input channel for receiving data

func (*FlatMap) Out

func (fm *FlatMap) Out() <-chan interface{}

Out returns an output channel for sending data

func (*FlatMap) To

func (fm *FlatMap) To(sink streams.Sink)

To streams data to the given sink

func (*FlatMap) Via

func (fm *FlatMap) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type FlatMapFunc

type FlatMapFunc func(interface{}) ([]interface{}, *Error)

FlatMapFunc is a FlatMap transformation function.

type Item

type Item struct {
	Msg interface{}
	// contains filtered or unexported fields
}

Item is the PriorityQueue item.

func NewItem

func NewItem(msg interface{}, epoch int64, index int) *Item

NewItem returns a new Item.

type Map

type Map struct {
	MapF MapFunc
	// contains filtered or unexported fields
}

Map takes one element and produces one element.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[----------- MapFunc -------------]
    |    |      |    |        |

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap(mapFunc MapFunc, parallelism uint, errChan chan *Error) *Map

NewMap returns a new Map instance. mapFunc is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Map) In

func (m *Map) In() chan<- interface{}

In returns an input channel for receiving data

func (*Map) Out

func (m *Map) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Map) To

func (m *Map) To(sink streams.Sink)

To streams data to the given sink

func (*Map) Via

func (m *Map) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type MapFunc

type MapFunc func(interface{}) (interface{}, *Error)

MapFunc is a Map transformation function.

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough produces the received element as is.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

|    |      |    |        |

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --

func NewPassThrough

func NewPassThrough() *PassThrough

NewPassThrough returns a new PassThrough instance.

func (*PassThrough) In

func (pt *PassThrough) In() chan<- interface{}

In returns an input channel for receiving data

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan interface{}

Out returns an output channel for sending data

func (*PassThrough) To

func (pt *PassThrough) To(sink streams.Sink)

To streams data to the given sink

func (*PassThrough) Via

func (pt *PassThrough) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type PriorityQueue

type PriorityQueue []*Item

PriorityQueue implements the heap.Interface.

func (*PriorityQueue) Head

func (pq *PriorityQueue) Head() *Item

Head returns the first item of the PriorityQueue without removing it.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

Len returns the PriorityQueue length.

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

Less is the items less comparator.

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

Pop implements the heap.Interface.Pop. Removes and returns the Len() - 1 element.

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

Push implements the heap.Interface.Push. Appends the item to the PriorityQueue.

func (PriorityQueue) Slice

func (pq PriorityQueue) Slice(start, end int) PriorityQueue

Slice returns the sliced PriorityQueue using the given bounds.

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

Swap exchanges the indexes of the items.

func (*PriorityQueue) Update

func (pq *PriorityQueue) Update(item *Item, newEpoch int64)

Update sets the item's priority and calls the heap.Fix to re-establish the heap ordering.

type SlidingWindow

type SlidingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.

func NewSlidingWindow

func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow

NewSlidingWindow returns a new processing time based SlidingWindow. Processing time refers to the system time of the machine that is executing the respective operation. size is the size of the generated windows. slide is the sliding interval of the generated windows.

func NewSlidingWindowWithTSExtractor

func NewSlidingWindowWithTSExtractor(size time.Duration, slide time.Duration,
	timestampExtractor func(interface{}) int64) *SlidingWindow

NewSlidingWindowWithTSExtractor returns a new event time based SlidingWindow. Event time is the time that each individual event occurred on its producing device. Gives correct results on out-of-order events, late events, or on replays of data. size is the size of the generated windows. slide is the sliding interval of the generated windows. timestampExtractor is the record timestamp (in nanoseconds) extractor.

func (*SlidingWindow) In

func (sw *SlidingWindow) In() chan<- interface{}

In returns an input channel for receiving data

func (*SlidingWindow) Out

func (sw *SlidingWindow) Out() <-chan interface{}

Out returns an output channel for sending data

func (*SlidingWindow) To

func (sw *SlidingWindow) To(sink streams.Sink)

To streams data to the given sink

func (*SlidingWindow) Via

func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type ThrottleMode

type ThrottleMode int8

ThrottleMode defines the Throttler behavior on buffer overflow.

const (
	// Backpressure on overflow mode.
	Backpressure ThrottleMode = iota
	// Discard elements on overflow mode.
	Discard
)

type Throttler

type Throttler struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Throttler limits the throughput to a specific number of elements per time unit.

func NewThrottler

func NewThrottler(elements uint, period time.Duration, bufferSize uint, mode ThrottleMode) *Throttler

NewThrottler returns a new Throttler instance. elements is the maximum number of elements to be produced per the given period of time. bufferSize defines the incoming elements buffer size. mode defines the Throttler flow behavior on elements buffer overflow.

func (*Throttler) In

func (th *Throttler) In() chan<- interface{}

In returns an input channel for receiving data

func (*Throttler) Out

func (th *Throttler) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Throttler) To

func (th *Throttler) To(sink streams.Sink)

To streams data to the given sink

func (*Throttler) Via

func (th *Throttler) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type TumblingWindow

type TumblingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap.

func NewTumblingWindow

func NewTumblingWindow(size time.Duration) *TumblingWindow

NewTumblingWindow returns a new TumblingWindow instance. size is the size of the generated windows.

func (*TumblingWindow) In

func (tw *TumblingWindow) In() chan<- interface{}

In returns an input channel for receiving data

func (*TumblingWindow) Out

func (tw *TumblingWindow) Out() <-chan interface{}

Out returns an output channel for sending data

func (*TumblingWindow) To

func (tw *TumblingWindow) To(sink streams.Sink)

To streams data to the given sink

func (*TumblingWindow) Via

func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL