Documentation ¶
Index ¶
- Variables
- func MaxId(tm time.Time) int64
- func MinId(tm time.Time) int64
- func SplitId(id int64) (tm time.Time, shardId int64, seqId int64)
- type Cluster
- func (cl *Cluster) Close() error
- func (cl *Cluster) DB(number int64) *pg.DB
- func (cl *Cluster) DBs() []*pg.DB
- func (cl *Cluster) ForEachDB(fn func(db *pg.DB) error) error
- func (cl *Cluster) ForEachNShards(n int, fn func(shard *pg.DB) error) error
- func (cl *Cluster) ForEachShard(fn func(shard *pg.DB) error) error
- func (cl *Cluster) Shard(number int64) *pg.DB
- func (cl *Cluster) Shards(db *pg.DB) []*pg.DB
- func (cl *Cluster) SplitShard(id int64) *pg.DB
- func (cl *Cluster) SubCluster(number int64, size int) *SubCluster
- type IdGen
- type ShardIdGen
- type SubCluster
- type UUID
- func (u UUID) AppendValue(b []byte, quote int) ([]byte, error)
- func (u *UUID) IsZero() bool
- func (u *UUID) Scan(b interface{}) error
- func (u *UUID) ShardId() int64
- func (u *UUID) Split() (shardId int64, tm time.Time)
- func (u UUID) String() string
- func (u *UUID) Time() time.Time
- func (u UUID) Value() (driver.Value, error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
DefaultIdGen = NewIdGen(41, 11, 12, epoch)
)
Functions ¶
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster maps many (up to 2048) logical database shards implemented using PostgreSQL schemas to far fewer physical PostgreSQL servers.
Example ¶
package main import ( "fmt" "github.com/go-pg/sharding" "github.com/go-pg/pg" ) // Users are sharded by AccountId, i.e. users with same account id are // placed on same shard. type User struct { tableName string `sql:"?shard.users"` Id int64 AccountId int64 Name string Emails []string } func (u User) String() string { return u.Name } // CreateUser picks shard by account id and creates user in the shard. func CreateUser(cluster *sharding.Cluster, user *User) error { return cluster.Shard(user.AccountId).Insert(user) } // GetUser splits shard from user id and fetches user from the shard. func GetUser(cluster *sharding.Cluster, id int64) (*User, error) { var user User err := cluster.SplitShard(id).Model(&user).Where("id = ?", id).Select() return &user, err } // GetUsers picks shard by account id and fetches users from the shard. func GetUsers(cluster *sharding.Cluster, accountId int64) ([]User, error) { var users []User err := cluster.Shard(accountId).Model(&users).Where("account_id = ?", accountId).Select() return users, err } // createShard creates database schema for a given shard. func createShard(shard *pg.DB) error { queries := []string{ `DROP SCHEMA IF EXISTS ?shard CASCADE`, `CREATE SCHEMA ?shard`, sqlFuncs, `CREATE TABLE ?shard.users (id bigint DEFAULT ?shard.next_id(), account_id int, name text, emails jsonb)`, } for _, q := range queries { _, err := shard.Exec(q) if err != nil { return err } } return nil } func main() { db := pg.Connect(&pg.Options{ User: "postgres", }) dbs := []*pg.DB{db} // list of physical PostgreSQL servers nshards := 2 // 2 logical shards // Create cluster with 1 physical server and 2 logical shards. cluster := sharding.NewCluster(dbs, nshards) // Create database schema for our logical shards. for i := 0; i < nshards; i++ { if err := createShard(cluster.Shard(int64(i))); err != nil { panic(err) } } // user1 will be created in shard1 because AccountId % nshards = shard1. user1 := &User{ Name: "user1", AccountId: 1, Emails: []string{"user1@domain"}, } err := CreateUser(cluster, user1) if err != nil { panic(err) } // user2 will be created in shard1 too AccountId is the same. user2 := &User{ Name: "user2", AccountId: 1, Emails: []string{"user2@domain"}, } err = CreateUser(cluster, user2) if err != nil { panic(err) } // user3 will be created in shard0 because AccountId % nshards = shard0. user3 := &User{ Name: "user3", AccountId: 2, Emails: []string{"user3@domain"}, } err = CreateUser(cluster, user3) if err != nil { panic(err) } user, err := GetUser(cluster, user1.Id) if err != nil { panic(err) } users, err := GetUsers(cluster, 1) if err != nil { panic(err) } fmt.Println(user) fmt.Println(users[0], users[1]) } const sqlFuncs = ` CREATE SEQUENCE ?shard.id_seq; -- _next_id returns unique sortable id. CREATE FUNCTION ?shard._next_id(tm timestamptz, shard_id int, seq_id bigint) RETURNS bigint AS $$ DECLARE our_epoch CONSTANT bigint := 1262304000000; max_shard_id CONSTANT bigint := 2048; max_seq_id CONSTANT bigint := 4096; id bigint; BEGIN shard_id := shard_id % max_shard_id; seq_id := seq_id % max_seq_id; id := (floor(extract(epoch FROM tm) * 1000)::bigint - our_epoch) << 23; id := id | (shard_id << 12); id := id | seq_id; RETURN id; END; $$ LANGUAGE plpgsql IMMUTABLE; CREATE FUNCTION ?shard.next_id() RETURNS bigint AS $$ BEGIN RETURN ?shard._next_id(clock_timestamp(), ?shard_id, nextval('?shard.id_seq')); END; $$ LANGUAGE plpgsql; `
Output: user1 user1 user2
func NewClusterWithGen ¶
NewClusterWithGen returns new PostgreSQL cluster consisting of physical dbs and running nshards logical shards.
func (*Cluster) ForEachNShards ¶
ForEachNShards concurrently calls the fn on each N shards in the cluster.
func (*Cluster) ForEachShard ¶
ForEachShard concurrently calls the fn on each shard in the cluster. It is the same as ForEachNShards(1, fn).
func (*Cluster) Shards ¶
Shards returns list of shards running in the db. If db is nil all shards are returned.
func (*Cluster) SplitShard ¶
SplitShard uses SplitId to extract shard id from the id and then returns corresponding Shard in the cluster.
func (*Cluster) SubCluster ¶
func (cl *Cluster) SubCluster(number int64, size int) *SubCluster
SubCluster returns a subset of the cluster of the given size.
type IdGen ¶
type IdGen struct {
// contains filtered or unexported fields
}
type ShardIdGen ¶
type ShardIdGen struct {
// contains filtered or unexported fields
}
IdGen generates sortable unique int64 numbers that consist of: - 41 bits for time in milliseconds. - 11 bits for shard id. - 12 bits for auto-incrementing sequence.
As a result we can generate 4096 ids per millisecond for each of 2048 shards. Minimum supported time is 1975-02-28, maximum is 2044-12-31.
func NewShardIdGen ¶
func NewShardIdGen(shard int64, gen *IdGen) *ShardIdGen
NewShardIdGen returns id generator for the shard.
func (*ShardIdGen) MaxId ¶
func (g *ShardIdGen) MaxId(tm time.Time) int64
MaxId returns max id for the time.
type SubCluster ¶
type SubCluster struct {
// contains filtered or unexported fields
}
SubCluster is a subset of the cluster.
func (*SubCluster) ForEachNShards ¶
ForEachNShards concurrently calls the fn on each N shards in the subcluster.
func (*SubCluster) ForEachShard ¶
func (cl *SubCluster) ForEachShard(fn func(shard *pg.DB) error) error
ForEachShard concurrently calls the fn on each shard in the subcluster. It is the same as ForEachNShards(1, fn).
func (*SubCluster) Shard ¶
func (cl *SubCluster) Shard(number int64) *pg.DB
Shard maps the number to the corresponding shard in the subscluster.
func (*SubCluster) SplitShard ¶
func (cl *SubCluster) SplitShard(id int64) *pg.DB
SplitShard uses SplitId to extract shard id from the id and then returns corresponding Shard in the subcluster.