base

package
v0.0.0-...-0f6c31a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LocalRegistry = registry.NewInMemoryRegistry()

Functions

func ArrayStream

func ArrayStream(elems interface{}) api.Stream

func EmptyStream

func EmptyStream(capacity int) api.Stream

--------------------------------------------------------------------------------------------------------------------

Builders

--------------------------------------------------------------------------------------------------------------------

func StreamGenerator

func StreamGenerator(generator func() api.Optional) api.Stream

func StreamGeneratorFeeder

func StreamGeneratorFeeder(s api.Stream, generator func() api.Optional)

func Subscribe

func Subscribe(uri string) (api.Stream, error)

Types

type BusyWait

type BusyWait struct {
}

func NewBusyWait

func NewBusyWait() *BusyWait

func (BusyWait) Loop

func (_ BusyWait) Loop() int64

func (BusyWait) Reset

func (_ BusyWait) Reset()

type FastSpinThenWait

type FastSpinThenWait struct {
	// contains filtered or unexported fields
}

func NewDefaultFastSpinThenWait

func NewDefaultFastSpinThenWait() *FastSpinThenWait

func NewFastSpinThenWait

func NewFastSpinThenWait(spinsBeforeSleep int, minWaitNs, maxWaitNs int64) *FastSpinThenWait

func (*FastSpinThenWait) Loop

func (w *FastSpinThenWait) Loop() (waitedNs int64)

func (*FastSpinThenWait) Reset

func (w *FastSpinThenWait) Reset()

type Int64SumReducer

type Int64SumReducer struct {
	// contains filtered or unexported fields
}

Memory efficient reducer adding all the elements of the stream into a int64 (assuming elements are int64) can process 175M int64/second/thread

func (*Int64SumReducer) First

func (r *Int64SumReducer) First(a interface{})

func (*Int64SumReducer) Next

func (r *Int64SumReducer) Next(a interface{})

func (*Int64SumReducer) Result

func (r *Int64SumReducer) Result() interface{}

type IntSumReducer

type IntSumReducer struct {
	// contains filtered or unexported fields
}

Memory efficient int reducer

func (*IntSumReducer) First

func (r *IntSumReducer) First(a interface{})

func (*IntSumReducer) Next

func (r *IntSumReducer) Next(a interface{})

func (*IntSumReducer) Result

func (r *IntSumReducer) Result() interface{}

type StreamImpl

type StreamImpl struct {
	// contains filtered or unexported fields
}

func NewStreamImpl

func NewStreamImpl(provider StreamProvider, pullFn func() (read interface{}, closed bool)) *StreamImpl

func (*StreamImpl) AllMatch

func (s *StreamImpl) AllMatch(op interface{}) bool

func (*StreamImpl) AsArray

func (s *StreamImpl) AsArray() (result []interface{})

func (*StreamImpl) AtLeastOne

func (s *StreamImpl) AtLeastOne(op interface{}) bool

func (*StreamImpl) CSVasMap

func (s *StreamImpl) CSVasMap(firstRowIsHeader bool, asMap bool) api.Stream

converts a Stream of strings into either a Map[string]interface{} or a []interface{}, depending on asMap value. If firstRowIsHeader is set, it will pick the column name from the first row, otherwise the column names will be sequential numbers starting from 1.

func (*StreamImpl) Close

func (s *StreamImpl) Close()

func (*StreamImpl) Count

func (s *StreamImpl) Count() int

func (*StreamImpl) CountUint64

func (s *StreamImpl) CountUint64() (c uint64)

func (*StreamImpl) CurrAbsPos

func (s *StreamImpl) CurrAbsPos() uint64

func (*StreamImpl) EnsureType

func (s *StreamImpl) EnsureType(kind reflect.Kind) api.Stream

Ensure the types trying to coerce and dropping elements that can not be converted

func (*StreamImpl) EnsureTypeEx

func (s *StreamImpl) EnsureTypeEx(kind reflect.Kind, coerce bool, dropIfNotPossible bool) api.Stream

converts the stream elements to the type provided; if the coerce parameter is set to true, it will try to convert the elements to the expected type i.e. a int64 will be converted into a string representation in base10, if this conversion is disabled, or the coercion is not possible, it can be either drop (setting the parameter dropIfNotPossible) or panic when dropping is disable.

func (*StreamImpl) Feed

func (s *StreamImpl) Feed(elem interface{})

func (*StreamImpl) Filter

func (s *StreamImpl) Filter(op interface{}) api.Stream

func (*StreamImpl) FilterNA

func (s *StreamImpl) FilterNA(op func(interface{}) bool) api.Stream

func (*StreamImpl) First

func (s *StreamImpl) First() api.Optional

func (*StreamImpl) ForEach

func (s *StreamImpl) ForEach(op interface{})

func (*StreamImpl) IsClosed

func (s *StreamImpl) IsClosed() bool

func (*StreamImpl) IsEmpty

func (s *StreamImpl) IsEmpty() bool

func (*StreamImpl) JsonToMap

func (s *StreamImpl) JsonToMap() api.Stream

Unmarshalls a string or an []byte into a map[string]interface{} or an []interface{}. If it can not be parsed as a valid JSON object, it will map into a nil.

func (*StreamImpl) Last

func (s *StreamImpl) Last() api.Optional

func (*StreamImpl) Map

func (s *StreamImpl) Map(op interface{}) api.Stream

func (*StreamImpl) MapAsCSV

func (s *StreamImpl) MapAsCSV(firstRowIsHeader bool) api.Stream

converts a Map[string]interface{} or []interface{} into a comma separated (and escaped if string vs number), adding an extra element at the beginning with the column names, if provided

func (*StreamImpl) MapInt64

func (s *StreamImpl) MapInt64(op func(int64) int64) api.Stream

MapInt64 requires one allocation per element (2 for the generic Map)

func (*StreamImpl) MapToJson

func (s *StreamImpl) MapToJson() api.Stream

Maps the object into a json string representation, if it can not be encoded, it will be encoded into a nil value

func (*StreamImpl) ModifyNA

func (s *StreamImpl) ModifyNA(fn func(interface{})) api.Stream

Modifies the Stream element in-place, avoids non-allocating operation. Given the root pointer can not be changed, this can only be used with struct or containers i.e. maps, etc.

func (*StreamImpl) NoneMatch

func (s *StreamImpl) NoneMatch(op interface{}) bool

func (*StreamImpl) Peek

func (s *StreamImpl) Peek(absPos uint64) interface{}

func (*StreamImpl) PeekLimit

func (s *StreamImpl) PeekLimit() uint64

func (*StreamImpl) Publish

func (s *StreamImpl) Publish(uri string) error

Consumes the contents of this Stream and publishes it into the provided URI Steam. Consumption will follow the Stream' Wait Time-Out; the output Stream will be left open.

func (*StreamImpl) PublishClose

func (s *StreamImpl) PublishClose(uri string) error

Consumes the contents of this Stream and publishes it into the provided URI Steam. Consumption will follow the Stream' Wait Time-Out; the output Stream will be Closed when no more elements are available in the Stream.

func (*StreamImpl) Pull

func (s *StreamImpl) Pull() api.Optional

This is not mean to be used as standard API but on specific cases (i.e. to implement Binary Search). Blocks until an element is in the Stream, or the Stream is Closed()

func (*StreamImpl) Reduce

func (s *StreamImpl) Reduce(op interface{}) api.Stream

func (*StreamImpl) ReduceNA

func (s *StreamImpl) ReduceNA(reducer api.Reducer) api.Stream

func (*StreamImpl) Reset

func (s *StreamImpl) Reset() uint64

func (*StreamImpl) Skip

func (s *StreamImpl) Skip(n int) api.Stream

skips N elements in the stream, it can block

func (*StreamImpl) Sum

func (s *StreamImpl) Sum() api.Stream

func (*StreamImpl) SumInt64

func (s *StreamImpl) SumInt64() api.Stream

func (*StreamImpl) TimeOut

func (s *StreamImpl) TimeOut(waitTimeOut api.WaitTimeOut) api.Stream

func (*StreamImpl) WaitDuty

func (s *StreamImpl) WaitDuty(waitDuty api.WaitDuty) api.Stream

type StreamProvider

type StreamProvider interface {
	Feed(elem interface{})
	Close()
	IsClosed() bool
	Pull() (elem interface{}, closed bool)
	Reset() uint64
	CurrAbsPos() uint64
	PeekLimit() uint64
	Peek(absPos uint64) interface{}
	WaitTimeOut(waitTimeOut api.WaitTimeOut)
	WaitDuty(waitDuty api.WaitDuty)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL