Documentation
¶
Index ¶
- Variables
- func NewBatchSQLWithMock(ctx context.Context, config PipelineConfig) (*BatchSQL, *MockExecutor)
- func NewBatchSQLWithMockDriver(ctx context.Context, config PipelineConfig, sqlDriver SQLDriver) (*BatchSQL, *MockExecutor)
- type BatchExecutor
- type BatchProcessor
- type BatchSQL
- func NewBatchSQL(ctx context.Context, buffSize uint32, flushSize uint32, ...) *BatchSQL
- func NewMySQLBatchSQL(ctx context.Context, db *sql.DB, config PipelineConfig) *BatchSQL
- func NewMySQLBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL
- func NewPostgreSQLBatchSQL(ctx context.Context, db *sql.DB, config PipelineConfig) *BatchSQL
- func NewPostgreSQLBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL
- func NewRedisBatchSQL(ctx context.Context, db *redisV9.Client, config PipelineConfig) *BatchSQL
- func NewRedisBatchSQLWithDriver(ctx context.Context, db *redisV9.Client, config PipelineConfig, ...) *BatchSQL
- func NewSQLiteBatchSQL(ctx context.Context, db *sql.DB, config PipelineConfig) *BatchSQL
- func NewSQLiteBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL
- type ConcurrencyCapable
- type ConflictStrategy
- type MetricsCapable
- type MetricsConfig
- type MetricsReporter
- type MockDriver
- type MockExecutor
- type MySQLDriver
- type NoopMetricsReporter
- func (*NoopMetricsReporter) DecInflight()
- func (*NoopMetricsReporter) IncError(string, string)
- func (*NoopMetricsReporter) IncInflight()
- func (*NoopMetricsReporter) ObserveBatchAssemble(time.Duration)
- func (*NoopMetricsReporter) ObserveBatchSize(int)
- func (*NoopMetricsReporter) ObserveEnqueueLatency(time.Duration)
- func (*NoopMetricsReporter) ObserveExecuteDuration(string, int, time.Duration, string)
- func (*NoopMetricsReporter) SetConcurrency(int)
- func (*NoopMetricsReporter) SetQueueLength(int)
- type Operations
- type PipelineConfig
- type PostgreSQLDriver
- type RedisBatchProcessor
- type RedisCmd
- type RedisDriver
- type RedisPipelineDriver
- type Request
- func (r *Request) Columns() map[string]any
- func (r *Request) GetBool(colName string) (bool, error)
- func (r *Request) GetFloat64(colName string) (float64, error)
- func (r *Request) GetInt32(colName string) (int32, error)
- func (r *Request) GetInt64(colName string) (int64, error)
- func (r *Request) GetOrderedValues() []any
- func (r *Request) GetString(colName string) (string, error)
- func (r *Request) GetTime(colName string) (time.Time, error)
- func (r *Request) Schema() *Schema
- func (r *Request) Set(colName string, value any) *Request
- func (r *Request) SetBool(colName string, value bool) *Request
- func (r *Request) SetBytes(colName string, value []byte) *Request
- func (r *Request) SetFloat32(colName string, value float32) *Request
- func (r *Request) SetFloat64(colName string, value float64) *Request
- func (r *Request) SetInt32(colName string, value int32) *Request
- func (r *Request) SetInt64(colName string, value int64) *Request
- func (r *Request) SetNull(colName string) *Request
- func (r *Request) SetString(colName string, value string) *Request
- func (r *Request) SetTime(colName string, value time.Time) *Request
- func (r *Request) Validate() error
- type RetryConfig
- type SQLBatchProcessor
- type SQLDriver
- type SQLiteDriver
- type Schema
- type ThrottledBatchExecutor
- func NewRedisThrottledBatchExecutor(client *redis.Client) *ThrottledBatchExecutor
- func NewRedisThrottledBatchExecutorWithDriver(client *redis.Client, driver RedisDriver) *ThrottledBatchExecutor
- func NewSQLThrottledBatchExecutorWithDriver(db *sql.DB, driver SQLDriver) *ThrottledBatchExecutor
- func NewThrottledBatchExecutor(processor BatchProcessor) *ThrottledBatchExecutor
- func (e *ThrottledBatchExecutor) ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error
- func (e *ThrottledBatchExecutor) MetricsReporter() MetricsReporter
- func (e *ThrottledBatchExecutor) WithConcurrencyLimit(limit int) *ThrottledBatchExecutor
- func (e *ThrottledBatchExecutor) WithMetricsReporter(metricsReporter MetricsReporter) *ThrottledBatchExecutor
- func (e *ThrottledBatchExecutor) WithRetryConfig(cfg RetryConfig) *ThrottledBatchExecutor
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEmptyRequest 空请求错误 ErrEmptyRequest = errors.New("empty request") // ErrContextCanceled 上下文被取消错误 ErrContextCanceled = errors.New("context canceled") // ErrInvalidSchema 无效的 schema 错误 ErrInvalidSchema = errors.New("invalid schema") // ErrMissingColumn 缺少列错误 ErrMissingColumn = errors.New("missing required column") // ErrInvalidColumnType 无效的列类型错误 ErrInvalidColumnType = errors.New("invalid column type") // ErrEmptyBatch 空批次错误 ErrEmptyBatch = errors.New("empty batch") // ErrEmptySchemaName 空表名错误 ErrEmptySchemaName = errors.New("empty schema name") )
var DefaultMySQLDriver = NewMySQLDriver()
var DefaultPostgreSQLDriver = NewPostgreSQLDriver()
var DefaultRedisPipelineDriver = NewRedisPipelineDriver()
var DefaultSQLiteDriver = NewSQLiteDriver()
Functions ¶
func NewBatchSQLWithMock ¶
func NewBatchSQLWithMock(ctx context.Context, config PipelineConfig) (*BatchSQL, *MockExecutor)
NewBatchSQLWithMock 使用模拟执行器创建 BatchSQL 实例(用于测试) 内部架构:BatchSQL -> MockExecutor(直接实现BatchExecutor,无真实数据库操作) 适用于单元测试,不依赖真实数据库连接
func NewBatchSQLWithMockDriver ¶
func NewBatchSQLWithMockDriver(ctx context.Context, config PipelineConfig, sqlDriver SQLDriver) (*BatchSQL, *MockExecutor)
NewBatchSQLWithMockDriver 使用模拟执行器创建 BatchSQL 实例(测试特定SQLDriver) 内部架构:BatchSQL -> MockExecutor(模拟CommonExecutor行为,测试SQLDriver逻辑) 适用于测试自定义SQLDriver的SQL生成逻辑
Types ¶
type BatchExecutor ¶
type BatchExecutor interface { // ExecuteBatch 执行批量操作 ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error }
BatchExecutor 批量执行器接口 - 所有数据库驱动的统一入口
type BatchProcessor ¶
type BatchProcessor interface { // GenerateOperations 生成批量操作 GenerateOperations(ctx context.Context, schema *Schema, data []map[string]any) (operations Operations, err error) // ExecuteOperations 执行批量操作 ExecuteOperations(ctx context.Context, operations Operations) error }
BatchProcessor 批量处理器接口 - SQL数据库的核心处理逻辑
type BatchSQL ¶
type BatchSQL struct {
// contains filtered or unexported fields
}
BatchSQL 批量处理管道 核心组件,整合 go-pipeline 和 BatchExecutor,提供统一的批量处理接口
架构层次: Application -> BatchSQL -> gopipeline -> BatchExecutor -> Database
支持的 BatchExecutor 实现: - SQL 数据库:ThrottledBatchExecutor + SQLBatchProcessor + SQLDriver - NoSQL 数据库:ThrottledBatchExecutor + RedisBatchProcessor + RedisDriver(直接生成/执行命令) - 测试环境:MockExecutor(直接实现 BatchExecutor) 可选能力: - WithConcurrencyLimit:通过信号量限制 ExecuteBatch 并发,避免攒批后同时冲击数据库(limit <= 0 等价于不限流)
func NewBatchSQL ¶
func NewBatchSQL(ctx context.Context, buffSize uint32, flushSize uint32, flushInterval time.Duration, executor BatchExecutor) *BatchSQL
NewBatchSQL 创建 BatchSQL 实例 这是最底层的构造函数,接受任何实现了BatchExecutor接口的执行器 通常不直接使用,而是通过具体数据库的工厂方法创建
func NewMySQLBatchSQL ¶
NewMySQLBatchSQL 创建MySQL BatchSQL实例(使用默认Driver)
内部架构:BatchSQL -> ThrottledBatchExecutor -> SQLBatchProcessor -> MySQLDriver -> MySQL
这是推荐的使用方式,使用MySQL优化的默认配置
func NewMySQLBatchSQLWithDriver ¶
func NewMySQLBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL
NewMySQLBatchSQLWithDriver 创建MySQL BatchSQL实例(使用自定义Driver)
内部架构:BatchSQL -> ThrottledBatchExecutor -> SQLBatchProcessor -> CustomDriver -> MySQL
适用于需要自定义SQL生成逻辑的场景(如TiDB优化)
func NewPostgreSQLBatchSQL ¶
NewPostgreSQLBatchSQL 创建PostgreSQL BatchSQL实例(使用默认Driver)
func NewPostgreSQLBatchSQLWithDriver ¶
func NewPostgreSQLBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL
NewPostgreSQLBatchSQLWithDriver 创建PostgreSQL BatchSQL实例(使用自定义Driver)
func NewRedisBatchSQL ¶
NewRedisBatchSQL 创建Redis BatchSQL实例
内部架构(NoSQL):BatchSQL -> ThrottledBatchExecutor -> RedisBatchProcessor -> RedisDriver -> Redis 说明:NoSQL 路径不使用 SQL 抽象层,直接生成并执行 Redis 命令;仍可启用 WithConcurrencyLimit 控制批次并发。
func NewRedisBatchSQLWithDriver ¶
func NewRedisBatchSQLWithDriver(ctx context.Context, db *redisV9.Client, config PipelineConfig, driver RedisDriver) *BatchSQL
func NewSQLiteBatchSQL ¶
NewSQLiteBatchSQL 创建SQLite BatchSQL实例(使用默认Driver)
func NewSQLiteBatchSQLWithDriver ¶
func NewSQLiteBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL
NewSQLiteBatchSQLWithDriver 创建SQLite BatchSQL实例(使用自定义Driver)
type ConcurrencyCapable ¶ added in v1.3.0
type ConflictStrategy ¶
type ConflictStrategy uint8
ConflictStrategy 冲突处理策略
const ( ConflictIgnore ConflictStrategy = iota ConflictReplace ConflictUpdate )
type MetricsCapable ¶ added in v1.3.0
type MetricsCapable[T any] interface { // WithMetricsReporter 设置性能监控报告器(返回实现者类型以支持链式) WithMetricsReporter(MetricsReporter) T // MetricsReporter 返回当前性能监控报告器(可能为 nil) MetricsReporter() MetricsReporter }
Metrics 相关接口设计说明
- BatchExecutor:仅负责“执行”职责,保持最小接口。
- MetricsCapable[T](泛型):提供“读 + 写”的度量能力,方法返回自类型 T,便于在具体类型或已实例化接口上进行链式配置。 注意:泛型接口在运行时类型断言时必须使用具体的类型实参(如 MetricsCapable[*ThrottledBatchExecutor])。
- 兼容性与运行时探测: 在 BatchSQL 等仅持有 BatchExecutor 的位置,无法统一断言到 MetricsCapable[T], 因此使用一个非泛型只读探测接口(即仅调用 MetricsReporter())来判断是否已有 Reporter; 若返回 nil,则在本地使用 Noop 兜底,不强制写回(写回需要具体类型 T)。
这允许: - 在需要链式的调用点:以具体类型或已实例化能力接口使用 Set/With 风格链式; - 在框架内部通用路径:通过只读探测保证安全、零开销的观测兜底。
MetricsCapable 扩展接口:支持性能监控报告器(自类型泛型)
type MetricsConfig ¶
type MetricsConfig struct {
Enabled bool
}
MetricsConfig 指标导出配置(Step 1:骨架;默认关闭采样)
type MetricsReporter ¶
type MetricsReporter interface { // 阶段耗时 ObserveEnqueueLatency(d time.Duration) // Submit -> 入队 ObserveBatchAssemble(d time.Duration) // 攒批/组装 ObserveExecuteDuration(table string, n int, d time.Duration, status string) // 执行 // 其他观测 ObserveBatchSize(n int) IncError(table string, typ string) SetConcurrency(n int) SetQueueLength(n int) // 在途批次数(不限流也可观察执行压力) IncInflight() DecInflight() }
MetricsReporter 统一指标接口(默认 Noop 实现,避免启用前引入开销)
type MockDriver ¶
type MockDriver struct {
// contains filtered or unexported fields
}
func NewMockDriver ¶
func NewMockDriver(databaseType string) *MockDriver
type MockExecutor ¶
type MockExecutor struct { ExecutedBatches [][]map[string]any // contains filtered or unexported fields }
Executor 模拟批量执行器(用于测试)
func NewMockExecutorWithDriver ¶
func NewMockExecutorWithDriver(driver SQLDriver) *MockExecutor
NewMockExecutorWithDriver 创建模拟批量执行器(使用自定义Driver)
func (*MockExecutor) ExecuteBatch ¶
func (e *MockExecutor) ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error
ExecuteBatch 模拟执行批量操作
func (*MockExecutor) SnapshotExecutedBatches ¶
func (e *MockExecutor) SnapshotExecutedBatches() [][]map[string]any
SnapshotExecutedBatches 返回一次性快照,避免并发读写竞态
func (*MockExecutor) SnapshotResults ¶
func (e *MockExecutor) SnapshotResults() map[string]map[string]int64
SnapshotResults 返回只读快照(拷贝),用于测试收尾输出或断言
type MySQLDriver ¶
type MySQLDriver struct {
// contains filtered or unexported fields
}
func NewMySQLDriver ¶
func NewMySQLDriver() *MySQLDriver
type NoopMetricsReporter ¶
type NoopMetricsReporter struct{}
NoopMetricsReporter 默认关闭时的无操作实现(零开销路径)
func NewNoopMetricsReporter ¶
func NewNoopMetricsReporter() *NoopMetricsReporter
func (*NoopMetricsReporter) DecInflight ¶
func (*NoopMetricsReporter) DecInflight()
func (*NoopMetricsReporter) IncError ¶
func (*NoopMetricsReporter) IncError(string, string)
func (*NoopMetricsReporter) IncInflight ¶
func (*NoopMetricsReporter) IncInflight()
func (*NoopMetricsReporter) ObserveBatchAssemble ¶
func (*NoopMetricsReporter) ObserveBatchAssemble(time.Duration)
func (*NoopMetricsReporter) ObserveBatchSize ¶
func (*NoopMetricsReporter) ObserveBatchSize(int)
func (*NoopMetricsReporter) ObserveEnqueueLatency ¶
func (*NoopMetricsReporter) ObserveEnqueueLatency(time.Duration)
func (*NoopMetricsReporter) ObserveExecuteDuration ¶
func (*NoopMetricsReporter) SetConcurrency ¶
func (*NoopMetricsReporter) SetConcurrency(int)
func (*NoopMetricsReporter) SetQueueLength ¶
func (*NoopMetricsReporter) SetQueueLength(int)
type Operations ¶
type Operations []any
type PipelineConfig ¶
type PipelineConfig struct { BufferSize uint32 FlushSize uint32 FlushInterval time.Duration // Step 2: 可选重试配置(零值=关闭,向后兼容) Retry RetryConfig }
PipelineConfig 管道配置
type PostgreSQLDriver ¶
type PostgreSQLDriver struct {
// contains filtered or unexported fields
}
func NewPostgreSQLDriver ¶
func NewPostgreSQLDriver() *PostgreSQLDriver
type RedisBatchProcessor ¶
type RedisBatchProcessor struct {
// contains filtered or unexported fields
}
RedisBatchProcessor Redis批量处理器 实现 BatchProcessor 接口,专注于Redis的核心处理逻辑
func NewRedisBatchProcessor ¶
func NewRedisBatchProcessor(client *redis.Client, driver RedisDriver) *RedisBatchProcessor
NewRedisBatchProcessor 创建Redis批量处理器 参数: - client: Redis客户端连接 - driver: Redis操作生成器
func (*RedisBatchProcessor) ExecuteOperations ¶
func (rp *RedisBatchProcessor) ExecuteOperations(ctx context.Context, operations Operations) error
func (*RedisBatchProcessor) GenerateOperations ¶
func (rp *RedisBatchProcessor) GenerateOperations(ctx context.Context, schema *Schema, data []map[string]any) (operations Operations, err error)
GenerateOperations 执行批量操作
type RedisDriver ¶
type RedisPipelineDriver ¶
type RedisPipelineDriver struct{}
func NewRedisPipelineDriver ¶
func NewRedisPipelineDriver() *RedisPipelineDriver
func (*RedisPipelineDriver) GenerateCmds ¶
type Request ¶
type Request struct {
// contains filtered or unexported fields
}
用来存储请求的数据的各种字段信息和对应的schema
func NewRequest ¶
func (*Request) GetOrderedValues ¶
GetOrderedValues 按照 schema 中定义的列顺序返回值
type RetryConfig ¶
type RetryConfig struct { Enabled bool MaxAttempts int // 总尝试次数(含首轮),建议 2~3 BackoffBase time.Duration // 退避基值(指数退避起点) MaxBackoff time.Duration // 最大退避时长(上限) // 自定义错误分类(可选);返回是否可重试与原因标签 Classifier func(error) (retryable bool, reason string) }
RetryConfig 可选重试配置(零值关闭)
type SQLBatchProcessor ¶
type SQLBatchProcessor struct {
// contains filtered or unexported fields
}
SQLBatchProcessor SQL数据库批量处理器 实现 BatchProcessor 接口,专注于SQL数据库的核心处理逻辑
func NewSQLBatchProcessor ¶
func NewSQLBatchProcessor(db *sql.DB, driver SQLDriver) *SQLBatchProcessor
NewSQLBatchProcessor 创建SQL批量处理器 参数: - db: 数据库连接(用户管理连接池) - driver: 数据库特定的SQL生成器
func (*SQLBatchProcessor) ExecuteOperations ¶
func (bp *SQLBatchProcessor) ExecuteOperations(ctx context.Context, operations Operations) error
func (*SQLBatchProcessor) GenerateOperations ¶
func (bp *SQLBatchProcessor) GenerateOperations(ctx context.Context, schema *Schema, data []map[string]any) (operations Operations, err error)
type SQLDriver ¶
type SQLDriver interface {
GenerateInsertSQL(ctx context.Context, schema *Schema, data []map[string]any) (sql string, args []any, err error)
}
SQLDriver 数据库特定的SQL生成器接口
type SQLiteDriver ¶
type SQLiteDriver struct {
// contains filtered or unexported fields
}
func NewSQLiteDriver ¶
func NewSQLiteDriver() *SQLiteDriver
type Schema ¶
type Schema struct { Name string Columns []string ConflictStrategy ConflictStrategy }
Schema 表结构定义
type ThrottledBatchExecutor ¶
type ThrottledBatchExecutor struct {
// contains filtered or unexported fields
}
ThrottledBatchExecutor SQL数据库通用批量执行器 实现 ThrottledBatchExecutor 接口,为SQL数据库提供统一的执行逻辑 架构:ThrottledBatchExecutor -> BatchProcessor -> SQLDriver -> Database
设计优势: - 代码复用:所有SQL数据库共享相同的执行逻辑和指标收集 - 职责分离:执行控制与具体处理逻辑分离 - 易于扩展:新增SQL数据库只需实现SQLDriver接口
func NewRedisThrottledBatchExecutor ¶
func NewRedisThrottledBatchExecutor(client *redis.Client) *ThrottledBatchExecutor
func NewRedisThrottledBatchExecutorWithDriver ¶
func NewRedisThrottledBatchExecutorWithDriver(client *redis.Client, driver RedisDriver) *ThrottledBatchExecutor
func NewSQLThrottledBatchExecutorWithDriver ¶
func NewSQLThrottledBatchExecutorWithDriver(db *sql.DB, driver SQLDriver) *ThrottledBatchExecutor
NewThrottledBatchExecutorWithDriver 创建SQL数据库执行器(推荐方式) 内部使用 SQLBatchProcessor + SQLDriver 组合
func NewThrottledBatchExecutor ¶
func NewThrottledBatchExecutor(processor BatchProcessor) *ThrottledBatchExecutor
NewThrottledBatchExecutor 创建通用执行器(使用自定义BatchProcessor)
func (*ThrottledBatchExecutor) ExecuteBatch ¶
func (e *ThrottledBatchExecutor) ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error
ExecuteBatch 执行批量操作
func (*ThrottledBatchExecutor) MetricsReporter ¶
func (e *ThrottledBatchExecutor) MetricsReporter() MetricsReporter
WithConcurrencyLimit 设置并发上限(limit <= 0 表示不启用限流)
func (*ThrottledBatchExecutor) WithConcurrencyLimit ¶
func (e *ThrottledBatchExecutor) WithConcurrencyLimit(limit int) *ThrottledBatchExecutor
func (*ThrottledBatchExecutor) WithMetricsReporter ¶
func (e *ThrottledBatchExecutor) WithMetricsReporter(metricsReporter MetricsReporter) *ThrottledBatchExecutor
WithMetricsReporter 设置指标报告器
func (*ThrottledBatchExecutor) WithRetryConfig ¶
func (e *ThrottledBatchExecutor) WithRetryConfig(cfg RetryConfig) *ThrottledBatchExecutor
WithRetryConfig 启用/配置重试(仅对 ThrottledBatchExecutor 可用)
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
test
|
|
integration
command
|
|
sqlite/tools/benchmark
command
|
|
sqlite/tools/clear-test
command
|
|
sqlite/tools/config-analysis
command
|
|
sqlite/tools/path-compatibility
command
|