rsql

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package rsql provides SQL parsing and analysis capabilities for StreamSQL.

This package implements a comprehensive SQL parser specifically designed for stream processing, supporting standard SQL syntax with extensions for window functions and streaming operations. It transforms SQL queries into executable stream processing configurations.

Core Features

• Complete SQL Parser - Full support for SELECT, FROM, WHERE, GROUP BY, HAVING, ORDER BY, LIMIT • Window Function Support - Native parsing of tumbling, sliding, counting, and session windows • Expression Analysis - Deep parsing of complex expressions, functions, and field references • Error Recovery - Advanced error detection and recovery with detailed error reporting • Function Validation - Integration with function registry for syntax and semantic validation • AST Generation - Abstract Syntax Tree generation for query optimization • Stream-Specific Extensions - Custom syntax for streaming operations and window management

Supported SQL Syntax

Standard SQL clauses with streaming extensions:

// Basic SELECT statement
SELECT field1, field2, AGG_FUNC(field3)
FROM stream
WHERE condition
GROUP BY field1, WindowFunction('params')
HAVING aggregate_condition
ORDER BY field1 ASC, field2 DESC
LIMIT 100

// Window functions
TumblingWindow('5s')           - Non-overlapping time windows
SlidingWindow('30s', '10s')    - Overlapping time windows
CountingWindow(100)            - Count-based windows
SessionWindow('5m')            - Session-based windows

Lexical Analysis

Advanced tokenization with comprehensive token types:

// Token types
TOKEN_SELECT, TOKEN_FROM, TOKEN_WHERE    - SQL keywords
TOKEN_IDENTIFIER, TOKEN_STRING           - Identifiers and literals
TOKEN_NUMBER, TOKEN_FLOAT               - Numeric literals
TOKEN_OPERATOR, TOKEN_COMPARISON        - Operators
TOKEN_FUNCTION, TOKEN_WINDOW            - Function calls
TOKEN_LPAREN, TOKEN_RPAREN             - Parentheses
TOKEN_COMMA, TOKEN_SEMICOLON           - Delimiters

Parser Architecture

Recursive descent parser with error recovery:

type Parser struct {
	lexer         *Lexer
	errorRecovery *ErrorRecovery
	currentToken  Token
	input         string
}

// Main parsing entry point
func (p *Parser) Parse() (*SelectStatement, error)

// Clause-specific parsers
func (p *Parser) parseSelect(stmt *SelectStatement) error
func (p *Parser) parseFrom(stmt *SelectStatement) error
func (p *Parser) parseWhere(stmt *SelectStatement) error
func (p *Parser) parseGroupBy(stmt *SelectStatement) error

Error Handling

Comprehensive error detection and recovery:

// Error types
type ParseError struct {
	Message   string
	Position  int
	Line      int
	Column    int
	Context   string
	ErrorType ErrorType
}

// Error recovery strategies
type ErrorRecovery struct {
	errors   []*ParseError
	parser   *Parser
	strategies []RecoveryStrategy
}

Function Validation

Integration with function registry for validation:

// Function validator
type FunctionValidator struct {
	functionRegistry map[string]FunctionInfo
}

// Validation methods
func (fv *FunctionValidator) ValidateFunction(name string, args []Expression) error
func (fv *FunctionValidator) ValidateAggregateFunction(name string, context AggregateContext) error
func (fv *FunctionValidator) ValidateWindowFunction(name string, params []Parameter) error

AST Structure

StreamSQL AST representation:

type SelectStatement struct {
	Fields    []Field
	Distinct  bool
	SelectAll bool
	Source    string
	Condition string
	Window    WindowDefinition
	GroupBy   []string
	Limit     int
	Having    string
}

type Field struct {
	Expression string
	Alias      string
	AggType    string
}

type WindowDefinition struct {
	Type     string
	Params   []interface{}
	TsProp   string
	TimeUnit time.Duration
}

Usage Examples

Basic SQL parsing:

parser := NewParser("SELECT AVG(temperature) FROM stream WHERE device_id = 'sensor1'")
stmt, err := parser.Parse()
if err != nil {
	log.Fatal(err)
}

// Convert to stream configuration
config, condition, err := stmt.ToStreamConfig()

Window function parsing:

sql := `SELECT device_id, AVG(temperature)
        FROM stream
        GROUP BY device_id, TumblingWindow('5s')`
config, condition, err := Parse(sql)

Complex query with multiple clauses:

sql := `SELECT device_id,
               AVG(temperature) as avg_temp,
               MAX(humidity) as max_humidity
        FROM stream
        WHERE device_id LIKE 'sensor%'
        GROUP BY device_id, SlidingWindow('1m', '30s')
        HAVING avg_temp > 25
        ORDER BY avg_temp DESC
        LIMIT 10`
config, condition, err := Parse(sql)

Configuration Generation

Transformation from AST to stream processing configuration:

type Config struct {
	WindowConfig     WindowConfig
	GroupFields      []string
	SelectFields     map[string]AggregateType
	FieldAlias       map[string]string
	SimpleFields     []string
	FieldExpressions map[string]FieldExpression
	FieldOrder       []string
	Where            string
	Having           string
	NeedWindow       bool
	Distinct         bool
	Limit            int
}

Integration

Seamless integration with other StreamSQL components:

• Functions package - Function validation and registry integration • Expr package - Expression parsing and evaluation • Types package - Configuration and data type definitions • Stream package - Configuration application and execution • Window package - Window function parsing and configuration

Index

Constants

View Source
const (
	// MaxRecursionDepth 定义 expectTokenWithDepth 方法的最大递归深度
	// 用于防止无限递归
	MaxRecursionDepth = 30

	// MaxSelectFields 定义 SELECT 子句中允许的最大字段数量
	MaxSelectFields = 300
)

解析器配置常量

Variables

This section is empty.

Functions

func FormatErrorContext

func FormatErrorContext(input string, position int, contextLength int) string

FormatErrorContext formats error context

func Parse

func Parse(sql string) (*types.Config, string, error)

Parse 是包级别的Parse函数,用于解析SQL字符串并返回配置和条件

func ParseAggregateTypeWithExpression

func ParseAggregateTypeWithExpression(exprStr string) (aggType aggregator.AggregateType, name string, expression string, allFields []string, err error)

Parse aggregation function and return expression information

Types

type ErrorRecovery

type ErrorRecovery struct {
	// contains filtered or unexported fields
}

ErrorRecovery error recovery strategy

func NewErrorRecovery

func NewErrorRecovery(parser *Parser) *ErrorRecovery

NewErrorRecovery creates error recovery instance

func (*ErrorRecovery) AddError

func (er *ErrorRecovery) AddError(err *ParseError)

AddError adds an error

func (*ErrorRecovery) GetErrors

func (er *ErrorRecovery) GetErrors() []*ParseError

GetErrors gets all errors

func (*ErrorRecovery) HasErrors

func (er *ErrorRecovery) HasErrors() bool

HasErrors checks if there are errors

func (*ErrorRecovery) RecoverFromError

func (er *ErrorRecovery) RecoverFromError(errorType ErrorType) bool

RecoverFromError recovers from error

type ErrorType

type ErrorType int

ErrorType defines error types

const (
	ErrorTypeSyntax ErrorType = iota
	ErrorTypeLexical
	ErrorTypeSemantics
	ErrorTypeUnexpectedToken
	ErrorTypeMissingToken
	ErrorTypeInvalidExpression
	ErrorTypeUnknownKeyword
	ErrorTypeInvalidNumber
	ErrorTypeUnterminatedString
	ErrorTypeMaxIterations
	ErrorTypeUnknownFunction
)

type Field

type Field struct {
	Expression string
	Alias      string
	AggType    string
}

type FunctionCall

type FunctionCall struct {
	Name     string
	Position int
}

FunctionCall contains function call information

type FunctionValidator

type FunctionValidator struct {
	// contains filtered or unexported fields
}

FunctionValidator validates SQL functions in expressions

func NewFunctionValidator

func NewFunctionValidator(errorRecovery *ErrorRecovery) *FunctionValidator

NewFunctionValidator creates a new function validator

func (*FunctionValidator) ValidateExpression

func (fv *FunctionValidator) ValidateExpression(expression string, position int)

ValidateExpression validates functions within expressions

type Lexer

type Lexer struct {
	// contains filtered or unexported fields
}

func NewLexer

func NewLexer(input string) *Lexer

func (*Lexer) GetPosition

func (l *Lexer) GetPosition() (int, int, int)

GetPosition 获取当前位置信息

func (*Lexer) NextToken

func (l *Lexer) NextToken() Token

func (*Lexer) SetErrorRecovery

func (l *Lexer) SetErrorRecovery(er *ErrorRecovery)

SetErrorRecovery 设置错误恢复实例

type ParseError

type ParseError struct {
	Type        ErrorType
	Message     string
	Position    int
	Line        int
	Column      int
	Token       string
	Expected    []string
	Suggestions []string
	Context     string
	Recoverable bool
}

ParseError enhanced parsing error structure

func CreateLexicalError

func CreateLexicalError(message string, position int, char byte) *ParseError

CreateLexicalError creates lexical error

func CreateLexicalErrorWithPosition

func CreateLexicalErrorWithPosition(message string, position int, line int, column int, char byte) *ParseError

CreateLexicalErrorWithPosition creates lexical error with accurate position

func CreateMissingTokenError

func CreateMissingTokenError(expected string, position int) *ParseError

CreateMissingTokenError creates missing token error

func CreateSemanticError added in v0.10.2

func CreateSemanticError(message string, position int) *ParseError

CreateSemanticError creates semantic error

func CreateSyntaxError

func CreateSyntaxError(message string, position int, token string, expected []string) *ParseError

CreateSyntaxError creates syntax error

func CreateUnexpectedTokenError

func CreateUnexpectedTokenError(found string, expected []string, position int) *ParseError

CreateUnexpectedTokenError creates unexpected token error

func CreateUnknownFunctionError

func CreateUnknownFunctionError(functionName string, position int) *ParseError

CreateUnknownFunctionError creates unknown function error

func (*ParseError) Error

func (e *ParseError) Error() string

Error implements the error interface

func (*ParseError) IsRecoverable

func (e *ParseError) IsRecoverable() bool

IsRecoverable checks if error is recoverable

type Parser

type Parser struct {
	// contains filtered or unexported fields
}

func NewParser

func NewParser(input string) *Parser

func (*Parser) GetErrors

func (p *Parser) GetErrors() []*ParseError

GetErrors 获取解析过程中的所有错误

func (*Parser) HasErrors

func (p *Parser) HasErrors() bool

HasErrors 检查是否有错误

func (*Parser) Parse

func (p *Parser) Parse() (*SelectStatement, error)

type SelectStatement

type SelectStatement struct {
	Fields    []Field
	Distinct  bool
	SelectAll bool // Flag to indicate if this is a SELECT * query
	Source    string
	Condition string
	Window    WindowDefinition
	GroupBy   []string
	Limit     int
	Having    string
}

func (*SelectStatement) ToStreamConfig

func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error)

ToStreamConfig converts AST to Stream configuration

type Token

type Token struct {
	Type   TokenType
	Value  string
	Pos    int
	Line   int
	Column int
}

type TokenType

type TokenType int
const (
	TokenEOF TokenType = iota
	TokenIdent
	TokenNumber
	TokenString
	TokenQuotedIdent // 反引号标识符
	TokenComma
	TokenLParen
	TokenRParen
	TokenPlus
	TokenMinus
	TokenAsterisk
	TokenSlash
	TokenEQ
	TokenNE
	TokenGT
	TokenLT
	TokenGE
	TokenLE
	TokenAND
	TokenOR
	TokenSELECT
	TokenFROM
	TokenWHERE
	TokenGROUP
	TokenBY
	TokenAS
	TokenTumbling
	TokenSliding
	TokenCounting
	TokenSession
	TokenWITH
	TokenTimestamp
	TokenTimeUnit
	TokenOrder
	TokenDISTINCT
	TokenLIMIT
	TokenHAVING
	TokenLIKE
	TokenIS
	TokenNULL
	TokenNOT
	// CASE表达式相关token
	TokenCASE
	TokenWHEN
	TokenTHEN
	TokenELSE
	TokenEND
	// 数组索引相关token
	TokenLBracket
	TokenRBracket
)

type WindowDefinition

type WindowDefinition struct {
	Type     string
	Params   []interface{}
	TsProp   string
	TimeUnit time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL