streamsql

package module
v0.0.0-...-662b5c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

README

StreamSQL

GoDoc Go Report CI RELEASE

English| 简体中文

StreamSQL is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.

Similar to: Apache Flink and ekuiper

Features

  • Lightweight
    • Pure in-memory operations
    • No dependencies
  • Data processing with SQL syntax
    • Nested field access: Support dot notation syntax (device.info.name) for accessing nested structured data
  • Data analysis
    • Built-in multiple window types: sliding window, tumbling window, counting window
    • Built-in aggregate functions: MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
    • Support for group-by aggregation
    • Support for filtering conditions
  • High extensibility
    • Flexible function extension provided
    • Integration with the RuleGo ecosystem to expand input and output sources using RuleGo components
  • Integration with RuleGo
    • Utilize the rich and flexible input, output, and processing components of RuleGo to achieve data source access and integration with third-party systems

Installation

go get github.com/rulego/streamsql

Usage

StreamSQL supports two main processing modes for different business scenarios:

Non-Aggregation Mode - Real-time Data Transformation and Filtering

Suitable for scenarios requiring real-time response and low latency, where each data record is processed and output immediately.

Typical Use Cases:

  • Data Cleaning: Clean and standardize dirty data from IoT devices
  • Real-time Alerting: Monitor key metrics and alert immediately when thresholds are exceeded
  • Data Enrichment: Add calculated fields and business labels to raw data
  • Format Conversion: Convert data to formats required by downstream systems
  • Data Routing: Route data to different processing channels based on content
package main

import (
	"fmt"
	"time"
	"github.com/rulego/streamsql"
)

func main() {
	// Create StreamSQL instance
	ssql := streamsql.New()
	defer ssql.Stop()

	// Non-aggregation SQL: Real-time data transformation and filtering
	// Feature: Each input data is processed immediately, no need to wait for windows
	rsql := `SELECT deviceId, 
	                UPPER(deviceType) as device_type,
	                temperature * 1.8 + 32 as temp_fahrenheit,
	                CASE WHEN temperature > 30 THEN 'hot'
	                     WHEN temperature < 15 THEN 'cold'
	                     ELSE 'normal' END as temp_category,
	                CONCAT(location, '-', deviceId) as full_identifier,
	                NOW() as processed_time
	         FROM stream 
	         WHERE temperature > 0 AND STARTSWITH(deviceId, 'sensor')`

	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}

	// Handle real-time transformation results
	ssql.Stream().AddSink(func(result interface{}) {
		fmt.Printf("Real-time result: %+v\n", result)
	})

	// Simulate sensor data input
	sensorData := []map[string]interface{}{
		{
			"deviceId":     "sensor001",
			"deviceType":   "temperature", 
			"temperature":  25.0,
			"location":     "warehouse-A",
		},
		{
			"deviceId":     "sensor002",
			"deviceType":   "humidity",
			"temperature":  32.5,
			"location":     "warehouse-B", 
		},
		{
			"deviceId":     "pump001",  // Will be filtered out
			"deviceType":   "actuator",
			"temperature":  20.0,
			"location":     "factory",
		},
	}

	// Process data one by one, each will output results immediately
	for _, data := range sensorData {
		ssql.Stream().AddData(data)
		time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
	}

	time.Sleep(500 * time.Millisecond) // Wait for processing completion
}
Aggregation Mode - Windowed Statistical Analysis

Suitable for scenarios requiring statistical analysis and batch processing, collecting data over a period of time for aggregated computation.

Typical Use Cases:

  • Monitoring Dashboard: Display real-time statistical charts of device operational status
  • Performance Analysis: Analyze key metrics like QPS, latency, etc.
  • Anomaly Detection: Detect data anomalies based on statistical models
  • Report Generation: Generate various business reports periodically
  • Trend Analysis: Analyze data trends and patterns
package main

import (
	"context"
	"fmt"
	"time"

	"math/rand"
	"sync"
	"github.com/rulego/streamsql"
)

// StreamSQL Usage Example
// This example demonstrates the complete workflow of StreamSQL: from instance creation to data processing and result handling
func main() {
	// Step 1: Create StreamSQL Instance
	// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
	ssql := streamsql.New()
	
	// Step 2: Define Stream SQL Query Statement
	// This SQL statement showcases StreamSQL's core capabilities:
	// - SELECT: Choose output fields and aggregation functions
	// - FROM stream: Specify the data source as stream data
	// - WHERE: Filter condition, excluding device3 data
	// - GROUP BY: Group by deviceId, combined with tumbling window for aggregation
	// - TumblingWindow('5s'): 5-second tumbling window, triggers computation every 5 seconds
	// - avg(), min(): Aggregation functions for calculating average and minimum values
	// - window_start(), window_end(): Window functions to get window start and end times
	rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
		"window_start() as start,window_end() as end FROM  stream  where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
	
	// Step 3: Execute SQL Statement and Start Stream Analysis Task
	// The Execute method parses SQL, builds execution plan, initializes window manager and aggregators
	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}
	
	// Step 4: Setup Test Environment and Concurrency Control
	var wg sync.WaitGroup
	wg.Add(1)
	// Set 30-second test timeout to prevent infinite running
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	
	// Step 5: Start Data Producer Goroutine
	// Simulate real-time data stream, continuously feeding data into StreamSQL
	go func() {
		defer wg.Done()
		// Create ticker to trigger data generation every second
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				// Generate 10 random test data points per second, simulating high-frequency data stream
				// This data density tests StreamSQL's real-time processing capability
				for i := 0; i < 10; i++ {
					// Construct device data containing deviceId, temperature, and humidity
					randomData := map[string]interface{}{
						"deviceId":    fmt.Sprintf("device%d", rand.Intn(2)+1), // Randomly select device1 or device2
						"temperature": 20.0 + rand.Float64()*10,                // Temperature range: 20-30 degrees
						"humidity":    50.0 + rand.Float64()*20,                // Humidity range: 50-70%
					}
					// Add data to stream, triggering StreamSQL's real-time processing
					// AddData distributes data to corresponding windows and aggregators
					ssql.stream.AddData(randomData)
				}

			case <-ctx.Done():
				// Timeout or cancellation signal, stop data generation
				return
			}
		}
	}()

	// Step 6: Setup Result Processing Pipeline
	resultChan := make(chan interface{})
	// Add computation result callback function (Sink)
	// When window triggers computation, results are output through this callback
	ssql.stream.AddSink(func(result interface{}) {
		resultChan <- result
	})
	
	// Step 7: Start Result Consumer Goroutine
	// Count received results for effect verification
	resultCount := 0
	go func() {
		for result := range resultChan {
			// Print results when window computation is triggered (every 5 seconds)
			// This demonstrates StreamSQL's window-based aggregation results
			fmt.Printf("Window Result [%s]: %v\n", time.Now().Format("15:04:05.000"), result)
			resultCount++
		}
	}()
	
	// Step 8: Wait for Processing Completion
	// Wait for data producer goroutine to finish (30-second timeout or manual cancellation)
	wg.Wait()
	
	// Step 9: Display Final Statistics
	// Show total number of window results received during the test period
	fmt.Printf("\nTotal window results received: %d\n", resultCount)
	fmt.Println("StreamSQL processing completed successfully!")
}
Nested Field Access

StreamSQL supports querying nested structured data using dot notation (.) syntax to access nested fields:

// Nested field access example
package main

import (
	"fmt"
	"time"
	"github.com/rulego/streamsql"
)

func main() {
	ssql := streamsql.New()
	defer ssql.Stop()

	// SQL query using nested fields - supports dot notation syntax for accessing nested structures
	rsql := `SELECT device.info.name as device_name, 
	                device.location,
	                AVG(sensor.temperature) as avg_temp,
	                COUNT(*) as sensor_count,
	                window_start() as start,
	                window_end() as end
	         FROM stream 
	         WHERE device.info.type = 'temperature'
	         GROUP BY device.location, TumblingWindow('5s')
	         WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`

	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}

	// Handle aggregation results
	ssql.Stream().AddSink(func(result interface{}) {
		fmt.Printf("Aggregation result: %+v\n", result)
	})

	// Add nested structured data
	nestedData := map[string]interface{}{
		"device": map[string]interface{}{
			"info": map[string]interface{}{
				"name": "temperature-sensor-001",
				"type": "temperature",
			},
			"location": "smart-greenhouse-A",
		},
		"sensor": map[string]interface{}{
			"temperature": 25.5,
			"humidity":    60.2,
		},
		"timestamp": time.Now().Unix(),
	}

	ssql.Stream().AddData(nestedData)
}

Functions

StreamSQL supports a variety of function types, including mathematical, string, conversion, aggregate, analytic, window, and more. Documentation

Concepts

Processing Modes

StreamSQL supports two main processing modes:

Aggregation Mode (Windowed Processing)

Used when the SQL query contains aggregate functions (SUM, AVG, COUNT, etc.) or GROUP BY clauses. Data is collected in windows and aggregated results are output when windows are triggered.

Non-Aggregation Mode (Real-time Processing)

Used for immediate data transformation and filtering without aggregation operations. Each input record is processed and output immediately, providing ultra-low latency for real-time scenarios like data cleaning, enrichment, and filtering.

Windows

Since stream data is unbounded, it cannot be processed as a whole. Windows provide a mechanism to divide unbounded data into a series of bounded data segments for computation. StreamSQL includes the following types of windows:

  • Sliding Window

    • Definition: A time-based window that slides forward at fixed time intervals. For example, it slides every 10 seconds.
    • Characteristics: The size of the window is fixed, but the starting point of the window is continuously updated over time. It is suitable for real-time statistical analysis of data within continuous time periods.
    • Application Scenario: In intelligent transportation systems, the vehicle traffic is counted every 10 seconds over the past 1 minute.
  • Tumbling Window

    • Definition: A time-based window that does not overlap and is completely independent. For example, a window is generated every 1 minute.
    • Characteristics: The size of the window is fixed, and the windows do not overlap with each other. It is suitable for overall analysis of data within fixed time periods.
    • Application Scenario: In smart agriculture monitoring systems, the temperature and humidity of the farmland are counted every hour within that hour.
  • Count Window

    • Definition: A window based on the number of data records, where the window size is determined by the number of data records. For example, a window is generated every 100 data records.
    • Characteristics: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
    • Application Scenario: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.
Stream
  • Definition: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
  • Characteristics: Stream data is real-time, dynamic, and unbounded, requiring timely processing and analysis.
  • Application Scenario: Real-time data streams generated by IoT devices, such as temperature sensor data and device status data.
Time Semantics
  • Event Time

    • Definition: The actual time when the data occurred, usually represented by a timestamp generated by the data source.
  • Processing Time

    • Definition: The time when the data arrives at the processing system.
  • Window Start Time

    • Definition: The starting time point of the window based on event time. For example, for a sliding window based on event time, the window start time is the timestamp of the earliest event within the window.
  • Window End Time

    • Definition: The ending time point of the window based on event time. Typically, the window end time is the window start time plus the duration of the window. For example, if the duration of a sliding window is 1 minute, then the window end time is the window start time plus 1 minute.

Contribution Guidelines

Pull requests and issues are welcome. Please ensure that the code conforms to Go standards and include relevant test cases.

License

Apache License 2.0

Documentation

Overview

Package streamsql 是一个轻量级的、基于 SQL 的物联网边缘流处理引擎。

StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种窗口类型、聚合函数、 自定义函数,以及与 RuleGo 生态的无缝集成。

核心特性

• 轻量级设计 - 纯内存操作,无外部依赖 • SQL语法支持 - 使用熟悉的SQL语法处理流数据 • 多种窗口类型 - 滑动窗口、滚动窗口、计数窗口、会话窗口 • 丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等 • 插件式自定义函数 - 运行时动态注册,支持8种函数类型 • RuleGo生态集成 - 利用RuleGo组件扩展输入输出源

入门示例

基本的流数据处理:

package main

import (
	"fmt"
	"math/rand"
	"time"
	"github.com/rulego/streamsql"
)

func main() {
	// 创建StreamSQL实例
	ssql := streamsql.New()

	// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值
	sql := `SELECT deviceId,
		AVG(temperature) as avg_temp,
		MIN(humidity) as min_humidity,
		window_start() as start,
		window_end() as end
	FROM stream
	WHERE deviceId != 'device3'
	GROUP BY deviceId, TumblingWindow('5s')`

	// 执行SQL,创建流处理任务
	err := ssql.Execute(sql)
	if err != nil {
		panic(err)
	}

	// 添加结果处理回调
	ssql.Stream().AddSink(func(result interface{}) {
		fmt.Printf("聚合结果: %v\n", result)
	})

	// 模拟发送流数据
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				// 生成随机设备数据
				data := map[string]interface{}{
					"deviceId":    fmt.Sprintf("device%d", rand.Intn(3)+1),
					"temperature": 20.0 + rand.Float64()*10,
					"humidity":    50.0 + rand.Float64()*20,
				}
				ssql.AddData(data)
			}
		}
	}()

	// 运行30秒
	time.Sleep(30 * time.Second)
}

窗口函数

StreamSQL 支持多种窗口类型:

// 滚动窗口 - 每5秒一个独立窗口
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')

// 滑动窗口 - 窗口大小30秒,每10秒滑动一次
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')

// 计数窗口 - 每100条记录一个窗口
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)

// 会话窗口 - 超时5分钟自动关闭会话
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')

自定义函数

StreamSQL 支持插件式自定义函数,运行时动态注册:

// 注册温度转换函数
functions.RegisterCustomFunction(
	"fahrenheit_to_celsius",
	functions.TypeConversion,
	"温度转换",
	"华氏度转摄氏度",
	1, 1,
	func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
		f, _ := functions.ConvertToFloat64(args[0])
		return (f - 32) * 5 / 9, nil
	},
)

// 立即在SQL中使用
sql := `SELECT deviceId,
	AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`

支持的自定义函数类型: • TypeMath - 数学计算函数 • TypeString - 字符串处理函数 • TypeConversion - 类型转换函数 • TypeDateTime - 时间日期函数 • TypeAggregation - 聚合函数 • TypeAnalytical - 分析函数 • TypeWindow - 窗口函数 • TypeCustom - 通用自定义函数

日志配置

StreamSQL 提供灵活的日志配置选项:

// 设置日志级别
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))

// 输出到文件
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))

// 禁用日志(生产环境)
ssql := streamsql.New(streamsql.WithDiscardLog())

性能配置

对于生产环境,建议进行以下配置:

ssql := streamsql.New(
	streamsql.WithDiscardLog(),          // 禁用日志提升性能
	// 其他配置选项...
)

与RuleGo集成

StreamSQL可以与RuleGo规则引擎无缝集成,利用RuleGo丰富的组件生态:

// TODO: 提供RuleGo集成示例

更多详细信息和高级用法,请参阅: • 自定义函数开发指南: docs/CUSTOM_FUNCTIONS_GUIDE.md • 快速入门指南: docs/FUNCTION_QUICK_START.md • 完整示例: examples/

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*Streamsql)

Option 定义StreamSQL的配置选项类型

func WithBufferSizes

func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option

WithBufferSizes 设置自定义缓冲区大小

func WithBuffers

func WithBuffers(dataBufSize, resultBufSize, sinkPoolSize int) Option

WithBuffers 设置缓冲区大小 (已弃用,使用WithBufferSizes) Deprecated: 使用WithBufferSizes替代

func WithCustomPerformance

func WithCustomPerformance(config types.PerformanceConfig) Option

WithCustomPerformance 使用自定义性能配置

func WithCustomPersistence

func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option

WithCustomPersistence 使用自定义持久化配置

func WithDiscardLog

func WithDiscardLog() Option

WithDiscardLog 禁用日志输出

func WithHighPerf

func WithHighPerf() Option

WithHighPerf 启用高性能模式 (已弃用,使用WithHighPerformance) Deprecated: 使用WithHighPerformance替代

func WithHighPerformance

func WithHighPerformance() Option

WithHighPerformance 使用高性能配置 适用于需要最大吞吐量的场景

func WithLogLevel

func WithLogLevel(level logger.Level) Option

WithLogLevel 设置日志级别

func WithLowLatency

func WithLowLatency() Option

WithLowLatency 使用低延迟配置 适用于实时交互应用,最小化延迟

func WithMonitoring

func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option

WithMonitoring 启用详细监控

func WithOverflowPolicy

func WithOverflowPolicy(strategy string, timeout time.Duration) Option

WithOverflowPolicy 设置溢出策略 (已弃用,使用WithOverflowStrategy) Deprecated: 使用WithOverflowStrategy替代

func WithOverflowStrategy

func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option

WithOverflowStrategy 设置溢出策略

func WithPersistence

func WithPersistence() Option

WithPersistence 使用持久化配置预设

func WithPersistenceConfig

func WithPersistenceConfig(dataDir string, maxFileSize int64, flushInterval time.Duration) Option

WithPersistenceConfig 设置持久化配置 (已弃用,使用WithCustomPersistence) Deprecated: 使用WithCustomPersistence替代

func WithWorkerConfig

func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option

WithWorkerConfig 设置工作池配置

func WithZeroDataLoss

func WithZeroDataLoss() Option

WithZeroDataLoss 使用零数据丢失配置 适用于关键业务数据,保证数据不丢失

type Streamsql

type Streamsql struct {
	// contains filtered or unexported fields
}

Streamsql 是StreamSQL流处理引擎的主要接口。 它封装了SQL解析、流处理、窗口管理等核心功能。

使用示例:

ssql := streamsql.New()
err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')")
ssql.AddData(map[string]interface{}{"temperature": 25.5})

func New

func New(options ...Option) *Streamsql

New 创建一个新的StreamSQL实例。 支持通过可选的Option参数进行配置。

参数:

  • options: 可变长度的配置选项,用于自定义StreamSQL行为

返回值:

  • *Streamsql: 新创建的StreamSQL实例

示例:

// 创建默认实例
ssql := streamsql.New()

// 创建高性能实例
ssql := streamsql.New(streamsql.WithHighPerformance())

// 创建零数据丢失实例
ssql := streamsql.New(streamsql.WithZeroDataLoss())

func (*Streamsql) AddData

func (s *Streamsql) AddData(data interface{})

AddData 向流中添加一条数据记录。 数据会根据已配置的SQL查询进行处理和聚合。

支持的数据格式:

  • map[string]interface{}: 最常用的键值对格式
  • 结构体: 会自动转换为map格式处理

参数:

  • data: 要添加的数据,通常是map[string]interface{}或结构体

示例:

// 添加设备数据
ssql.AddData(map[string]interface{}{
    "deviceId": "sensor001",
    "temperature": 25.5,
    "humidity": 60.0,
    "timestamp": time.Now(),
})

// 添加用户行为数据
ssql.AddData(map[string]interface{}{
    "userId": "user123",
    "action": "click",
    "page": "/home",
})

func (*Streamsql) Execute

func (s *Streamsql) Execute(sql string) error

Execute 解析并执行SQL查询,创建对应的流处理管道。 这是StreamSQL的核心方法,负责将SQL转换为实际的流处理逻辑。

支持的SQL语法:

  • SELECT 子句: 选择字段和聚合函数
  • FROM 子句: 指定数据源(通常为'stream')
  • WHERE 子句: 数据过滤条件
  • GROUP BY 子句: 分组字段和窗口函数
  • HAVING 子句: 聚合结果过滤
  • LIMIT 子句: 限制结果数量
  • DISTINCT: 结果去重

窗口函数:

  • TumblingWindow('5s'): 滚动窗口
  • SlidingWindow('30s', '10s'): 滑动窗口
  • CountingWindow(100): 计数窗口
  • SessionWindow('5m'): 会话窗口

参数:

  • sql: 要执行的SQL查询语句

返回值:

  • error: 如果SQL解析或执行失败,返回相应错误

示例:

// 基本聚合查询
err := ssql.Execute("SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')")

// 带过滤条件的查询
err := ssql.Execute("SELECT * FROM stream WHERE temperature > 30")

// 复杂的窗口聚合
err := ssql.Execute(`
    SELECT deviceId,
           AVG(temperature) as avg_temp,
           MAX(humidity) as max_humidity
    FROM stream
    WHERE deviceId != 'test'
    GROUP BY deviceId, SlidingWindow('1m', '30s')
    HAVING avg_temp > 25
    LIMIT 100
`)

func (*Streamsql) GetDetailedStats

func (s *Streamsql) GetDetailedStats() map[string]interface{}

GetDetailedStats 获取详细的性能统计信息

func (*Streamsql) GetStats

func (s *Streamsql) GetStats() map[string]int64

GetStats 获取流处理统计信息

func (*Streamsql) Stop

func (s *Streamsql) Stop()

Stop 停止流处理器,释放相关资源。 调用此方法后,流处理器将停止接收和处理新数据。

建议在应用程序退出前调用此方法进行清理:

defer ssql.Stop()

注意: 停止后的StreamSQL实例不能重新启动,需要创建新实例。

func (*Streamsql) Stream

func (s *Streamsql) Stream() *stream.Stream

Stream 返回底层的流处理器实例。 通过此方法可以访问更底层的流处理功能。

返回值:

  • *stream.Stream: 底层流处理器实例,如果未执行SQL则返回nil

常用场景:

  • 添加结果处理回调
  • 获取结果通道
  • 手动控制流处理生命周期

示例:

// 添加结果处理回调
ssql.Stream().AddSink(func(result interface{}) {
    fmt.Printf("处理结果: %v\n", result)
})

// 获取结果通道
resultChan := ssql.Stream().GetResultsChan()
go func() {
    for result := range resultChan {
        // 处理结果
    }
}()

Directories

Path Synopsis
examples
Package logger 提供StreamSQL的日志记录功能。
Package logger 提供StreamSQL的日志记录功能。
utils
Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。
Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL