functions

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

README

StreamSQL Functions 模块扩展

概述

本次扩展实现了统一的聚合函数和分析函数管理,简化了自定义函数的扩展过程。现在只需要在 functions 模块中实现函数,就可以自动在 aggregator 模块中使用。

主要改进

1. 统一的函数接口
  • AggregatorFunction: 支持增量计算的聚合函数接口
  • AnalyticalFunction: 支持状态管理的分析函数接口
  • Function: 基础函数接口
2. 自动适配器
  • AggregatorAdapter: 将 functions 模块的聚合函数适配到 aggregator 模块
  • AnalyticalAdapter: 将 functions 模块的分析函数适配到 aggregator 模块
3. 简化的扩展流程

现在添加自定义函数只需要:

  1. 在 functions 模块中实现函数
  2. 注册函数和适配器
  3. 无需修改 aggregator 模块

使用方法

创建自定义聚合函数
// 1. 定义函数结构
type CustomSumFunction struct {
    *BaseFunction
    sum float64
}

// 2. 实现基础接口
func (f *CustomSumFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
    // 实现函数逻辑
}

// 3. 实现AggregatorFunction接口
func (f *CustomSumFunction) New() AggregatorFunction {
    return &CustomSumFunction{BaseFunction: f.BaseFunction}
}

func (f *CustomSumFunction) Add(value interface{}) {
    // 增量计算逻辑
}

func (f *CustomSumFunction) Result() interface{} {
    return f.sum
}

func (f *CustomSumFunction) Reset() {
    f.sum = 0
}

func (f *CustomSumFunction) Clone() AggregatorFunction {
    return &CustomSumFunction{BaseFunction: f.BaseFunction, sum: f.sum}
}

// 4. 注册函数
func init() {
    Register(NewCustomSumFunction())
    RegisterAggregatorAdapter("custom_sum")
}
创建自定义分析函数
// 1. 定义函数结构
type CustomAnalyticalFunction struct {
    *BaseFunction
    state interface{}
}

// 2. 实现基础接口
func (f *CustomAnalyticalFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
    // 实现分析逻辑
}

// 3. 实现AnalyticalFunction接口
func (f *CustomAnalyticalFunction) Reset() {
    f.state = nil
}

func (f *CustomAnalyticalFunction) Clone() AnalyticalFunction {
    return &CustomAnalyticalFunction{BaseFunction: f.BaseFunction, state: f.state}
}

// 4. 注册函数
func init() {
    Register(NewCustomAnalyticalFunction())
    RegisterAnalyticalAdapter("custom_analytical")
}
使用简化的注册方式
// 注册简单的自定义函数
RegisterCustomFunction("double", TypeAggregation, "数学函数", "将值乘以2", 1, 1,
    func(ctx *FunctionContext, args []interface{}) (interface{}, error) {
        val, err := cast.ToFloat64E(args[0])
        if err != nil {
            return nil, err
        }
        return val * 2, nil
    })

内置函数

聚合函数
  • sum: 求和
  • avg: 平均值
  • min: 最小值
  • max: 最大值
  • count: 计数
  • stddev: 标准差
  • median: 中位数
  • percentile: 百分位数
  • collect: 收集所有值
  • last_value: 最后一个值
  • merge_agg: 合并聚合
  • stddevs: 样本标准差
  • deduplicate: 去重
  • var: 总体方差
  • vars: 样本方差
分析函数
  • lag: 滞后函数
  • latest: 最新值
  • changed_col: 变化列
  • had_changed: 是否变化

自定义函数示例

参考 custom_example.go 文件中的示例:

  • CustomProductFunction: 乘积聚合函数
  • CustomGeometricMeanFunction: 几何平均聚合函数
  • CustomMovingAverageFunction: 移动平均分析函数

Documentation

Index

Constants

View Source
const (
	SumStr         = string(Sum)
	CountStr       = string(Count)
	AvgStr         = string(Avg)
	MaxStr         = string(Max)
	MinStr         = string(Min)
	MedianStr      = string(Median)
	PercentileStr  = string(Percentile)
	WindowStartStr = string(WindowStart)
	WindowEndStr   = string(WindowEnd)
	CollectStr     = string(Collect)
	LastValueStr   = string(LastValue)
	MergeAggStr    = string(MergeAgg)
	StdStr         = "std"
	StdDevStr      = string(StdDev)
	StdDevSStr     = string(StdDevS)
	DeduplicateStr = string(Deduplicate)
	VarStr         = string(Var)
	VarSStr        = string(VarS)
	// 分析函数
	LagStr        = string(Lag)
	LatestStr     = string(Latest)
	ChangedColStr = string(ChangedCol)
	HadChangedStr = string(HadChanged)
	// 表达式聚合器
	ExpressionStr = string(Expression)
)

为了方便使用,提供字符串常量版本

Variables

This section is empty.

Functions

func CreateAnalyticalAggregatorFromFunctions

func CreateAnalyticalAggregatorFromFunctions(funcType string) interface{}

CreateAnalyticalAggregatorFromFunctions 从functions模块创建分析函数聚合器

func CreateBuiltinAggregatorFromFunctions

func CreateBuiltinAggregatorFromFunctions(aggType string) interface{}

CreateBuiltinAggregatorFromFunctions 从functions模块创建聚合器

func EvaluateWithBridge

func EvaluateWithBridge(expression string, data map[string]interface{}) (interface{}, error)

便捷函数:直接评估表达式

func Execute

func Execute(name string, ctx *FunctionContext, args []interface{}) (interface{}, error)

Execute 执行函数

func GetAggregatorAdapter

func GetAggregatorAdapter(name string) (func() interface{}, bool)

GetAggregatorAdapter 获取聚合器适配器

func GetAllAvailableFunctions

func GetAllAvailableFunctions() map[string]interface{}

便捷函数:获取所有可用函数信息

func GetAnalyticalAdapter

func GetAnalyticalAdapter(name string) (func() *AnalyticalAdapter, bool)

GetAnalyticalAdapter 获取分析函数适配器

func IsUnnestResult

func IsUnnestResult(value interface{}) bool

IsUnnestResult 检查是否为 unnest 结果

func ListAll

func ListAll() map[string]Function

func ProcessUnnestResult

func ProcessUnnestResult(value interface{}) []map[string]interface{}

ProcessUnnestResult 处理 unnest 结果,将其转换为多行

func Register

func Register(fn Function) error

全局函数注册和获取方法

func RegisterAggregatorAdapter

func RegisterAggregatorAdapter(name string) error

RegisterAggregatorAdapter 注册聚合器适配器

func RegisterAnalyticalAdapter

func RegisterAnalyticalAdapter(name string) error

RegisterAnalyticalAdapter 注册分析函数适配器

func RegisterCustomFunction

func RegisterCustomFunction(name string, fnType FunctionType, category, description string,
	minArgs, maxArgs int, executor func(ctx *FunctionContext, args []interface{}) (interface{}, error)) error

RegisterCustomFunction 注册自定义函数

func RegisterCustomFunctions

func RegisterCustomFunctions()

RegisterCustomFunctions 注册自定义函数的示例

func RegisterLegacyAggregator

func RegisterLegacyAggregator(name string, constructor func() LegacyAggregatorFunction)

RegisterLegacyAggregator 注册传统聚合器到全局注册表

func Unregister

func Unregister(name string) bool

Types

type AbsFunction

type AbsFunction struct {
	*BaseFunction
}

AbsFunction 绝对值函数

func NewAbsFunction

func NewAbsFunction() *AbsFunction

func (*AbsFunction) Execute

func (f *AbsFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AbsFunction) Validate

func (f *AbsFunction) Validate(args []interface{}) error

type AcosFunction

type AcosFunction struct {
	*BaseFunction
}

AcosFunction 反余弦函数

func NewAcosFunction

func NewAcosFunction() *AcosFunction

func (*AcosFunction) Execute

func (f *AcosFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AcosFunction) Validate

func (f *AcosFunction) Validate(args []interface{}) error

type AggregateType

type AggregateType string

AggregateType 聚合类型,从 aggregator.AggregateType 迁移而来

const (
	Sum         AggregateType = "sum"
	Count       AggregateType = "count"
	Avg         AggregateType = "avg"
	Max         AggregateType = "max"
	Min         AggregateType = "min"
	Median      AggregateType = "median"
	Percentile  AggregateType = "percentile"
	WindowStart AggregateType = "window_start"
	WindowEnd   AggregateType = "window_end"
	Collect     AggregateType = "collect"
	LastValue   AggregateType = "last_value"
	MergeAgg    AggregateType = "merge_agg"
	StdDev      AggregateType = "stddev"
	StdDevS     AggregateType = "stddevs"
	Deduplicate AggregateType = "deduplicate"
	Var         AggregateType = "var"
	VarS        AggregateType = "vars"
	// 分析函数
	Lag        AggregateType = "lag"
	Latest     AggregateType = "latest"
	ChangedCol AggregateType = "changed_col"
	HadChanged AggregateType = "had_changed"
	// 表达式聚合器,用于处理自定义函数
	Expression AggregateType = "expression"
)

type AggregatorAdapter

type AggregatorAdapter struct {
	// contains filtered or unexported fields
}

AggregatorAdapter 聚合器适配器,兼容原有的aggregator接口

func NewAggregatorAdapter

func NewAggregatorAdapter(name string) (*AggregatorAdapter, error)

NewAggregatorAdapter 创建聚合器适配器

func (*AggregatorAdapter) Add

func (a *AggregatorAdapter) Add(value interface{})

Add 添加值

func (*AggregatorAdapter) GetFunctionName

func (a *AggregatorAdapter) GetFunctionName() string

GetFunctionName 获取底层函数名称,用于支持context机制

func (*AggregatorAdapter) New

func (a *AggregatorAdapter) New() interface{}

New 创建新的聚合器实例

func (*AggregatorAdapter) Result

func (a *AggregatorAdapter) Result() interface{}

Result 获取结果

type AggregatorFunction

type AggregatorFunction interface {
	Function
	// New 创建新的聚合器实例
	New() AggregatorFunction
	// Add 添加值进行增量计算
	Add(value interface{})
	// Result 获取聚合结果
	Result() interface{}
	// Reset 重置聚合器状态
	Reset()
	// Clone 克隆聚合器(用于窗口函数等场景)
	Clone() AggregatorFunction
}

AggregatorFunction 聚合器函数接口,支持增量计算

func CreateAggregator

func CreateAggregator(name string) (AggregatorFunction, error)

CreateAggregator 创建聚合器实例

type AnalyticalAdapter

type AnalyticalAdapter struct {
	// contains filtered or unexported fields
}

AnalyticalAdapter 分析函数适配器

func CreateAnalyticalFromFunctions

func CreateAnalyticalFromFunctions(funcType string) *AnalyticalAdapter

CreateAnalyticalFromFunctions 从functions模块创建分析函数

func NewAnalyticalAdapter

func NewAnalyticalAdapter(name string) (*AnalyticalAdapter, error)

NewAnalyticalAdapter 创建分析函数适配器

func (*AnalyticalAdapter) Clone

Clone 克隆实例

func (*AnalyticalAdapter) Execute

func (a *AnalyticalAdapter) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

Execute 执行分析函数

func (*AnalyticalAdapter) Reset

func (a *AnalyticalAdapter) Reset()

Reset 重置状态

type AnalyticalAggregatorAdapter

type AnalyticalAggregatorAdapter struct {
	// contains filtered or unexported fields
}

AnalyticalAggregatorAdapter 分析函数到聚合器的适配器

func NewAnalyticalAggregatorAdapter

func NewAnalyticalAggregatorAdapter(name string) (*AnalyticalAggregatorAdapter, error)

NewAnalyticalAggregatorAdapter 创建分析函数聚合器适配器

func (*AnalyticalAggregatorAdapter) Add

func (a *AnalyticalAggregatorAdapter) Add(value interface{})

Add 添加值

func (*AnalyticalAggregatorAdapter) New

func (a *AnalyticalAggregatorAdapter) New() interface{}

New 创建新的适配器实例

func (*AnalyticalAggregatorAdapter) Result

func (a *AnalyticalAggregatorAdapter) Result() interface{}

Result 获取结果

type AnalyticalAggregatorWrapper

type AnalyticalAggregatorWrapper struct {
	// contains filtered or unexported fields
}

AnalyticalAggregatorWrapper 包装functions模块的分析函数聚合器,使其兼容原有接口

func (*AnalyticalAggregatorWrapper) Add

func (w *AnalyticalAggregatorWrapper) Add(value interface{})

func (*AnalyticalAggregatorWrapper) New

func (*AnalyticalAggregatorWrapper) Result

func (w *AnalyticalAggregatorWrapper) Result() interface{}

type AnalyticalFunction

type AnalyticalFunction interface {
	AggregatorFunction
}

AnalyticalFunction 分析函数接口,支持状态管理 现在继承自AggregatorFunction,支持增量计算

func CreateAnalytical

func CreateAnalytical(name string) (AnalyticalFunction, error)

CreateAnalytical 创建分析函数实例

type ArrayContainsFunction

type ArrayContainsFunction struct {
	*BaseFunction
}

ArrayContainsFunction 检查数组是否包含指定值

func NewArrayContainsFunction

func NewArrayContainsFunction() *ArrayContainsFunction

func (*ArrayContainsFunction) Execute

func (f *ArrayContainsFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayContainsFunction) Validate

func (f *ArrayContainsFunction) Validate(args []interface{}) error

type ArrayDistinctFunction

type ArrayDistinctFunction struct {
	*BaseFunction
}

ArrayDistinctFunction 数组去重

func NewArrayDistinctFunction

func NewArrayDistinctFunction() *ArrayDistinctFunction

func (*ArrayDistinctFunction) Execute

func (f *ArrayDistinctFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayDistinctFunction) Validate

func (f *ArrayDistinctFunction) Validate(args []interface{}) error

type ArrayExceptFunction

type ArrayExceptFunction struct {
	*BaseFunction
}

ArrayExceptFunction 数组差集

func NewArrayExceptFunction

func NewArrayExceptFunction() *ArrayExceptFunction

func (*ArrayExceptFunction) Execute

func (f *ArrayExceptFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayExceptFunction) Validate

func (f *ArrayExceptFunction) Validate(args []interface{}) error

type ArrayIntersectFunction

type ArrayIntersectFunction struct {
	*BaseFunction
}

ArrayIntersectFunction 数组交集

func NewArrayIntersectFunction

func NewArrayIntersectFunction() *ArrayIntersectFunction

func (*ArrayIntersectFunction) Execute

func (f *ArrayIntersectFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayIntersectFunction) Validate

func (f *ArrayIntersectFunction) Validate(args []interface{}) error

type ArrayLengthFunction

type ArrayLengthFunction struct {
	*BaseFunction
}

ArrayLengthFunction 返回数组长度

func NewArrayLengthFunction

func NewArrayLengthFunction() *ArrayLengthFunction

func (*ArrayLengthFunction) Execute

func (f *ArrayLengthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayLengthFunction) Validate

func (f *ArrayLengthFunction) Validate(args []interface{}) error

type ArrayPositionFunction

type ArrayPositionFunction struct {
	*BaseFunction
}

ArrayPositionFunction 返回值在数组中的位置

func NewArrayPositionFunction

func NewArrayPositionFunction() *ArrayPositionFunction

func (*ArrayPositionFunction) Execute

func (f *ArrayPositionFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayPositionFunction) Validate

func (f *ArrayPositionFunction) Validate(args []interface{}) error

type ArrayRemoveFunction

type ArrayRemoveFunction struct {
	*BaseFunction
}

ArrayRemoveFunction 从数组中移除指定值

func NewArrayRemoveFunction

func NewArrayRemoveFunction() *ArrayRemoveFunction

func (*ArrayRemoveFunction) Execute

func (f *ArrayRemoveFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayRemoveFunction) Validate

func (f *ArrayRemoveFunction) Validate(args []interface{}) error

type ArrayUnionFunction

type ArrayUnionFunction struct {
	*BaseFunction
}

ArrayUnionFunction 数组并集

func NewArrayUnionFunction

func NewArrayUnionFunction() *ArrayUnionFunction

func (*ArrayUnionFunction) Execute

func (f *ArrayUnionFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayUnionFunction) Validate

func (f *ArrayUnionFunction) Validate(args []interface{}) error

type AsinFunction

type AsinFunction struct {
	*BaseFunction
}

AsinFunction 反正弦函数

func NewAsinFunction

func NewAsinFunction() *AsinFunction

func (*AsinFunction) Execute

func (f *AsinFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AsinFunction) Validate

func (f *AsinFunction) Validate(args []interface{}) error

type Atan2Function

type Atan2Function struct {
	*BaseFunction
}

Atan2Function 两个参数的反正切函数

func NewAtan2Function

func NewAtan2Function() *Atan2Function

func (*Atan2Function) Execute

func (f *Atan2Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Atan2Function) Validate

func (f *Atan2Function) Validate(args []interface{}) error

type AtanFunction

type AtanFunction struct {
	*BaseFunction
}

AtanFunction 反正切函数

func NewAtanFunction

func NewAtanFunction() *AtanFunction

func (*AtanFunction) Execute

func (f *AtanFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AtanFunction) Validate

func (f *AtanFunction) Validate(args []interface{}) error

type AvgFunction

type AvgFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

AvgFunction 求平均值函数

func NewAvgFunction

func NewAvgFunction() *AvgFunction

func (*AvgFunction) Add

func (f *AvgFunction) Add(value interface{})

func (*AvgFunction) Clone

func (f *AvgFunction) Clone() AggregatorFunction

func (*AvgFunction) Execute

func (f *AvgFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AvgFunction) New

实现AggregatorFunction接口

func (*AvgFunction) Reset

func (f *AvgFunction) Reset()

func (*AvgFunction) Result

func (f *AvgFunction) Result() interface{}

func (*AvgFunction) Validate

func (f *AvgFunction) Validate(args []interface{}) error

type BaseFunction

type BaseFunction struct {
	// contains filtered or unexported fields
}

BaseFunction 基础函数实现,提供通用功能

func NewBaseFunction

func NewBaseFunction(name string, fnType FunctionType, category, description string, minArgs, maxArgs int) *BaseFunction

NewBaseFunction 创建基础函数

func (*BaseFunction) GetCategory

func (bf *BaseFunction) GetCategory() string

func (*BaseFunction) GetDescription

func (bf *BaseFunction) GetDescription() string

func (*BaseFunction) GetName

func (bf *BaseFunction) GetName() string

func (*BaseFunction) GetType

func (bf *BaseFunction) GetType() FunctionType

func (*BaseFunction) ValidateArgCount

func (bf *BaseFunction) ValidateArgCount(args []interface{}) error

ValidateArgCount 验证参数数量

type BitAndFunction

type BitAndFunction struct {
	*BaseFunction
}

BitAndFunction 按位与函数

func NewBitAndFunction

func NewBitAndFunction() *BitAndFunction

func (*BitAndFunction) Execute

func (f *BitAndFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitAndFunction) Validate

func (f *BitAndFunction) Validate(args []interface{}) error

type BitNotFunction

type BitNotFunction struct {
	*BaseFunction
}

BitNotFunction 按位非函数

func NewBitNotFunction

func NewBitNotFunction() *BitNotFunction

func (*BitNotFunction) Execute

func (f *BitNotFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitNotFunction) Validate

func (f *BitNotFunction) Validate(args []interface{}) error

type BitOrFunction

type BitOrFunction struct {
	*BaseFunction
}

BitOrFunction 按位或函数

func NewBitOrFunction

func NewBitOrFunction() *BitOrFunction

func (*BitOrFunction) Execute

func (f *BitOrFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitOrFunction) Validate

func (f *BitOrFunction) Validate(args []interface{}) error

type BitXorFunction

type BitXorFunction struct {
	*BaseFunction
}

BitXorFunction 按位异或函数

func NewBitXorFunction

func NewBitXorFunction() *BitXorFunction

func (*BitXorFunction) Execute

func (f *BitXorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitXorFunction) Validate

func (f *BitXorFunction) Validate(args []interface{}) error

type CaseWhenFunction

type CaseWhenFunction struct {
	*BaseFunction
}

CaseWhenFunction CASE WHEN表达式

func NewCaseWhenFunction

func NewCaseWhenFunction() *CaseWhenFunction

func (*CaseWhenFunction) Execute

func (f *CaseWhenFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CaseWhenFunction) Validate

func (f *CaseWhenFunction) Validate(args []interface{}) error

type CastFunction

type CastFunction struct {
	*BaseFunction
}

CastFunction 类型转换函数

func NewCastFunction

func NewCastFunction() *CastFunction

func (*CastFunction) Execute

func (f *CastFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CastFunction) Validate

func (f *CastFunction) Validate(args []interface{}) error

type CeilingFunction

type CeilingFunction struct {
	*BaseFunction
}

CeilingFunction 向上取整函数

func NewCeilingFunction

func NewCeilingFunction() *CeilingFunction

func (*CeilingFunction) Execute

func (f *CeilingFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CeilingFunction) Validate

func (f *CeilingFunction) Validate(args []interface{}) error

type ChangedColFunction

type ChangedColFunction struct {
	*BaseFunction
	PreviousValues map[string]interface{}
}

ChangedColFunction 变化列函数 - 返回发生变化的列名

func NewChangedColFunction

func NewChangedColFunction() *ChangedColFunction

func (*ChangedColFunction) Add

func (f *ChangedColFunction) Add(value interface{})

func (*ChangedColFunction) Clone

func (*ChangedColFunction) Execute

func (f *ChangedColFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ChangedColFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*ChangedColFunction) Reset

func (f *ChangedColFunction) Reset()

func (*ChangedColFunction) Result

func (f *ChangedColFunction) Result() interface{}

func (*ChangedColFunction) Validate

func (f *ChangedColFunction) Validate(args []interface{}) error

type ChrFunction

type ChrFunction struct {
	*BaseFunction
}

ChrFunction 返回对应ASCII字符

func NewChrFunction

func NewChrFunction() *ChrFunction

func (*ChrFunction) Execute

func (f *ChrFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ChrFunction) Validate

func (f *ChrFunction) Validate(args []interface{}) error

type CoalesceFunction

type CoalesceFunction struct {
	*BaseFunction
}

CoalesceFunction 返回第一个非NULL值

func NewCoalesceFunction

func NewCoalesceFunction() *CoalesceFunction

func (*CoalesceFunction) Execute

func (f *CoalesceFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CoalesceFunction) Validate

func (f *CoalesceFunction) Validate(args []interface{}) error

type CollectAggregatorFunction

type CollectAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为CollectFunction添加AggregatorFunction接口实现

func NewCollectAggregatorFunction

func NewCollectAggregatorFunction() *CollectAggregatorFunction

func (*CollectAggregatorFunction) Add

func (f *CollectAggregatorFunction) Add(value interface{})

func (*CollectAggregatorFunction) Clone

func (*CollectAggregatorFunction) Execute

func (f *CollectAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CollectAggregatorFunction) New

func (*CollectAggregatorFunction) Reset

func (f *CollectAggregatorFunction) Reset()

func (*CollectAggregatorFunction) Result

func (f *CollectAggregatorFunction) Result() interface{}

func (*CollectAggregatorFunction) Validate

func (f *CollectAggregatorFunction) Validate(args []interface{}) error

type CollectFunction

type CollectFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CollectFunction 收集函数 - 获取当前窗口所有消息的列值组成的数组

func NewCollectFunction

func NewCollectFunction() *CollectFunction

func (*CollectFunction) Add

func (f *CollectFunction) Add(value interface{})

func (*CollectFunction) Clone

func (*CollectFunction) Execute

func (f *CollectFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CollectFunction) New

实现AggregatorFunction接口

func (*CollectFunction) Reset

func (f *CollectFunction) Reset()

func (*CollectFunction) Result

func (f *CollectFunction) Result() interface{}

func (*CollectFunction) Validate

func (f *CollectFunction) Validate(args []interface{}) error

type ConcatFunction

type ConcatFunction struct {
	*BaseFunction
}

ConcatFunction 字符串连接函数

func NewConcatFunction

func NewConcatFunction() *ConcatFunction

func (*ConcatFunction) Execute

func (f *ConcatFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ConcatFunction) Validate

func (f *ConcatFunction) Validate(args []interface{}) error

type ContextAggregator

type ContextAggregator interface {
	GetContextKey() string
}

ContextAggregator 支持context机制的聚合器接口

type ConvertTzFunction

type ConvertTzFunction struct {
	*BaseFunction
}

ConvertTzFunction 时区转换函数

func NewConvertTzFunction

func NewConvertTzFunction() *ConvertTzFunction

func (*ConvertTzFunction) Execute

func (f *ConvertTzFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ConvertTzFunction) Validate

func (f *ConvertTzFunction) Validate(args []interface{}) error

type CosFunction

type CosFunction struct {
	*BaseFunction
}

CosFunction 余弦函数

func NewCosFunction

func NewCosFunction() *CosFunction

func (*CosFunction) Execute

func (f *CosFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CosFunction) Validate

func (f *CosFunction) Validate(args []interface{}) error

type CoshFunction

type CoshFunction struct {
	*BaseFunction
}

CoshFunction 双曲余弦函数

func NewCoshFunction

func NewCoshFunction() *CoshFunction

func (*CoshFunction) Execute

func (f *CoshFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CoshFunction) Validate

func (f *CoshFunction) Validate(args []interface{}) error

type CountFunction

type CountFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CountFunction 计数函数

func NewCountFunction

func NewCountFunction() *CountFunction

func (*CountFunction) Add

func (f *CountFunction) Add(value interface{})

func (*CountFunction) Clone

func (f *CountFunction) Clone() AggregatorFunction

func (*CountFunction) Execute

func (f *CountFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CountFunction) New

实现AggregatorFunction接口

func (*CountFunction) Reset

func (f *CountFunction) Reset()

func (*CountFunction) Result

func (f *CountFunction) Result() interface{}

func (*CountFunction) Validate

func (f *CountFunction) Validate(args []interface{}) error

type CurrentDateFunction

type CurrentDateFunction struct {
	*BaseFunction
}

CurrentDateFunction 当前日期函数

func NewCurrentDateFunction

func NewCurrentDateFunction() *CurrentDateFunction

func (*CurrentDateFunction) Execute

func (f *CurrentDateFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CurrentDateFunction) Validate

func (f *CurrentDateFunction) Validate(args []interface{}) error

type CurrentTimeFunction

type CurrentTimeFunction struct {
	*BaseFunction
}

CurrentTimeFunction 当前时间函数

func NewCurrentTimeFunction

func NewCurrentTimeFunction() *CurrentTimeFunction

func (*CurrentTimeFunction) Execute

func (f *CurrentTimeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CurrentTimeFunction) Validate

func (f *CurrentTimeFunction) Validate(args []interface{}) error

type CustomFunction

type CustomFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CustomFunction 自定义函数实现

func (*CustomFunction) Execute

func (f *CustomFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CustomFunction) Validate

func (f *CustomFunction) Validate(args []interface{}) error

type CustomGeometricMeanFunction

type CustomGeometricMeanFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CustomGeometricMeanFunction 自定义几何平均聚合函数示例

func NewCustomGeometricMeanFunction

func NewCustomGeometricMeanFunction() *CustomGeometricMeanFunction

func (*CustomGeometricMeanFunction) Add

func (f *CustomGeometricMeanFunction) Add(value interface{})

func (*CustomGeometricMeanFunction) Clone

func (*CustomGeometricMeanFunction) Execute

func (f *CustomGeometricMeanFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CustomGeometricMeanFunction) New

实现AggregatorFunction接口

func (*CustomGeometricMeanFunction) Reset

func (f *CustomGeometricMeanFunction) Reset()

func (*CustomGeometricMeanFunction) Result

func (f *CustomGeometricMeanFunction) Result() interface{}

func (*CustomGeometricMeanFunction) Validate

func (f *CustomGeometricMeanFunction) Validate(args []interface{}) error

type CustomMovingAverageFunction

type CustomMovingAverageFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CustomMovingAverageFunction 自定义移动平均分析函数示例

func NewCustomMovingAverageFunction

func NewCustomMovingAverageFunction(windowSize int) *CustomMovingAverageFunction

func (*CustomMovingAverageFunction) Add

func (f *CustomMovingAverageFunction) Add(value interface{})

func (*CustomMovingAverageFunction) Clone

func (*CustomMovingAverageFunction) Execute

func (f *CustomMovingAverageFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CustomMovingAverageFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*CustomMovingAverageFunction) Reset

func (f *CustomMovingAverageFunction) Reset()

实现AnalyticalFunction接口

func (*CustomMovingAverageFunction) Result

func (f *CustomMovingAverageFunction) Result() interface{}

func (*CustomMovingAverageFunction) Validate

func (f *CustomMovingAverageFunction) Validate(args []interface{}) error

type CustomProductFunction

type CustomProductFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CustomProductFunction 自定义乘积聚合函数示例

func NewCustomProductFunction

func NewCustomProductFunction() *CustomProductFunction

func (*CustomProductFunction) Add

func (f *CustomProductFunction) Add(value interface{})

func (*CustomProductFunction) Clone

func (*CustomProductFunction) Execute

func (f *CustomProductFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CustomProductFunction) New

实现AggregatorFunction接口

func (*CustomProductFunction) Reset

func (f *CustomProductFunction) Reset()

func (*CustomProductFunction) Result

func (f *CustomProductFunction) Result() interface{}

func (*CustomProductFunction) Validate

func (f *CustomProductFunction) Validate(args []interface{}) error

type DateAddFunction

type DateAddFunction struct {
	*BaseFunction
}

DateAddFunction 日期加法函数

func NewDateAddFunction

func NewDateAddFunction() *DateAddFunction

func (*DateAddFunction) Execute

func (f *DateAddFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateAddFunction) Validate

func (f *DateAddFunction) Validate(args []interface{}) error

type DateDiffFunction

type DateDiffFunction struct {
	*BaseFunction
}

DateDiffFunction 日期差函数

func NewDateDiffFunction

func NewDateDiffFunction() *DateDiffFunction

func (*DateDiffFunction) Execute

func (f *DateDiffFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateDiffFunction) Validate

func (f *DateDiffFunction) Validate(args []interface{}) error

type DateFormatFunction

type DateFormatFunction struct {
	*BaseFunction
}

DateFormatFunction 日期格式化函数

func NewDateFormatFunction

func NewDateFormatFunction() *DateFormatFunction

func (*DateFormatFunction) Execute

func (f *DateFormatFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateFormatFunction) Validate

func (f *DateFormatFunction) Validate(args []interface{}) error

type DateParseFunction

type DateParseFunction struct {
	*BaseFunction
}

DateParseFunction 日期解析函数

func NewDateParseFunction

func NewDateParseFunction() *DateParseFunction

func (*DateParseFunction) Execute

func (f *DateParseFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateParseFunction) Validate

func (f *DateParseFunction) Validate(args []interface{}) error

type DateSubFunction

type DateSubFunction struct {
	*BaseFunction
}

DateSubFunction 日期减法函数

func NewDateSubFunction

func NewDateSubFunction() *DateSubFunction

func (*DateSubFunction) Execute

func (f *DateSubFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateSubFunction) Validate

func (f *DateSubFunction) Validate(args []interface{}) error

type DayFunction

type DayFunction struct {
	*BaseFunction
}

DayFunction 提取日期函数

func NewDayFunction

func NewDayFunction() *DayFunction

func (*DayFunction) Execute

func (f *DayFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DayFunction) Validate

func (f *DayFunction) Validate(args []interface{}) error

type DayOfWeekFunction

type DayOfWeekFunction struct {
	*BaseFunction
}

DayOfWeekFunction 获取星期几函数

func NewDayOfWeekFunction

func NewDayOfWeekFunction() *DayOfWeekFunction

func (*DayOfWeekFunction) Execute

func (f *DayOfWeekFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DayOfWeekFunction) Validate

func (f *DayOfWeekFunction) Validate(args []interface{}) error

type DayOfYearFunction

type DayOfYearFunction struct {
	*BaseFunction
}

DayOfYearFunction 获取一年中的第几天函数

func NewDayOfYearFunction

func NewDayOfYearFunction() *DayOfYearFunction

func (*DayOfYearFunction) Execute

func (f *DayOfYearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DayOfYearFunction) Validate

func (f *DayOfYearFunction) Validate(args []interface{}) error

type Dec2HexFunction

type Dec2HexFunction struct {
	*BaseFunction
}

Dec2HexFunction 十进制转十六进制函数

func NewDec2HexFunction

func NewDec2HexFunction() *Dec2HexFunction

func (*Dec2HexFunction) Execute

func (f *Dec2HexFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Dec2HexFunction) Validate

func (f *Dec2HexFunction) Validate(args []interface{}) error

type DecodeFunction

type DecodeFunction struct {
	*BaseFunction
}

DecodeFunction 将编码的字符串解码为原始数据

func NewDecodeFunction

func NewDecodeFunction() *DecodeFunction

func (*DecodeFunction) Execute

func (f *DecodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DecodeFunction) Validate

func (f *DecodeFunction) Validate(args []interface{}) error

type DeduplicateAggregatorFunction

type DeduplicateAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为DeduplicateFunction添加AggregatorFunction接口实现

func NewDeduplicateAggregatorFunction

func NewDeduplicateAggregatorFunction() *DeduplicateAggregatorFunction

func (*DeduplicateAggregatorFunction) Add

func (f *DeduplicateAggregatorFunction) Add(value interface{})

func (*DeduplicateAggregatorFunction) Clone

func (*DeduplicateAggregatorFunction) Execute

func (f *DeduplicateAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DeduplicateAggregatorFunction) New

func (*DeduplicateAggregatorFunction) Reset

func (f *DeduplicateAggregatorFunction) Reset()

func (*DeduplicateAggregatorFunction) Result

func (f *DeduplicateAggregatorFunction) Result() interface{}

func (*DeduplicateAggregatorFunction) Validate

func (f *DeduplicateAggregatorFunction) Validate(args []interface{}) error

type DeduplicateFunction

type DeduplicateFunction struct {
	*BaseFunction
}

DeduplicateFunction 去重函数

func NewDeduplicateFunction

func NewDeduplicateFunction() *DeduplicateFunction

func (*DeduplicateFunction) Execute

func (f *DeduplicateFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DeduplicateFunction) Validate

func (f *DeduplicateFunction) Validate(args []interface{}) error

type EncodeFunction

type EncodeFunction struct {
	*BaseFunction
}

EncodeFunction 将输入值编码为指定格式的字符串

func NewEncodeFunction

func NewEncodeFunction() *EncodeFunction

func (*EncodeFunction) Execute

func (f *EncodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*EncodeFunction) Validate

func (f *EncodeFunction) Validate(args []interface{}) error

type EndswithFunction

type EndswithFunction struct {
	*BaseFunction
}

EndswithFunction 检查字符串是否以指定后缀结尾

func NewEndswithFunction

func NewEndswithFunction() *EndswithFunction

func (*EndswithFunction) Execute

func (f *EndswithFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*EndswithFunction) Validate

func (f *EndswithFunction) Validate(args []interface{}) error

type ExpFunction

type ExpFunction struct {
	*BaseFunction
}

ExpFunction 指数函数

func NewExpFunction

func NewExpFunction() *ExpFunction

func (*ExpFunction) Execute

func (f *ExpFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExpFunction) Validate

func (f *ExpFunction) Validate(args []interface{}) error

type ExprBridge

type ExprBridge struct {
	// contains filtered or unexported fields
}

ExprBridge 桥接 StreamSQL 函数系统与 expr-lang/expr

func GetExprBridge

func GetExprBridge() *ExprBridge

GetExprBridge 获取全局桥接器实例

func NewExprBridge

func NewExprBridge() *ExprBridge

NewExprBridge 创建新的表达式桥接器

func (*ExprBridge) CompileExpressionWithStreamSQLFunctions

func (bridge *ExprBridge) CompileExpressionWithStreamSQLFunctions(expression string, dataType interface{}) (*vm.Program, error)

CompileExpressionWithStreamSQLFunctions 编译表达式,包含StreamSQL函数

func (*ExprBridge) ContainsIsNullOperator

func (bridge *ExprBridge) ContainsIsNullOperator(expression string) bool

ContainsIsNullOperator 检查表达式是否包含IS NULL或IS NOT NULL操作符

func (*ExprBridge) ContainsLikeOperator

func (bridge *ExprBridge) ContainsLikeOperator(expression string) bool

ContainsLikeOperator 检查表达式是否包含LIKE操作符

func (*ExprBridge) CreateEnhancedExprEnvironment

func (bridge *ExprBridge) CreateEnhancedExprEnvironment(data map[string]interface{}) map[string]interface{}

CreateEnhancedExprEnvironment 创建增强的expr执行环境

func (*ExprBridge) EvaluateExpression

func (bridge *ExprBridge) EvaluateExpression(expression string, data map[string]interface{}) (interface{}, error)

EvaluateExpression 评估表达式,自动选择最合适的引擎

func (*ExprBridge) GetFunctionInfo

func (bridge *ExprBridge) GetFunctionInfo() map[string]interface{}

GetFunctionInfo 获取函数信息,统一两个系统的函数

func (*ExprBridge) IsExprLangFunction

func (bridge *ExprBridge) IsExprLangFunction(name string) bool

IsExprLangFunction 检查函数名是否是expr-lang内置函数

func (*ExprBridge) PreprocessIsNullExpression

func (bridge *ExprBridge) PreprocessIsNullExpression(expression string) (string, error)

PreprocessIsNullExpression 预处理IS NULL和IS NOT NULL表达式,转换为expr-lang可理解的表达式

func (*ExprBridge) PreprocessLikeExpression

func (bridge *ExprBridge) PreprocessLikeExpression(expression string) (string, error)

PreprocessLikeExpression 预处理LIKE表达式,转换为expr-lang可理解的函数调用

func (*ExprBridge) RegisterStreamSQLFunctionsToExpr

func (bridge *ExprBridge) RegisterStreamSQLFunctionsToExpr() []expr.Option

RegisterStreamSQLFunctionsToExpr 将StreamSQL函数注册到expr环境中

func (*ExprBridge) ResolveFunction

func (bridge *ExprBridge) ResolveFunction(name string) (interface{}, bool, string)

ResolveFunction 解析函数调用,优先使用StreamSQL函数

type ExprFunction

type ExprFunction struct {
	*BaseFunction
}

ExprFunction expr函数,用于在SQL中执行表达式

func NewExprFunction

func NewExprFunction() *ExprFunction

func (*ExprFunction) Execute

func (f *ExprFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExprFunction) Validate

func (f *ExprFunction) Validate(args []interface{}) error

type ExpressionAggregatorFunction

type ExpressionAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

ExpressionAggregatorFunction 表达式聚合器函数 - 用于处理非聚合函数在聚合查询中的情况

func NewExpressionAggregatorFunction

func NewExpressionAggregatorFunction() *ExpressionAggregatorFunction

func (*ExpressionAggregatorFunction) Add

func (f *ExpressionAggregatorFunction) Add(value interface{})

func (*ExpressionAggregatorFunction) Clone

func (*ExpressionAggregatorFunction) Execute

func (f *ExpressionAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExpressionAggregatorFunction) New

实现AggregatorFunction接口

func (*ExpressionAggregatorFunction) Reset

func (f *ExpressionAggregatorFunction) Reset()

func (*ExpressionAggregatorFunction) Result

func (f *ExpressionAggregatorFunction) Result() interface{}

func (*ExpressionAggregatorFunction) Validate

func (f *ExpressionAggregatorFunction) Validate(args []interface{}) error

type ExpressionFunction

type ExpressionFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

ExpressionFunction 表达式函数,用于处理自定义表达式

func NewExpressionFunction

func NewExpressionFunction() *ExpressionFunction

func (*ExpressionFunction) Add

func (f *ExpressionFunction) Add(value interface{})

func (*ExpressionFunction) Clone

func (*ExpressionFunction) Execute

func (f *ExpressionFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExpressionFunction) New

实现AggregatorFunction接口

func (*ExpressionFunction) Reset

func (f *ExpressionFunction) Reset()

func (*ExpressionFunction) Result

func (f *ExpressionFunction) Result() interface{}

func (*ExpressionFunction) Validate

func (f *ExpressionFunction) Validate(args []interface{}) error

type ExtractFunction

type ExtractFunction struct {
	*BaseFunction
}

ExtractFunction 提取日期部分函数

func NewExtractFunction

func NewExtractFunction() *ExtractFunction

func (*ExtractFunction) Execute

func (f *ExtractFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExtractFunction) Validate

func (f *ExtractFunction) Validate(args []interface{}) error

type FirstValueFunction

type FirstValueFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

FirstValueFunction 返回窗口中第一个值

func NewFirstValueFunction

func NewFirstValueFunction() *FirstValueFunction

func (*FirstValueFunction) Add

func (f *FirstValueFunction) Add(value interface{})

func (*FirstValueFunction) Clone

func (*FirstValueFunction) Execute

func (f *FirstValueFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FirstValueFunction) New

实现AggregatorFunction接口

func (*FirstValueFunction) Reset

func (f *FirstValueFunction) Reset()

func (*FirstValueFunction) Result

func (f *FirstValueFunction) Result() interface{}

func (*FirstValueFunction) Validate

func (f *FirstValueFunction) Validate(args []interface{}) error

type FloorFunction

type FloorFunction struct {
	*BaseFunction
}

FloorFunction 向下取整函数

func NewFloorFunction

func NewFloorFunction() *FloorFunction

func (*FloorFunction) Execute

func (f *FloorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FloorFunction) Validate

func (f *FloorFunction) Validate(args []interface{}) error

type FormatFunction

type FormatFunction struct {
	*BaseFunction
}

FormatFunction 格式化函数

func NewFormatFunction

func NewFormatFunction() *FormatFunction

func (*FormatFunction) Execute

func (f *FormatFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FormatFunction) Validate

func (f *FormatFunction) Validate(args []interface{}) error

type FromJsonFunction

type FromJsonFunction struct {
	*BaseFunction
}

FromJsonFunction 从JSON字符串解析

func NewFromJsonFunction

func NewFromJsonFunction() *FromJsonFunction

func (*FromJsonFunction) Execute

func (f *FromJsonFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FromJsonFunction) Validate

func (f *FromJsonFunction) Validate(args []interface{}) error

type FromUnixtimeFunction

type FromUnixtimeFunction struct {
	*BaseFunction
}

FromUnixtimeFunction 从Unix时间戳转换函数

func NewFromUnixtimeFunction

func NewFromUnixtimeFunction() *FromUnixtimeFunction

func (*FromUnixtimeFunction) Execute

func (f *FromUnixtimeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FromUnixtimeFunction) Validate

func (f *FromUnixtimeFunction) Validate(args []interface{}) error

type Function

type Function interface {
	// GetName 获取函数名称
	GetName() string
	// GetType 获取函数类型
	GetType() FunctionType
	// GetCategory 获取函数分类
	GetCategory() string
	// Validate 验证参数
	Validate(args []interface{}) error
	// Execute 执行函数
	Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)
	// GetDescription 获取函数描述
	GetDescription() string
}

Function 函数接口定义

func Get

func Get(name string) (Function, bool)

func GetByType

func GetByType(fnType FunctionType) []Function

type FunctionAggregatorWrapper

type FunctionAggregatorWrapper struct {
	// contains filtered or unexported fields
}

FunctionAggregatorWrapper 包装functions模块的聚合器,使其兼容原有接口

func (*FunctionAggregatorWrapper) Add

func (w *FunctionAggregatorWrapper) Add(value interface{})

func (*FunctionAggregatorWrapper) GetContextKey

func (w *FunctionAggregatorWrapper) GetContextKey() string

实现ContextAggregator接口,支持窗口函数的context机制

func (*FunctionAggregatorWrapper) New

func (*FunctionAggregatorWrapper) Result

func (w *FunctionAggregatorWrapper) Result() interface{}

type FunctionContext

type FunctionContext struct {
	// 当前数据行
	Data map[string]interface{}
	// 窗口信息(如果适用)
	WindowInfo *WindowInfo
	// 其他上下文信息
	Extra map[string]interface{}
}

FunctionContext 函数执行上下文

type FunctionRegistry

type FunctionRegistry struct {
	// contains filtered or unexported fields
}

FunctionRegistry 函数注册器

func NewFunctionRegistry

func NewFunctionRegistry() *FunctionRegistry

NewFunctionRegistry 创建新的函数注册器

func (*FunctionRegistry) Get

func (r *FunctionRegistry) Get(name string) (Function, bool)

Get 获取函数

func (*FunctionRegistry) GetByType

func (r *FunctionRegistry) GetByType(fnType FunctionType) []Function

GetByType 按类型获取函数列表

func (*FunctionRegistry) ListAll

func (r *FunctionRegistry) ListAll() map[string]Function

ListAll 列出所有注册的函数

func (*FunctionRegistry) Register

func (r *FunctionRegistry) Register(fn Function) error

Register 注册函数

func (*FunctionRegistry) Unregister

func (r *FunctionRegistry) Unregister(name string) bool

Unregister 注销函数

type FunctionType

type FunctionType string

FunctionType 函数类型枚举

const (
	// 聚合函数
	TypeAggregation FunctionType = "aggregation"
	// 窗口函数
	TypeWindow FunctionType = "window"
	// 时间日期函数
	TypeDateTime FunctionType = "datetime"
	// 转换函数
	TypeConversion FunctionType = "conversion"
	// 数学函数
	TypeMath FunctionType = "math"
	// 字符串函数
	TypeString FunctionType = "string"
	// 分析函数
	TypeAnalytical FunctionType = "analytical"
	// 用户自定义函数
	TypeCustom FunctionType = "custom"
)

type GreatestFunction

type GreatestFunction struct {
	*BaseFunction
}

GreatestFunction 返回最大值

func NewGreatestFunction

func NewGreatestFunction() *GreatestFunction

func (*GreatestFunction) Execute

func (f *GreatestFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*GreatestFunction) Validate

func (f *GreatestFunction) Validate(args []interface{}) error

type HadChangedFunction

type HadChangedFunction struct {
	*BaseFunction
	PreviousValue interface{}
	IsSet         bool // 标记PreviousValue是否已被设置
}

HadChangedFunction 是否变化函数 - 判断指定列的值是否发生变化

func NewHadChangedFunction

func NewHadChangedFunction() *HadChangedFunction

func (*HadChangedFunction) Add

func (f *HadChangedFunction) Add(value interface{})

func (*HadChangedFunction) Clone

func (*HadChangedFunction) Execute

func (f *HadChangedFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*HadChangedFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*HadChangedFunction) Reset

func (f *HadChangedFunction) Reset()

func (*HadChangedFunction) Result

func (f *HadChangedFunction) Result() interface{}

func (*HadChangedFunction) Validate

func (f *HadChangedFunction) Validate(args []interface{}) error

type Hex2DecFunction

type Hex2DecFunction struct {
	*BaseFunction
}

Hex2DecFunction 十六进制转十进制函数

func NewHex2DecFunction

func NewHex2DecFunction() *Hex2DecFunction

func (*Hex2DecFunction) Execute

func (f *Hex2DecFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Hex2DecFunction) Validate

func (f *Hex2DecFunction) Validate(args []interface{}) error

type HourFunction

type HourFunction struct {
	*BaseFunction
}

HourFunction 提取小时函数

func NewHourFunction

func NewHourFunction() *HourFunction

func (*HourFunction) Execute

func (f *HourFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*HourFunction) Validate

func (f *HourFunction) Validate(args []interface{}) error

type IfNullFunction

type IfNullFunction struct {
	*BaseFunction
}

IfNullFunction 如果第一个参数为NULL则返回第二个参数

func NewIfNullFunction

func NewIfNullFunction() *IfNullFunction

func (*IfNullFunction) Execute

func (f *IfNullFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IfNullFunction) Validate

func (f *IfNullFunction) Validate(args []interface{}) error

type IndexofFunction

type IndexofFunction struct {
	*BaseFunction
}

IndexofFunction 返回子字符串在字符串中的位置

func NewIndexofFunction

func NewIndexofFunction() *IndexofFunction

func (*IndexofFunction) Execute

func (f *IndexofFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IndexofFunction) Validate

func (f *IndexofFunction) Validate(args []interface{}) error

type IsArrayFunction

type IsArrayFunction struct {
	*BaseFunction
}

IsArrayFunction 检查是否为数组类型

func NewIsArrayFunction

func NewIsArrayFunction() *IsArrayFunction

func (*IsArrayFunction) Execute

func (f *IsArrayFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsArrayFunction) Validate

func (f *IsArrayFunction) Validate(args []interface{}) error

type IsBoolFunction

type IsBoolFunction struct {
	*BaseFunction
}

IsBoolFunction 检查是否为布尔类型

func NewIsBoolFunction

func NewIsBoolFunction() *IsBoolFunction

func (*IsBoolFunction) Execute

func (f *IsBoolFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsBoolFunction) Validate

func (f *IsBoolFunction) Validate(args []interface{}) error

type IsNotNullFunction

type IsNotNullFunction struct {
	*BaseFunction
}

IsNotNullFunction 检查是否不为NULL

func NewIsNotNullFunction

func NewIsNotNullFunction() *IsNotNullFunction

func (*IsNotNullFunction) Execute

func (f *IsNotNullFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsNotNullFunction) Validate

func (f *IsNotNullFunction) Validate(args []interface{}) error

type IsNullFunction

type IsNullFunction struct {
	*BaseFunction
}

IsNullFunction 检查是否为NULL

func NewIsNullFunction

func NewIsNullFunction() *IsNullFunction

func (*IsNullFunction) Execute

func (f *IsNullFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsNullFunction) Validate

func (f *IsNullFunction) Validate(args []interface{}) error

type IsNumericFunction

type IsNumericFunction struct {
	*BaseFunction
}

IsNumericFunction 检查是否为数字类型

func NewIsNumericFunction

func NewIsNumericFunction() *IsNumericFunction

func (*IsNumericFunction) Execute

func (f *IsNumericFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsNumericFunction) Validate

func (f *IsNumericFunction) Validate(args []interface{}) error

type IsObjectFunction

type IsObjectFunction struct {
	*BaseFunction
}

IsObjectFunction 检查是否为对象类型

func NewIsObjectFunction

func NewIsObjectFunction() *IsObjectFunction

func (*IsObjectFunction) Execute

func (f *IsObjectFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsObjectFunction) Validate

func (f *IsObjectFunction) Validate(args []interface{}) error

type IsStringFunction

type IsStringFunction struct {
	*BaseFunction
}

IsStringFunction 检查是否为字符串类型

func NewIsStringFunction

func NewIsStringFunction() *IsStringFunction

func (*IsStringFunction) Execute

func (f *IsStringFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsStringFunction) Validate

func (f *IsStringFunction) Validate(args []interface{}) error

type JsonExtractFunction

type JsonExtractFunction struct {
	*BaseFunction
}

JsonExtractFunction 提取JSON字段值

func NewJsonExtractFunction

func NewJsonExtractFunction() *JsonExtractFunction

func (*JsonExtractFunction) Execute

func (f *JsonExtractFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonExtractFunction) Validate

func (f *JsonExtractFunction) Validate(args []interface{}) error

type JsonLengthFunction

type JsonLengthFunction struct {
	*BaseFunction
}

JsonLengthFunction 返回JSON数组或对象的长度

func NewJsonLengthFunction

func NewJsonLengthFunction() *JsonLengthFunction

func (*JsonLengthFunction) Execute

func (f *JsonLengthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonLengthFunction) Validate

func (f *JsonLengthFunction) Validate(args []interface{}) error

type JsonTypeFunction

type JsonTypeFunction struct {
	*BaseFunction
}

JsonTypeFunction 返回JSON值的类型

func NewJsonTypeFunction

func NewJsonTypeFunction() *JsonTypeFunction

func (*JsonTypeFunction) Execute

func (f *JsonTypeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonTypeFunction) Validate

func (f *JsonTypeFunction) Validate(args []interface{}) error

type JsonValidFunction

type JsonValidFunction struct {
	*BaseFunction
}

JsonValidFunction 验证JSON格式是否有效

func NewJsonValidFunction

func NewJsonValidFunction() *JsonValidFunction

func (*JsonValidFunction) Execute

func (f *JsonValidFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonValidFunction) Validate

func (f *JsonValidFunction) Validate(args []interface{}) error

type LagFunction

type LagFunction struct {
	*BaseFunction
	PreviousValues []interface{}
	DefaultValue   interface{}
	Offset         int
}

LagFunction LAG函数 - 返回当前行之前的第N行的值

func NewLagFunction

func NewLagFunction() *LagFunction

func (*LagFunction) Add

func (f *LagFunction) Add(value interface{})

func (*LagFunction) Clone

func (f *LagFunction) Clone() AggregatorFunction

func (*LagFunction) Execute

func (f *LagFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LagFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*LagFunction) Reset

func (f *LagFunction) Reset()

func (*LagFunction) Result

func (f *LagFunction) Result() interface{}

func (*LagFunction) Validate

func (f *LagFunction) Validate(args []interface{}) error

type LastValueAggregatorFunction

type LastValueAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为LastValueFunction添加AggregatorFunction接口实现

func NewLastValueAggregatorFunction

func NewLastValueAggregatorFunction() *LastValueAggregatorFunction

func (*LastValueAggregatorFunction) Add

func (f *LastValueAggregatorFunction) Add(value interface{})

func (*LastValueAggregatorFunction) Clone

func (*LastValueAggregatorFunction) Execute

func (f *LastValueAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LastValueAggregatorFunction) New

func (*LastValueAggregatorFunction) Reset

func (f *LastValueAggregatorFunction) Reset()

func (*LastValueAggregatorFunction) Result

func (f *LastValueAggregatorFunction) Result() interface{}

func (*LastValueAggregatorFunction) Validate

func (f *LastValueAggregatorFunction) Validate(args []interface{}) error

type LastValueFunction

type LastValueFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

LastValueFunction 最后值函数 - 返回组中最后一行的值

func NewLastValueFunction

func NewLastValueFunction() *LastValueFunction

func (*LastValueFunction) Add

func (f *LastValueFunction) Add(value interface{})

func (*LastValueFunction) Clone

func (*LastValueFunction) Execute

func (f *LastValueFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LastValueFunction) New

实现AggregatorFunction接口

func (*LastValueFunction) Reset

func (f *LastValueFunction) Reset()

func (*LastValueFunction) Result

func (f *LastValueFunction) Result() interface{}

func (*LastValueFunction) Validate

func (f *LastValueFunction) Validate(args []interface{}) error

type LatestFunction

type LatestFunction struct {
	*BaseFunction
	LatestValue interface{}
}

LatestFunction 最新值函数 - 返回指定列的最新值

func NewLatestFunction

func NewLatestFunction() *LatestFunction

func (*LatestFunction) Add

func (f *LatestFunction) Add(value interface{})

func (*LatestFunction) Clone

func (*LatestFunction) Execute

func (f *LatestFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LatestFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*LatestFunction) Reset

func (f *LatestFunction) Reset()

func (*LatestFunction) Result

func (f *LatestFunction) Result() interface{}

func (*LatestFunction) Validate

func (f *LatestFunction) Validate(args []interface{}) error

type LeadFunction

type LeadFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

LeadFunction 返回当前行之后第N行的值

func NewLeadFunction

func NewLeadFunction() *LeadFunction

func (*LeadFunction) Add

func (f *LeadFunction) Add(value interface{})

func (*LeadFunction) Clone

func (f *LeadFunction) Clone() AggregatorFunction

func (*LeadFunction) Execute

func (f *LeadFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LeadFunction) New

实现AggregatorFunction接口

func (*LeadFunction) Reset

func (f *LeadFunction) Reset()

func (*LeadFunction) Result

func (f *LeadFunction) Result() interface{}

func (*LeadFunction) Validate

func (f *LeadFunction) Validate(args []interface{}) error

type LeastFunction

type LeastFunction struct {
	*BaseFunction
}

LeastFunction 返回最小值

func NewLeastFunction

func NewLeastFunction() *LeastFunction

func (*LeastFunction) Execute

func (f *LeastFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LeastFunction) Validate

func (f *LeastFunction) Validate(args []interface{}) error

type LegacyAggregatorFunction

type LegacyAggregatorFunction interface {
	New() LegacyAggregatorFunction
	Add(value interface{})
	Result() interface{}
}

LegacyAggregatorFunction 兼容原有aggregator接口的聚合器函数接口 保持与原有接口兼容,用于向后兼容

func CreateLegacyAggregator

func CreateLegacyAggregator(aggType AggregateType) LegacyAggregatorFunction

CreateLegacyAggregator 创建传统聚合器,优先使用functions模块

type LengthFunction

type LengthFunction struct {
	*BaseFunction
}

LengthFunction 字符串长度函数

func NewLengthFunction

func NewLengthFunction() *LengthFunction

func (*LengthFunction) Execute

func (f *LengthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LengthFunction) Validate

func (f *LengthFunction) Validate(args []interface{}) error

type LnFunction

type LnFunction struct {
	*BaseFunction
}

LnFunction 自然对数函数

func NewLnFunction

func NewLnFunction() *LnFunction

func (*LnFunction) Execute

func (f *LnFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LnFunction) Validate

func (f *LnFunction) Validate(args []interface{}) error

type Log10Function

type Log10Function struct {
	*BaseFunction
}

Log10Function 以10为底的对数函数

func NewLog10Function

func NewLog10Function() *Log10Function

func (*Log10Function) Execute

func (f *Log10Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Log10Function) Validate

func (f *Log10Function) Validate(args []interface{}) error

type Log2Function

type Log2Function struct {
	*BaseFunction
}

Log2Function 以2为底的对数函数

func NewLog2Function

func NewLog2Function() *Log2Function

func (*Log2Function) Execute

func (f *Log2Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Log2Function) Validate

func (f *Log2Function) Validate(args []interface{}) error

type LogFunction

type LogFunction struct {
	*BaseFunction
}

LogFunction 自然对数函数 (log的别名)

func NewLogFunction

func NewLogFunction() *LogFunction

func (*LogFunction) Execute

func (f *LogFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LogFunction) Validate

func (f *LogFunction) Validate(args []interface{}) error

type LowerFunction

type LowerFunction struct {
	*BaseFunction
}

LowerFunction 转小写函数

func NewLowerFunction

func NewLowerFunction() *LowerFunction

func (*LowerFunction) Execute

func (f *LowerFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LowerFunction) Validate

func (f *LowerFunction) Validate(args []interface{}) error

type LpadFunction

type LpadFunction struct {
	*BaseFunction
}

LpadFunction 左填充字符串

func NewLpadFunction

func NewLpadFunction() *LpadFunction

func (*LpadFunction) Execute

func (f *LpadFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LpadFunction) Validate

func (f *LpadFunction) Validate(args []interface{}) error

type LtrimFunction

type LtrimFunction struct {
	*BaseFunction
}

LtrimFunction 去除左侧空白字符

func NewLtrimFunction

func NewLtrimFunction() *LtrimFunction

func (*LtrimFunction) Execute

func (f *LtrimFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LtrimFunction) Validate

func (f *LtrimFunction) Validate(args []interface{}) error

type MaxFunction

type MaxFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

MaxFunction 求最大值函数

func NewMaxFunction

func NewMaxFunction() *MaxFunction

func (*MaxFunction) Add

func (f *MaxFunction) Add(value interface{})

func (*MaxFunction) Clone

func (f *MaxFunction) Clone() AggregatorFunction

func (*MaxFunction) Execute

func (f *MaxFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MaxFunction) New

实现AggregatorFunction接口

func (*MaxFunction) Reset

func (f *MaxFunction) Reset()

func (*MaxFunction) Result

func (f *MaxFunction) Result() interface{}

func (*MaxFunction) Validate

func (f *MaxFunction) Validate(args []interface{}) error

type Md5Function

type Md5Function struct {
	*BaseFunction
}

Md5Function 计算MD5哈希值

func NewMd5Function

func NewMd5Function() *Md5Function

func (*Md5Function) Execute

func (f *Md5Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Md5Function) Validate

func (f *Md5Function) Validate(args []interface{}) error

type MedianAggregatorFunction

type MedianAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为MedianFunction添加AggregatorFunction接口实现

func NewMedianAggregatorFunction

func NewMedianAggregatorFunction() *MedianAggregatorFunction

func (*MedianAggregatorFunction) Add

func (f *MedianAggregatorFunction) Add(value interface{})

func (*MedianAggregatorFunction) Clone

func (*MedianAggregatorFunction) Execute

func (f *MedianAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MedianAggregatorFunction) New

func (*MedianAggregatorFunction) Reset

func (f *MedianAggregatorFunction) Reset()

func (*MedianAggregatorFunction) Result

func (f *MedianAggregatorFunction) Result() interface{}

func (*MedianAggregatorFunction) Validate

func (f *MedianAggregatorFunction) Validate(args []interface{}) error

type MedianFunction

type MedianFunction struct {
	*BaseFunction
}

MedianFunction 中位数函数

func NewMedianFunction

func NewMedianFunction() *MedianFunction

func (*MedianFunction) Execute

func (f *MedianFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MedianFunction) Validate

func (f *MedianFunction) Validate(args []interface{}) error

type MergeAggAggregatorFunction

type MergeAggAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为MergeAggFunction添加AggregatorFunction接口实现

func NewMergeAggAggregatorFunction

func NewMergeAggAggregatorFunction() *MergeAggAggregatorFunction

func (*MergeAggAggregatorFunction) Add

func (f *MergeAggAggregatorFunction) Add(value interface{})

func (*MergeAggAggregatorFunction) Clone

func (*MergeAggAggregatorFunction) Execute

func (f *MergeAggAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MergeAggAggregatorFunction) New

func (*MergeAggAggregatorFunction) Reset

func (f *MergeAggAggregatorFunction) Reset()

func (*MergeAggAggregatorFunction) Result

func (f *MergeAggAggregatorFunction) Result() interface{}

func (*MergeAggAggregatorFunction) Validate

func (f *MergeAggAggregatorFunction) Validate(args []interface{}) error

type MergeAggFunction

type MergeAggFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

MergeAggFunction 合并聚合函数 - 将组中的值合并为单个值

func NewMergeAggFunction

func NewMergeAggFunction() *MergeAggFunction

func (*MergeAggFunction) Add

func (f *MergeAggFunction) Add(value interface{})

func (*MergeAggFunction) Clone

func (*MergeAggFunction) Execute

func (f *MergeAggFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MergeAggFunction) New

实现AggregatorFunction接口

func (*MergeAggFunction) Reset

func (f *MergeAggFunction) Reset()

func (*MergeAggFunction) Result

func (f *MergeAggFunction) Result() interface{}

func (*MergeAggFunction) Validate

func (f *MergeAggFunction) Validate(args []interface{}) error

type MinFunction

type MinFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

MinFunction 求最小值函数

func NewMinFunction

func NewMinFunction() *MinFunction

func (*MinFunction) Add

func (f *MinFunction) Add(value interface{})

func (*MinFunction) Clone

func (f *MinFunction) Clone() AggregatorFunction

func (*MinFunction) Execute

func (f *MinFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MinFunction) New

实现AggregatorFunction接口

func (*MinFunction) Reset

func (f *MinFunction) Reset()

func (*MinFunction) Result

func (f *MinFunction) Result() interface{}

func (*MinFunction) Validate

func (f *MinFunction) Validate(args []interface{}) error

type MinuteFunction

type MinuteFunction struct {
	*BaseFunction
}

MinuteFunction 提取分钟函数

func NewMinuteFunction

func NewMinuteFunction() *MinuteFunction

func (*MinuteFunction) Execute

func (f *MinuteFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MinuteFunction) Validate

func (f *MinuteFunction) Validate(args []interface{}) error

type ModFunction

type ModFunction struct {
	*BaseFunction
}

ModFunction 取模函数

func NewModFunction

func NewModFunction() *ModFunction

func (*ModFunction) Execute

func (f *ModFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ModFunction) Validate

func (f *ModFunction) Validate(args []interface{}) error

type MonthFunction

type MonthFunction struct {
	*BaseFunction
}

MonthFunction 提取月份函数

func NewMonthFunction

func NewMonthFunction() *MonthFunction

func (*MonthFunction) Execute

func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MonthFunction) Validate

func (f *MonthFunction) Validate(args []interface{}) error

type NowFunction

type NowFunction struct {
	*BaseFunction
}

NowFunction 当前时间函数

func NewNowFunction

func NewNowFunction() *NowFunction

func (*NowFunction) Execute

func (f *NowFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*NowFunction) Validate

func (f *NowFunction) Validate(args []interface{}) error

type NthValueFunction

type NthValueFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

NthValueFunction 返回窗口中第N个值

func NewNthValueFunction

func NewNthValueFunction() *NthValueFunction

func (*NthValueFunction) Add

func (f *NthValueFunction) Add(value interface{})

func (*NthValueFunction) Clone

func (*NthValueFunction) Execute

func (f *NthValueFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*NthValueFunction) New

实现AggregatorFunction接口

func (*NthValueFunction) Reset

func (f *NthValueFunction) Reset()

func (*NthValueFunction) Result

func (f *NthValueFunction) Result() interface{}

func (*NthValueFunction) Validate

func (f *NthValueFunction) Validate(args []interface{}) error

type NullIfFunction

type NullIfFunction struct {
	*BaseFunction
}

NullIfFunction 如果两个值相等则返回NULL

func NewNullIfFunction

func NewNullIfFunction() *NullIfFunction

func (*NullIfFunction) Execute

func (f *NullIfFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*NullIfFunction) Validate

func (f *NullIfFunction) Validate(args []interface{}) error

type OptimizedStdDevFunction

type OptimizedStdDevFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

OptimizedStdDevFunction 优化的标准差函数,使用韦尔福德算法实现O(1)空间复杂度

func NewOptimizedStdDevFunction

func NewOptimizedStdDevFunction() *OptimizedStdDevFunction

func (*OptimizedStdDevFunction) Add

func (f *OptimizedStdDevFunction) Add(value interface{})

func (*OptimizedStdDevFunction) Clone

func (*OptimizedStdDevFunction) Execute

func (f *OptimizedStdDevFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*OptimizedStdDevFunction) New

实现AggregatorFunction接口 - 韦尔福德算法

func (*OptimizedStdDevFunction) Reset

func (f *OptimizedStdDevFunction) Reset()

func (*OptimizedStdDevFunction) Result

func (f *OptimizedStdDevFunction) Result() interface{}

func (*OptimizedStdDevFunction) Validate

func (f *OptimizedStdDevFunction) Validate(args []interface{}) error

type OptimizedStdDevSFunction

type OptimizedStdDevSFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

OptimizedStdDevSFunction 优化的样本标准差函数

func NewOptimizedStdDevSFunction

func NewOptimizedStdDevSFunction() *OptimizedStdDevSFunction

func (*OptimizedStdDevSFunction) Add

func (f *OptimizedStdDevSFunction) Add(value interface{})

func (*OptimizedStdDevSFunction) Clone

func (*OptimizedStdDevSFunction) Execute

func (f *OptimizedStdDevSFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*OptimizedStdDevSFunction) New

实现AggregatorFunction接口 - 韦尔福德算法

func (*OptimizedStdDevSFunction) Reset

func (f *OptimizedStdDevSFunction) Reset()

func (*OptimizedStdDevSFunction) Result

func (f *OptimizedStdDevSFunction) Result() interface{}

func (*OptimizedStdDevSFunction) Validate

func (f *OptimizedStdDevSFunction) Validate(args []interface{}) error

type OptimizedVarFunction

type OptimizedVarFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

OptimizedVarFunction 优化的方差函数,使用韦尔福德算法实现O(1)空间复杂度

func NewOptimizedVarFunction

func NewOptimizedVarFunction() *OptimizedVarFunction

func (*OptimizedVarFunction) Add

func (f *OptimizedVarFunction) Add(value interface{})

func (*OptimizedVarFunction) Clone

func (*OptimizedVarFunction) Execute

func (f *OptimizedVarFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*OptimizedVarFunction) New

实现AggregatorFunction接口 - 韦尔福德算法

func (*OptimizedVarFunction) Reset

func (f *OptimizedVarFunction) Reset()

func (*OptimizedVarFunction) Result

func (f *OptimizedVarFunction) Result() interface{}

func (*OptimizedVarFunction) Validate

func (f *OptimizedVarFunction) Validate(args []interface{}) error

type OptimizedVarSFunction

type OptimizedVarSFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

OptimizedVarSFunction 优化的样本方差函数

func NewOptimizedVarSFunction

func NewOptimizedVarSFunction() *OptimizedVarSFunction

func (*OptimizedVarSFunction) Add

func (f *OptimizedVarSFunction) Add(value interface{})

func (*OptimizedVarSFunction) Clone

func (*OptimizedVarSFunction) Execute

func (f *OptimizedVarSFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*OptimizedVarSFunction) New

实现AggregatorFunction接口 - 韦尔福德算法

func (*OptimizedVarSFunction) Reset

func (f *OptimizedVarSFunction) Reset()

func (*OptimizedVarSFunction) Result

func (f *OptimizedVarSFunction) Result() interface{}

func (*OptimizedVarSFunction) Validate

func (f *OptimizedVarSFunction) Validate(args []interface{}) error

type PercentileAggregatorFunction

type PercentileAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为PercentileFunction添加AggregatorFunction接口实现

func NewPercentileAggregatorFunction

func NewPercentileAggregatorFunction() *PercentileAggregatorFunction

func (*PercentileAggregatorFunction) Add

func (f *PercentileAggregatorFunction) Add(value interface{})

func (*PercentileAggregatorFunction) Clone

func (*PercentileAggregatorFunction) Execute

func (f *PercentileAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*PercentileAggregatorFunction) New

func (*PercentileAggregatorFunction) Reset

func (f *PercentileAggregatorFunction) Reset()

func (*PercentileAggregatorFunction) Result

func (f *PercentileAggregatorFunction) Result() interface{}

func (*PercentileAggregatorFunction) Validate

func (f *PercentileAggregatorFunction) Validate(args []interface{}) error

type PercentileFunction

type PercentileFunction struct {
	*BaseFunction
}

PercentileFunction 百分位数函数

func NewPercentileFunction

func NewPercentileFunction() *PercentileFunction

func (*PercentileFunction) Execute

func (f *PercentileFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*PercentileFunction) Validate

func (f *PercentileFunction) Validate(args []interface{}) error

type PowerFunction

type PowerFunction struct {
	*BaseFunction
}

PowerFunction 幂函数

func NewPowerFunction

func NewPowerFunction() *PowerFunction

func (*PowerFunction) Execute

func (f *PowerFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*PowerFunction) Validate

func (f *PowerFunction) Validate(args []interface{}) error

type RandFunction

type RandFunction struct {
	*BaseFunction
}

RandFunction 随机数函数

func NewRandFunction

func NewRandFunction() *RandFunction

func (*RandFunction) Execute

func (f *RandFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RandFunction) Validate

func (f *RandFunction) Validate(args []interface{}) error

type RegexpMatchesFunction

type RegexpMatchesFunction struct {
	*BaseFunction
}

RegexpMatchesFunction 正则表达式匹配

func NewRegexpMatchesFunction

func NewRegexpMatchesFunction() *RegexpMatchesFunction

func (*RegexpMatchesFunction) Execute

func (f *RegexpMatchesFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RegexpMatchesFunction) Validate

func (f *RegexpMatchesFunction) Validate(args []interface{}) error

type RegexpReplaceFunction

type RegexpReplaceFunction struct {
	*BaseFunction
}

RegexpReplaceFunction 正则表达式替换

func NewRegexpReplaceFunction

func NewRegexpReplaceFunction() *RegexpReplaceFunction

func (*RegexpReplaceFunction) Execute

func (f *RegexpReplaceFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RegexpReplaceFunction) Validate

func (f *RegexpReplaceFunction) Validate(args []interface{}) error

type RegexpSubstringFunction

type RegexpSubstringFunction struct {
	*BaseFunction
}

RegexpSubstringFunction 正则表达式提取子字符串

func NewRegexpSubstringFunction

func NewRegexpSubstringFunction() *RegexpSubstringFunction

func (*RegexpSubstringFunction) Execute

func (f *RegexpSubstringFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RegexpSubstringFunction) Validate

func (f *RegexpSubstringFunction) Validate(args []interface{}) error

type ReplaceFunction

type ReplaceFunction struct {
	*BaseFunction
}

ReplaceFunction 替换字符串中的内容

func NewReplaceFunction

func NewReplaceFunction() *ReplaceFunction

func (*ReplaceFunction) Execute

func (f *ReplaceFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ReplaceFunction) Validate

func (f *ReplaceFunction) Validate(args []interface{}) error

type RoundFunction

type RoundFunction struct {
	*BaseFunction
}

RoundFunction 四舍五入函数

func NewRoundFunction

func NewRoundFunction() *RoundFunction

func (*RoundFunction) Execute

func (f *RoundFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RoundFunction) Validate

func (f *RoundFunction) Validate(args []interface{}) error

type RowNumberFunction

type RowNumberFunction struct {
	*BaseFunction
	CurrentRowNumber int64
}

RowNumberFunction 行号函数

func NewRowNumberFunction

func NewRowNumberFunction() *RowNumberFunction

func (*RowNumberFunction) Execute

func (f *RowNumberFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RowNumberFunction) Reset

func (f *RowNumberFunction) Reset()

func (*RowNumberFunction) Validate

func (f *RowNumberFunction) Validate(args []interface{}) error

type RpadFunction

type RpadFunction struct {
	*BaseFunction
}

RpadFunction 右填充字符串

func NewRpadFunction

func NewRpadFunction() *RpadFunction

func (*RpadFunction) Execute

func (f *RpadFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RpadFunction) Validate

func (f *RpadFunction) Validate(args []interface{}) error

type RtrimFunction

type RtrimFunction struct {
	*BaseFunction
}

RtrimFunction 去除右侧空白字符

func NewRtrimFunction

func NewRtrimFunction() *RtrimFunction

func (*RtrimFunction) Execute

func (f *RtrimFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RtrimFunction) Validate

func (f *RtrimFunction) Validate(args []interface{}) error

type SecondFunction

type SecondFunction struct {
	*BaseFunction
}

SecondFunction 提取秒数函数

func NewSecondFunction

func NewSecondFunction() *SecondFunction

func (*SecondFunction) Execute

func (f *SecondFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SecondFunction) Validate

func (f *SecondFunction) Validate(args []interface{}) error

type Sha1Function

type Sha1Function struct {
	*BaseFunction
}

Sha1Function 计算SHA1哈希值

func NewSha1Function

func NewSha1Function() *Sha1Function

func (*Sha1Function) Execute

func (f *Sha1Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Sha1Function) Validate

func (f *Sha1Function) Validate(args []interface{}) error

type Sha256Function

type Sha256Function struct {
	*BaseFunction
}

Sha256Function 计算SHA256哈希值

func NewSha256Function

func NewSha256Function() *Sha256Function

func (*Sha256Function) Execute

func (f *Sha256Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Sha256Function) Validate

func (f *Sha256Function) Validate(args []interface{}) error

type Sha512Function

type Sha512Function struct {
	*BaseFunction
}

Sha512Function 计算SHA512哈希值

func NewSha512Function

func NewSha512Function() *Sha512Function

func (*Sha512Function) Execute

func (f *Sha512Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Sha512Function) Validate

func (f *Sha512Function) Validate(args []interface{}) error

type SignFunction

type SignFunction struct {
	*BaseFunction
}

SignFunction 符号函数

func NewSignFunction

func NewSignFunction() *SignFunction

func (*SignFunction) Execute

func (f *SignFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SignFunction) Validate

func (f *SignFunction) Validate(args []interface{}) error

type SinFunction

type SinFunction struct {
	*BaseFunction
}

SinFunction 正弦函数

func NewSinFunction

func NewSinFunction() *SinFunction

func (*SinFunction) Execute

func (f *SinFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SinFunction) Validate

func (f *SinFunction) Validate(args []interface{}) error

type SinhFunction

type SinhFunction struct {
	*BaseFunction
}

SinhFunction 双曲正弦函数

func NewSinhFunction

func NewSinhFunction() *SinhFunction

func (*SinhFunction) Execute

func (f *SinhFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SinhFunction) Validate

func (f *SinhFunction) Validate(args []interface{}) error

type SplitFunction

type SplitFunction struct {
	*BaseFunction
}

SplitFunction 按分隔符分割字符串

func NewSplitFunction

func NewSplitFunction() *SplitFunction

func (*SplitFunction) Execute

func (f *SplitFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SplitFunction) Validate

func (f *SplitFunction) Validate(args []interface{}) error

type SqrtFunction

type SqrtFunction struct {
	*BaseFunction
}

SqrtFunction 平方根函数

func NewSqrtFunction

func NewSqrtFunction() *SqrtFunction

func (*SqrtFunction) Execute

func (f *SqrtFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SqrtFunction) Validate

func (f *SqrtFunction) Validate(args []interface{}) error

type StartswithFunction

type StartswithFunction struct {
	*BaseFunction
}

StartswithFunction 检查字符串是否以指定前缀开始

func NewStartswithFunction

func NewStartswithFunction() *StartswithFunction

func (*StartswithFunction) Execute

func (f *StartswithFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StartswithFunction) Validate

func (f *StartswithFunction) Validate(args []interface{}) error

type StdDevAggregatorFunction

type StdDevAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为StdDevFunction添加AggregatorFunction接口实现

func NewStdDevAggregatorFunction

func NewStdDevAggregatorFunction() *StdDevAggregatorFunction

func (*StdDevAggregatorFunction) Add

func (f *StdDevAggregatorFunction) Add(value interface{})

func (*StdDevAggregatorFunction) Clone

func (*StdDevAggregatorFunction) Execute

func (f *StdDevAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevAggregatorFunction) New

func (*StdDevAggregatorFunction) Reset

func (f *StdDevAggregatorFunction) Reset()

func (*StdDevAggregatorFunction) Result

func (f *StdDevAggregatorFunction) Result() interface{}

func (*StdDevAggregatorFunction) Validate

func (f *StdDevAggregatorFunction) Validate(args []interface{}) error

type StdDevFunction

type StdDevFunction struct {
	*BaseFunction
}

StdDevFunction 标准差函数

func NewStdDevFunction

func NewStdDevFunction() *StdDevFunction

func (*StdDevFunction) Execute

func (f *StdDevFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevFunction) Validate

func (f *StdDevFunction) Validate(args []interface{}) error

type StdDevSAggregatorFunction

type StdDevSAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为StdDevSFunction添加AggregatorFunction接口实现

func NewStdDevSAggregatorFunction

func NewStdDevSAggregatorFunction() *StdDevSAggregatorFunction

func (*StdDevSAggregatorFunction) Add

func (f *StdDevSAggregatorFunction) Add(value interface{})

func (*StdDevSAggregatorFunction) Clone

func (*StdDevSAggregatorFunction) Execute

func (f *StdDevSAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevSAggregatorFunction) New

func (*StdDevSAggregatorFunction) Reset

func (f *StdDevSAggregatorFunction) Reset()

func (*StdDevSAggregatorFunction) Result

func (f *StdDevSAggregatorFunction) Result() interface{}

func (*StdDevSAggregatorFunction) Validate

func (f *StdDevSAggregatorFunction) Validate(args []interface{}) error

type StdDevSFunction

type StdDevSFunction struct {
	*BaseFunction
}

StdDevSFunction 样本标准差函数

func NewStdDevSFunction

func NewStdDevSFunction() *StdDevSFunction

func (*StdDevSFunction) Execute

func (f *StdDevSFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevSFunction) Validate

func (f *StdDevSFunction) Validate(args []interface{}) error

type SubstringFunction

type SubstringFunction struct {
	*BaseFunction
}

SubstringFunction 提取子字符串

func NewSubstringFunction

func NewSubstringFunction() *SubstringFunction

func (*SubstringFunction) Execute

func (f *SubstringFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SubstringFunction) Validate

func (f *SubstringFunction) Validate(args []interface{}) error

type SumFunction

type SumFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

SumFunction 求和函数

func NewSumFunction

func NewSumFunction() *SumFunction

func (*SumFunction) Add

func (f *SumFunction) Add(value interface{})

func (*SumFunction) Clone

func (f *SumFunction) Clone() AggregatorFunction

func (*SumFunction) Execute

func (f *SumFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SumFunction) New

实现AggregatorFunction接口

func (*SumFunction) Reset

func (f *SumFunction) Reset()

func (*SumFunction) Result

func (f *SumFunction) Result() interface{}

func (*SumFunction) Validate

func (f *SumFunction) Validate(args []interface{}) error

type TanFunction

type TanFunction struct {
	*BaseFunction
}

TanFunction 正切函数

func NewTanFunction

func NewTanFunction() *TanFunction

func (*TanFunction) Execute

func (f *TanFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*TanFunction) Validate

func (f *TanFunction) Validate(args []interface{}) error

type TanhFunction

type TanhFunction struct {
	*BaseFunction
}

TanhFunction 双曲正切函数

func NewTanhFunction

func NewTanhFunction() *TanhFunction

func (*TanhFunction) Execute

func (f *TanhFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*TanhFunction) Validate

func (f *TanhFunction) Validate(args []interface{}) error

type ToJsonFunction

type ToJsonFunction struct {
	*BaseFunction
}

ToJsonFunction 转换为JSON字符串

func NewToJsonFunction

func NewToJsonFunction() *ToJsonFunction

func (*ToJsonFunction) Execute

func (f *ToJsonFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ToJsonFunction) Validate

func (f *ToJsonFunction) Validate(args []interface{}) error

type ToSecondsFunction

type ToSecondsFunction struct {
	*BaseFunction
}

ToSecondsFunction 转换为Unix时间戳(秒)

func NewToSecondsFunction

func NewToSecondsFunction() *ToSecondsFunction

func (*ToSecondsFunction) Execute

func (f *ToSecondsFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ToSecondsFunction) Validate

func (f *ToSecondsFunction) Validate(args []interface{}) error

type TrimFunction

type TrimFunction struct {
	*BaseFunction
}

TrimFunction 去除首尾空格函数

func NewTrimFunction

func NewTrimFunction() *TrimFunction

func (*TrimFunction) Execute

func (f *TrimFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*TrimFunction) Validate

func (f *TrimFunction) Validate(args []interface{}) error

type TruncFunction

type TruncFunction struct {
	*BaseFunction
}

TruncFunction 截断小数位数

func NewTruncFunction

func NewTruncFunction() *TruncFunction

NewTruncFunction 创建新的 trunc 函数

func (*TruncFunction) Execute

func (f *TruncFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

Execute 执行函数

func (*TruncFunction) Validate

func (f *TruncFunction) Validate(args []interface{}) error

Validate 验证参数

type UnixTimestampFunction

type UnixTimestampFunction struct {
	*BaseFunction
}

UnixTimestampFunction Unix时间戳函数

func NewUnixTimestampFunction

func NewUnixTimestampFunction() *UnixTimestampFunction

func (*UnixTimestampFunction) Execute

func (f *UnixTimestampFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UnixTimestampFunction) Validate

func (f *UnixTimestampFunction) Validate(args []interface{}) error

type UnnestFunction

type UnnestFunction struct {
	*BaseFunction
}

UnnestFunction 将数组展开为多行

func NewUnnestFunction

func NewUnnestFunction() *UnnestFunction

func (*UnnestFunction) Execute

func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UnnestFunction) Validate

func (f *UnnestFunction) Validate(args []interface{}) error

type UnnestResult

type UnnestResult struct {
	Rows []map[string]interface{}
}

UnnestResult 表示 unnest 函数的结果

type UpperFunction

type UpperFunction struct {
	*BaseFunction
}

UpperFunction 转大写函数

func NewUpperFunction

func NewUpperFunction() *UpperFunction

func (*UpperFunction) Execute

func (f *UpperFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UpperFunction) Validate

func (f *UpperFunction) Validate(args []interface{}) error

type UrlDecodeFunction

type UrlDecodeFunction struct {
	*BaseFunction
}

UrlDecodeFunction URL解码函数

func NewUrlDecodeFunction

func NewUrlDecodeFunction() *UrlDecodeFunction

func (*UrlDecodeFunction) Execute

func (f *UrlDecodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UrlDecodeFunction) Validate

func (f *UrlDecodeFunction) Validate(args []interface{}) error

type UrlEncodeFunction

type UrlEncodeFunction struct {
	*BaseFunction
}

UrlEncodeFunction URL编码函数

func NewUrlEncodeFunction

func NewUrlEncodeFunction() *UrlEncodeFunction

func (*UrlEncodeFunction) Execute

func (f *UrlEncodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UrlEncodeFunction) Validate

func (f *UrlEncodeFunction) Validate(args []interface{}) error

type VarAggregatorFunction

type VarAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为VarFunction添加AggregatorFunction接口实现

func NewVarAggregatorFunction

func NewVarAggregatorFunction() *VarAggregatorFunction

func (*VarAggregatorFunction) Add

func (f *VarAggregatorFunction) Add(value interface{})

func (*VarAggregatorFunction) Clone

func (*VarAggregatorFunction) Execute

func (f *VarAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarAggregatorFunction) New

func (*VarAggregatorFunction) Reset

func (f *VarAggregatorFunction) Reset()

func (*VarAggregatorFunction) Result

func (f *VarAggregatorFunction) Result() interface{}

func (*VarAggregatorFunction) Validate

func (f *VarAggregatorFunction) Validate(args []interface{}) error

type VarFunction

type VarFunction struct {
	*BaseFunction
}

VarFunction 总体方差函数

func NewVarFunction

func NewVarFunction() *VarFunction

func (*VarFunction) Execute

func (f *VarFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarFunction) Validate

func (f *VarFunction) Validate(args []interface{}) error

type VarSAggregatorFunction

type VarSAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为VarSFunction添加AggregatorFunction接口实现

func NewVarSAggregatorFunction

func NewVarSAggregatorFunction() *VarSAggregatorFunction

func (*VarSAggregatorFunction) Add

func (f *VarSAggregatorFunction) Add(value interface{})

func (*VarSAggregatorFunction) Clone

func (*VarSAggregatorFunction) Execute

func (f *VarSAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarSAggregatorFunction) New

func (*VarSAggregatorFunction) Reset

func (f *VarSAggregatorFunction) Reset()

func (*VarSAggregatorFunction) Result

func (f *VarSAggregatorFunction) Result() interface{}

func (*VarSAggregatorFunction) Validate

func (f *VarSAggregatorFunction) Validate(args []interface{}) error

type VarSFunction

type VarSFunction struct {
	*BaseFunction
}

VarSFunction 样本方差函数

func NewVarSFunction

func NewVarSFunction() *VarSFunction

func (*VarSFunction) Execute

func (f *VarSFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarSFunction) Validate

func (f *VarSFunction) Validate(args []interface{}) error

type WeekOfYearFunction

type WeekOfYearFunction struct {
	*BaseFunction
}

WeekOfYearFunction 获取一年中的第几周函数

func NewWeekOfYearFunction

func NewWeekOfYearFunction() *WeekOfYearFunction

func (*WeekOfYearFunction) Execute

func (f *WeekOfYearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*WeekOfYearFunction) Validate

func (f *WeekOfYearFunction) Validate(args []interface{}) error

type WindowEndFunction

type WindowEndFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

WindowEndFunction 窗口结束时间函数

func NewWindowEndFunction

func NewWindowEndFunction() *WindowEndFunction

func (*WindowEndFunction) Add

func (f *WindowEndFunction) Add(value interface{})

func (*WindowEndFunction) Clone

func (*WindowEndFunction) Execute

func (f *WindowEndFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*WindowEndFunction) New

实现AggregatorFunction接口

func (*WindowEndFunction) Reset

func (f *WindowEndFunction) Reset()

func (*WindowEndFunction) Result

func (f *WindowEndFunction) Result() interface{}

func (*WindowEndFunction) Validate

func (f *WindowEndFunction) Validate(args []interface{}) error

type WindowInfo

type WindowInfo struct {
	WindowStart int64
	WindowEnd   int64
	RowCount    int
}

WindowInfo 窗口信息

type WindowStartFunction

type WindowStartFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

WindowStartFunction 窗口开始时间函数

func NewWindowStartFunction

func NewWindowStartFunction() *WindowStartFunction

func (*WindowStartFunction) Add

func (f *WindowStartFunction) Add(value interface{})

func (*WindowStartFunction) Clone

func (*WindowStartFunction) Execute

func (f *WindowStartFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*WindowStartFunction) New

实现AggregatorFunction接口

func (*WindowStartFunction) Reset

func (f *WindowStartFunction) Reset()

func (*WindowStartFunction) Result

func (f *WindowStartFunction) Result() interface{}

func (*WindowStartFunction) Validate

func (f *WindowStartFunction) Validate(args []interface{}) error

type YearFunction

type YearFunction struct {
	*BaseFunction
}

YearFunction 提取年份函数

func NewYearFunction

func NewYearFunction() *YearFunction

func (*YearFunction) Execute

func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*YearFunction) Validate

func (f *YearFunction) Validate(args []interface{}) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL