batchsql

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2025 License: MIT Imports: 10 Imported by: 0

README

BatchSQL

Release Go Reference Go Report Card License Coverage GitHub Stars

一个高性能的 Go 批量 SQL 处理库,基于 go-pipeline 实现,支持多种数据库类型和冲突处理策略。

🏗️ 架构设计

延伸阅读

核心组件
flowchart TB

  %% 子图1:系统级数据流
  subgraph A0[系统级数据流]
    A1[Application] --> A2["BatchSQL<br/>(MySQL/PG/SQLite/Redis)"] --> A3["gopipeline<br/>(异步批量处理)"]

    A2 --> A4["BatchExecutor<br/>(统一执行接口)"]
    A3 --> A5["Flush Function<br/>(批量刷新逻辑)"]

    A4 --> A6[数据库驱动层]
    A5 --> A7["Schema Grouping<br/>(按表分组聚合)"]

    A6 --> A8["SQL数据库<br/>(MySQL/PG/SQLite)"]
    A6 --> A9[Redis数据库]

    A8 --> A10["Database<br/>(SQL连接池)"]
    A9 --> A11["Redis Client<br/>(Redis连接)"]
  end

  %% 子图2:组件分层与驱动路径
  subgraph B0[组件分层与驱动路径]
    B1["BatchExecutor"] --> B2["CommonExecutor<br/>(通用执行器)"]
    B2 --> B3["BatchProcessor + Driver<br/>(操作生成和执行)"]
    B3 --> B4["Database Connection"]

    %% 数据库类型分支
    B4 --> B5[SQL数据库]
    B4 --> B6[NoSQL数据库]

    B5 --> B51[MySQL]
    B5 --> B52[PostgreSQL]
    B5 --> B53[SQLite]

    B6 --> B61[Redis]
  end

  %% 视图之间的对应关系(虚线)
  A4 -. 同一执行器 .-> B1
  A6 -. 连接/驱动 .-> B4
设计原则
  • 一个BatchSQL绑定一个数据库类型 - 避免混合数据库的复杂性
  • Schema专注表结构定义 - 职责单一,可复用性强
  • BatchExecutor统一接口 - 所有数据库驱动的统一入口
  • 模块化设计 - 清晰的组件分工,便于维护和扩展
  • 轻量级设计 - 不涉及连接池管理,支持任何数据库框架

🚀 功能特性

核心功能
  • 批量处理:使用 gopipeline.StandardPipeline 进行高效的批量数据处理
  • 多数据库支持:支持 MySQL、PostgreSQL、SQLite,易于扩展
  • 冲突处理策略:支持跳过、覆盖、更新三种冲突处理方式
  • 类型安全:提供类型化的列操作方法
  • 智能聚合:按 schema 指针自动聚合相同表的请求
设计亮点
  • 指针传递优化:使用指针传递减少内存复制,提高性能
  • 并发安全:支持并发提交请求,自动按 schema 分组处理
  • 灵活配置:支持自定义缓冲区大小、刷新大小和刷新间隔
  • 混合API设计:默认方式简单易用,自定义方式支持第三方扩展
  • 框架无关:支持原生 sql.DB、GORM、sqlx 等任何数据库框架

延伸阅读

🚀 快速开始

安装
go get github.com/rushairer/batchsql
基本使用
package main

import (
    "context"
    "database/sql"
    "log"
    "time"
    "github.com/rushairer/batchsql"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
    ctx := context.Background()
    
    // 1. 创建数据库连接(用户自己管理连接池)
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    if err != nil {
        panic(err)
    }
    defer db.Close()
    
    // 2. 创建MySQL BatchSQL实例
    // 内部架构:ThrottledBatchExecutor -> SQLBatchProcessor -> MySQLDriver
    config := batchsql.PipelineConfig{
        BufferSize:    1000,        // 缓冲区大小
        FlushSize:     100,         // 批量刷新大小
        FlushInterval: 5 * time.Second, // 刷新间隔
    }
    batch := batchsql.NewMySQLBatchSQL(ctx, db, config)

    // 3. 定义 schema(表结构定义,与数据库类型解耦)
    userSchema := batchsql.NewSchema(
        "users",                    // 表名
        batchsql.ConflictIgnore,     // 冲突策略
        "id", "name", "email",      // 列名
    )

    // 4. 创建并提交请求
    request := batchsql.NewRequest(userSchema).
        SetInt64("id", 1).
        SetString("name", "John").
        SetString("email", "john@example.com")

    if err := batch.Submit(ctx, request); err != nil {
        panic(err)
    }
    
    // 5. 监听错误
    go func() {
        errorChan := batch.ErrorChan(10)
        for err := range errorChan {
            log.Printf("Batch processing error: %v", err)
        }
    }()
}

注意:

  • 自 v1.1.1 起,Submit 会在尝试入队前优先检查 ctx.Err()(取消/超时将立即返回,不会进入内部批处理通道)。请在提交前妥善管理 context 生命周期,避免无效提交。

延伸阅读

Redis 使用示例
package main

import (
    "context"
    "log"
    "time"
    "github.com/redis/go-redis/v9"
    "github.com/rushairer/batchsql"

)

func main() {
    ctx := context.Background()
    
    // 1. 创建Redis连接
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()
    
    // 2. 创建Redis BatchSQL实例
    // 内部架构:ThrottledBatchExecutor -> RedisBatchProcessor -> RedisDriver
    config := batchsql.PipelineConfig{
        BufferSize:    1000,
        FlushSize:     100,
        FlushInterval: 5 * time.Second,
    }
    batch := batchsql.NewRedisBatchSQL(ctx, rdb, config)

    // 3. 定义 Redis schema(使用 SETEX 命令格式)
    cacheSchema := batchsql.NewSchema(
        "cache",                    // 逻辑表名
        batchsql.ConflictReplace,    // Redis默认覆盖
        "cmd", "key", "ttl", "value", // SETEX 命令参数顺序
    )

    // 4. 提交Redis数据(SETEX 命令)
    request := batchsql.NewRequest(cacheSchema).
        SetString("cmd", "SETEX").
        SetString("key", "user:1").
        SetInt64("ttl", 3600).      // TTL in seconds
        SetString("value", `{"name":"John Doe","email":"john@example.com"}`)

    if err := batch.Submit(ctx, request); err != nil {
        panic(err)
    }
    
    // 5. 监听错误
    go func() {
        errorChan := batch.ErrorChan(10)
        for err := range errorChan {
            log.Printf("Redis batch processing error: %v", err)
        }
    }()
}
测试使用

延伸阅读

func TestBatchSQL(t *testing.T) {
    ctx := context.Background()
    
    // 使用模拟执行器进行测试
    // 内部使用 MockExecutor 直接实现 BatchExecutor 接口
    config := batchsql.PipelineConfig{
        BufferSize:    100,
        FlushSize:     10,
        FlushInterval: time.Second,
    }
    batch, mockExecutor := batchsql.NewBatchSQLWithMock(ctx, config)
    
    // 定义测试schema
    testSchema := batchsql.NewSchema("test_table", batchsql.ConflictIgnore, "id", "name")
    
    // 提交测试数据
    request := batchsql.NewRequest(testSchema).
        SetInt64("id", 1).
        SetString("name", "test")
    
    err := batch.Submit(ctx, request)
    assert.NoError(t, err)
    
    // 验证模拟执行器的调用
    time.Sleep(100 * time.Millisecond) // 等待批量处理
    assert.True(t, mockExecutor.WasCalled())
    
    // 获取执行的数据
    executedData := mockExecutor.GetExecutedData()
    assert.Len(t, executedData, 1)
}

📡 监控与指标(MetricsReporter)

  • 功能:统一上报入队延迟、攒批耗时、执行耗时、批大小、错误计数、执行并发、队列长度、在途批次等关键阶段与状态
  • 使用场景:
    • 开箱即用观测(Prometheus + Grafana)
    • 接入自有监控体系(实现自定义 Reporter)
  • 配置要点:
    • 默认 NoopMetricsReporter(零开销,未注入时不产生任何观测)
    • 务必在 NewBatchSQL 之前对执行器注入 Reporter(WithMetricsReporter)
    • NewBatchSQL 会尊重已注入的 Reporter,不会覆盖为 Noop

最小示例(Prometheus 快速上手)

pm := integration.NewPrometheusMetrics()
go pm.StartServer(9090)
defer pm.StopServer()

exec := batchsql.NewSQLThrottledBatchExecutorWithDriver(db, driver)
reporter := integration.NewPrometheusMetricsReporter(pm, "postgres", "user_batch")
exec = exec.WithMetricsReporter(reporter)

bs := batchsql.NewBatchSQL(ctx, 5000, 200, 100*time.Millisecond, exec)
defer bs.Close()

延伸阅读

Prometheus + Grafana 快速监控

BatchSQL 支持 Prometheus 指标收集和 Grafana 可视化,让你能够实时监控性能曲线变化。

  • 快速启动监控
# 使用 Make 命令(推荐)
make monitoring                           # 启动监控环境
make test-integration-with-monitoring     # 启动监控后运行测试

详细使用说明请参考:监控指南

Retry 指标
  • 指标设计
    • 每次判定为“可重试”都会上报一次:IncError(schema.Name, "retry:"+reason)
    • 最终失败(达到最大次数或不可重试)会上报:IncError(schema.Name, "final:"+reason)
    • 执行耗时统计(包含重试与退避):ObserveExecuteDuration(schema.Name, len(data), duration, status)
  • 常见原因标签(reason)
    • deadlock、lock_timeout、timeout、connection、io、context、non_retryable
  • PromQL 示例
    • 各表重试速率:sum(rate(batchsql_errors_total{type=~"retry:.*"}[5m])) by (table,type)
    • 最终失败速率:sum(rate(batchsql_errors_total{type=~"final:.*"}[5m])) by (table,type)
    • 重试占比(近5分钟):sum(rate(batchsql_errors_total{type=~"retry:.*"}[5m])) / sum(rate(batchsql_errors_total{type=~"(retry:|final:).*"}[5m]))
  • 实践建议
    • 观察“retry:”与“final:”的比值,若“final:*”持续升高需关注数据库稳定性与退避配置
    • ObserveExecuteDuration 已包含重试与退避时间,P95/99 将反映重试导致的尾部放大
运行时只读探测:MetricsReporter()
  • 功能定位:为执行器(BatchExecutor 实现者)提供一种“可选能力”以暴露其当前 MetricsReporter;NewBatchSQL 会基于该能力安全决定是否注入默认的 NoopMetricsReporter,从而避免误覆盖自定义执行器的监控实现。
  • 适用场景:
    • 你实现了自定义执行器,希望显式告知框架“我已有/没有 MetricsReporter”;
    • 希望 NewBatchSQL 能在“reporter 为空时自动注入 Noop”,“不为空时完全复用现有 reporter”,而不对未知执行器做强行覆盖。

接口定义

// MetricsProvider 可选能力:执行器可暴露其当前 MetricsReporter(若未设置则返回 nil)
type MetricsProvider interface {
    MetricsReporter() MetricsReporter
}

参数与返回值

  • 返回值
    • MetricsReporter:当前执行器使用的指标上报器实例;当执行器尚未配置 reporter 时,应返回 nil。
  • 约定
    • 返回 nil 表示“未设置任何 reporter”,框架可注入 NewNoopMetricsReporter 作为默认实现;
    • 返回非 nil 表示“已自行设置 reporter”,框架将尊重现有实现,绝不覆盖。

典型用法示例

  1. 执行器实现 MetricsProvider
type MyExecutor struct {
    reporter batchsql.MetricsReporter
}
func (e *MyExecutor) ExecuteBatch(ctx context.Context, schema *batchsql.Schema, data []map[string]any) error {
    // ...
    return nil
}
func (e *MyExecutor) WithMetricsReporter(r batchsql.MetricsReporter) batchsql.BatchExecutor {
    e.reporter = r
    return e
}
// 实现可选能力
func (e *MyExecutor) MetricsReporter() batchsql.MetricsReporter { return e.reporter }

// 组合到 BatchSQL
exec := &MyExecutor{}
// 若未设置 reporter,NewBatchSQL 将自动注入 Noop(不覆盖已有实现)
bs := batchsql.NewBatchSQL(ctx, cfg.BufferSize, cfg.FlushSize, cfg.FlushInterval, exec)
  1. 显式注入自定义 Reporter(推荐)
exec := &MyExecutor{}
prom := integration.NewPrometheusMetricsReporter(pm, "mysql", "user_batch")
exec = exec.WithMetricsReporter(prom)
// NewBatchSQL 发现 MetricsProvider 返回非 nil,将复用 prom,不会覆盖
bs := batchsql.NewBatchSQL(ctx, cfg.BufferSize, cfg.FlushSize, cfg.FlushInterval, exec)

与 NewBatchSQL 的交互逻辑(简述)

  • 若执行器实现了 MetricsProvider:
    • MetricsReporter() 返回非 nil:直接复用该 reporter;
    • 返回 nil:BatchSQL 在内部使用本地 NoopMetricsReporter 兜底,不写回执行器。
  • 若执行器未实现 MetricsProvider:
    • 不会强制覆盖执行器内部状态;NewBatchSQL 仅在内部使用本地 Noop 进行自有观测,保持外部行为稳定。

异常处理机制

  • ctx 取消与超时:执行器自身应优先处理上下文取消;框架不会因 reporter 缺失而改变取消语义。
  • reporter 为空:
    • 对实现 MetricsProvider 的执行器:返回 nil 即可,框架会自动注入 Noop;
    • 对未实现 MetricsProvider 的执行器:框架不会修改执行器,只在 BatchSQL 内部使用本地 Noop,避免 panic 或 nil 调用。
  • 执行器忽略 WithMetricsReporter:
    • 若执行器实现 MetricsProvider 且始终返回 nil,将被注入 Noop;若未实现 MetricsProvider,则由执行器自行负责内部调用安全(框架内部仍使用本地 Noop,避免外泄)。

性能考量

  • 零额外开销:仅一次类型断言与空值判断;NoopMetricsReporter 方法为空实现,分支预测友好,基本为零成本。
  • 无锁设计:建议执行器缓存 reporter 指针,不在热点路径加锁;NewBatchSQL 注入在构造期完成,不进入热路径。
  • 重试与限流:引入 MetricsProvider 不改变 ExecuteBatch 的重试/限流与指标上报路径;在 ctx.Done() 场景下仍能保证统一上报。

最佳实践

  • 在构造期(NewXxx 或 NewBatchSQL 之前)通过 WithMetricsReporter 显式注入自定义 reporter;
  • 若短期无监控接入需求,无需实现 MetricsProvider,框架会在内部使用本地 Noop 保持行为稳定;
  • 实现 MetricsProvider 时,确保返回值与 WithMetricsReporter 设置同步,避免出现“返回 nil 但内部已使用非空 reporter”的不一致状态。

📋 详细功能

延伸阅读

API 设计模式
默认方式(推荐)
// SQL数据库
mysqlBatch := batchsql.NewMySQLBatchSQL(ctx, db, config)
postgresBatch := batchsql.NewPostgreSQLBatchSQL(ctx, db, config)
sqliteBatch := batchsql.NewSQLiteBatchSQL(ctx, db, config)

// NoSQL数据库
redisBatch := batchsql.NewRedisBatchSQL(ctx, redisClient, config)

// 测试
batch, mockExecutor := batchsql.NewBatchSQLWithMock(ctx, config)
自定义方式(扩展支持)
// SQL数据库:支持自定义SQLDriver
customSQLDriver := &MyCustomSQLDriver{}
mysqlBatch := batchsql.NewMySQLBatchSQLWithDriver(ctx, db, config, customSQLDriver)

// Redis数据库:支持自定义RedisDriver
customRedisDriver := &MyCustomRedisDriver{}
redisBatch := batchsql.NewRedisBatchSQLWithDriver(ctx, redisClient, config, customRedisDriver)

// 测试:使用特定Driver的Mock
batch, mockExecutor := batchsql.NewBatchSQLWithMockDriver(ctx, config, customSQLDriver)

// 完全自定义:实现自己的BatchExecutor
type MyExecutor struct {
    // 自定义字段
}

func (e *MyExecutor) ExecuteBatch(ctx context.Context, schema *batchsql.Schema, data []map[string]any) error {
    // 自定义实现
    return nil
}

func (e *MyExecutor) WithMetricsReporter(reporter batchsql.MetricsReporter) batchsql.BatchExecutor {
    // 设置指标报告器
    return e
}

customExecutor := &MyExecutor{}
batch := batchsql.NewBatchSQL(ctx, config.BufferSize, config.FlushSize, config.FlushInterval, customExecutor)
冲突处理策略
type ConflictStrategy int

const (
    ConflictIgnore  ConflictStrategy = iota // 跳过冲突
    ConflictReplace                         // 覆盖冲突
    ConflictUpdate                          // 更新冲突
)
Schema 设计
// Schema专注于表结构定义,与数据库类型解耦
userSchema := batchsql.NewSchema("users", batchsql.ConflictIgnore, "id", "name", "email")
productSchema := batchsql.NewSchema("products", batchsql.ConflictUpdate, "id", "name", "price")

// 同一个Schema可以在不同数据库类型间复用
生成的 SQL 示例
MySQL
  • ConflictIgnore: INSERT IGNORE INTO users (id, name) VALUES (?, ?)
  • ConflictReplace: REPLACE INTO users (id, name) VALUES (?, ?)
  • ConflictUpdate: INSERT INTO users (id, name) VALUES (?, ?) ON DUPLICATE KEY UPDATE name = VALUES(name)
PostgreSQL
  • ConflictIgnore: INSERT INTO users (id, name) VALUES (?, ?) ON CONFLICT DO NOTHING
  • ConflictUpdate: INSERT INTO users (id, name) VALUES (?, ?) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name
SQLite
  • ConflictIgnore: INSERT OR IGNORE INTO users (id, name) VALUES (?, ?)
  • ConflictReplace: INSERT OR REPLACE INTO users (id, name) VALUES (?, ?)
  • ConflictUpdate: INSERT INTO users (id, name) VALUES (?, ?) ON CONFLICT DO UPDATE SET name = excluded.name
类型化的列操作
request := batchsql.NewRequest(schema).
    SetInt32("age", 30).
    SetInt64("id", 12345).
    SetFloat64("salary", 75000.50).
    SetString("name", "John Doe").
    SetBool("is_active", true).
    SetTime("created_at", time.Now()).
    SetBytes("data", []byte("binary data")).
    SetNull("optional_field")
获取类型化的值
if name, err := request.GetString("name"); err == nil {
    fmt.Printf("Name: %s", name)
}

if age, err := request.GetInt32("age"); err == nil {
    fmt.Printf("Age: %d", age)
}

高级用法

多数据库支持
import (
    "database/sql"
    "github.com/redis/go-redis/v9"
    _ "github.com/go-sql-driver/mysql"
    _ "github.com/lib/pq"
    _ "github.com/mattn/go-sqlite3"
)

func main() {
    ctx := context.Background()
    config := batchsql.PipelineConfig{
        BufferSize:    1000,
        FlushSize:     100,
        FlushInterval: 5 * time.Second,
    }
    
    // SQL数据库
    
    // MySQL
    mysqlDB, _ := sql.Open("mysql", "user:password@tcp(localhost:3306)/testdb")
    mysqlBatch := batchsql.NewMySQLBatchSQL(ctx, mysqlDB, config)
    
    // PostgreSQL
    postgresDB, _ := sql.Open("postgres", "postgres://user:password@localhost/testdb?sslmode=disable")
    postgresBatch := batchsql.NewPostgreSQLBatchSQL(ctx, postgresDB, config)
    
    // SQLite
    sqliteDB, _ := sql.Open("sqlite3", "./test.db")
    sqliteBatch := batchsql.NewSQLiteBatchSQL(ctx, sqliteDB, config)
    
    // NoSQL数据库
    
    // Redis
    redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    redisBatch := batchsql.NewRedisBatchSQL(ctx, redisClient, config)
    
    // 定义通用schema(可在不同数据库间复用)
    userSchema := batchsql.NewSchema("users", batchsql.ConflictIgnore, "id", "name")
    productSchema := batchsql.NewSchema("products", batchsql.ConflictUpdate, "id", "name", "price")
    
    // Redis专用schema(SETEX命令格式)
    cacheSchema := batchsql.NewSchema("cache", batchsql.ConflictReplace, "cmd", "key", "ttl", "value")
    
    // 每个BatchSQL处理对应数据库的多个表
    
    // MySQL处理用户和产品表
    mysqlBatch.Submit(ctx, batchsql.NewRequest(userSchema).SetInt64("id", 1).SetString("name", "User1"))
    mysqlBatch.Submit(ctx, batchsql.NewRequest(productSchema).SetInt64("id", 1).SetString("name", "Product1").SetFloat64("price", 99.99))
    
    // PostgreSQL处理相同的schema
    postgresBatch.Submit(ctx, batchsql.NewRequest(userSchema).SetInt64("id", 2).SetString("name", "User2"))
    
    // Redis处理缓存数据(使用SETEX命令)
    redisBatch.Submit(ctx, batchsql.NewRequest(cacheSchema).
        SetString("cmd", "SETEX").
        SetString("key", "user:1").
        SetInt64("ttl", 3600).
        SetString("value", `{"name":"User1","active":true}`))
}
第三方扩展示例
扩展SQL数据库支持(如TiDB)
// 实现SQLDriver接口
type TiDBDriver struct{}

func (d *TiDBDriver) GenerateInsertSQL(schema *batchsql.Schema, data []map[string]any) (string, []any, error) {
    // TiDB特定的批量插入优化
    // 可以使用TiDB的特殊语法或优化
    return sql, args, nil
}

// 使用自定义Driver,内部仍使用CommonExecutor架构
tidbDriver := &TiDBDriver{}
batch := batchsql.NewMySQLBatchSQLWithDriver(ctx, tidbDB, config, tidbDriver)
扩展NoSQL数据库支持(如MongoDB)
// 直接实现BatchExecutor接口
type MongoExecutor struct {
    client          *mongo.Client
    metricsReporter batchsql.MetricsReporter
}

func NewMongoBatchExecutor(client *mongo.Client) *MongoExecutor {
    return &MongoExecutor{client: client}
}

func (e *MongoExecutor) ExecuteBatch(ctx context.Context, schema *batchsql.Schema, data []map[string]any) error {
    if len(data) == 0 {
        return nil
    }
    
    // MongoDB特定的批量插入逻辑
    collection := e.client.Database("mydb").Collection(schema.Name)
    
    // 转换数据格式
    docs := make([]interface{}, len(data))
    for i, row := range data {
        docs[i] = row
    }
    
    // 执行批量插入
    _, err := collection.InsertMany(ctx, docs)
    return err
}

func (e *MongoExecutor) WithMetricsReporter(reporter batchsql.MetricsReporter) batchsql.BatchExecutor {
    e.metricsReporter = reporter
    return e
}

// 创建MongoDB BatchSQL
func NewMongoBatchSQL(ctx context.Context, client *mongo.Client, config batchsql.PipelineConfig) *batchsql.BatchSQL {
    executor := NewMongoBatchExecutor(client)
    return batchsql.NewBatchSQL(ctx, config.BufferSize, config.FlushSize, config.FlushInterval, executor)
}

// 使用
mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
mongoBatch := NewMongoBatchSQL(ctx, mongoClient, config)
可选并发限流示例
// 高级用法:自行构造可限流的执行器,再创建 BatchSQL
db, _ := sql.Open("mysql", dsn)
// 构造 SQL 执行器,并限制同时执行的批次数为 8
executor := batchsql.NewSQLThrottledBatchExecutorWithDriver(db, batchsql.DefaultMySQLDriver).
    WithConcurrencyLimit(8)

// 创建 BatchSQL(管道配置)
cfg := batchsql.PipelineConfig{BufferSize: 5000, FlushSize: 200, FlushInterval: 100 * time.Millisecond}
batch := batchsql.NewBatchSQL(ctx, cfg.BufferSize, cfg.FlushSize, cfg.FlushInterval, executor)

说明:

  • limit <= 0 时不启用限流,行为等价于默认
  • 限流发生在 ExecuteBatch 入口,避免攒批后过度并发压垮数据库
  • 指标上报与错误处理逻辑保持一致
框架集成示例

延伸阅读

// 与GORM集成
gormDB, _ := gorm.Open(mysql.Open(dsn), &gorm.Config{})
sqlDB, _ := gormDB.DB()
batch := batchsql.NewMySQLBatchSQL(ctx, sqlDB, config)

// 与sqlx集成
sqlxDB, _ := sqlx.Connect("mysql", dsn)
batch := batchsql.NewMySQLBatchSQL(ctx, sqlxDB.DB, config)

⚡ 性能优化

内存效率
  • 指针传递:使用 StandardPipeline[*Request] 而非值传递,减少内存复制
  • 智能聚合:按 schema 指针自动聚合相同表的请求,减少数据库操作次数
  • 全局Driver共享:SQLDriver实例全局共享,避免重复创建
  • 零拷贝设计:Request数据直接传递,无额外序列化开销
并发处理
  • 多goroutine安全:支持多 goroutine 并发提交请求
  • 自动分组:按 schema 指针聚合,确保相同表的请求批量处理
  • 异步处理:基于 go-pipeline 的异步处理,不阻塞主线程
  • 背压控制:缓冲区满时自动背压,防止内存溢出
数据库优化
  • 批量插入:自动生成优化的批量INSERT语句
  • 事务保证:每个批次使用单个事务,保证数据一致性
  • 连接复用:用户自己管理连接池,支持连接复用
  • SQL优化:针对不同数据库生成最优的SQL语法

延伸阅读

📊 质量评估

延伸阅读

基于最新集成测试报告的项目质量状态评估:

测试通过率
数据库 测试数量 通过 失败 通过率 BatchSQL 状态
SQLite 5 4 1 80% ✅ 正常(失败为 SQLite 架构限制)
MySQL 5 5 0 100% ✅ 优秀
PostgreSQL 5 5 0 100% ✅ 优秀
Redis 5 5 0 100% ✅ 优秀(三层架构重构完成)
总计 20 19 1 95% ✅ 优秀
性能指标
数据库 平均 RPS 最大 RPS 数据完整性 BatchSQL 性能评级
SQLite 105,246 199,071 80% 测试通过 ✅ 符合 SQLite 预期
MySQL 144,879 168,472 100% 测试通过 ✅ 优秀
PostgreSQL 152,586 191,037 100% 测试通过 ✅ 优秀
Redis 180,000+ 250,000+ 100% 测试通过 ✅ 优秀(三层架构优化)
技术说明

🔵 SQLite 架构限制(非项目缺陷):SQLite 是单写入者数据库,大批次并发写入失败属于数据库引擎固有限制
🟢 BatchSQL 功能完整:所有核心功能正常,错误检测机制完善
🟢 代码质量优秀:在 MySQL/PostgreSQL/Redis 上表现优异,证明实现正确

发布状态

当前状态:✅ 可以发布
项目质量:BatchSQL 核心功能完整,所有数据库驱动稳定可用
SQLite 说明:测试失败源于 SQLite 单写入者架构限制,非项目问题
使用建议

  • 高并发场景推荐 MySQL/PostgreSQL/Redis
  • 轻量级场景可用 SQLite
  • 缓存场景推荐 Redis(性能优异)

📚 文档导航

BatchSQL 提供完整的文档体系,按使用场景分类:

🚀 快速开始
📖 API 文档
📖 用户指南
🔧 开发文档
📊 测试报告

🐛 重要修复记录

延伸阅读

数据完整性监控指标修复 (2025-09-30)
  • 问题:Grafana 监控面板显示数据完整性为 10000% 而非正常的 100%
  • 原因:Prometheus 指标范围定义不一致(0-1 vs 0-100)
  • 修复:统一指标范围为 0-1,修复初始化和记录逻辑
  • 影响:✅ 监控面板现在正确显示数据完整性百分比
  • 详情修复日志

📋 测试

延伸阅读

单元测试
# 运行所有单元测试
go test -v

# 运行测试覆盖率分析
go test -cover -coverprofile=coverage.out
go tool cover -html=coverage.out
集成测试
# 运行所有数据库集成测试
make docker-all-tests

# 运行单个数据库测试
make docker-mysql-test      # MySQL 测试
make docker-postgres-test   # PostgreSQL 测试
make docker-sqlite-test     # SQLite 测试
make docker-redis-test      # Redis 测试

提示:性能观测请参见上文“📡 监控与指标(MetricsReporter)”章节的「Prometheus + Grafana 快速监控」小节。

测试覆盖范围
  • ✅ 基本批量处理功能
  • ✅ Schema 分组逻辑
  • ✅ SQL 生成正确性
  • ✅ Redis 操作生成正确性
  • ✅ 不同数据库类型和冲突策略
  • ✅ 错误处理和边界条件
  • ✅ 并发安全性测试
  • ✅ 大数据量压力测试
  • ✅ 数据库连接异常处理
  • ✅ Redis Pipeline 批量执行

详细测试文档:集成测试指南

🏗️ 文件结构

batchsql/
├── README.md
├── go.mod
├── go.sum
├── Makefile
├── .golangci.yml
├── .env.test
├── .env.sqlite.test
├── docker-compose.integration.yml
├── Dockerfile.integration
├── Dockerfile.sqlite.integration
├── batchsql.go              # 主入口与管道工厂
├── driver.go                # 驱动接口与实现入口(SQL/Redis等)
├── executor.go              # 执行器(含可选并发限流:WithConcurrencyLimit)
├── processor.go             # 处理器(SQL/Redis等各自批处理实现)
├── metrics_reporter.go      # 指标上报接口与默认实现
├── schema.go                # Schema 定义
├── request.go               # Request 定义
├── error.go                 # 错误定义
├── batchsql_test.go
├── benchmark_test.go
├── boundary_test.go
├── concurrency_test.go
├── db_connection_test.go
├── error_handling_test.go
├── large_data_test.go
├── docs/
│   ├── index.md
│   ├── api/
│   │   ├── reference.md
│   │   └── configuration.md
│   ├── guides/
│   │   ├── examples.md
│   │   ├── testing.md
│   │   ├── monitoring.md
│   │   ├── monitoring-quickstart.md
│   │   ├── custom-metrics-reporter.md
│   │   ├── tuning.md
│   │   ├── troubleshooting.md
│   │   └── integration-tests.md
│   ├── development/
│   │   ├── architecture.md
│   │   ├── contributing.md
│   │   ├── changelog.md
│   │   ├── quality.md
│   │   └── release.md
│   └── reports/
│       ├── PERFORMANCE_ANALYSIS.md
│       ├── SQLITE_OPTIMIZATION.md
│       ├── TEST_REPORT_ANALYSIS.md
│       └── sqlite-tools.md
├── scripts/

└── test/
    ├── integration/
    │   ├── config.go
    │   ├── main.go
    │   ├── metrics_reporter.go
    │   ├── prometheus.go
    │   ├── prometheus.yml
    │   ├── redis_tests.go
    │   ├── reports.go
    │   ├── run-single-db-test.sh
    │   ├── sql_tests.go
    │   ├── types.go
    │   ├── utils.go
    │   └── grafana/
    │       └── provisioning/...
    ├── sql/
    │   ├── mysql/init.sql
    │   ├── postgres/init.sql
    │   └── sqlite/init.sql
    └── sqlite/
        └── tools/...

🤝 贡献

延伸阅读

欢迎提交 Issue 和 Pull Request!

开发流程
  1. Fork 项目
  2. 创建功能分支
  3. 运行完整测试:make ci
  4. 提交 Pull Request
测试要求
  • 所有单元测试必须通过
  • 集成测试通过率 ≥ 90%
  • 代码覆盖率 ≥ 60%
  • 通过 golangci-lint 检查

📄 许可证

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyRequest 空请求错误
	ErrEmptyRequest = errors.New("empty request")

	// ErrContextCanceled 上下文被取消错误
	ErrContextCanceled = errors.New("context canceled")

	// ErrInvalidSchema 无效的 schema 错误
	ErrInvalidSchema = errors.New("invalid schema")

	// ErrMissingColumn 缺少列错误
	ErrMissingColumn = errors.New("missing required column")

	// ErrInvalidColumnType 无效的列类型错误
	ErrInvalidColumnType = errors.New("invalid column type")

	// ErrEmptyBatch 空批次错误
	ErrEmptyBatch = errors.New("empty batch")

	// ErrEmptySchemaName 空表名错误
	ErrEmptySchemaName = errors.New("empty schema name")
)
View Source
var DefaultMySQLDriver = NewMySQLDriver()
View Source
var DefaultPostgreSQLDriver = NewPostgreSQLDriver()
View Source
var DefaultRedisPipelineDriver = NewRedisPipelineDriver()
View Source
var DefaultSQLiteDriver = NewSQLiteDriver()

Functions

func NewBatchSQLWithMock

func NewBatchSQLWithMock(ctx context.Context, config PipelineConfig) (*BatchSQL, *MockExecutor)

NewBatchSQLWithMock 使用模拟执行器创建 BatchSQL 实例(用于测试) 内部架构:BatchSQL -> MockExecutor(直接实现BatchExecutor,无真实数据库操作) 适用于单元测试,不依赖真实数据库连接

func NewBatchSQLWithMockDriver

func NewBatchSQLWithMockDriver(ctx context.Context, config PipelineConfig, sqlDriver SQLDriver) (*BatchSQL, *MockExecutor)

NewBatchSQLWithMockDriver 使用模拟执行器创建 BatchSQL 实例(测试特定SQLDriver) 内部架构:BatchSQL -> MockExecutor(模拟CommonExecutor行为,测试SQLDriver逻辑) 适用于测试自定义SQLDriver的SQL生成逻辑

Types

type BatchExecutor

type BatchExecutor interface {
	// ExecuteBatch 执行批量操作
	ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error
}

BatchExecutor 批量执行器接口 - 所有数据库驱动的统一入口

type BatchProcessor

type BatchProcessor interface {
	// GenerateOperations 生成批量操作
	GenerateOperations(ctx context.Context, schema *Schema, data []map[string]any) (operations Operations, err error)

	// ExecuteOperations 执行批量操作
	ExecuteOperations(ctx context.Context, operations Operations) error
}

BatchProcessor 批量处理器接口 - SQL数据库的核心处理逻辑

type BatchSQL

type BatchSQL struct {
	// contains filtered or unexported fields
}

BatchSQL 批量处理管道 核心组件,整合 go-pipeline 和 BatchExecutor,提供统一的批量处理接口

架构层次: Application -> BatchSQL -> gopipeline -> BatchExecutor -> Database

支持的 BatchExecutor 实现: - SQL 数据库:ThrottledBatchExecutor + SQLBatchProcessor + SQLDriver - NoSQL 数据库:ThrottledBatchExecutor + RedisBatchProcessor + RedisDriver(直接生成/执行命令) - 测试环境:MockExecutor(直接实现 BatchExecutor) 可选能力: - WithConcurrencyLimit:通过信号量限制 ExecuteBatch 并发,避免攒批后同时冲击数据库(limit <= 0 等价于不限流)

func NewBatchSQL

func NewBatchSQL(ctx context.Context, buffSize uint32, flushSize uint32, flushInterval time.Duration, executor BatchExecutor) *BatchSQL

NewBatchSQL 创建 BatchSQL 实例 这是最底层的构造函数,接受任何实现了BatchExecutor接口的执行器 通常不直接使用,而是通过具体数据库的工厂方法创建

func NewMySQLBatchSQL

func NewMySQLBatchSQL(ctx context.Context, db *sql.DB, config PipelineConfig) *BatchSQL

NewMySQLBatchSQL 创建MySQL BatchSQL实例(使用默认Driver)

内部架构:BatchSQL -> ThrottledBatchExecutor -> SQLBatchProcessor -> MySQLDriver -> MySQL

这是推荐的使用方式,使用MySQL优化的默认配置

func NewMySQLBatchSQLWithDriver

func NewMySQLBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL

NewMySQLBatchSQLWithDriver 创建MySQL BatchSQL实例(使用自定义Driver)

内部架构:BatchSQL -> ThrottledBatchExecutor -> SQLBatchProcessor -> CustomDriver -> MySQL

适用于需要自定义SQL生成逻辑的场景(如TiDB优化)

func NewPostgreSQLBatchSQL

func NewPostgreSQLBatchSQL(ctx context.Context, db *sql.DB, config PipelineConfig) *BatchSQL

NewPostgreSQLBatchSQL 创建PostgreSQL BatchSQL实例(使用默认Driver)

func NewPostgreSQLBatchSQLWithDriver

func NewPostgreSQLBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL

NewPostgreSQLBatchSQLWithDriver 创建PostgreSQL BatchSQL实例(使用自定义Driver)

func NewRedisBatchSQL

func NewRedisBatchSQL(ctx context.Context, db *redisV9.Client, config PipelineConfig) *BatchSQL

NewRedisBatchSQL 创建Redis BatchSQL实例

内部架构(NoSQL):BatchSQL -> ThrottledBatchExecutor -> RedisBatchProcessor -> RedisDriver -> Redis 说明:NoSQL 路径不使用 SQL 抽象层,直接生成并执行 Redis 命令;仍可启用 WithConcurrencyLimit 控制批次并发。

func NewRedisBatchSQLWithDriver

func NewRedisBatchSQLWithDriver(ctx context.Context, db *redisV9.Client, config PipelineConfig, driver RedisDriver) *BatchSQL

func NewSQLiteBatchSQL

func NewSQLiteBatchSQL(ctx context.Context, db *sql.DB, config PipelineConfig) *BatchSQL

NewSQLiteBatchSQL 创建SQLite BatchSQL实例(使用默认Driver)

func NewSQLiteBatchSQLWithDriver

func NewSQLiteBatchSQLWithDriver(ctx context.Context, db *sql.DB, config PipelineConfig, driver SQLDriver) *BatchSQL

NewSQLiteBatchSQLWithDriver 创建SQLite BatchSQL实例(使用自定义Driver)

func (*BatchSQL) ErrorChan

func (b *BatchSQL) ErrorChan(size int) <-chan error

ErrorChan 获取错误通道

func (*BatchSQL) Submit

func (b *BatchSQL) Submit(ctx context.Context, request *Request) error

Submit 提交请求到批量处理管道

type ConcurrencyCapable added in v1.3.0

type ConcurrencyCapable[T any] interface {
	WithConcurrencyLimit(int) T
}

type ConflictStrategy

type ConflictStrategy uint8

ConflictStrategy 冲突处理策略

const (
	ConflictIgnore ConflictStrategy = iota
	ConflictReplace
	ConflictUpdate
)

type MetricsCapable added in v1.3.0

type MetricsCapable[T any] interface {
	// WithMetricsReporter 设置性能监控报告器(返回实现者类型以支持链式)
	WithMetricsReporter(MetricsReporter) T
	// MetricsReporter 返回当前性能监控报告器(可能为 nil)
	MetricsReporter() MetricsReporter
}

Metrics 相关接口设计说明

  • BatchExecutor:仅负责“执行”职责,保持最小接口。
  • MetricsCapable[T](泛型):提供“读 + 写”的度量能力,方法返回自类型 T,便于在具体类型或已实例化接口上进行链式配置。 注意:泛型接口在运行时类型断言时必须使用具体的类型实参(如 MetricsCapable[*ThrottledBatchExecutor])。
  • 兼容性与运行时探测: 在 BatchSQL 等仅持有 BatchExecutor 的位置,无法统一断言到 MetricsCapable[T], 因此使用一个非泛型只读探测接口(即仅调用 MetricsReporter())来判断是否已有 Reporter; 若返回 nil,则在本地使用 Noop 兜底,不强制写回(写回需要具体类型 T)。

这允许: - 在需要链式的调用点:以具体类型或已实例化能力接口使用 Set/With 风格链式; - 在框架内部通用路径:通过只读探测保证安全、零开销的观测兜底。

MetricsCapable 扩展接口:支持性能监控报告器(自类型泛型)

type MetricsConfig

type MetricsConfig struct {
	Enabled bool
}

MetricsConfig 指标导出配置(Step 1:骨架;默认关闭采样)

type MetricsReporter

type MetricsReporter interface {
	// 阶段耗时
	ObserveEnqueueLatency(d time.Duration)                                      // Submit -> 入队
	ObserveBatchAssemble(d time.Duration)                                       // 攒批/组装
	ObserveExecuteDuration(table string, n int, d time.Duration, status string) // 执行

	// 其他观测
	ObserveBatchSize(n int)
	IncError(table string, typ string)
	SetConcurrency(n int)
	SetQueueLength(n int)
	// 在途批次数(不限流也可观察执行压力)
	IncInflight()
	DecInflight()
}

MetricsReporter 统一指标接口(默认 Noop 实现,避免启用前引入开销)

type MockDriver

type MockDriver struct {
	// contains filtered or unexported fields
}

func NewMockDriver

func NewMockDriver(databaseType string) *MockDriver

func (*MockDriver) GenerateInsertSQL

func (d *MockDriver) GenerateInsertSQL(ctx context.Context, schema *Schema, data []map[string]any) (string, []any, error)

GenerateInsertSQL 生成模拟SQL(默认MySQL语法)

type MockExecutor

type MockExecutor struct {
	ExecutedBatches [][]map[string]any
	// contains filtered or unexported fields
}

Executor 模拟批量执行器(用于测试)

func NewMockExecutor

func NewMockExecutor() *MockExecutor

NewMockExecutor 创建模拟批量执行器(使用默认Driver)

func NewMockExecutorWithDriver

func NewMockExecutorWithDriver(driver SQLDriver) *MockExecutor

NewMockExecutorWithDriver 创建模拟批量执行器(使用自定义Driver)

func (*MockExecutor) ExecuteBatch

func (e *MockExecutor) ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error

ExecuteBatch 模拟执行批量操作

func (*MockExecutor) SnapshotExecutedBatches

func (e *MockExecutor) SnapshotExecutedBatches() [][]map[string]any

SnapshotExecutedBatches 返回一次性快照,避免并发读写竞态

func (*MockExecutor) SnapshotResults

func (e *MockExecutor) SnapshotResults() map[string]map[string]int64

SnapshotResults 返回只读快照(拷贝),用于测试收尾输出或断言

type MySQLDriver

type MySQLDriver struct {
	// contains filtered or unexported fields
}

func NewMySQLDriver

func NewMySQLDriver() *MySQLDriver

func (*MySQLDriver) GenerateInsertSQL

func (d *MySQLDriver) GenerateInsertSQL(ctx context.Context, schema *Schema, data []map[string]any) (string, []any, error)

GenerateInsertSQL 生成MySQL批量插入SQL

type NoopMetricsReporter

type NoopMetricsReporter struct{}

NoopMetricsReporter 默认关闭时的无操作实现(零开销路径)

func NewNoopMetricsReporter

func NewNoopMetricsReporter() *NoopMetricsReporter

func (*NoopMetricsReporter) DecInflight

func (*NoopMetricsReporter) DecInflight()

func (*NoopMetricsReporter) IncError

func (*NoopMetricsReporter) IncError(string, string)

func (*NoopMetricsReporter) IncInflight

func (*NoopMetricsReporter) IncInflight()

func (*NoopMetricsReporter) ObserveBatchAssemble

func (*NoopMetricsReporter) ObserveBatchAssemble(time.Duration)

func (*NoopMetricsReporter) ObserveBatchSize

func (*NoopMetricsReporter) ObserveBatchSize(int)

func (*NoopMetricsReporter) ObserveEnqueueLatency

func (*NoopMetricsReporter) ObserveEnqueueLatency(time.Duration)

func (*NoopMetricsReporter) ObserveExecuteDuration

func (*NoopMetricsReporter) ObserveExecuteDuration(string, int, time.Duration, string)

func (*NoopMetricsReporter) SetConcurrency

func (*NoopMetricsReporter) SetConcurrency(int)

func (*NoopMetricsReporter) SetQueueLength

func (*NoopMetricsReporter) SetQueueLength(int)

type Operations

type Operations []any

type PipelineConfig

type PipelineConfig struct {
	BufferSize    uint32
	FlushSize     uint32
	FlushInterval time.Duration

	// Step 2: 可选重试配置(零值=关闭,向后兼容)
	Retry RetryConfig
}

PipelineConfig 管道配置

type PostgreSQLDriver

type PostgreSQLDriver struct {
	// contains filtered or unexported fields
}

func NewPostgreSQLDriver

func NewPostgreSQLDriver() *PostgreSQLDriver

func (*PostgreSQLDriver) GenerateInsertSQL

func (d *PostgreSQLDriver) GenerateInsertSQL(ctx context.Context, schema *Schema, data []map[string]any) (string, []any, error)

GenerateInsertSQL 生成PostgreSQL批量插入SQL

type RedisBatchProcessor

type RedisBatchProcessor struct {
	// contains filtered or unexported fields
}

RedisBatchProcessor Redis批量处理器 实现 BatchProcessor 接口,专注于Redis的核心处理逻辑

func NewRedisBatchProcessor

func NewRedisBatchProcessor(client *redis.Client, driver RedisDriver) *RedisBatchProcessor

NewRedisBatchProcessor 创建Redis批量处理器 参数: - client: Redis客户端连接 - driver: Redis操作生成器

func (*RedisBatchProcessor) ExecuteOperations

func (rp *RedisBatchProcessor) ExecuteOperations(ctx context.Context, operations Operations) error

func (*RedisBatchProcessor) GenerateOperations

func (rp *RedisBatchProcessor) GenerateOperations(ctx context.Context, schema *Schema, data []map[string]any) (operations Operations, err error)

GenerateOperations 执行批量操作

type RedisCmd

type RedisCmd []any

type RedisDriver

type RedisDriver interface {
	GenerateCmds(ctx context.Context, schema *Schema, data []map[string]any) ([]RedisCmd, error)
}

type RedisPipelineDriver

type RedisPipelineDriver struct{}

func NewRedisPipelineDriver

func NewRedisPipelineDriver() *RedisPipelineDriver

func (*RedisPipelineDriver) GenerateCmds

func (d *RedisPipelineDriver) GenerateCmds(ctx context.Context, schema *Schema, data []map[string]any) ([]RedisCmd, error)

type Request

type Request struct {
	// contains filtered or unexported fields
}

用来存储请求的数据的各种字段信息和对应的schema

func NewRequest

func NewRequest(schema *Schema) *Request

func (*Request) Columns

func (r *Request) Columns() map[string]any

Columns 获取所有列数据

func (*Request) GetBool

func (r *Request) GetBool(colName string) (bool, error)

func (*Request) GetFloat64

func (r *Request) GetFloat64(colName string) (float64, error)

func (*Request) GetInt32

func (r *Request) GetInt32(colName string) (int32, error)

类型化的获取方法

func (*Request) GetInt64

func (r *Request) GetInt64(colName string) (int64, error)

func (*Request) GetOrderedValues

func (r *Request) GetOrderedValues() []any

GetOrderedValues 按照 schema 中定义的列顺序返回值

func (*Request) GetString

func (r *Request) GetString(colName string) (string, error)

func (*Request) GetTime

func (r *Request) GetTime(colName string) (time.Time, error)

func (*Request) Schema

func (r *Request) Schema() *Schema

Schema 获取请求的 schema

func (*Request) Set

func (r *Request) Set(colName string, value any) *Request

通用设置方法

func (*Request) SetBool

func (r *Request) SetBool(colName string, value bool) *Request

func (*Request) SetBytes

func (r *Request) SetBytes(colName string, value []byte) *Request

func (*Request) SetFloat32

func (r *Request) SetFloat32(colName string, value float32) *Request

func (*Request) SetFloat64

func (r *Request) SetFloat64(colName string, value float64) *Request

func (*Request) SetInt32

func (r *Request) SetInt32(colName string, value int32) *Request

类型化的设置方法

func (*Request) SetInt64

func (r *Request) SetInt64(colName string, value int64) *Request

func (*Request) SetNull

func (r *Request) SetNull(colName string) *Request

func (*Request) SetString

func (r *Request) SetString(colName string, value string) *Request

func (*Request) SetTime

func (r *Request) SetTime(colName string, value time.Time) *Request

func (*Request) Validate

func (r *Request) Validate() error

验证请求是否包含所有必需的列

type RetryConfig

type RetryConfig struct {
	Enabled     bool
	MaxAttempts int           // 总尝试次数(含首轮),建议 2~3
	BackoffBase time.Duration // 退避基值(指数退避起点)
	MaxBackoff  time.Duration // 最大退避时长(上限)
	// 自定义错误分类(可选);返回是否可重试与原因标签
	Classifier func(error) (retryable bool, reason string)
}

RetryConfig 可选重试配置(零值关闭)

type SQLBatchProcessor

type SQLBatchProcessor struct {
	// contains filtered or unexported fields
}

SQLBatchProcessor SQL数据库批量处理器 实现 BatchProcessor 接口,专注于SQL数据库的核心处理逻辑

func NewSQLBatchProcessor

func NewSQLBatchProcessor(db *sql.DB, driver SQLDriver) *SQLBatchProcessor

NewSQLBatchProcessor 创建SQL批量处理器 参数: - db: 数据库连接(用户管理连接池) - driver: 数据库特定的SQL生成器

func (*SQLBatchProcessor) ExecuteOperations

func (bp *SQLBatchProcessor) ExecuteOperations(ctx context.Context, operations Operations) error

func (*SQLBatchProcessor) GenerateOperations

func (bp *SQLBatchProcessor) GenerateOperations(ctx context.Context, schema *Schema, data []map[string]any) (operations Operations, err error)

type SQLDriver

type SQLDriver interface {
	GenerateInsertSQL(ctx context.Context, schema *Schema, data []map[string]any) (sql string, args []any, err error)
}

SQLDriver 数据库特定的SQL生成器接口

type SQLiteDriver

type SQLiteDriver struct {
	// contains filtered or unexported fields
}

func NewSQLiteDriver

func NewSQLiteDriver() *SQLiteDriver

func (*SQLiteDriver) GenerateInsertSQL

func (d *SQLiteDriver) GenerateInsertSQL(ctx context.Context, schema *Schema, data []map[string]any) (string, []any, error)

GenerateInsertSQL 生成SQLite批量插入SQL

type Schema

type Schema struct {
	Name             string
	Columns          []string
	ConflictStrategy ConflictStrategy
}

Schema 表结构定义

func NewSchema

func NewSchema(
	name string,
	conflictStrategy ConflictStrategy,
	columns ...string,
) *Schema

NewSchema 创建新的Schema实例

type ThrottledBatchExecutor

type ThrottledBatchExecutor struct {
	// contains filtered or unexported fields
}

ThrottledBatchExecutor SQL数据库通用批量执行器 实现 ThrottledBatchExecutor 接口,为SQL数据库提供统一的执行逻辑 架构:ThrottledBatchExecutor -> BatchProcessor -> SQLDriver -> Database

设计优势: - 代码复用:所有SQL数据库共享相同的执行逻辑和指标收集 - 职责分离:执行控制与具体处理逻辑分离 - 易于扩展:新增SQL数据库只需实现SQLDriver接口

func NewRedisThrottledBatchExecutor

func NewRedisThrottledBatchExecutor(client *redis.Client) *ThrottledBatchExecutor

func NewRedisThrottledBatchExecutorWithDriver

func NewRedisThrottledBatchExecutorWithDriver(client *redis.Client, driver RedisDriver) *ThrottledBatchExecutor

func NewSQLThrottledBatchExecutorWithDriver

func NewSQLThrottledBatchExecutorWithDriver(db *sql.DB, driver SQLDriver) *ThrottledBatchExecutor

NewThrottledBatchExecutorWithDriver 创建SQL数据库执行器(推荐方式) 内部使用 SQLBatchProcessor + SQLDriver 组合

func NewThrottledBatchExecutor

func NewThrottledBatchExecutor(processor BatchProcessor) *ThrottledBatchExecutor

NewThrottledBatchExecutor 创建通用执行器(使用自定义BatchProcessor)

func (*ThrottledBatchExecutor) ExecuteBatch

func (e *ThrottledBatchExecutor) ExecuteBatch(ctx context.Context, schema *Schema, data []map[string]any) error

ExecuteBatch 执行批量操作

func (*ThrottledBatchExecutor) MetricsReporter

func (e *ThrottledBatchExecutor) MetricsReporter() MetricsReporter

WithConcurrencyLimit 设置并发上限(limit <= 0 表示不启用限流)

func (*ThrottledBatchExecutor) WithConcurrencyLimit

func (e *ThrottledBatchExecutor) WithConcurrencyLimit(limit int) *ThrottledBatchExecutor

func (*ThrottledBatchExecutor) WithMetricsReporter

func (e *ThrottledBatchExecutor) WithMetricsReporter(metricsReporter MetricsReporter) *ThrottledBatchExecutor

WithMetricsReporter 设置指标报告器

func (*ThrottledBatchExecutor) WithRetryConfig

WithRetryConfig 启用/配置重试(仅对 ThrottledBatchExecutor 可用)

Directories

Path Synopsis
examples
test
integration command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL