api

package
v0.0.0-...-0f6c31a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2021 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Optional

type Optional struct {
	// contains filtered or unexported fields
}

func EmptyOptional

func EmptyOptional() Optional

func OptionalOf

func OptionalOf(value interface{}) Optional

func (Optional) Get

func (o Optional) Get() interface{}

func (Optional) IfPresent

func (o Optional) IfPresent(op interface{})

func (Optional) IsEmpty

func (o Optional) IsEmpty() bool

func (Optional) IsPresent

func (o Optional) IsPresent() bool

func (Optional) Map

func (o Optional) Map(op interface{}) Optional

func (Optional) OrElse

func (o Optional) OrElse(other interface{}) interface{}

func (Optional) String

func (o Optional) String() string

type PersistentStream

type PersistentStream interface {

	// Lifecycle
	Feed(elem interface{})
	Close()
	IsClosed() bool
	CloseFile() error
	Delete() error

	Publish(uri string)
	// Subscribing, wait time-out is UntilNoMoreData
	Consume(subscriberName string) Stream
}

multi-consumer, multi-producer persistent Stream

type Reducer

type Reducer interface {
	First(interface{})
	Next(interface{})
	Result() interface{}
}

allocation free reducer

type Stream

type Stream interface {

	// Lifecycle
	Feed(elem interface{})
	Close()
	IsClosed() bool
	TimeOut(waitTimeOut WaitTimeOut) Stream
	WaitDuty(duty WaitDuty) Stream

	// Positioning operations
	Reset() uint64
	Skip(int) Stream

	CurrAbsPos() uint64
	PeekLimit() uint64
	Peek(absPos uint64) interface{}
	Pull() Optional

	Map(op interface{}) Stream
	MapInt64(func(int64) int64) Stream
	ModifyNA(func(interface{})) Stream
	Reduce(op interface{}) Stream
	ReduceNA(reducer Reducer) Stream
	Filter(op interface{}) Stream
	FilterNA(func(interface{}) bool) Stream

	Sum() Stream
	SumInt64() Stream

	EnsureTypeEx(t reflect.Kind, coerce bool, dropIfNotPossible bool) Stream
	EnsureType(t reflect.Kind) Stream

	JsonToMap() Stream
	MapToJson() Stream
	CSVasMap(firstRowIsHeader bool, asMap bool) Stream
	MapAsCSV(firstRowIsHeader bool) Stream

	// Terminating methods
	First() Optional
	Last() Optional
	IsEmpty() bool
	AsArray() []interface{}
	Count() int          //LENGTH?
	CountUint64() uint64 //LENGTH?
	AllMatch(op interface{}) bool
	NoneMatch(interface{}) bool
	AtLeastOne(interface{}) bool
	ForEach(op interface{})

	Publish(uri string) error
	PublishClose(uri string) error
}

type WaitDuty

type WaitDuty interface {
	// callback notifying a loop during wait has occurred, if something has been processed since the last call the
	// parameter `hasProcessed` should be set to true; the call might put the processor to wait or the goroutine to
	// yield, depending on its configured behaviour.
	Loop() (waitedNs int64)
	Reset()
}

Strategy specifying how to wait and process for new data. Different strategies are a trade-off between latency and CPU usage. i.e. busy wait will have the lowest latency, at the expense of putting one CPU at 100% usage. Always-wait, will impose a few ms latency due to clock-checking, wait and context-switching. An hybrid solution it usually a good trade-off, fast-spin for a few thousand times, and then if no activity is detected the thread is put to sleep and the context is switched to another process/goroutine; each time waiting a bit longer until certain upper limit.

type WaitTimeOut

type WaitTimeOut int64
const (
	UntilClosed       WaitTimeOut = -1
	UntilNoMoreData   WaitTimeOut = 0
	WaitingUpto1000ns WaitTimeOut = 1_000
	WaitingUpto10ms   WaitTimeOut = 10_000_000
	WaitingUpto1s     WaitTimeOut = 1_000_000_000
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL