sharding

package module
v8.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2020 License: BSD-2-Clause Imports: 15 Imported by: 0

README

PostgreSQL sharding for go-pg and Golang

Build Status

Uptrace.dev - distributed traces, logs, and errors in one place

This package uses a go-pg PostgreSQL client to help sharding your data across a set of PostgreSQL servers as described in Sharding & IDs at Instagram. In 2 words it maps many (2048-8192) logical shards implemented using PostgreSQL schemas to far fewer physical PostgreSQL servers.

API docs: http://godoc.org/github.com/go-pg/sharding. Examples: http://godoc.org/github.com/go-pg/sharding#pkg-examples.

Installation

This package requires Go modules support:

go get github.com/go-pg/sharding/v8

Quickstart

package sharding_test

import (
	"fmt"

	"github.com/go-pg/sharding/v8"
	"github.com/go-pg/pg/v10"
)

// Users are sharded by AccountId, i.e. users with same account id are
// placed on the same shard.
type User struct {
	tableName string `pg:"?SHARD.users"`

	Id        int64
	AccountId int64
	Name      string
	Emails    []string
}

func (u User) String() string {
	return u.Name
}

// CreateUser picks shard by account id and creates user in the shard.
func CreateUser(cluster *sharding.Cluster, user *User) error {
	return cluster.Shard(user.AccountId).Insert(user)
}

// GetUser splits shard from user id and fetches user from the shard.
func GetUser(cluster *sharding.Cluster, id int64) (*User, error) {
	var user User
	err := cluster.SplitShard(id).Model(&user).Where("id = ?", id).Select()
	return &user, err
}

// GetUsers picks shard by account id and fetches users from the shard.
func GetUsers(cluster *sharding.Cluster, accountId int64) ([]User, error) {
	var users []User
	err := cluster.Shard(accountId).Model(&users).Where("account_id = ?", accountId).Select()
	return users, err
}

// createShard creates database schema for a given shard.
func createShard(shard *pg.DB) error {
	queries := []string{
		`DROP SCHEMA IF EXISTS ?SHARD CASCADE`,
		`CREATE SCHEMA ?SHARD`,
		sqlFuncs,
		`CREATE TABLE ?SHARD.users (id bigint DEFAULT ?SHARD.next_id(), account_id int, name text, emails jsonb)`,
	}

	for _, q := range queries {
		_, err := shard.Exec(q)
		if err != nil {
			return err
		}
	}

	return nil
}

func ExampleCluster() {
	db := pg.Connect(&pg.Options{
		User: "postgres",
	})

	dbs := []*pg.DB{db} // list of physical PostgreSQL servers
	nshards := 2        // 2 logical shards
	// Create cluster with 1 physical server and 2 logical shards.
	cluster := sharding.NewCluster(dbs, nshards)

	// Create database schema for our logical shards.
	for i := 0; i < nshards; i++ {
		if err := createShard(cluster.Shard(int64(i))); err != nil {
			panic(err)
		}
	}

	// user1 will be created in shard1 because AccountId % nshards = shard1.
	user1 := &User{
		Name:      "user1",
		AccountId: 1,
		Emails:    []string{"user1@domain"},
	}
	err := CreateUser(cluster, user1)
	if err != nil {
		panic(err)
	}

	// user2 will be created in shard1 too because AccountId is the same.
	user2 := &User{
		Name:      "user2",
		AccountId: 1,
		Emails:    []string{"user2@domain"},
	}
	err = CreateUser(cluster, user2)
	if err != nil {
		panic(err)
	}

	// user3 will be created in shard0 because AccountId % nshards = shard0.
	user3 := &User{
		Name:      "user3",
		AccountId: 2,
		Emails:    []string{"user3@domain"},
	}
	err = CreateUser(cluster, user3)
	if err != nil {
		panic(err)
	}

	user, err := GetUser(cluster, user1.Id)
	if err != nil {
		panic(err)
	}

	users, err := GetUsers(cluster, 1)
	if err != nil {
		panic(err)
	}

	fmt.Println(user)
	fmt.Println(users[0], users[1])
	// Output: user1
	// user1 user2
}

const sqlFuncs = `
CREATE OR REPLACE FUNCTION public.make_id(tm timestamptz, seq_id bigint, shard_id int)
RETURNS bigint AS $$
DECLARE
  max_shard_id CONSTANT bigint := 2048;
  max_seq_id CONSTANT bigint := 4096;
  id bigint;
BEGIN
  shard_id := shard_id % max_shard_id;
  seq_id := seq_id % max_seq_id;
  id := (floor(extract(epoch FROM tm) * 1000)::bigint - ?EPOCH) << 23;
  id := id | (shard_id << 12);
  id := id | seq_id;
  RETURN id;
END;
$$
LANGUAGE plpgsql IMMUTABLE;

CREATE FUNCTION ?SHARD.make_id(tm timestamptz, seq_id bigint)
RETURNS bigint AS $$
BEGIN
   RETURN public.make_id(tm, seq_id, ?SHARD_ID);
END;
$$
LANGUAGE plpgsql IMMUTABLE;

CREATE SEQUENCE ?SHARD.id_seq;
`

Howto

Please use Golang PostgreSQL client docs to get the idea how to use this package.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	DefaultIDGen = NewIDGen(41, 11, 12, _epoch)
)

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster maps many (up to 2048) logical database shards implemented using PostgreSQL schemas to far fewer physical PostgreSQL servers.

Example
package main

import (
	"fmt"

	"github.com/go-pg/sharding/v8"

	"github.com/go-pg/pg/v10"
)

// Users are sharded by AccountId, i.e. users with same account id are
// placed on same shard.
type User struct {
	tableName string `pg:"?SHARD.users"`

	Id        int64
	AccountId int64
	Name      string
	Emails    []string
}

func (u User) String() string {
	return u.Name
}

// CreateUser picks shard by account id and creates user in the shard.
func CreateUser(cluster *sharding.Cluster, user *User) error {
	_, err := cluster.Shard(user.AccountId).Model(user).Insert()
	return err
}

// GetUser splits shard from user id and fetches user from the shard.
func GetUser(cluster *sharding.Cluster, id int64) (*User, error) {
	var user User
	err := cluster.SplitShard(id).Model(&user).Where("id = ?", id).Select()
	return &user, err
}

// GetUsers picks shard by account id and fetches users from the shard.
func GetUsers(cluster *sharding.Cluster, accountId int64) ([]User, error) {
	var users []User
	err := cluster.Shard(accountId).Model(&users).Where("account_id = ?", accountId).Select()
	return users, err
}

// createShard creates database schema for a given shard.
func createShard(shard *pg.DB) error {
	queries := []string{
		`DROP SCHEMA IF EXISTS ?SHARD CASCADE`,
		`CREATE SCHEMA ?SHARD`,
		sqlFuncs,
		`CREATE TABLE ?SHARD.users (id bigint DEFAULT ?SHARD.next_id(), account_id int, name text, emails jsonb)`,
	}

	for _, q := range queries {
		_, err := shard.Exec(q)
		if err != nil {
			return err
		}
	}

	return nil
}

func main() {
	db := pg.Connect(&pg.Options{
		User: "postgres",
	})

	dbs := []*pg.DB{db} // list of physical PostgreSQL servers
	nshards := 2        // 2 logical shards
	// Create cluster with 1 physical server and 2 logical shards.
	cluster := sharding.NewCluster(dbs, nshards)

	// Create database schema for our logical shards.
	for i := 0; i < nshards; i++ {
		if err := createShard(cluster.Shard(int64(i))); err != nil {
			panic(err)
		}
	}

	// user1 will be created in shard1 because AccountId % nshards = shard1.
	user1 := &User{
		Name:      "user1",
		AccountId: 1,
		Emails:    []string{"user1@domain"},
	}
	err := CreateUser(cluster, user1)
	if err != nil {
		panic(err)
	}

	// user2 will be created in shard1 too AccountId is the same.
	user2 := &User{
		Name:      "user2",
		AccountId: 1,
		Emails:    []string{"user2@domain"},
	}
	err = CreateUser(cluster, user2)
	if err != nil {
		panic(err)
	}

	// user3 will be created in shard0 because AccountId % nshards = shard0.
	user3 := &User{
		Name:      "user3",
		AccountId: 2,
		Emails:    []string{"user3@domain"},
	}
	err = CreateUser(cluster, user3)
	if err != nil {
		panic(err)
	}

	user, err := GetUser(cluster, user1.Id)
	if err != nil {
		panic(err)
	}

	users, err := GetUsers(cluster, 1)
	if err != nil {
		panic(err)
	}

	fmt.Println(user)
	fmt.Println(users[0], users[1])
}

const sqlFuncs = `
CREATE OR REPLACE FUNCTION public.make_id(tm timestamptz, seq_id bigint, shard_id int)
RETURNS bigint AS $$
DECLARE
  max_shard_id CONSTANT bigint := 2048;
  max_seq_id CONSTANT bigint := 4096;
  id bigint;
BEGIN
  shard_id := shard_id % max_shard_id;
  seq_id := seq_id % max_seq_id;
  id := (floor(extract(epoch FROM tm) * 1000)::bigint - ?EPOCH) << 23;
  id := id | (shard_id << 12);
  id := id | seq_id;
  RETURN id;
END;
$$
LANGUAGE plpgsql IMMUTABLE;

CREATE FUNCTION ?SHARD.make_id(tm timestamptz, seq_id bigint)
RETURNS bigint AS $$
BEGIN
   RETURN public.make_id(tm, seq_id, ?SHARD_ID);
END;
$$
LANGUAGE plpgsql IMMUTABLE;

CREATE FUNCTION ?SHARD.next_id()
RETURNS bigint AS $$
BEGIN
   RETURN ?SHARD.make_id(clock_timestamp(), nextval('?SHARD.id_seq'));
END;
$$
LANGUAGE plpgsql;

CREATE SEQUENCE ?SHARD.id_seq;
`
Output:

user1
user1 user2

func NewCluster

func NewCluster(dbs []*pg.DB, nshards int) *Cluster

func NewClusterWithGen

func NewClusterWithGen(dbs []*pg.DB, nshards int, gen *IDGen) *Cluster

NewClusterWithGen returns new PostgreSQL cluster consisting of physical dbs and running nshards logical shards.

func (*Cluster) Close

func (cl *Cluster) Close() error

func (*Cluster) DB

func (cl *Cluster) DB(number int64) (int, *pg.DB)

DB returns db id and db for the number.

func (*Cluster) DBs

func (cl *Cluster) DBs() []*pg.DB

DBs returns list of database servers in the cluster.

func (*Cluster) ForEachDB

func (cl *Cluster) ForEachDB(fn func(db *pg.DB) error) error

ForEachDB concurrently calls the fn on each database in the cluster.

func (*Cluster) ForEachNShards

func (cl *Cluster) ForEachNShards(n int, fn func(shard *pg.DB) error) error

ForEachNShards concurrently calls the fn on each N shards in the cluster.

func (*Cluster) ForEachShard

func (cl *Cluster) ForEachShard(fn func(shard *pg.DB) error) error

ForEachShard concurrently calls the fn on each shard in the cluster. It is the same as ForEachNShards(1, fn).

func (*Cluster) IDGen

func (cl *Cluster) IDGen() *IDGen

func (*Cluster) Shard

func (cl *Cluster) Shard(number int64) *pg.DB

Shard maps the number to the corresponding shard in the cluster.

func (*Cluster) Shards

func (cl *Cluster) Shards(db *pg.DB) []*pg.DB

Shards returns list of shards running in the db. If db is nil all shards are returned.

func (*Cluster) SplitShard

func (cl *Cluster) SplitShard(id int64) *pg.DB

SplitShard uses SplitID to extract shard id from the id and then returns corresponding Shard in the cluster.

func (*Cluster) SubCluster

func (cl *Cluster) SubCluster(number int64, size int) *SubCluster

SubCluster returns a subset of the cluster of the given size.

type IDGen

type IDGen struct {
	// contains filtered or unexported fields
}

func NewIDGen

func NewIDGen(timeBits, shardBits, seqBits uint, epoch time.Time) *IDGen

func (*IDGen) MakeID

func (g *IDGen) MakeID(tm time.Time, shard, seq int64) int64

MakeId returns an id for the time. Note that you can only generate 4096 unique numbers per millisecond.

func (*IDGen) MaxID

func (g *IDGen) MaxID(tm time.Time) int64

MaxID returns max id for the time.

func (*IDGen) MinID

func (g *IDGen) MinID(tm time.Time) int64

MinID returns min id for the time.

func (*IDGen) NumShards

func (g *IDGen) NumShards() int

func (*IDGen) SplitID

func (g *IDGen) SplitID(id int64) (tm time.Time, shardID int64, seqID int64)

SplitID splits id into time, shard id, and sequence id.

type ShardIDGen

type ShardIDGen struct {
	// contains filtered or unexported fields
}

IDGen generates sortable unique int64 numbers that consist of: - 41 bits for time in milliseconds. - 11 bits for shard id. - 12 bits for auto-incrementing sequence.

As a result we can generate 4096 ids per millisecond for each of 2048 shards. Minimum supported time is 1975-02-28, maximum is 2044-12-31.

func NewShardIDGen

func NewShardIDGen(shard int64, gen *IDGen) *ShardIDGen

NewShardIDGen returns id generator for the shard.

func (*ShardIDGen) MaxID

func (g *ShardIDGen) MaxID(tm time.Time) int64

MaxId returns max id for the time.

func (*ShardIDGen) MinID

func (g *ShardIDGen) MinID(tm time.Time) int64

MinId returns min id for the time.

func (*ShardIDGen) NextID

func (g *ShardIDGen) NextID(tm time.Time) int64

NextID returns incremental id for the time. Note that you can only generate 4096 unique numbers per millisecond.

func (*ShardIDGen) SplitID

func (g *ShardIDGen) SplitID(id int64) (tm time.Time, shardID int64, seqID int64)

SplitID splits id into time, shard id, and sequence id.

type SubCluster

type SubCluster struct {
	// contains filtered or unexported fields
}

SubCluster is a subset of the cluster.

func (*SubCluster) ForEachNShards

func (cl *SubCluster) ForEachNShards(n int, fn func(shard *pg.DB) error) error

ForEachNShards concurrently calls the fn on each N shards in the subcluster.

func (*SubCluster) ForEachShard

func (cl *SubCluster) ForEachShard(fn func(shard *pg.DB) error) error

ForEachShard concurrently calls the fn on each shard in the subcluster. It is the same as ForEachNShards(1, fn).

func (*SubCluster) Shard

func (cl *SubCluster) Shard(number int64) *pg.DB

Shard maps the number to the corresponding shard in the subscluster.

func (*SubCluster) SplitShard

func (cl *SubCluster) SplitShard(id int64) *pg.DB

SplitShard uses SplitID to extract shard id from the id and then returns corresponding Shard in the subcluster.

type UUID

type UUID [uuidLen]byte

func NewUUID

func NewUUID(shardID int64, tm time.Time) UUID

func ParseUUID

func ParseUUID(b []byte) (UUID, error)

func (UUID) AppendValue

func (u UUID) AppendValue(b []byte, quote int) ([]byte, error)

func (*UUID) IsZero

func (u *UUID) IsZero() bool

func (UUID) MarshalBinary

func (u UUID) MarshalBinary() ([]byte, error)

func (UUID) MarshalJSON

func (u UUID) MarshalJSON() ([]byte, error)

func (UUID) MarshalText

func (u UUID) MarshalText() ([]byte, error)

func (*UUID) Scan

func (u *UUID) Scan(b interface{}) error

func (*UUID) ShardID

func (u *UUID) ShardID() int64

func (*UUID) Split

func (u *UUID) Split() (shardID int64, tm time.Time)

func (UUID) String

func (u UUID) String() string

func (*UUID) Time

func (u *UUID) Time() time.Time

func (*UUID) UnmarshalBinary

func (u *UUID) UnmarshalBinary(b []byte) error

func (*UUID) UnmarshalJSON

func (u *UUID) UnmarshalJSON(b []byte) error

func (*UUID) UnmarshalText

func (u *UUID) UnmarshalText(b []byte) error

func (UUID) Value

func (u UUID) Value() (driver.Value, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL