sharding

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2023 License: MIT Imports: 14 Imported by: 0

README

Gorm Sharding

Go

Gorm Sharding plugin using SQL parser and replace for splits large tables into smaller ones, redirects Query into sharding tables. Give you a high performance database access.

Gorm Sharding 是一个高性能的数据库分表中间件。

它基于 Conn 层做 SQL 拦截、AST 解析、分表路由、自增主键填充,带来的额外开销极小。对开发者友好、透明,使用上与普通 SQL、Gorm 查询无差别,只需要额外注意一下分表键条件。

Features

  • Non-intrusive design. Load the plugin, specify the config, and all done.
  • Lighting-fast. No network based middlewares, as fast as Go.
  • Multiple database (PostgreSQL, MySQL) support.
  • Integrated primary key generator (Snowflake, PostgreSQL Sequence, Custom, ...).

Install

go get -u gorm.io/sharding

Usage

Config the sharding middleware, register the tables which you want to shard.

import (
  "fmt"

  "gorm.io/driver/postgres"
  "gorm.io/gorm"
  "gorm.io/sharding"
)

db, err := gorm.Open(postgres.New(postgres.Config{DSN: "postgres://localhost:5432/sharding-db?sslmode=disable"))

db.Use(sharding.Register(sharding.Config{
    ShardingKey:         "user_id",
    NumberOfShards:      64,
    PrimaryKeyGenerator: sharding.PKSnowflake,
}, "orders", Notification{}, AuditLog{}))
// This case for show up give notifications, audit_logs table use same sharding rule.

Use the db session as usual. Just note that the query should have the Sharding Key when operate sharding tables.

// Gorm create example, this will insert to orders_02
db.Create(&Order{UserID: 2})
// sql: INSERT INTO orders_2 ...

// Show have use Raw SQL to insert, this will insert into orders_03
db.Exec("INSERT INTO orders(user_id) VALUES(?)", int64(3))

// This will throw ErrMissingShardingKey error, because there not have sharding key presented.
db.Create(&Order{Amount: 10, ProductID: 100})
fmt.Println(err)

// Find, this will redirect query to orders_02
var orders []Order
db.Model(&Order{}).Where("user_id", int64(2)).Find(&orders)
fmt.Printf("%#v\n", orders)

// Raw SQL also supported
db.Raw("SELECT * FROM orders WHERE user_id = ?", int64(3)).Scan(&orders)
fmt.Printf("%#v\n", orders)

// This will throw ErrMissingShardingKey error, because WHERE conditions not included sharding key
err = db.Model(&Order{}).Where("product_id", "1").Find(&orders).Error
fmt.Println(err)

// Update and Delete are similar to create and query
db.Exec("UPDATE orders SET product_id = ? WHERE user_id = ?", 2, int64(3))
err = db.Exec("DELETE FROM orders WHERE product_id = 3").Error
fmt.Println(err) // ErrMissingShardingKey

The full example is here.

🚨 NOTE: Gorm config PrepareStmt: true is not supported for now.

🚨 NOTE: Default snowflake generator in multiple nodes may result conflicted primary key, use your custom primary key generator, or regenerate a primary key when conflict occurs.

Primary Key

When you sharding tables, you need consider how the primary key generate.

Recommend options:

Use Snowflake

Built-in Snowflake primary key generator.

db.Use(sharding.Register(sharding.Config{
    ShardingKey:         "user_id",
    NumberOfShards:      64,
    PrimaryKeyGenerator: sharding.PKSnowflake,
}, "orders")
Use PostgreSQL Sequence

There has built-in PostgreSQL sequence primary key implementation in Gorm Sharding, you just configure PrimaryKeyGenerator: sharding.PKPGSequence to use.

You don't need create sequence manually, Gorm Sharding check and create when the PostgreSQL sequence does not exists.

This sequence name followed gorm_sharding_${table_name}_id_seq, for example orders table, the sequence name is gorm_sharding_orders_id_seq.

db.Use(sharding.Register(sharding.Config{
    ShardingKey:         "user_id",
    NumberOfShards:      64,
    PrimaryKeyGenerator: sharding.PKPGSequence,
}, "orders")

Combining with dbresolver

🚨 NOTE: Use dbresolver first.

dsn := "host=localhost user=gorm password=gorm dbname=gorm port=5432 sslmode=disable"
dsnRead := "host=localhost user=gorm password=gorm dbname=gorm-slave port=5432 sslmode=disable"

conn := postgres.Open(dsn)
connRead := postgres.Open(dsnRead)

db, err := gorm.Open(conn, &gorm.Config{})
dbRead, err := gorm.Open(conn, &gorm.Config{})

db.Use(dbresolver.Register(dbresolver.Config{
  Replicas: []gorm.Dialector{dbRead.Dialector},
}))

db.Use(sharding.Register(sharding.Config{
  ShardingKey:         "user_id",
  NumberOfShards:      64,
  PrimaryKeyGenerator: sharding.PKSnowflake,
}))

Sharding process

This graph show up how Gorm Sharding works.

graph TD
first("SELECT * FROM orders WHERE user_id = ? AND status = ?
args = [100, 1]")

first--->gorm(["Gorm Query"])

subgraph "Gorm"
  gorm--->gorm_query
  gorm--->gorm_exec
  gorm--->gorm_queryrow
  gorm_query["connPool.QueryContext(sql, args)"]
  gorm_exec[/"connPool.ExecContext"/]
  gorm_queryrow[/"connPool.QueryRowContext"/]
end

subgraph "database/sql"
  gorm_query-->conn(["Conn"])
  gorm_exec-->conn(["Conn"])
  gorm_queryrow-->conn(["Conn"])
  ExecContext[/"ExecContext"/]
  QueryContext[/"QueryContext"/]
  QueryRowContext[/"QueryRowContext"/]


  conn-->ExecContext
  conn-->QueryRowContext
  conn-->QueryContext
end

subgraph sharding ["Sharding"]
  QueryContext-->router-->| Format to get full SQL string |format_sql-->| Parser to AST |parse-->check_table
  router[["router(sql, args)<br>"]]
  format_sql>"sql = SELECT * FROM orders WHERE user_id = 100 AND status = 1"]

  check_table{"Check sharding rules<br>by table name"}
  check_table-->| Exist |process_ast
  check_table_1{{"Return Raw SQL"}}
  not_match_error[/"Return Error<br>SQL query must has sharding key"\]

  parse[["ast = sqlparser.Parse(sql)"]]

  check_table-.->| Not exist |check_table_1
  process_ast(("Sharding rules"))
  get_new_table_name[["Use value in WhereValue (100) for get sharding table index<br>orders + (100 % 16)<br>Sharding Table = orders_4"]]
  new_sql{{"SELECT * FROM orders_4 WHERE user_id = 100 AND status = 1"}}

  process_ast-.->| Not match ShardingKey |not_match_error
  process_ast-->| Match ShardingKey |match_sharding_key-->| Get table name |get_new_table_name-->| Replace TableName to get new SQL |new_sql
end


subgraph database [Database]
  orders_other[("orders_0, orders_1 ... orders_3")]
  orders_4[(orders_4)]
  orders_last[("orders_5 ... orders_15")]
  other_tables[(Other non-sharding tables<br>users, stocks, topics ...)]

  new_sql-->| Sharding Query | orders_4
  check_table_1-.->| None sharding Query |other_tables
end

orders_4-->result
other_tables-.->result
result[/Query results\]

License

MIT license.

Original fork from Longbridge.

Documentation

Index

Constants

View Source
const (
	// Use Snowflake primary key generator
	PKSnowflake = iota
	// Use PostgreSQL sequence primary key generator
	PKPGSequence
	// Use custom primary key generator
	PKCustom
)

Variables

View Source
var (
	ErrMissingShardingKey = errors.New("sharding key or id required, and use operator =")
	ErrInvalidID          = errors.New("invalid id format")
	ErrInsertDiffSuffix   = errors.New("can not insert different suffix table in one query ")
)
View Source
var (
	ShardingIgnoreStoreKey = "sharding_ignore"
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// When DoubleWrite enabled, data will double write to both main table and sharding table.
	DoubleWrite bool

	// ShardingKey specifies the table column you want to used for sharding the table rows.
	// For example, for a product order table, you may want to split the rows by `user_id`.
	ShardingKey string

	// NumberOfShards specifies how many tables you want to sharding.
	NumberOfShards uint

	// ShardingAlgorithm specifies a function to generate the sharding
	// table's suffix by the column value.
	// For example, this function implements a mod sharding algorithm.
	//
	// 	func(value interface{}) (suffix string, err error) {
	//		if uid, ok := value.(int64);ok {
	//			return fmt.Sprintf("_%02d", user_id % 64), nil
	//		}
	//		return "", errors.New("invalid user_id")
	// 	}
	ShardingAlgorithm func(columnValue interface{}) (suffix string, err error)

	// ShardingSuffixs specifies a function to generate all table's suffix.
	// Used to support Migrator and generate PrimaryKey.
	// For example, this function get a mod all sharding suffixs.
	//
	// func () (suffixs []string) {
	// 	numberOfShards := 5
	// 	for i := 0; i < numberOfShards; i++ {
	// 		suffixs = append(suffixs, fmt.Sprintf("_%02d", i%numberOfShards))
	// 	}
	// 	return
	// }
	ShardingSuffixs func() (suffixs []string)

	// ShardingAlgorithmByPrimaryKey specifies a function to generate the sharding
	// table's suffix by the primary key. Used when no sharding key specified.
	// For example, this function use the Snowflake library to generate the suffix.
	//
	// 	func(id int64) (suffix string) {
	//		return fmt.Sprintf("_%02d", snowflake.ParseInt64(id).Node())
	//	}
	ShardingAlgorithmByPrimaryKey func(id int64) (suffix string)

	// PrimaryKeyGenerator specifies the primary key generate algorithm.
	// Used only when insert and the record does not contains an id field.
	// Options are PKSnowflake, PKPGSequence and PKCustom.
	// When use PKCustom, you should also specify PrimaryKeyGeneratorFn.
	PrimaryKeyGenerator int

	// PrimaryKeyGeneratorFn specifies a function to generate the primary key.
	// When use auto-increment like generator, the tableIdx argument could ignored.
	// For example, this function use the Snowflake library to generate the primary key.
	//
	// 	func(tableIdx int64) int64 {
	//		return nodes[tableIdx].Generate().Int64()
	//	}
	PrimaryKeyGeneratorFn func(tableIdx int64) int64
	// contains filtered or unexported fields
}

Config specifies the configuration for sharding.

type ConnPool

type ConnPool struct {
	gorm.ConnPool
	// contains filtered or unexported fields
}

ConnPool Implement a ConnPool for replace db.Statement.ConnPool in Gorm

func (*ConnPool) BeginTx

func (pool *ConnPool) BeginTx(ctx context.Context, opt *sql.TxOptions) (gorm.ConnPool, error)

BeginTx Implement ConnPoolBeginner.BeginTx

func (*ConnPool) Commit

func (pool *ConnPool) Commit() error

Implement TxCommitter.Commit

func (ConnPool) ExecContext

func (pool ConnPool) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

func (*ConnPool) Ping

func (pool *ConnPool) Ping() error

func (ConnPool) PrepareContext

func (pool ConnPool) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

func (ConnPool) QueryContext

func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

https://github.com/go-gorm/gorm/blob/v1.21.11/callbacks/query.go#L18

func (ConnPool) QueryRowContext

func (pool ConnPool) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

func (*ConnPool) Rollback

func (pool *ConnPool) Rollback() error

Implement TxCommitter.Rollback

func (*ConnPool) String

func (pool *ConnPool) String() string

type Sharding

type Sharding struct {
	*gorm.DB
	ConnPool *ConnPool
	// contains filtered or unexported fields
}

func NewSharding added in v0.0.3

func NewSharding(config Config) *Sharding

func Register

func Register(config Config, tables ...interface{}) *Sharding

func (*Sharding) Initialize

func (s *Sharding) Initialize(db *gorm.DB) error

Initialize implement for Gorm plugin interface

func (*Sharding) LastQuery

func (s *Sharding) LastQuery() string

LastQuery get last SQL query

func (*Sharding) Name

func (s *Sharding) Name() string

Name plugin name for Gorm plugin interface

func (*Sharding) Register added in v0.0.3

func (s *Sharding) Register(config Config, tables ...interface{}) *Sharding

func (*Sharding) RegisterDefault added in v0.0.3

func (s *Sharding) RegisterDefault(tables ...interface{}) *Sharding

func (*Sharding) RegisterOne added in v0.0.3

func (s *Sharding) RegisterOne(config Config, table interface{}) *Sharding

type ShardingDialector

type ShardingDialector struct {
	gorm.Dialector
	// contains filtered or unexported fields
}

func NewShardingDialector

func NewShardingDialector(d gorm.Dialector, s *Sharding) ShardingDialector

func (ShardingDialector) Migrator

func (d ShardingDialector) Migrator(db *gorm.DB) gorm.Migrator

type ShardingMigrator

type ShardingMigrator struct {
	gorm.Migrator
	// contains filtered or unexported fields
}

func (ShardingMigrator) AutoMigrate

func (m ShardingMigrator) AutoMigrate(dst ...interface{}) error

func (ShardingMigrator) BuildIndexOptions

func (m ShardingMigrator) BuildIndexOptions(opts []schema.IndexOption, stmt *gorm.Statement) (results []interface{})

func (ShardingMigrator) DropTable

func (m ShardingMigrator) DropTable(dst ...interface{}) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL