Documentation
¶
Overview ¶
Package rsql provides SQL parsing and analysis capabilities for StreamSQL.
This package implements a comprehensive SQL parser specifically designed for stream processing, supporting standard SQL syntax with extensions for window functions and streaming operations. It transforms SQL queries into executable stream processing configurations.
Core Features ¶
• Complete SQL Parser - Full support for SELECT, FROM, WHERE, GROUP BY, HAVING, ORDER BY, LIMIT • Window Function Support - Native parsing of tumbling, sliding, counting, and session windows • Expression Analysis - Deep parsing of complex expressions, functions, and field references • Error Recovery - Advanced error detection and recovery with detailed error reporting • Function Validation - Integration with function registry for syntax and semantic validation • AST Generation - Abstract Syntax Tree generation for query optimization • Stream-Specific Extensions - Custom syntax for streaming operations and window management
Supported SQL Syntax ¶
Standard SQL clauses with streaming extensions:
// Basic SELECT statement SELECT field1, field2, AGG_FUNC(field3) FROM stream WHERE condition GROUP BY field1, WindowFunction('params') HAVING aggregate_condition ORDER BY field1 ASC, field2 DESC LIMIT 100 // Window functions TumblingWindow('5s') - Non-overlapping time windows SlidingWindow('30s', '10s') - Overlapping time windows CountingWindow(100) - Count-based windows SessionWindow('5m') - Session-based windows
Lexical Analysis ¶
Advanced tokenization with comprehensive token types:
// Token types TOKEN_SELECT, TOKEN_FROM, TOKEN_WHERE - SQL keywords TOKEN_IDENTIFIER, TOKEN_STRING - Identifiers and literals TOKEN_NUMBER, TOKEN_FLOAT - Numeric literals TOKEN_OPERATOR, TOKEN_COMPARISON - Operators TOKEN_FUNCTION, TOKEN_WINDOW - Function calls TOKEN_LPAREN, TOKEN_RPAREN - Parentheses TOKEN_COMMA, TOKEN_SEMICOLON - Delimiters
Parser Architecture ¶
Recursive descent parser with error recovery:
type Parser struct { lexer *Lexer errorRecovery *ErrorRecovery currentToken Token input string } // Main parsing entry point func (p *Parser) Parse() (*SelectStatement, error) // Clause-specific parsers func (p *Parser) parseSelect(stmt *SelectStatement) error func (p *Parser) parseFrom(stmt *SelectStatement) error func (p *Parser) parseWhere(stmt *SelectStatement) error func (p *Parser) parseGroupBy(stmt *SelectStatement) error
Error Handling ¶
Comprehensive error detection and recovery:
// Error types type ParseError struct { Message string Position int Line int Column int Context string ErrorType ErrorType } // Error recovery strategies type ErrorRecovery struct { errors []*ParseError parser *Parser strategies []RecoveryStrategy }
Function Validation ¶
Integration with function registry for validation:
// Function validator type FunctionValidator struct { functionRegistry map[string]FunctionInfo } // Validation methods func (fv *FunctionValidator) ValidateFunction(name string, args []Expression) error func (fv *FunctionValidator) ValidateAggregateFunction(name string, context AggregateContext) error func (fv *FunctionValidator) ValidateWindowFunction(name string, params []Parameter) error
AST Structure ¶
StreamSQL AST representation:
type SelectStatement struct { Fields []Field Distinct bool SelectAll bool Source string Condition string Window WindowDefinition GroupBy []string Limit int Having string } type Field struct { Expression string Alias string AggType string } type WindowDefinition struct { Type string Params []interface{} TsProp string TimeUnit time.Duration }
Usage Examples ¶
Basic SQL parsing:
parser := NewParser("SELECT AVG(temperature) FROM stream WHERE device_id = 'sensor1'") stmt, err := parser.Parse() if err != nil { log.Fatal(err) } // Convert to stream configuration config, condition, err := stmt.ToStreamConfig()
Window function parsing:
sql := `SELECT device_id, AVG(temperature) FROM stream GROUP BY device_id, TumblingWindow('5s')` config, condition, err := Parse(sql)
Complex query with multiple clauses:
sql := `SELECT device_id, AVG(temperature) as avg_temp, MAX(humidity) as max_humidity FROM stream WHERE device_id LIKE 'sensor%' GROUP BY device_id, SlidingWindow('1m', '30s') HAVING avg_temp > 25 ORDER BY avg_temp DESC LIMIT 10` config, condition, err := Parse(sql)
Configuration Generation ¶
Transformation from AST to stream processing configuration:
type Config struct { WindowConfig WindowConfig GroupFields []string SelectFields map[string]AggregateType FieldAlias map[string]string SimpleFields []string FieldExpressions map[string]FieldExpression FieldOrder []string Where string Having string NeedWindow bool Distinct bool Limit int }
Integration ¶
Seamless integration with other StreamSQL components:
• Functions package - Function validation and registry integration • Expr package - Expression parsing and evaluation • Types package - Configuration and data type definitions • Stream package - Configuration application and execution • Window package - Window function parsing and configuration
Index ¶
- Constants
- func FormatErrorContext(input string, position int, contextLength int) string
- func Parse(sql string) (*types.Config, string, error)
- func ParseAggregateTypeWithExpression(exprStr string) (aggType aggregator.AggregateType, name string, expression string, ...)
- type ErrorRecovery
- type ErrorType
- type Field
- type FunctionCall
- type FunctionValidator
- type Lexer
- type ParseError
- func CreateLexicalError(message string, position int, char byte) *ParseError
- func CreateLexicalErrorWithPosition(message string, position int, line int, column int, char byte) *ParseError
- func CreateMissingTokenError(expected string, position int) *ParseError
- func CreateSemanticError(message string, position int) *ParseError
- func CreateSyntaxError(message string, position int, token string, expected []string) *ParseError
- func CreateUnexpectedTokenError(found string, expected []string, position int) *ParseError
- func CreateUnknownFunctionError(functionName string, position int) *ParseError
- type Parser
- type SelectStatement
- type Token
- type TokenType
- type WindowDefinition
Constants ¶
const ( // MaxRecursionDepth 定义 expectTokenWithDepth 方法的最大递归深度 // 用于防止无限递归 MaxRecursionDepth = 30 // MaxSelectFields 定义 SELECT 子句中允许的最大字段数量 MaxSelectFields = 300 )
解析器配置常量
Variables ¶
This section is empty.
Functions ¶
func FormatErrorContext ¶
FormatErrorContext formats error context
func ParseAggregateTypeWithExpression ¶
func ParseAggregateTypeWithExpression(exprStr string) (aggType aggregator.AggregateType, name string, expression string, allFields []string, err error)
Parse aggregation function and return expression information
Types ¶
type ErrorRecovery ¶
type ErrorRecovery struct {
// contains filtered or unexported fields
}
ErrorRecovery error recovery strategy
func NewErrorRecovery ¶
func NewErrorRecovery(parser *Parser) *ErrorRecovery
NewErrorRecovery creates error recovery instance
func (*ErrorRecovery) AddError ¶
func (er *ErrorRecovery) AddError(err *ParseError)
AddError adds an error
func (*ErrorRecovery) GetErrors ¶
func (er *ErrorRecovery) GetErrors() []*ParseError
GetErrors gets all errors
func (*ErrorRecovery) HasErrors ¶
func (er *ErrorRecovery) HasErrors() bool
HasErrors checks if there are errors
func (*ErrorRecovery) RecoverFromError ¶
func (er *ErrorRecovery) RecoverFromError(errorType ErrorType) bool
RecoverFromError recovers from error
type FunctionCall ¶
FunctionCall contains function call information
type FunctionValidator ¶
type FunctionValidator struct {
// contains filtered or unexported fields
}
FunctionValidator validates SQL functions in expressions
func NewFunctionValidator ¶
func NewFunctionValidator(errorRecovery *ErrorRecovery) *FunctionValidator
NewFunctionValidator creates a new function validator
func (*FunctionValidator) ValidateExpression ¶
func (fv *FunctionValidator) ValidateExpression(expression string, position int)
ValidateExpression validates functions within expressions
type Lexer ¶
type Lexer struct {
// contains filtered or unexported fields
}
func (*Lexer) SetErrorRecovery ¶
func (l *Lexer) SetErrorRecovery(er *ErrorRecovery)
SetErrorRecovery 设置错误恢复实例
type ParseError ¶
type ParseError struct { Type ErrorType Message string Position int Line int Column int Token string Expected []string Suggestions []string Context string Recoverable bool }
ParseError enhanced parsing error structure
func CreateLexicalError ¶
func CreateLexicalError(message string, position int, char byte) *ParseError
CreateLexicalError creates lexical error
func CreateLexicalErrorWithPosition ¶
func CreateLexicalErrorWithPosition(message string, position int, line int, column int, char byte) *ParseError
CreateLexicalErrorWithPosition creates lexical error with accurate position
func CreateMissingTokenError ¶
func CreateMissingTokenError(expected string, position int) *ParseError
CreateMissingTokenError creates missing token error
func CreateSemanticError ¶ added in v0.10.2
func CreateSemanticError(message string, position int) *ParseError
CreateSemanticError creates semantic error
func CreateSyntaxError ¶
func CreateSyntaxError(message string, position int, token string, expected []string) *ParseError
CreateSyntaxError creates syntax error
func CreateUnexpectedTokenError ¶
func CreateUnexpectedTokenError(found string, expected []string, position int) *ParseError
CreateUnexpectedTokenError creates unexpected token error
func CreateUnknownFunctionError ¶
func CreateUnknownFunctionError(functionName string, position int) *ParseError
CreateUnknownFunctionError creates unknown function error
func (*ParseError) IsRecoverable ¶
func (e *ParseError) IsRecoverable() bool
IsRecoverable checks if error is recoverable
type Parser ¶
type Parser struct {
// contains filtered or unexported fields
}
func (*Parser) Parse ¶
func (p *Parser) Parse() (*SelectStatement, error)
type SelectStatement ¶
type SelectStatement struct { Fields []Field Distinct bool SelectAll bool // Flag to indicate if this is a SELECT * query Source string Condition string Window WindowDefinition GroupBy []string Limit int Having string }
func (*SelectStatement) ToStreamConfig ¶
func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error)
ToStreamConfig converts AST to Stream configuration
type TokenType ¶
type TokenType int
const ( TokenEOF TokenType = iota TokenIdent TokenNumber TokenString TokenQuotedIdent // 反引号标识符 TokenComma TokenLParen TokenRParen TokenPlus TokenMinus TokenAsterisk TokenSlash TokenEQ TokenNE TokenGT TokenLT TokenGE TokenLE TokenAND TokenOR TokenSELECT TokenFROM TokenWHERE TokenGROUP TokenBY TokenAS TokenTumbling TokenSliding TokenCounting TokenSession TokenWITH TokenTimestamp TokenTimeUnit TokenOrder TokenDISTINCT TokenLIMIT TokenHAVING TokenLIKE TokenIS TokenNULL TokenNOT // CASE表达式相关token TokenCASE TokenWHEN TokenTHEN TokenELSE TokenEND // 数组索引相关token TokenLBracket TokenRBracket )