Documentation
¶
Overview ¶
Package streamsql 是一个轻量级的、基于 SQL 的物联网边缘流处理引擎。
StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种窗口类型、聚合函数、 自定义函数,以及与 RuleGo 生态的无缝集成。
核心特性 ¶
• 轻量级设计 - 纯内存操作,无外部依赖 • SQL语法支持 - 使用熟悉的SQL语法处理流数据 • 多种窗口类型 - 滑动窗口、滚动窗口、计数窗口、会话窗口 • 丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等 • 插件式自定义函数 - 运行时动态注册,支持8种函数类型 • RuleGo生态集成 - 利用RuleGo组件扩展输入输出源
入门示例 ¶
基本的流数据处理:
package main import ( "fmt" "math/rand" "time" "github.com/rulego/streamsql" ) func main() { // 创建StreamSQL实例 ssql := streamsql.New() // 定义SQL查询 - 每5秒按设备ID分组计算温度平均值 sql := `SELECT deviceId, AVG(temperature) as avg_temp, MIN(humidity) as min_humidity, window_start() as start, window_end() as end FROM stream WHERE deviceId != 'device3' GROUP BY deviceId, TumblingWindow('5s')` // 执行SQL,创建流处理任务 err := ssql.Execute(sql) if err != nil { panic(err) } // 添加结果处理回调 ssql.Stream().AddSink(func(result interface{}) { fmt.Printf("聚合结果: %v\n", result) }) // 模拟发送流数据 go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // 生成随机设备数据 data := map[string]interface{}{ "deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1), "temperature": 20.0 + rand.Float64()*10, "humidity": 50.0 + rand.Float64()*20, } ssql.AddData(data) } } }() // 运行30秒 time.Sleep(30 * time.Second) }
窗口函数 ¶
StreamSQL 支持多种窗口类型:
// 滚动窗口 - 每5秒一个独立窗口 SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s') // 滑动窗口 - 窗口大小30秒,每10秒滑动一次 SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s') // 计数窗口 - 每100条记录一个窗口 SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100) // 会话窗口 - 超时5分钟自动关闭会话 SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
自定义函数 ¶
StreamSQL 支持插件式自定义函数,运行时动态注册:
// 注册温度转换函数 functions.RegisterCustomFunction( "fahrenheit_to_celsius", functions.TypeConversion, "温度转换", "华氏度转摄氏度", 1, 1, func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) { f, _ := functions.ConvertToFloat64(args[0]) return (f - 32) * 5 / 9, nil }, ) // 立即在SQL中使用 sql := `SELECT deviceId, AVG(fahrenheit_to_celsius(temperature)) as avg_celsius FROM stream GROUP BY deviceId, TumblingWindow('5s')`
支持的自定义函数类型: • TypeMath - 数学计算函数 • TypeString - 字符串处理函数 • TypeConversion - 类型转换函数 • TypeDateTime - 时间日期函数 • TypeAggregation - 聚合函数 • TypeAnalytical - 分析函数 • TypeWindow - 窗口函数 • TypeCustom - 通用自定义函数
日志配置 ¶
StreamSQL 提供灵活的日志配置选项:
// 设置日志级别 ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG)) // 输出到文件 logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO)) // 禁用日志(生产环境) ssql := streamsql.New(streamsql.WithDiscardLog())
性能配置 ¶
对于生产环境,建议进行以下配置:
ssql := streamsql.New( streamsql.WithDiscardLog(), // 禁用日志提升性能 // 其他配置选项... )
与RuleGo集成 ¶
StreamSQL可以与RuleGo规则引擎无缝集成,利用RuleGo丰富的组件生态:
// TODO: 提供RuleGo集成示例
更多详细信息和高级用法,请参阅: • 自定义函数开发指南: docs/CUSTOM_FUNCTIONS_GUIDE.md • 快速入门指南: docs/FUNCTION_QUICK_START.md • 完整示例: examples/
Index ¶
- type Option
- func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option
- func WithBuffers(dataBufSize, resultBufSize, sinkPoolSize int) Option
- func WithCustomPerformance(config types.PerformanceConfig) Option
- func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
- func WithDiscardLog() Option
- func WithHighPerf() Option
- func WithHighPerformance() Option
- func WithLogLevel(level logger.Level) Option
- func WithLowLatency() Option
- func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option
- func WithOverflowPolicy(strategy string, timeout time.Duration) Option
- func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option
- func WithPersistence() Option
- func WithPersistenceConfig(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
- func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option
- func WithZeroDataLoss() Option
- type Streamsql
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*Streamsql)
Option 定义StreamSQL的配置选项类型
func WithBufferSizes ¶
WithBufferSizes 设置自定义缓冲区大小
func WithBuffers ¶
WithBuffers 设置缓冲区大小 (已弃用,使用WithBufferSizes) Deprecated: 使用WithBufferSizes替代
func WithCustomPerformance ¶
func WithCustomPerformance(config types.PerformanceConfig) Option
WithCustomPerformance 使用自定义性能配置
func WithCustomPersistence ¶
WithCustomPersistence 使用自定义持久化配置
func WithHighPerf ¶
func WithHighPerf() Option
WithHighPerf 启用高性能模式 (已弃用,使用WithHighPerformance) Deprecated: 使用WithHighPerformance替代
func WithHighPerformance ¶
func WithHighPerformance() Option
WithHighPerformance 使用高性能配置 适用于需要最大吞吐量的场景
func WithMonitoring ¶
WithMonitoring 启用详细监控
func WithOverflowPolicy ¶
WithOverflowPolicy 设置溢出策略 (已弃用,使用WithOverflowStrategy) Deprecated: 使用WithOverflowStrategy替代
func WithOverflowStrategy ¶
WithOverflowStrategy 设置溢出策略
func WithPersistenceConfig ¶
WithPersistenceConfig 设置持久化配置 (已弃用,使用WithCustomPersistence) Deprecated: 使用WithCustomPersistence替代
func WithWorkerConfig ¶
WithWorkerConfig 设置工作池配置
type Streamsql ¶
type Streamsql struct {
// contains filtered or unexported fields
}
Streamsql 是StreamSQL流处理引擎的主要接口。 它封装了SQL解析、流处理、窗口管理等核心功能。
使用示例:
ssql := streamsql.New() err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')") ssql.AddData(map[string]interface{}{"temperature": 25.5})
func New ¶
New 创建一个新的StreamSQL实例。 支持通过可选的Option参数进行配置。
参数:
- options: 可变长度的配置选项,用于自定义StreamSQL行为
返回值:
- *Streamsql: 新创建的StreamSQL实例
示例:
// 创建默认实例 ssql := streamsql.New() // 创建高性能实例 ssql := streamsql.New(streamsql.WithHighPerformance()) // 创建零数据丢失实例 ssql := streamsql.New(streamsql.WithZeroDataLoss())
func (*Streamsql) AddData ¶
func (s *Streamsql) AddData(data interface{})
AddData 向流中添加一条数据记录。 数据会根据已配置的SQL查询进行处理和聚合。
支持的数据格式:
- map[string]interface{}: 最常用的键值对格式
- 结构体: 会自动转换为map格式处理
参数:
- data: 要添加的数据,通常是map[string]interface{}或结构体
示例:
// 添加设备数据 ssql.AddData(map[string]interface{}{ "deviceId": "sensor001", "temperature": 25.5, "humidity": 60.0, "timestamp": time.Now(), }) // 添加用户行为数据 ssql.AddData(map[string]interface{}{ "userId": "user123", "action": "click", "page": "/home", })
func (*Streamsql) Execute ¶
Execute 解析并执行SQL查询,创建对应的流处理管道。 这是StreamSQL的核心方法,负责将SQL转换为实际的流处理逻辑。
支持的SQL语法:
- SELECT 子句: 选择字段和聚合函数
- FROM 子句: 指定数据源(通常为'stream')
- WHERE 子句: 数据过滤条件
- GROUP BY 子句: 分组字段和窗口函数
- HAVING 子句: 聚合结果过滤
- LIMIT 子句: 限制结果数量
- DISTINCT: 结果去重
窗口函数:
- TumblingWindow('5s'): 滚动窗口
- SlidingWindow('30s', '10s'): 滑动窗口
- CountingWindow(100): 计数窗口
- SessionWindow('5m'): 会话窗口
参数:
- sql: 要执行的SQL查询语句
返回值:
- error: 如果SQL解析或执行失败,返回相应错误
示例:
// 基本聚合查询 err := ssql.Execute("SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')") // 带过滤条件的查询 err := ssql.Execute("SELECT * FROM stream WHERE temperature > 30") // 复杂的窗口聚合 err := ssql.Execute(` SELECT deviceId, AVG(temperature) as avg_temp, MAX(humidity) as max_humidity FROM stream WHERE deviceId != 'test' GROUP BY deviceId, SlidingWindow('1m', '30s') HAVING avg_temp > 25 LIMIT 100 `)
func (*Streamsql) GetDetailedStats ¶
GetDetailedStats 获取详细的性能统计信息
func (*Streamsql) Stop ¶
func (s *Streamsql) Stop()
Stop 停止流处理器,释放相关资源。 调用此方法后,流处理器将停止接收和处理新数据。
建议在应用程序退出前调用此方法进行清理:
defer ssql.Stop()
注意: 停止后的StreamSQL实例不能重新启动,需要创建新实例。
func (*Streamsql) Stream ¶
Stream 返回底层的流处理器实例。 通过此方法可以访问更底层的流处理功能。
返回值:
- *stream.Stream: 底层流处理器实例,如果未执行SQL则返回nil
常用场景:
- 添加结果处理回调
- 获取结果通道
- 手动控制流处理生命周期
示例:
// 添加结果处理回调 ssql.Stream().AddSink(func(result interface{}) { fmt.Printf("处理结果: %v\n", result) }) // 获取结果通道 resultChan := ssql.Stream().GetResultsChan() go func() { for result := range resultChan { // 处理结果 } }()
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
Package logger 提供StreamSQL的日志记录功能。
|
Package logger 提供StreamSQL的日志记录功能。 |
utils
|
|
Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。
|
Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。 |