Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Closable ¶
type Closable interface {
Close(ctx StreamContext) error
}
type DefaultSourceTuple ¶
type DefaultSourceTuple struct {
// contains filtered or unexported fields
}
func NewDefaultSourceTuple ¶
func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple
func (*DefaultSourceTuple) Message ¶
func (t *DefaultSourceTuple) Message() map[string]interface{}
func (*DefaultSourceTuple) Meta ¶
func (t *DefaultSourceTuple) Meta() map[string]interface{}
type Function ¶
type Function interface {
//The argument is a list of xsql.Expr
Validate(args []interface{}) error
//Execute the function, return the result and if execution is successful.
//If execution fails, return the error and false.
Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
//If this function is an aggregate function. Each parameter of an aggregate function will be a slice
IsAggregate() bool
}
type FunctionContext ¶
type FunctionContext interface {
StreamContext
GetFuncId() int
}
type Logger ¶
type Logger interface {
Debug(args ...interface{})
Info(args ...interface{})
Warn(args ...interface{})
Error(args ...interface{})
Debugln(args ...interface{})
Infoln(args ...interface{})
Warnln(args ...interface{})
Errorln(args ...interface{})
Debugf(format string, args ...interface{})
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
type Operator ¶
type Operator interface {
Emitter
Collector
Exec(StreamContext, chan<- error)
GetName() string
GetMetrics() [][]interface{}
}
type Rewindable ¶
type Rule ¶
type Rule struct {
Triggered bool `json:"triggered"`
Id string `json:"id"`
Sql string `json:"sql"`
Actions []map[string]interface{} `json:"actions"`
Options *RuleOption `json:"options"`
}
type RuleOption ¶
type RuleOption struct {
IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
Concurrency int `json:"concurrency" yaml:"concurrency"`
BufferLength int `json:"bufferLength" yaml:"bufferLength"`
SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
Qos Qos `json:"qos" yaml:"qos"`
CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
}
type Sink ¶
type Sink interface {
//Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext) error
//Called during initialization. Configure the sink with the properties from rule action definition
Configure(props map[string]interface{}) error
//Called when each row of data has transferred to this sink
Collect(ctx StreamContext, data interface{}) error
Closable
}
type Source ¶
type Source interface {
//Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
//Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
//read from the yaml
Configure(datasource string, jsonConfig string, props map[string]interface{}) error
Closable
}
type SourceTuple ¶
type StreamContext ¶
type StreamContext interface {
context.Context
GetLogger() Logger
GetRuleId() string
GetOpId() string
GetInstanceId() int
WithMeta(ruleId string, opId string, store Store) StreamContext
WithInstance(instanceId int) StreamContext
WithCancel() (StreamContext, context.CancelFunc)
SetError(e error)
//State handling
IncrCounter(key string, amount int) error
GetCounter(key string) (int, error)
PutState(key string, value interface{}) error
GetState(key string) (interface{}, error)
DeleteState(key string) error
}
Click to show internal directories.
Click to hide internal directories.