batch

package
v0.1.0-alpha.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2019 License: Apache-2.0 Imports: 8 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ForAll

func ForAll(f func(ctx context.Context, batch interface{}) map[interface{}][]interface{}) api.UnFunc

func GroupByKeyFunc

func GroupByKeyFunc(key interface{}) api.UnFunc

GroupByKeyFunc generates an api.UnFunc that groups incoming batched items by key value. The batched data is expected to be in the following type:

[]map[K]V - slice of map[K]V

The batched data is grouped in a slice of map of type

[]map[interface{}][]interface{}

Where items with simlar K values are assigned the same key in the result map.

func GroupByNameFunc

func GroupByNameFunc(name string) api.UnFunc

GroupByNameFunc generates an api.UnFunc that groups incoming batched items by struct field name. The batched data is expected to be of type:

[]struct{T} - where T is the type of a struct fields identified by name

The function returns a type

[]map[interface{}][]interface{}

Where the map that uses the field values as key to group the items.

func GroupByPosFunc

func GroupByPosFunc(pos int) api.UnFunc

GroupByPosFunc generates an api.Unary function that groups incoming batched items by their position in a slice. The batch is expected to be of type:

[][]T - where []T is a slice or array of data items

The function returns type

[]map[interface{}][]interface{}

func SortByKeyFunc

func SortByKeyFunc(key interface{}) api.UnFunc

SortByKeyFunc generates a api.UnFunc operation that sorts batched items from upsteram using the key value of maps in the batch. The batched data is of the form:

[]map[K]V - where K is a comparable type

The function returns sorted []map[K]

func SortByNameFunc

func SortByNameFunc(name string) api.UnFunc

SortByNameFunc generates a api.UnFunc operation that sorts batched items from upstream using the field name of items in the batch. The batched data is of type:

[]T - where T is a struct

For each struct s, field s.name must be of comparable values. The function returns a sorted []T

func SortByPosFunc

func SortByPosFunc(pos int) api.UnFunc

SortByPosFunc generates a api.UnFunc that sorts batched data from upstream. The batched items are expected to be in the following type:

[][]T - where T is comparable type

with each iteration i for batch v:

  • check v[i][pos] to be of type string, integers, float
  • Use package sort and a Less function to compare v[i][pos] and v[i+1][pos]

The function returns the sorted slice

func SortFunc

func SortFunc() api.UnFunc

SortFunc generates an api.UnFunc that sorts batched data from upstream. The batched items are expected to be in the following type:

[]T - where T is comparable type (string, numeric, etc)

with each iteration i for batch v:

  • check v[i] to be of type string, integers, float
  • Use package sort and a Less function to compare v[i] and v[i+1]

The function returns the sorted slice

func SortWithFunc

func SortWithFunc(f func(batch interface{}, i, j int) bool) api.UnFunc

SortWithFunc generates an api.UnFunc operation that is intended to sort batched items from upstream using the provided Less function to be used with the sort package.

The batched data is expected to be of form:

[]T - where T is a valid Go type

The specified function should follow the Less function convention of the sort package when compairing values from rows i, j.

func SumByKeyFunc

func SumByKeyFunc(key interface{}) api.UnFunc

SumByKeyFunc generates an api.UnFunc that sums incoming batched items by key value. The batched data can be of the following types:

[]map[K]V - where V is either an integer or a floating point
[]map[K][]V - where []V is a slice of integers or floating points

The function returns type

[]map[interface{}]

For instance:

[]map[interface{}]float64{key: sum}

Where sum is the total calculated sum for a given key. If key == nil, it returns sums for all keys.

func SumByNameFunc

func SumByNameFunc(name string) api.UnFunc

SumByNameFunc generates an api.UnFunc that sums incoming batched items by sturct field name. The batched data is expected to be of type:

  • []struct{F} - where field F is either an integer or floating point
  • []struct{V} - where field V is a slice of integers or floating points

The function returns value of type

map[string]float64

For instance

[]map[string]float64{{name:sum}}

Where sum is the total calculated sum for fields name.

func SumByPosFunc

func SumByPosFunc(pos int) api.UnFunc

SumByPosFunc generates an api.UnFunc that sums incoming values from upstream batched items based on their position. The stream data is expected to be of types:

[][]T - where []T is a slice of integers or floating points

The function returns a value of type

[]map[int]float64

More specifically a value:

[]map[int]float64{{pos: sum}} where sum is the calculated sum.

func SumFunc

func SumFunc() api.UnFunc

Sum generates an api.UnFunc that sums batched items from upstream. The data is expected to be of the following types:

[]integers
[]floats
[][]integers
[][]floats

The function returns the sum as a float64

func TriggerAll

func TriggerAll() api.BatchTriggerFunc

TriggerAll forces the batch trigger to always return false meaning it's never done, causing the batch to exhaust all stream items.

func TriggerBySize

func TriggerBySize(size int64) api.BatchTriggerFunc

TriggerBySize triggers batch based on the specified size. The batch remains open as long as the current index of the item is less then the specified size.

Types

type BatchOperator

type BatchOperator struct {
	// contains filtered or unexported fields
}

BatchOperator is an executor that batches incoming streamed items based on provided criteria. The batched items are streamed on the ouptut channel for downstream processing.

func New

func New() *BatchOperator

New returns a new BatchOperator operator

func (*BatchOperator) Exec

func (op *BatchOperator) Exec(ctx context.Context) (err error)

Exec is the execution starting point for the operator node. The batch operator batches N size items from upstream into a slice []T. When the slice reaches size N, the slice is sent downstream for processing.

func (*BatchOperator) GetOutput

func (op *BatchOperator) GetOutput() <-chan interface{}

GetOutput returns the output channel of the executer node

func (*BatchOperator) SetInput

func (op *BatchOperator) SetInput(in <-chan interface{})

SetInput sets the input channel for the executor node

func (*BatchOperator) SetTrigger

func (op *BatchOperator) SetTrigger(trigger api.BatchTrigger)

SetTrigger sets the batch operation to apply for this operator

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL