Documentation ¶
Index ¶
- Variables
- func ArrayStream(elems interface{}) api.Stream
- func EmptyStream(capacity int) api.Stream
- func StreamGenerator(generator func() api.Optional) api.Stream
- func StreamGeneratorFeeder(s api.Stream, generator func() api.Optional)
- func Subscribe(uri string) (api.Stream, error)
- type BusyWait
- type FastSpinThenWait
- type Int64SumReducer
- type IntSumReducer
- type StreamImpl
- func (s *StreamImpl) AllMatch(op interface{}) bool
- func (s *StreamImpl) AsArray() (result []interface{})
- func (s *StreamImpl) AtLeastOne(op interface{}) bool
- func (s *StreamImpl) CSVasMap(firstRowIsHeader bool, asMap bool) api.Stream
- func (s *StreamImpl) Close()
- func (s *StreamImpl) Count() int
- func (s *StreamImpl) CountUint64() (c uint64)
- func (s *StreamImpl) CurrAbsPos() uint64
- func (s *StreamImpl) EnsureType(kind reflect.Kind) api.Stream
- func (s *StreamImpl) EnsureTypeEx(kind reflect.Kind, coerce bool, dropIfNotPossible bool) api.Stream
- func (s *StreamImpl) Feed(elem interface{})
- func (s *StreamImpl) Filter(op interface{}) api.Stream
- func (s *StreamImpl) FilterNA(op func(interface{}) bool) api.Stream
- func (s *StreamImpl) First() api.Optional
- func (s *StreamImpl) ForEach(op interface{})
- func (s *StreamImpl) IsClosed() bool
- func (s *StreamImpl) IsEmpty() bool
- func (s *StreamImpl) JsonToMap() api.Stream
- func (s *StreamImpl) Last() api.Optional
- func (s *StreamImpl) Map(op interface{}) api.Stream
- func (s *StreamImpl) MapAsCSV(firstRowIsHeader bool) api.Stream
- func (s *StreamImpl) MapInt64(op func(int64) int64) api.Stream
- func (s *StreamImpl) MapToJson() api.Stream
- func (s *StreamImpl) ModifyNA(fn func(interface{})) api.Stream
- func (s *StreamImpl) NoneMatch(op interface{}) bool
- func (s *StreamImpl) Peek(absPos uint64) interface{}
- func (s *StreamImpl) PeekLimit() uint64
- func (s *StreamImpl) Publish(uri string) error
- func (s *StreamImpl) PublishClose(uri string) error
- func (s *StreamImpl) Pull() api.Optional
- func (s *StreamImpl) Reduce(op interface{}) api.Stream
- func (s *StreamImpl) ReduceNA(reducer api.Reducer) api.Stream
- func (s *StreamImpl) Reset() uint64
- func (s *StreamImpl) Skip(n int) api.Stream
- func (s *StreamImpl) Sum() api.Stream
- func (s *StreamImpl) SumInt64() api.Stream
- func (s *StreamImpl) TimeOut(waitTimeOut api.WaitTimeOut) api.Stream
- func (s *StreamImpl) WaitDuty(waitDuty api.WaitDuty) api.Stream
- type StreamProvider
Constants ¶
This section is empty.
Variables ¶
var LocalRegistry = registry.NewInMemoryRegistry()
Functions ¶
func ArrayStream ¶
func EmptyStream ¶
--------------------------------------------------------------------------------------------------------------------
Builders ¶
--------------------------------------------------------------------------------------------------------------------
func StreamGeneratorFeeder ¶
Types ¶
type FastSpinThenWait ¶
type FastSpinThenWait struct {
// contains filtered or unexported fields
}
func NewDefaultFastSpinThenWait ¶
func NewDefaultFastSpinThenWait() *FastSpinThenWait
func NewFastSpinThenWait ¶
func NewFastSpinThenWait(spinsBeforeSleep int, minWaitNs, maxWaitNs int64) *FastSpinThenWait
func (*FastSpinThenWait) Loop ¶
func (w *FastSpinThenWait) Loop() (waitedNs int64)
func (*FastSpinThenWait) Reset ¶
func (w *FastSpinThenWait) Reset()
type Int64SumReducer ¶
type Int64SumReducer struct {
// contains filtered or unexported fields
}
Memory efficient reducer adding all the elements of the stream into a int64 (assuming elements are int64) can process 175M int64/second/thread
func (*Int64SumReducer) First ¶
func (r *Int64SumReducer) First(a interface{})
func (*Int64SumReducer) Next ¶
func (r *Int64SumReducer) Next(a interface{})
func (*Int64SumReducer) Result ¶
func (r *Int64SumReducer) Result() interface{}
type IntSumReducer ¶
type IntSumReducer struct {
// contains filtered or unexported fields
}
Memory efficient int reducer
func (*IntSumReducer) First ¶
func (r *IntSumReducer) First(a interface{})
func (*IntSumReducer) Next ¶
func (r *IntSumReducer) Next(a interface{})
func (*IntSumReducer) Result ¶
func (r *IntSumReducer) Result() interface{}
type StreamImpl ¶
type StreamImpl struct {
// contains filtered or unexported fields
}
func NewStreamImpl ¶
func NewStreamImpl(provider StreamProvider, pullFn func() (read interface{}, closed bool)) *StreamImpl
func (*StreamImpl) AllMatch ¶
func (s *StreamImpl) AllMatch(op interface{}) bool
func (*StreamImpl) AsArray ¶
func (s *StreamImpl) AsArray() (result []interface{})
func (*StreamImpl) AtLeastOne ¶
func (s *StreamImpl) AtLeastOne(op interface{}) bool
func (*StreamImpl) CSVasMap ¶
func (s *StreamImpl) CSVasMap(firstRowIsHeader bool, asMap bool) api.Stream
converts a Stream of strings into either a Map[string]interface{} or a []interface{}, depending on asMap value. If firstRowIsHeader is set, it will pick the column name from the first row, otherwise the column names will be sequential numbers starting from 1.
func (*StreamImpl) Close ¶
func (s *StreamImpl) Close()
func (*StreamImpl) Count ¶
func (s *StreamImpl) Count() int
func (*StreamImpl) CountUint64 ¶
func (s *StreamImpl) CountUint64() (c uint64)
func (*StreamImpl) CurrAbsPos ¶
func (s *StreamImpl) CurrAbsPos() uint64
func (*StreamImpl) EnsureType ¶
func (s *StreamImpl) EnsureType(kind reflect.Kind) api.Stream
Ensure the types trying to coerce and dropping elements that can not be converted
func (*StreamImpl) EnsureTypeEx ¶
func (s *StreamImpl) EnsureTypeEx(kind reflect.Kind, coerce bool, dropIfNotPossible bool) api.Stream
converts the stream elements to the type provided; if the coerce parameter is set to true, it will try to convert the elements to the expected type i.e. a int64 will be converted into a string representation in base10, if this conversion is disabled, or the coercion is not possible, it can be either drop (setting the parameter dropIfNotPossible) or panic when dropping is disable.
func (*StreamImpl) Feed ¶
func (s *StreamImpl) Feed(elem interface{})
func (*StreamImpl) Filter ¶
func (s *StreamImpl) Filter(op interface{}) api.Stream
func (*StreamImpl) First ¶
func (s *StreamImpl) First() api.Optional
func (*StreamImpl) ForEach ¶
func (s *StreamImpl) ForEach(op interface{})
func (*StreamImpl) IsClosed ¶
func (s *StreamImpl) IsClosed() bool
func (*StreamImpl) IsEmpty ¶
func (s *StreamImpl) IsEmpty() bool
func (*StreamImpl) JsonToMap ¶
func (s *StreamImpl) JsonToMap() api.Stream
Unmarshalls a string or an []byte into a map[string]interface{} or an []interface{}. If it can not be parsed as a valid JSON object, it will map into a nil.
func (*StreamImpl) Last ¶
func (s *StreamImpl) Last() api.Optional
func (*StreamImpl) Map ¶
func (s *StreamImpl) Map(op interface{}) api.Stream
func (*StreamImpl) MapAsCSV ¶
func (s *StreamImpl) MapAsCSV(firstRowIsHeader bool) api.Stream
converts a Map[string]interface{} or []interface{} into a comma separated (and escaped if string vs number), adding an extra element at the beginning with the column names, if provided
func (*StreamImpl) MapInt64 ¶
func (s *StreamImpl) MapInt64(op func(int64) int64) api.Stream
MapInt64 requires one allocation per element (2 for the generic Map)
func (*StreamImpl) MapToJson ¶
func (s *StreamImpl) MapToJson() api.Stream
Maps the object into a json string representation, if it can not be encoded, it will be encoded into a nil value
func (*StreamImpl) ModifyNA ¶
func (s *StreamImpl) ModifyNA(fn func(interface{})) api.Stream
Modifies the Stream element in-place, avoids non-allocating operation. Given the root pointer can not be changed, this can only be used with struct or containers i.e. maps, etc.
func (*StreamImpl) NoneMatch ¶
func (s *StreamImpl) NoneMatch(op interface{}) bool
func (*StreamImpl) Peek ¶
func (s *StreamImpl) Peek(absPos uint64) interface{}
func (*StreamImpl) PeekLimit ¶
func (s *StreamImpl) PeekLimit() uint64
func (*StreamImpl) Publish ¶
func (s *StreamImpl) Publish(uri string) error
Consumes the contents of this Stream and publishes it into the provided URI Steam. Consumption will follow the Stream' Wait Time-Out; the output Stream will be left open.
func (*StreamImpl) PublishClose ¶
func (s *StreamImpl) PublishClose(uri string) error
Consumes the contents of this Stream and publishes it into the provided URI Steam. Consumption will follow the Stream' Wait Time-Out; the output Stream will be Closed when no more elements are available in the Stream.
func (*StreamImpl) Pull ¶
func (s *StreamImpl) Pull() api.Optional
This is not mean to be used as standard API but on specific cases (i.e. to implement Binary Search). Blocks until an element is in the Stream, or the Stream is Closed()
func (*StreamImpl) Reduce ¶
func (s *StreamImpl) Reduce(op interface{}) api.Stream
func (*StreamImpl) Reset ¶
func (s *StreamImpl) Reset() uint64
func (*StreamImpl) Skip ¶
func (s *StreamImpl) Skip(n int) api.Stream
skips N elements in the stream, it can block
func (*StreamImpl) Sum ¶
func (s *StreamImpl) Sum() api.Stream
func (*StreamImpl) SumInt64 ¶
func (s *StreamImpl) SumInt64() api.Stream
func (*StreamImpl) TimeOut ¶
func (s *StreamImpl) TimeOut(waitTimeOut api.WaitTimeOut) api.Stream