nodes

package
v0.0.0-...-7b16d43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2021 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const BufferLength = "buffer_length"
View Source
const ExceptionsTotal = "exceptions_total"
View Source
const LastInvocation = "last_invocation"
View Source
const MSG_COUNT_KEY = "$$msgCount"
View Source
const OFFSET_KEY = "$$offset"
View Source
const ProcessLatencyUs = "process_latency_us"
View Source
const RecordsInTotal = "records_in_total"
View Source
const RecordsOutTotal = "records_out_total"
View Source
const TRIGGER_TIME_KEY = "$$triggerTime"
View Source
const WATERMARK_KEY = "$$wartermark"
View Source
const WINDOW_INPUTS_KEY = "$$windowInputs"

Variables

Functions

This section is empty.

Types

type Cache

type Cache struct {
	Out      chan *CacheTuple
	Complete chan int
	// contains filtered or unexported fields
}

func NewCheckpointbasedCache

func NewCheckpointbasedCache(in <-chan interface{}, limit int, tch <-chan struct{}, errCh chan<- error, ctx api.StreamContext) *Cache

func NewTimebasedCache

func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache

func (*Cache) Length

func (c *Cache) Length() int

type CacheTuple

type CacheTuple struct {
	// contains filtered or unexported fields
}

type DefaultStatManager

type DefaultStatManager struct {
	// contains filtered or unexported fields
}

The statManager is not thread safe. Make sure it is used in only one instance

func (*DefaultStatManager) GetMetrics

func (sm *DefaultStatManager) GetMetrics() []interface{}

func (*DefaultStatManager) IncTotalExceptions

func (sm *DefaultStatManager) IncTotalExceptions()

func (*DefaultStatManager) IncTotalRecordsIn

func (sm *DefaultStatManager) IncTotalRecordsIn()

func (*DefaultStatManager) IncTotalRecordsOut

func (sm *DefaultStatManager) IncTotalRecordsOut()

func (*DefaultStatManager) ProcessTimeEnd

func (sm *DefaultStatManager) ProcessTimeEnd()

func (*DefaultStatManager) ProcessTimeStart

func (sm *DefaultStatManager) ProcessTimeStart()

func (*DefaultStatManager) SetBufferLength

func (sm *DefaultStatManager) SetBufferLength(l int64)

type DynamicChannelBuffer

type DynamicChannelBuffer struct {
	In  chan api.SourceTuple
	Out chan api.SourceTuple
	// contains filtered or unexported fields
}

func NewDynamicChannelBuffer

func NewDynamicChannelBuffer() *DynamicChannelBuffer

func (*DynamicChannelBuffer) Close

func (b *DynamicChannelBuffer) Close()

func (*DynamicChannelBuffer) GetLength

func (b *DynamicChannelBuffer) GetLength() int

func (*DynamicChannelBuffer) SetLimit

func (b *DynamicChannelBuffer) SetLimit(limit int)

type LinkedQueue

type LinkedQueue struct {
	Data map[int]interface{}
	Tail int
}

func (*LinkedQueue) String

func (l *LinkedQueue) String() string

type MetricGroup

type MetricGroup struct {
	TotalRecordsIn  *prometheus.CounterVec
	TotalRecordsOut *prometheus.CounterVec
	TotalExceptions *prometheus.CounterVec
	ProcessLatency  *prometheus.GaugeVec
	BufferLength    *prometheus.GaugeVec
}

type OperatorNode

type OperatorNode interface {
	api.Operator
	Broadcast(data interface{}) error
	GetStreamContext() api.StreamContext
	GetInputCount() int
	AddInputCount()
	SetQos(api.Qos)
	SetBarrierHandler(checkpoints.BarrierHandler)
}

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

func GetPrometheusMetrics

func GetPrometheusMetrics() *PrometheusMetrics

func (*PrometheusMetrics) GetMetricsGroup

func (m *PrometheusMetrics) GetMetricsGroup(opType string) *MetricGroup

type PrometheusStatManager

type PrometheusStatManager struct {
	DefaultStatManager
	// contains filtered or unexported fields
}

func (*PrometheusStatManager) IncTotalExceptions

func (sm *PrometheusStatManager) IncTotalExceptions()

func (*PrometheusStatManager) IncTotalRecordsIn

func (sm *PrometheusStatManager) IncTotalRecordsIn()

func (*PrometheusStatManager) IncTotalRecordsOut

func (sm *PrometheusStatManager) IncTotalRecordsOut()

func (*PrometheusStatManager) ProcessTimeEnd

func (sm *PrometheusStatManager) ProcessTimeEnd()

func (*PrometheusStatManager) SetBufferLength

func (sm *PrometheusStatManager) SetBufferLength(l int64)

type SinkNode

type SinkNode struct {
	// contains filtered or unexported fields
}

func NewSinkNode

func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode

func NewSinkNodeWithSink

func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode

Only for mock source, do not use it in production

func (SinkNode) AddInputCount

func (o SinkNode) AddInputCount()

func (*SinkNode) AddOutput

func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error

Override defaultNode

func (*SinkNode) Broadcast

func (m *SinkNode) Broadcast(_ interface{}) error

Override defaultNode

func (SinkNode) GetInput

func (o SinkNode) GetInput() (chan<- interface{}, string)

func (SinkNode) GetInputCount

func (o SinkNode) GetInputCount() int

func (*SinkNode) Open

func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error)

func (*SinkNode) SaveCache

func (m *SinkNode) SaveCache()

Only called when checkpoint enabled

func (SinkNode) SetBarrierHandler

func (o SinkNode) SetBarrierHandler(bh checkpoints.BarrierHandler)

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

func NewSourceNode

func NewSourceNode(name string, options map[string]string) *SourceNode

func NewSourceNodeWithSource

func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode

Only for mock source, do not use it in production

func (SourceNode) AddOutput

func (o SourceNode) AddOutput(output chan<- interface{}, name string) error

func (SourceNode) Broadcast

func (o SourceNode) Broadcast(val interface{}) error

func (SourceNode) GetMetrics

func (o SourceNode) GetMetrics() (result [][]interface{})

func (SourceNode) GetName

func (o SourceNode) GetName() string

func (SourceNode) GetStreamContext

func (o SourceNode) GetStreamContext() api.StreamContext

func (*SourceNode) Open

func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error)

func (SourceNode) SetConcurrency

func (o SourceNode) SetConcurrency(concurr int)

SetConcurrency sets the concurrency level for the operation

func (SourceNode) SetQos

func (o SourceNode) SetQos(qos api.Qos)

type StatManager

type StatManager interface {
	IncTotalRecordsIn()
	IncTotalRecordsOut()
	IncTotalExceptions()
	ProcessTimeStart()
	ProcessTimeEnd()
	SetBufferLength(l int64)
	GetMetrics() []interface{}
}

func NewStatManager

func NewStatManager(opType string, ctx api.StreamContext) (StatManager, error)

type TupleList

type TupleList struct {
	// contains filtered or unexported fields
}

func NewTupleList

func NewTupleList(tuples []*xsql.Tuple, windowSize int) (TupleList, error)

type UnFunc

type UnFunc func(api.StreamContext, interface{}) interface{}

UnFunc implements UnOperation as type func (context.Context, interface{})

func (UnFunc) Apply

func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{}

Apply implements UnOperation.Apply method

type UnOperation

type UnOperation interface {
	Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
}

UnOperation interface represents unary operations (i.e. Map, Filter, etc)

type UnaryOperator

type UnaryOperator struct {
	// contains filtered or unexported fields
}

func New

func New(name string, bufferLength int) *UnaryOperator

NewUnary creates *UnaryOperator value

func (UnaryOperator) AddInputCount

func (o UnaryOperator) AddInputCount()

func (*UnaryOperator) Exec

func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)

Exec is the entry point for the executor

func (UnaryOperator) GetInput

func (o UnaryOperator) GetInput() (chan<- interface{}, string)

func (UnaryOperator) GetInputCount

func (o UnaryOperator) GetInputCount() int

func (UnaryOperator) SetBarrierHandler

func (o UnaryOperator) SetBarrierHandler(bh checkpoints.BarrierHandler)

func (*UnaryOperator) SetOperation

func (o *UnaryOperator) SetOperation(op UnOperation)

SetOperation sets the executor operation

type WatermarkGenerator

type WatermarkGenerator struct {
	// contains filtered or unexported fields
}

func NewWatermarkGenerator

func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error)

type WatermarkTuple

type WatermarkTuple struct {
	Timestamp int64
}

func (*WatermarkTuple) GetTimestamp

func (t *WatermarkTuple) GetTimestamp() int64

func (*WatermarkTuple) IsWatermark

func (t *WatermarkTuple) IsWatermark() bool

type WindowConfig

type WindowConfig struct {
	Type     xsql.WindowType
	Length   int
	Interval int //If interval is not set, it is equals to Length
}

type WindowOperator

type WindowOperator struct {
	// contains filtered or unexported fields
}

func NewWindowOp

func NewWindowOp(name string, w WindowConfig, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error)

func (WindowOperator) AddInputCount

func (o WindowOperator) AddInputCount()

func (*WindowOperator) Exec

func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)

Exec is the entry point for the executor input: *xsql.Tuple from preprocessor output: xsql.WindowTuplesSet

func (WindowOperator) GetInput

func (o WindowOperator) GetInput() (chan<- interface{}, string)

func (WindowOperator) GetInputCount

func (o WindowOperator) GetInputCount() int

func (*WindowOperator) GetMetrics

func (o *WindowOperator) GetMetrics() [][]interface{}

func (WindowOperator) SetBarrierHandler

func (o WindowOperator) SetBarrierHandler(bh checkpoints.BarrierHandler)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL