api

package
v0.0.0-...-7b16d43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2021 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Closable

type Closable interface {
	Close(ctx StreamContext) error
}

type Collector

type Collector interface {
	GetInput() (chan<- interface{}, string)
}

type DefaultSourceTuple

type DefaultSourceTuple struct {
	// contains filtered or unexported fields
}

func NewDefaultSourceTuple

func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple

func (*DefaultSourceTuple) Message

func (t *DefaultSourceTuple) Message() map[string]interface{}

func (*DefaultSourceTuple) Meta

func (t *DefaultSourceTuple) Meta() map[string]interface{}

type Emitter

type Emitter interface {
	AddOutput(chan<- interface{}, string) error
}

type Function

type Function interface {
	//The argument is a list of xsql.Expr
	Validate(args []interface{}) error
	//Execute the function, return the result and if execution is successful.
	//If execution fails, return the error and false.
	Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
	//If this function is an aggregate function. Each parameter of an aggregate function will be a slice
	IsAggregate() bool
}

type FunctionContext

type FunctionContext interface {
	StreamContext
	GetFuncId() int
}

type Logger

type Logger interface {
	Debug(args ...interface{})
	Info(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
	Debugln(args ...interface{})
	Infoln(args ...interface{})
	Warnln(args ...interface{})
	Errorln(args ...interface{})
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

type Operator

type Operator interface {
	Emitter
	Collector
	Exec(StreamContext, chan<- error)
	GetName() string
	GetMetrics() [][]interface{}
}

type Qos

type Qos int
const (
	AtMostOnce Qos = iota
	AtLeastOnce
	ExactlyOnce
)

type Rewindable

type Rewindable interface {
	GetOffset() (interface{}, error)
	Rewind(offset interface{}) error
}

type Rule

type Rule struct {
	Triggered bool                     `json:"triggered"`
	Id        string                   `json:"id"`
	Sql       string                   `json:"sql"`
	Actions   []map[string]interface{} `json:"actions"`
	Options   *RuleOption              `json:"options"`
}

type RuleOption

type RuleOption struct {
	IsEventTime        bool  `json:"isEventTime" yaml:"isEventTime"`
	LateTol            int64 `json:"lateTolerance" yaml:"lateTolerance"`
	Concurrency        int   `json:"concurrency" yaml:"concurrency"`
	BufferLength       int   `json:"bufferLength" yaml:"bufferLength"`
	SendMetaToSink     bool  `json:"sendMetaToSink" yaml:"sendMetaToSink"`
	Qos                Qos   `json:"qos" yaml:"qos"`
	CheckpointInterval int   `json:"checkpointInterval" yaml:"checkpointInterval"`
}

type Sink

type Sink interface {
	//Should be sync function for normal case. The container will run it in go func
	Open(ctx StreamContext) error
	//Called during initialization. Configure the sink with the properties from rule action definition
	Configure(props map[string]interface{}) error
	//Called when each row of data has transferred to this sink
	Collect(ctx StreamContext, data interface{}) error
	Closable
}

type Source

type Source interface {
	//Should be sync function for normal case. The container will run it in go func
	Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
	//Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
	//read from the yaml
	Configure(datasource string, jsonConfig string, props map[string]interface{}) error
	Closable
}

type SourceTuple

type SourceTuple interface {
	Message() map[string]interface{}
	Meta() map[string]interface{}
}

type Store

type Store interface {
	SaveState(checkpointId int64, opId string, state map[string]interface{}) error
	SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage like badger
	GetOpState(opId string) (*sync.Map, error)
}

type StreamContext

type StreamContext interface {
	context.Context
	GetLogger() Logger
	GetRuleId() string
	GetOpId() string
	GetInstanceId() int
	WithMeta(ruleId string, opId string, store Store) StreamContext
	WithInstance(instanceId int) StreamContext
	WithCancel() (StreamContext, context.CancelFunc)
	SetError(e error)
	//State handling
	IncrCounter(key string, amount int) error
	GetCounter(key string) (int, error)
	PutState(key string, value interface{}) error
	GetState(key string) (interface{}, error)
	DeleteState(key string) error
}

type TopNode

type TopNode interface {
	GetName() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL