qdb

package
v0.0.0-...-0caea15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: PostgreSQL Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MoveKeyRangePlanned  = MoveKeyRangeStatus("PLANNED")
	MoveKeyRangeStarted  = MoveKeyRangeStatus("STARTED")
	MoveKeyRangeComplete = MoveKeyRangeStatus("COMPLETE")
)
View Source
const (
	CLOSED = RouterState("CLOSED")
	OPENED = RouterState("OPENED")
)
View Source
const (
	Planned    = TxStatus("planned")
	DataCopied = TxStatus("data_copied")
)
View Source
const (
	CoordKeepAliveTtl = 3
)
View Source
const KRLocked = KeyRangeStatus("LOCKED")
View Source
const KRUnLocked = KeyRangeStatus("UNLOCKED")

Variables

View Source
var (
	ColumnTypeVarchar  = "varchar"
	ColumnTypeInteger  = "integer"
	ColumnTypeUinteger = "uinteger"
)

Functions

func ExecuteCommands

func ExecuteCommands(saver func() error, commands ...Command) error

Types

type Command

type Command interface {
	Do() error
	Undo() error
}

type CustomCommand

type CustomCommand struct {
	// contains filtered or unexported fields
}

func NewCustomCommand

func NewCustomCommand(do func() error, undo func() error) *CustomCommand

func (*CustomCommand) Do

func (c *CustomCommand) Do() error

func (*CustomCommand) Undo

func (c *CustomCommand) Undo() error

type DataTransferTransaction

type DataTransferTransaction struct {
	ToShardId   string   `json:"to_shard"`
	FromShardId string   `json:"from_shard"`
	Status      TxStatus `json:"status"`
}

DataTransferTransaction contains information about data transfer from one shard to another

type DeleteCommand

type DeleteCommand[T any] struct {
	// contains filtered or unexported fields
}

func NewDeleteCommand

func NewDeleteCommand[T any](m map[string]T, key string) *DeleteCommand[T]

func (*DeleteCommand[T]) Do

func (c *DeleteCommand[T]) Do() error

func (*DeleteCommand[T]) Undo

func (c *DeleteCommand[T]) Undo() error

type DistributedRelation

type DistributedRelation struct {
	Name            string                 `json:"name"`
	DistributionKey []DistributionKeyEntry `json:"column_names"`
}

type DistributedXactKepper

type DistributedXactKepper interface {
	RecordTransferTx(ctx context.Context, key string, info *DataTransferTransaction) error
	GetTransferTx(ctx context.Context, key string) (*DataTransferTransaction, error)
	RemoveTransferTx(ctx context.Context, key string) error
}

Keep track of the status of the two-phase data move transaction.

type Distribution

type Distribution struct {
	ID       string   `json:"id"`
	ColTypes []string `json:"col_types,omitempty"`

	Relations map[string]*DistributedRelation `json:"relations"`
}

func NewDistribution

func NewDistribution(id string, coltypes []string) *Distribution

type DistributionKeyEntry

type DistributionKeyEntry struct {
	Column       string `json:"column"`
	HashFunction string `json:"hash"`
}

type DropCommand

type DropCommand[T any] struct {
	// contains filtered or unexported fields
}

func NewDropCommand

func NewDropCommand[T any](m map[string]T) *DropCommand[T]

func (*DropCommand[T]) Do

func (c *DropCommand[T]) Do() error

func (*DropCommand[T]) Undo

func (c *DropCommand[T]) Undo() error

type EtcdQDB

type EtcdQDB struct {
	// contains filtered or unexported fields
}

func NewEtcdQDB

func NewEtcdQDB(addr string) (*EtcdQDB, error)

func (*EtcdQDB) AddRouter

func (q *EtcdQDB) AddRouter(ctx context.Context, r *Router) error

TODO : unit tests

func (*EtcdQDB) AddShard

func (q *EtcdQDB) AddShard(ctx context.Context, shard *Shard) error

TODO : unit tests

func (*EtcdQDB) AlterDistributionAttach

func (q *EtcdQDB) AlterDistributionAttach(ctx context.Context, id string, rels []*DistributedRelation) error

TODO : unit tests

func (*EtcdQDB) AlterDistributionDetach

func (q *EtcdQDB) AlterDistributionDetach(ctx context.Context, id string, relName string) error

TODO: unit tests

func (*EtcdQDB) CheckLockedKeyRange

func (q *EtcdQDB) CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) Client

func (q *EtcdQDB) Client() *clientv3.Client

func (*EtcdQDB) CloseRouter

func (q *EtcdQDB) CloseRouter(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) CreateDistribution

func (q *EtcdQDB) CreateDistribution(ctx context.Context, distribution *Distribution) error

TODO : unit tests

func (*EtcdQDB) CreateKeyRange

func (q *EtcdQDB) CreateKeyRange(ctx context.Context, keyRange *KeyRange) error

TODO : unit tests

func (*EtcdQDB) DeleteKeyRangeMove

func (q *EtcdQDB) DeleteKeyRangeMove(ctx context.Context, moveId string) error

func (*EtcdQDB) DeleteRouter

func (q *EtcdQDB) DeleteRouter(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) DropDistribution

func (q *EtcdQDB) DropDistribution(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) DropKeyRange

func (q *EtcdQDB) DropKeyRange(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) DropKeyRangeAll

func (q *EtcdQDB) DropKeyRangeAll(ctx context.Context) error

TODO : unit tests

func (*EtcdQDB) DropShard

func (q *EtcdQDB) DropShard(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) GetCoordinator

func (q *EtcdQDB) GetCoordinator(ctx context.Context) (string, error)

TODO : unit tests

func (*EtcdQDB) GetDistribution

func (q *EtcdQDB) GetDistribution(ctx context.Context, id string) (*Distribution, error)

TODO : unit tests

func (*EtcdQDB) GetKeyRange

func (q *EtcdQDB) GetKeyRange(ctx context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) GetRelationDistribution

func (q *EtcdQDB) GetRelationDistribution(ctx context.Context, relName string) (*Distribution, error)

TODO : unit tests

func (*EtcdQDB) GetShard

func (q *EtcdQDB) GetShard(ctx context.Context, id string) (*Shard, error)

TODO : unit tests

func (*EtcdQDB) GetTaskGroup

func (q *EtcdQDB) GetTaskGroup(ctx context.Context) (*TaskGroup, error)

func (*EtcdQDB) GetTransferTx

func (q *EtcdQDB) GetTransferTx(ctx context.Context, key string) (*DataTransferTransaction, error)

TODO : unit tests

func (*EtcdQDB) ListAllKeyRanges

func (q *EtcdQDB) ListAllKeyRanges(ctx context.Context) ([]*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) ListDistributions

func (q *EtcdQDB) ListDistributions(ctx context.Context) ([]*Distribution, error)

TODO : unit tests

func (*EtcdQDB) ListKeyRangeMoves

func (q *EtcdQDB) ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error)

TODO : unit tests

func (*EtcdQDB) ListKeyRanges

func (q *EtcdQDB) ListKeyRanges(ctx context.Context, distribution string) ([]*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) ListRouters

func (q *EtcdQDB) ListRouters(ctx context.Context) ([]*Router, error)

TODO : unit tests

func (*EtcdQDB) ListShards

func (q *EtcdQDB) ListShards(ctx context.Context) ([]*Shard, error)

TODO : unit tests

func (*EtcdQDB) LockKeyRange

func (q *EtcdQDB) LockKeyRange(ctx context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) OpenRouter

func (q *EtcdQDB) OpenRouter(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) RecordKeyRangeMove

func (q *EtcdQDB) RecordKeyRangeMove(ctx context.Context, m *MoveKeyRange) error

TODO : unit tests

func (*EtcdQDB) RecordTransferTx

func (q *EtcdQDB) RecordTransferTx(ctx context.Context, key string, info *DataTransferTransaction) error

TODO : unit tests

func (*EtcdQDB) RemoveTaskGroup

func (q *EtcdQDB) RemoveTaskGroup(ctx context.Context) error

func (*EtcdQDB) RemoveTransferTx

func (q *EtcdQDB) RemoveTransferTx(ctx context.Context, key string) error

TODO : unit tests

func (*EtcdQDB) ShareKeyRange

func (q *EtcdQDB) ShareKeyRange(id string) error

TODO : unit tests

func (*EtcdQDB) TryCoordinatorLock

func (q *EtcdQDB) TryCoordinatorLock(ctx context.Context) error

TODO : unit tests

func (*EtcdQDB) UnlockKeyRange

func (q *EtcdQDB) UnlockKeyRange(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) UpdateCoordinator

func (q *EtcdQDB) UpdateCoordinator(ctx context.Context, address string) error

TODO : unit tests TODO : implement

func (*EtcdQDB) UpdateKeyRange

func (q *EtcdQDB) UpdateKeyRange(ctx context.Context, keyRange *KeyRange) error

TODO : unit tests

func (*EtcdQDB) UpdateKeyRangeMoveStatus

func (q *EtcdQDB) UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error

TODO : unit tests

func (*EtcdQDB) WriteTaskGroup

func (q *EtcdQDB) WriteTaskGroup(ctx context.Context, group *TaskGroup) error

type KeyRange

type KeyRange struct {
	LowerBound     []byte `json:"from"`
	ShardID        string `json:"shard_id"`
	KeyRangeID     string `json:"key_range_id"`
	DistributionId string `json:"distribution_id"`
}

type KeyRangeStatus

type KeyRangeStatus string

type MemQDB

type MemQDB struct {
	Locks                map[string]*sync.RWMutex            `json:"locks"`
	Freq                 map[string]bool                     `json:"freq"`
	Krs                  map[string]*KeyRange                `json:"krs"`
	Shards               map[string]*Shard                   `json:"shards"`
	Distributions        map[string]*Distribution            `json:"distributions"`
	RelationDistribution map[string]string                   `json:"relation_distribution"`
	Routers              map[string]*Router                  `json:"routers"`
	Transactions         map[string]*DataTransferTransaction `json:"transactions"`
	Coordinator          string                              `json:"coordinator"`
	TaskGroup            *TaskGroup                          `json:"taskGroup"`
	// contains filtered or unexported fields
}

func NewMemQDB

func NewMemQDB(backupPath string) (*MemQDB, error)

func RestoreQDB

func RestoreQDB(backupPath string) (*MemQDB, error)

TODO : unit tests

func (*MemQDB) AddRouter

func (q *MemQDB) AddRouter(_ context.Context, r *Router) error

TODO : unit tests

func (*MemQDB) AddShard

func (q *MemQDB) AddShard(_ context.Context, shard *Shard) error

TODO : unit tests

func (*MemQDB) AlterDistributionAttach

func (q *MemQDB) AlterDistributionAttach(_ context.Context, id string, rels []*DistributedRelation) error

TODO : unit tests

func (*MemQDB) AlterDistributionDetach

func (q *MemQDB) AlterDistributionDetach(_ context.Context, id string, relName string) error

TODO: unit tests

func (*MemQDB) CheckLockedKeyRange

func (q *MemQDB) CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*MemQDB) CloseRouter

func (q *MemQDB) CloseRouter(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) CreateDistribution

func (q *MemQDB) CreateDistribution(_ context.Context, distribution *Distribution) error

TODO : unit tests

func (*MemQDB) CreateKeyRange

func (q *MemQDB) CreateKeyRange(_ context.Context, keyRange *KeyRange) error

TODO : unit tests

func (*MemQDB) DeleteKeyRangeMove

func (q *MemQDB) DeleteKeyRangeMove(ctx context.Context, moveId string) error

func (*MemQDB) DeleteRouter

func (q *MemQDB) DeleteRouter(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) DropDistribution

func (q *MemQDB) DropDistribution(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) DropKeyRange

func (q *MemQDB) DropKeyRange(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) DropKeyRangeAll

func (q *MemQDB) DropKeyRangeAll(_ context.Context) error

TODO : unit tests

func (*MemQDB) DropShard

func (q *MemQDB) DropShard(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) DumpState

func (q *MemQDB) DumpState() error

TODO : unit tests

func (*MemQDB) GetCoordinator

func (q *MemQDB) GetCoordinator(ctx context.Context) (string, error)

func (*MemQDB) GetDistribution

func (q *MemQDB) GetDistribution(_ context.Context, id string) (*Distribution, error)

TODO : unit tests

func (*MemQDB) GetKeyRange

func (q *MemQDB) GetKeyRange(_ context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*MemQDB) GetRelationDistribution

func (q *MemQDB) GetRelationDistribution(_ context.Context, relation string) (*Distribution, error)

func (*MemQDB) GetShard

func (q *MemQDB) GetShard(_ context.Context, id string) (*Shard, error)

TODO : unit tests

func (*MemQDB) GetTaskGroup

func (q *MemQDB) GetTaskGroup(_ context.Context) (*TaskGroup, error)

func (*MemQDB) GetTransferTx

func (q *MemQDB) GetTransferTx(_ context.Context, key string) (*DataTransferTransaction, error)

TODO : unit tests

func (*MemQDB) ListAllKeyRanges

func (q *MemQDB) ListAllKeyRanges(_ context.Context) ([]*KeyRange, error)

TODO : unit tests

func (*MemQDB) ListDistributions

func (q *MemQDB) ListDistributions(_ context.Context) ([]*Distribution, error)

TODO : unit tests

func (*MemQDB) ListKeyRangeMoves

func (q *MemQDB) ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error)

func (*MemQDB) ListKeyRanges

func (q *MemQDB) ListKeyRanges(_ context.Context, distribution string) ([]*KeyRange, error)

TODO : unit tests

func (*MemQDB) ListRouters

func (q *MemQDB) ListRouters(_ context.Context) ([]*Router, error)

TODO : unit tests

func (*MemQDB) ListShards

func (q *MemQDB) ListShards(_ context.Context) ([]*Shard, error)

TODO : unit tests

func (*MemQDB) LockKeyRange

func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*MemQDB) OpenRouter

func (q *MemQDB) OpenRouter(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) RecordKeyRangeMove

func (q *MemQDB) RecordKeyRangeMove(ctx context.Context, m *MoveKeyRange) error

func (*MemQDB) RecordTransferTx

func (q *MemQDB) RecordTransferTx(_ context.Context, key string, info *DataTransferTransaction) error

TODO : unit tests

func (*MemQDB) RemoveTaskGroup

func (q *MemQDB) RemoveTaskGroup(_ context.Context) error

func (*MemQDB) RemoveTransferTx

func (q *MemQDB) RemoveTransferTx(_ context.Context, key string) error

TODO : unit tests

func (*MemQDB) ShareKeyRange

func (q *MemQDB) ShareKeyRange(id string) error

TODO : unit tests

func (*MemQDB) TryCoordinatorLock

func (q *MemQDB) TryCoordinatorLock(_ context.Context) error

func (*MemQDB) TryLockKeyRange

func (q *MemQDB) TryLockKeyRange(lock *sync.RWMutex, id string, read bool) error

TODO : unit tests

func (*MemQDB) UnlockKeyRange

func (q *MemQDB) UnlockKeyRange(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) UpdateCoordinator

func (q *MemQDB) UpdateCoordinator(_ context.Context, address string) error

TODO : unit tests

func (*MemQDB) UpdateKeyRange

func (q *MemQDB) UpdateKeyRange(_ context.Context, keyRange *KeyRange) error

TODO : unit tests

func (*MemQDB) UpdateKeyRangeMoveStatus

func (q *MemQDB) UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error

func (*MemQDB) WriteTaskGroup

func (q *MemQDB) WriteTaskGroup(_ context.Context, group *TaskGroup) error

type MoveKeyRange

type MoveKeyRange struct {
	MoveId     string             `json:"move_id"`
	ShardId    string             `json:"shard_id"`
	KeyRangeID string             `json:"key_range_id"`
	Status     MoveKeyRangeStatus `json:"status"`
}

type MoveKeyRangeStatus

type MoveKeyRangeStatus string

type QDB

type QDB interface {
	CreateKeyRange(ctx context.Context, keyRange *KeyRange) error
	GetKeyRange(ctx context.Context, id string) (*KeyRange, error)
	UpdateKeyRange(ctx context.Context, keyRange *KeyRange) error
	DropKeyRange(ctx context.Context, id string) error
	DropKeyRangeAll(ctx context.Context) error
	ListKeyRanges(_ context.Context, distribution string) ([]*KeyRange, error)
	ListAllKeyRanges(_ context.Context) ([]*KeyRange, error)
	LockKeyRange(ctx context.Context, id string) (*KeyRange, error)
	UnlockKeyRange(ctx context.Context, id string) error
	CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, error)
	ShareKeyRange(id string) error

	AddShard(ctx context.Context, shard *Shard) error
	ListShards(ctx context.Context) ([]*Shard, error)
	GetShard(ctx context.Context, shardID string) (*Shard, error)
	DropShard(ctx context.Context, shardID string) error

	CreateDistribution(ctx context.Context, distr *Distribution) error
	ListDistributions(ctx context.Context) ([]*Distribution, error)
	DropDistribution(ctx context.Context, id string) error

	AlterDistributionAttach(ctx context.Context, id string, rels []*DistributedRelation) error
	AlterDistributionDetach(ctx context.Context, id string, relName string) error

	GetDistribution(ctx context.Context, id string) (*Distribution, error)
	// TODO: fix this by passing FQRN (fully qualified relation name (+schema))
	GetRelationDistribution(ctx context.Context, relation string) (*Distribution, error)

	GetTaskGroup(ctx context.Context) (*TaskGroup, error)
	WriteTaskGroup(ctx context.Context, group *TaskGroup) error
	RemoveTaskGroup(ctx context.Context) error

	UpdateCoordinator(ctx context.Context, address string) error
	GetCoordinator(ctx context.Context) (string, error)
}
This is a generic interface to be used by both the coordinator and the router.

* The router should use a memory-based version of this interface to cache * the state of the routing schema, while the coordinator should use an etcd-based * implementation to keep the distributed state in sync.

type Router

type Router struct {
	Address string      `json:"address"`
	ID      string      `json:"id"`
	State   RouterState `json:"state,omitempty"`
}

func NewRouter

func NewRouter(addr, id string, rst RouterState) *Router

func (Router) Addr

func (r Router) Addr() string

type RouterState

type RouterState string

type Shard

type Shard struct {
	ID    string   `json:"id"`
	Hosts []string `json:"hosts"`
}

func NewShard

func NewShard(ID string, hosts []string) *Shard

type ShardKey

type ShardKey struct {
	Name string
	RW   bool
}

type ShardingSchemaKeeper

type ShardingSchemaKeeper interface {
	/* persist start of key range move in distributed storage */
	RecordKeyRangeMove(ctx context.Context, m *MoveKeyRange) error
	/* list all key-range moves in progress */
	ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error)
	/* mark key range move as completed */
	UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error
	// DeleteKeyRangeMove deletes info about key range move
	DeleteKeyRangeMove(ctx context.Context, moveId string) error
}

type Task

type Task struct {
	ShardFromId string `json:"shard_from_id"`
	ShardToId   string `json:"shard_to_id"`
	KrIdFrom    string `json:"kr_id_from"`
	KrIdTo      string `json:"kr_id_to"`
	Bound       []byte `json:"bound"`
	KrIdTemp    string `json:"kr_id_temp"`
	State       int    `json:"state"`
}

type TaskGroup

type TaskGroup struct {
	Tasks    []*Task `json:"tasks"`
	JoinType int     `json:"join_type"`
}

type TopolodyKeeper

type TopolodyKeeper interface {
	AddRouter(ctx context.Context, r *Router) error
	DeleteRouter(ctx context.Context, rID string) error
	ListRouters(ctx context.Context) ([]*Router, error)

	// OpenRouter: change state of router to online
	// Making it usable to use for query executiong.
	// "Online" mode.
	OpenRouter(ctx context.Context, rID string) error

	// CloseRouter: change state of router to offline
	// Making it unusable to use for query executiong.
	// "Offline" mode.
	CloseRouter(ctx context.Context, rID string) error
}

type TxStatus

type TxStatus string

type UpdateCommand

type UpdateCommand[T any] struct {
	// contains filtered or unexported fields
}

func NewUpdateCommand

func NewUpdateCommand[T any](m map[string]T, key string, value T) *UpdateCommand[T]

func (*UpdateCommand[T]) Do

func (c *UpdateCommand[T]) Do() error

func (*UpdateCommand[T]) Undo

func (c *UpdateCommand[T]) Undo() error

type XQDB

type XQDB interface {
	// routing schema
	QDB
	// router topology
	TopolodyKeeper
	// data move state
	ShardingSchemaKeeper
	DistributedXactKepper

	TryCoordinatorLock(ctx context.Context) error
}

XQDB means extended QDB The coordinator should use an etcd-based implementation to keep the distributed state in sync.

func NewXQDB

func NewXQDB(qdbType string) (XQDB, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL