stream

package
v0.1.0-alpha.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2019 License: Apache-2.0 Imports: 14 Imported by: 74

Documentation

Overview

Package stream contain implementation of stream executors and operators

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Drain

type Drain struct {
	// contains filtered or unexported fields
}

Drain is a generic sink that terminates streamed data

func NewDrain

func NewDrain() *Drain

NewDrain creates a new Drain

func (*Drain) GetOutput

func (s *Drain) GetOutput() <-chan interface{}

GetOutput returns output channel for stream node

func (*Drain) Open

func (s *Drain) Open(ctx context.Context) <-chan error

Open opens the sink node to start consuming streaming data

func (*Drain) SetInput

func (s *Drain) SetInput(in <-chan interface{})

SetInput sets input channel for executor node

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a stream unto which executor nodes can be attached to operate on the streamed data

func New

func New(src interface{}) *Stream

New creates a new *Stream value

func (*Stream) Batch

func (s *Stream) Batch() *Stream

func (*Stream) BatchBySize

func (s *Stream) BatchBySize(size int64) *Stream

func (*Stream) Filter

func (s *Stream) Filter(f interface{}) *Stream

Filter takes a predicate user-defined func that filters the stream. The specified function must be of type:

func (T) bool

If the func returns true, current item continues downstream.

func (*Stream) FlatMap

func (s *Stream) FlatMap(f interface{}) *Stream

FlatMap similar to Map, however, the user-defined function is expected to return a slice of values (instead of just one mapped value) for downstream operators. The FlatMap function flatten the slice, returned by the user-defined function, into items that are individually streamed. The user-defined function must have the the following type:

func(T) []R - where T is the incoming item and []R is a slice to be flattened

func (*Stream) GroupByKey

func (s *Stream) GroupByKey(key interface{}) *Stream

GroupByKey groups incoming items that are batched as type []map[K]V where parameter key is used to group the items when K=key. Items with same key values are grouped in a new map and returned as []map[G]V.

See Also

See batch operator function GroupByKey in

"github.com/vladimirvivien/automi/operators/batch/"#GroupByKeyFunc

func (*Stream) GroupByName

func (s *Stream) GroupByName(name string) *Stream

GroupByName groups incoming items that are batched as type []T where T is a struct. Parameter name is used to select T.name as key to group items with the same value into a map map[key][]T that is sent downstream.

See Also

See batch operator function GroupByName in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) GroupByPos

func (s *Stream) GroupByPos(pos int) *Stream

GroupByPos groups incoming items that are batched as [][]T. For each i in dimension 1, [i][pos] is selected as key and grouped in a map, map[key][]T, that is returned downstream.

See Also

See the batch operator function GroupByPosFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) Into

func (s *Stream) Into(snk interface{}) *Stream

Into sets the terminal stream sink to use

func (*Stream) Map

func (s *Stream) Map(f interface{}) *Stream

Map uses the user-defined function to take the value of an incoming item and returns a new value that is said to be mapped to the intial item. The user-defined function must be of type:

func(T) R - where T is the type of the incoming item and R the type of the returned item.

func (*Stream) Open

func (s *Stream) Open() <-chan error

Open opens the Stream which executes all operators nodes. If there's an issue prior to execution, an error is returned in the error channel.

func (*Stream) Process

func (s *Stream) Process(f interface{}) *Stream

Process applies the user-defined function for general processing of incoming streamed elements. The user-defined function must be of type:

func(T) R - where T is the incoming item from upstream,
            R is the type of the processed value

See Also

"github.com/vladimirvivien/automi/operators/unary"#ProcessFunc

func (*Stream) ReStream

func (s *Stream) ReStream() *Stream

ReStream takes upstream items of types []slice []array, map[T] and emmits their elements as individual channel items to downstream operations. Items of other types are ignored.

func (*Stream) Reduce

func (s *Stream) Reduce(seed, f interface{}) *Stream

Reduce accumulates and reduces items from upstream into a single value using the initial seed value and the reduction binary function. The provided function must be of type:

func(S, T) R
  where S is the type of the partial result
  T is the incoming item from the stream
  R is the type of the result, to be used in the next call

If reductive operations are called after open-ended emitters (i.e. network service), they may never end.

func (*Stream) Sort

func (s *Stream) Sort() *Stream

Sort sorts incoming items that are batched as []T where value T is comparable. The operator returns sorted slice []T.

See Also

See also the operator function SortFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) SortByKey

func (s *Stream) SortByKey(key interface{}) *Stream

SortByKey sorts incoming items that are batched as type []map[K]V where K is a comparable type specified by param key and used to sort the slice. The opertor returns a sorted []map[K]V.

See Also

See also the operator function SortByKeyFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) SortByName

func (s *Stream) SortByName(name string) *Stream

SortByName sorts incoming items that are batched as []T where T struct with fields identified by param name. Value struct.<name> is used to sort the slice. The operator returns stored slice []T.

See Also

See also the operator function SortByNameFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) SortByPos

func (s *Stream) SortByPos(pos int) *Stream

SortByPos sorts incoming items that are batched as [][]T where value at [][[pos]T is used to sort the slice. The operator returns sorted slice [][]T.

See Also

See also the operator function SortByPosFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) SortWith

func (s *Stream) SortWith(f func(batch interface{}, i, j int) bool) *Stream

SortWith sorts incoming items that are batched as []T using the provided Less function for applicaiton with the sort package.

See Also

See also the operator function SortWithFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) Sum

func (s *Stream) Sum() *Stream

Sum sums up numeric items that are batched as []T or [][]T where T is an integer or a floating point value. The operator returns a single value of type float64.

See Also

See also the operator function SumFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) SumAllKeys

func (s *Stream) SumAllKeys() *Stream

SumAllKeys returns a grand total of all keys by calling

SumByKey(nil)

func (*Stream) SumByKey

func (s *Stream) SumByKey(key interface{}) *Stream

SumByKey sums up numeric items that are batched as []map[K]V or []map[K][]V where key specifies a K value that returns a V or a []V that is a numeric (or a slice of) value of type integer or floating point. If key == nil, the grand total sum of all values for all keys will be calculated.

This operator returns map[interface{}]float64{key:sum} where sum is the calculated sum.

See Also

See also the operator function SumByKeyFunc in

"github.com/vladimirvivien/automi/operators/batch"

func (*Stream) SumByName

func (s *Stream) SumByName(name string) *Stream

SumByName sums up items that are batched as []T where T is a struct. The name parameter sums up fields with name identifier and are of integer of floating point types. The operator returns a float64 value.

See Also

See also the operator function SumByNameFunc in

"github.com/vladimirvivien/automi/operator/batch"

func (*Stream) SumByPos

func (s *Stream) SumByPos(pos int) *Stream

SumByPos sums up items that are batched as []T or [][]T where T is an integer or floating point. Values [pos]T or [][pos]T are added and returned as a float64 value.

See Also

See also the operator function SumByPosFunc in

"github.com/vladimirvivien/automi/operator/batch"

func (*Stream) Transform

func (s *Stream) Transform(op api.UnOperation) *Stream

Transform is the base method used to apply transfomrmative unary operations to streamed elements (i.e. filter, map, etc) It is exposed here for completeness, use the other more specific methods.

func (*Stream) WithContext

func (s *Stream) WithContext(ctx context.Context) *Stream

WithContext sets a context.Context to use.

func (*Stream) WithErrorFunc

func (s *Stream) WithErrorFunc(fn api.ErrorFunc) *Stream

WithErrorFunc sets a function of type func(StreamError) that will be invoked when an operator indicates it wants to signal an error by defining an operator function of the form func(data)error.

func (*Stream) WithLogFunc

func (s *Stream) WithLogFunc(fn api.LogFunc) *Stream

WithLogFunc sets a function that will receive internal log events at runtime. Supported log function type: func(interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL