Documentation ¶
Index ¶
- func ForAll(f func(ctx context.Context, batch interface{}) map[interface{}][]interface{}) api.UnFunc
- func GroupByKeyFunc(key interface{}) api.UnFunc
- func GroupByNameFunc(name string) api.UnFunc
- func GroupByPosFunc(pos int) api.UnFunc
- func SortByKeyFunc(key interface{}) api.UnFunc
- func SortByNameFunc(name string) api.UnFunc
- func SortByPosFunc(pos int) api.UnFunc
- func SortFunc() api.UnFunc
- func SortWithFunc(f func(batch interface{}, i, j int) bool) api.UnFunc
- func SumByKeyFunc(key interface{}) api.UnFunc
- func SumByNameFunc(name string) api.UnFunc
- func SumByPosFunc(pos int) api.UnFunc
- func SumFunc() api.UnFunc
- func TriggerAll() api.BatchTriggerFunc
- func TriggerBySize(size int64) api.BatchTriggerFunc
- type BatchOperator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GroupByKeyFunc ¶
GroupByKeyFunc generates an api.UnFunc that groups incoming batched items by key value. The batched data is expected to be in the following type:
[]map[K]V - slice of map[K]V
The batched data is grouped in a slice of map of type
[]map[interface{}][]interface{}
Where items with simlar K values are assigned the same key in the result map.
func GroupByNameFunc ¶
GroupByNameFunc generates an api.UnFunc that groups incoming batched items by struct field name. The batched data is expected to be of type:
[]struct{T} - where T is the type of a struct fields identified by name
The function returns a type
[]map[interface{}][]interface{}
Where the map that uses the field values as key to group the items.
func GroupByPosFunc ¶
GroupByPosFunc generates an api.Unary function that groups incoming batched items by their position in a slice. The batch is expected to be of type:
[][]T - where []T is a slice or array of data items
The function returns type
[]map[interface{}][]interface{}
func SortByKeyFunc ¶
SortByKeyFunc generates a api.UnFunc operation that sorts batched items from upsteram using the key value of maps in the batch. The batched data is of the form:
[]map[K]V - where K is a comparable type
The function returns sorted []map[K]
func SortByNameFunc ¶
SortByNameFunc generates a api.UnFunc operation that sorts batched items from upstream using the field name of items in the batch. The batched data is of type:
[]T - where T is a struct
For each struct s, field s.name must be of comparable values. The function returns a sorted []T
func SortByPosFunc ¶
SortByPosFunc generates a api.UnFunc that sorts batched data from upstream. The batched items are expected to be in the following type:
[][]T - where T is comparable type
with each iteration i for batch v:
- check v[i][pos] to be of type string, integers, float
- Use package sort and a Less function to compare v[i][pos] and v[i+1][pos]
The function returns the sorted slice
func SortFunc ¶
SortFunc generates an api.UnFunc that sorts batched data from upstream. The batched items are expected to be in the following type:
[]T - where T is comparable type (string, numeric, etc)
with each iteration i for batch v:
- check v[i] to be of type string, integers, float
- Use package sort and a Less function to compare v[i] and v[i+1]
The function returns the sorted slice
func SortWithFunc ¶
SortWithFunc generates an api.UnFunc operation that is intended to sort batched items from upstream using the provided Less function to be used with the sort package.
The batched data is expected to be of form:
[]T - where T is a valid Go type
The specified function should follow the Less function convention of the sort package when compairing values from rows i, j.
func SumByKeyFunc ¶
SumByKeyFunc generates an api.UnFunc that sums incoming batched items by key value. The batched data can be of the following types:
[]map[K]V - where V is either an integer or a floating point []map[K][]V - where []V is a slice of integers or floating points
The function returns type
[]map[interface{}]
For instance:
[]map[interface{}]float64{key: sum}
Where sum is the total calculated sum for a given key. If key == nil, it returns sums for all keys.
func SumByNameFunc ¶
SumByNameFunc generates an api.UnFunc that sums incoming batched items by sturct field name. The batched data is expected to be of type:
- []struct{F} - where field F is either an integer or floating point
- []struct{V} - where field V is a slice of integers or floating points
The function returns value of type
map[string]float64
For instance
[]map[string]float64{{name:sum}}
Where sum is the total calculated sum for fields name.
func SumByPosFunc ¶
SumByPosFunc generates an api.UnFunc that sums incoming values from upstream batched items based on their position. The stream data is expected to be of types:
[][]T - where []T is a slice of integers or floating points
The function returns a value of type
[]map[int]float64
More specifically a value:
[]map[int]float64{{pos: sum}} where sum is the calculated sum.
func SumFunc ¶
Sum generates an api.UnFunc that sums batched items from upstream. The data is expected to be of the following types:
[]integers []floats [][]integers [][]floats
The function returns the sum as a float64
func TriggerAll ¶
func TriggerAll() api.BatchTriggerFunc
TriggerAll forces the batch trigger to always return false meaning it's never done, causing the batch to exhaust all stream items.
func TriggerBySize ¶
func TriggerBySize(size int64) api.BatchTriggerFunc
TriggerBySize triggers batch based on the specified size. The batch remains open as long as the current index of the item is less then the specified size.
Types ¶
type BatchOperator ¶
type BatchOperator struct {
// contains filtered or unexported fields
}
BatchOperator is an executor that batches incoming streamed items based on provided criteria. The batched items are streamed on the ouptut channel for downstream processing.
func (*BatchOperator) Exec ¶
func (op *BatchOperator) Exec(ctx context.Context) (err error)
Exec is the execution starting point for the operator node. The batch operator batches N size items from upstream into a slice []T. When the slice reaches size N, the slice is sent downstream for processing.
func (*BatchOperator) GetOutput ¶
func (op *BatchOperator) GetOutput() <-chan interface{}
GetOutput returns the output channel of the executer node
func (*BatchOperator) SetInput ¶
func (op *BatchOperator) SetInput(in <-chan interface{})
SetInput sets the input channel for the executor node
func (*BatchOperator) SetTrigger ¶
func (op *BatchOperator) SetTrigger(trigger api.BatchTrigger)
SetTrigger sets the batch operation to apply for this operator