Documentation ¶
Overview ¶
Package stream contain implementation of stream executors and operators
Index ¶
- type Drain
- type Stream
- func (s *Stream) Batch() *Stream
- func (s *Stream) BatchBySize(size int64) *Stream
- func (s *Stream) Filter(f interface{}) *Stream
- func (s *Stream) FlatMap(f interface{}) *Stream
- func (s *Stream) GroupByKey(key interface{}) *Stream
- func (s *Stream) GroupByName(name string) *Stream
- func (s *Stream) GroupByPos(pos int) *Stream
- func (s *Stream) Into(snk interface{}) *Stream
- func (s *Stream) Map(f interface{}) *Stream
- func (s *Stream) Open() <-chan error
- func (s *Stream) Process(f interface{}) *Stream
- func (s *Stream) ReStream() *Stream
- func (s *Stream) Reduce(seed, f interface{}) *Stream
- func (s *Stream) Sort() *Stream
- func (s *Stream) SortByKey(key interface{}) *Stream
- func (s *Stream) SortByName(name string) *Stream
- func (s *Stream) SortByPos(pos int) *Stream
- func (s *Stream) SortWith(f func(batch interface{}, i, j int) bool) *Stream
- func (s *Stream) Sum() *Stream
- func (s *Stream) SumAllKeys() *Stream
- func (s *Stream) SumByKey(key interface{}) *Stream
- func (s *Stream) SumByName(name string) *Stream
- func (s *Stream) SumByPos(pos int) *Stream
- func (s *Stream) Transform(op api.UnOperation) *Stream
- func (s *Stream) WithContext(ctx context.Context) *Stream
- func (s *Stream) WithErrorFunc(fn api.ErrorFunc) *Stream
- func (s *Stream) WithLogFunc(fn api.LogFunc) *Stream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Drain ¶
type Drain struct {
// contains filtered or unexported fields
}
Drain is a generic sink that terminates streamed data
func (*Drain) GetOutput ¶
func (s *Drain) GetOutput() <-chan interface{}
GetOutput returns output channel for stream node
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a stream unto which executor nodes can be attached to operate on the streamed data
func (*Stream) BatchBySize ¶
func (*Stream) Filter ¶
Filter takes a predicate user-defined func that filters the stream. The specified function must be of type:
func (T) bool
If the func returns true, current item continues downstream.
func (*Stream) FlatMap ¶
FlatMap similar to Map, however, the user-defined function is expected to return a slice of values (instead of just one mapped value) for downstream operators. The FlatMap function flatten the slice, returned by the user-defined function, into items that are individually streamed. The user-defined function must have the the following type:
func(T) []R - where T is the incoming item and []R is a slice to be flattened
func (*Stream) GroupByKey ¶
GroupByKey groups incoming items that are batched as type []map[K]V where parameter key is used to group the items when K=key. Items with same key values are grouped in a new map and returned as []map[G]V.
See Also ¶
See batch operator function GroupByKey in
"github.com/vladimirvivien/automi/operators/batch/"#GroupByKeyFunc
func (*Stream) GroupByName ¶
GroupByName groups incoming items that are batched as type []T where T is a struct. Parameter name is used to select T.name as key to group items with the same value into a map map[key][]T that is sent downstream.
See Also ¶
See batch operator function GroupByName in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) GroupByPos ¶
GroupByPos groups incoming items that are batched as [][]T. For each i in dimension 1, [i][pos] is selected as key and grouped in a map, map[key][]T, that is returned downstream.
See Also ¶
See the batch operator function GroupByPosFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) Map ¶
Map uses the user-defined function to take the value of an incoming item and returns a new value that is said to be mapped to the intial item. The user-defined function must be of type:
func(T) R - where T is the type of the incoming item and R the type of the returned item.
func (*Stream) Open ¶
Open opens the Stream which executes all operators nodes. If there's an issue prior to execution, an error is returned in the error channel.
func (*Stream) Process ¶
Process applies the user-defined function for general processing of incoming streamed elements. The user-defined function must be of type:
func(T) R - where T is the incoming item from upstream, R is the type of the processed value
See Also
"github.com/vladimirvivien/automi/operators/unary"#ProcessFunc
func (*Stream) ReStream ¶
ReStream takes upstream items of types []slice []array, map[T] and emmits their elements as individual channel items to downstream operations. Items of other types are ignored.
func (*Stream) Reduce ¶
Reduce accumulates and reduces items from upstream into a single value using the initial seed value and the reduction binary function. The provided function must be of type:
func(S, T) R where S is the type of the partial result T is the incoming item from the stream R is the type of the result, to be used in the next call
If reductive operations are called after open-ended emitters (i.e. network service), they may never end.
func (*Stream) Sort ¶
Sort sorts incoming items that are batched as []T where value T is comparable. The operator returns sorted slice []T.
See Also ¶
See also the operator function SortFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) SortByKey ¶
SortByKey sorts incoming items that are batched as type []map[K]V where K is a comparable type specified by param key and used to sort the slice. The opertor returns a sorted []map[K]V.
See Also ¶
See also the operator function SortByKeyFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) SortByName ¶
SortByName sorts incoming items that are batched as []T where T struct with fields identified by param name. Value struct.<name> is used to sort the slice. The operator returns stored slice []T.
See Also ¶
See also the operator function SortByNameFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) SortByPos ¶
SortByPos sorts incoming items that are batched as [][]T where value at [][[pos]T is used to sort the slice. The operator returns sorted slice [][]T.
See Also ¶
See also the operator function SortByPosFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) SortWith ¶
SortWith sorts incoming items that are batched as []T using the provided Less function for applicaiton with the sort package.
See Also ¶
See also the operator function SortWithFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) Sum ¶
Sum sums up numeric items that are batched as []T or [][]T where T is an integer or a floating point value. The operator returns a single value of type float64.
See Also ¶
See also the operator function SumFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) SumByKey ¶
SumByKey sums up numeric items that are batched as []map[K]V or []map[K][]V where key specifies a K value that returns a V or a []V that is a numeric (or a slice of) value of type integer or floating point. If key == nil, the grand total sum of all values for all keys will be calculated.
This operator returns map[interface{}]float64{key:sum} where sum is the calculated sum.
See Also ¶
See also the operator function SumByKeyFunc in
"github.com/vladimirvivien/automi/operators/batch"
func (*Stream) SumByName ¶
SumByName sums up items that are batched as []T where T is a struct. The name parameter sums up fields with name identifier and are of integer of floating point types. The operator returns a float64 value.
See Also ¶
See also the operator function SumByNameFunc in
"github.com/vladimirvivien/automi/operator/batch"
func (*Stream) SumByPos ¶
SumByPos sums up items that are batched as []T or [][]T where T is an integer or floating point. Values [pos]T or [][pos]T are added and returned as a float64 value.
See Also ¶
See also the operator function SumByPosFunc in
"github.com/vladimirvivien/automi/operator/batch"
func (*Stream) Transform ¶
func (s *Stream) Transform(op api.UnOperation) *Stream
Transform is the base method used to apply transfomrmative unary operations to streamed elements (i.e. filter, map, etc) It is exposed here for completeness, use the other more specific methods.
func (*Stream) WithContext ¶
WithContext sets a context.Context to use.
func (*Stream) WithErrorFunc ¶
WithErrorFunc sets a function of type func(StreamError) that will be invoked when an operator indicates it wants to signal an error by defining an operator function of the form func(data)error.