qdb

package
v0.0.0-...-8a6b5b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: PostgreSQL Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CoordKeepAliveTTL = 3

	TaskGroupLeaseTTL = 30 // generous lease

)
View Source
const (
	// maps of MemQDB as `extensions` of QdbStatement
	MapRelationDistribution = "RelationDistribution"
	MapDistributions        = "Distributions"
	MapKrs                  = "Krs"
	MapFreq                 = "Freq"
	MapLocks                = "Locks"
	MapKrVersions           = "KrVersions"
	MapSequences            = "Sequences"
	MapSequenceToValues     = "SequenceToValues"
)
View Source
const (
	MoveKeyRangePlanned              = MoveKeyRangeStatus("PLANNED")
	MoveKeyRangeLocked               = MoveKeyRangeStatus("LOCKED")
	MoveKeyRangeDataMoved            = MoveKeyRangeStatus("DATA_MOVED")
	MoveKeyRangeDataCoordMetaUpdated = MoveKeyRangeStatus("COORD_META_UPDATED")
	MoveKeyRangeComplete             = MoveKeyRangeStatus("COMPLETE")

	// Deprecated: use MoveKeyRangeLocked
	MoveKeyRangeStarted = MoveKeyRangeStatus("STARTED")
)
View Source
const (
	CLOSED = RouterState("CLOSED")
	OPENED = RouterState("OPENED")
)
View Source
const (
	IssuerRedistributeTask = iota + 1
	IssuerBalancerTask
)
View Source
const (
	StopTaskGroup          = "cancel"
	StopTaskGroupImmediate = "cancel immediate"
)
View Source
const (
	Planned    = TxStatus("planned")
	Locked     = TxStatus("locked")
	DataCopied = TxStatus("data_copied")
)
View Source
const (
	CmdPut = iota
	CmdDelete
)
View Source
const DefaultMaxTxnSize uint16 = 128 // like ETCD max-txn-ops default value
View Source
const KRLocked = KeyRangeStatus("LOCKED")
View Source
const KRUnlocked = KeyRangeStatus("UNLOCKED")

Variables

View Source
var (
	ColumnTypeVarchar           = "varchar"
	ColumnTypeVarcharHashed     = "varchar hashed"
	ColumnTypeVarcharDeprecated = "_varchar"
	ColumnTypeInteger           = "integer"
	ColumnTypeUinteger          = "uinteger"
	ColumnTypeUUID              = "uuid"
)

Functions

func ExecuteCommands

func ExecuteCommands(saver func() error, commands ...Command) error

func LockPath

func LockPath(key string) string

Types

type ACLItem

type ACLItem struct {
	AIGrantee string `json:"ai_grantee"` /* ID that this item grants privs to */
	AIGrantor string `json:"ai_grantor"` /* grantor of privs */
	AIPrivs   uint64 `json:"ai_privs"`   /* privilege bits */
}

type AppendCommand

type AppendCommand[T any] struct {
	// contains filtered or unexported fields
}

func NewAppendCommand

func NewAppendCommand[T any](m []T, _ T) *AppendCommand[T]

func (*AppendCommand[T]) Do

func (c *AppendCommand[T]) Do() error

func (*AppendCommand[T]) Undo

func (c *AppendCommand[T]) Undo() error

type BalancerTask

type BalancerTask struct {
	Type      int    `json:"type"`
	KrIdFrom  string `json:"krIdFrom"`
	KrIdTo    string `json:"krIdTo"`
	KrIdTemp  string `json:"krIdTemp"`
	ShardIdTo string `json:"shardIdTo"`
	KeyCount  int64  `json:"keyCount"`
	State     int    `json:"state"`
}

type Command

type Command interface {
	Do() error
	Undo() error
}

type CustomCommand

type CustomCommand struct {
	// contains filtered or unexported fields
}

func NewCustomCommand

func NewCustomCommand(do func() error, undo func() error) *CustomCommand

func (*CustomCommand) Do

func (c *CustomCommand) Do() error

func (*CustomCommand) Undo

func (c *CustomCommand) Undo() error

type DCStateKeeper

type DCStateKeeper interface {
	RecordTwoPhaseMembers(ctx context.Context, gid string, shards []string) error
	ChangeTxStatus(ctx context.Context, gid string, state TwoPhaseTxState) error

	ListTXNames(ctx context.Context) ([]string, error)
	GetTXs(ctx context.Context) (map[string]*TwoPCInfo, error)

	AcquireTxOwnership(ctx context.Context, gid string) (bool, error)
	ReleaseTxOwnership(ctx context.Context, gid string) error

	TXStatus(ctx context.Context, gid string) (TwoPhaseTxState, error)
	TXCohortShards(ctx context.Context, gid string) ([]string, error)

	RemoveTXData(ctx context.Context, gid string) error
	ClearTxStatuses(ctx context.Context) error
}

Distributed (2pc) commit state keeper. Could be ether local storage or ETCD

type DataTransferTransaction

type DataTransferTransaction struct {
	ToShardId   string   `json:"to_shard"`
	FromShardId string   `json:"from_shard"`
	Status      TxStatus `json:"status"`
}

DataTransferTransaction contains information about data transfer from one shard to another

type DeleteCommand

type DeleteCommand[T any] struct {
	// contains filtered or unexported fields
}

func NewDeleteCommand

func NewDeleteCommand[T any](m map[string]T, key string) *DeleteCommand[T]

func (*DeleteCommand[T]) Do

func (c *DeleteCommand[T]) Do() error

func (*DeleteCommand[T]) Undo

func (c *DeleteCommand[T]) Undo() error

type DistributedRelation

type DistributedRelation struct {
	Name               string                 `json:"name"`
	SchemaName         string                 `json:"schema_name,omitempty"`
	DistributionKey    []DistributionKeyEntry `json:"column_names"`
	ReplicatedRelation bool                   `json:"replicated_relation,omitempty"`

	Version uint64    `json:"version,omitempty"`
	ACL     []ACLItem `json:"acl,omitempty"`
}

func (*DistributedRelation) QualifiedName

func (r *DistributedRelation) QualifiedName() *rfqn.RelationFQN

type Distribution

type Distribution struct {
	ID            string                          `json:"id"`
	ColTypes      []string                        `json:"col_types,omitempty"`
	Relations     map[string]*DistributedRelation `json:"relations"`
	FQNRelations  map[string]*DistributedRelation `json:"fqn_relations,omitempty"`
	UniqueIndexes map[string]*UniqueIndex         `json:"unique_indexes"`

	Version uint64    `json:"version,omitempty"`
	ACL     []ACLItem `json:"acl,omitempty"`
}

func NewDistribution

func NewDistribution(id string, coltypes []string) *Distribution

func (*Distribution) Copy

func (d *Distribution) Copy() *Distribution

func (*Distribution) GetRelation

func (d *Distribution) GetRelation(fqn *rfqn.RelationFQN) (*DistributedRelation, bool)

type DistributionKeyEntry

type DistributionKeyEntry struct {
	Column       string `json:"column"`
	HashFunction string `json:"hash"`

	Expr RoutingExpr `json:"routing_expression"`
}

type DropCommand

type DropCommand[T any] struct {
	// contains filtered or unexported fields
}

func NewDropCommand

func NewDropCommand[T any](m map[string]T) *DropCommand[T]

func (*DropCommand[T]) Do

func (c *DropCommand[T]) Do() error

func (*DropCommand[T]) Undo

func (c *DropCommand[T]) Undo() error

type EtcdQDB

type EtcdQDB struct {
	// contains filtered or unexported fields
}

func NewEtcdQDB

func NewEtcdQDB(addrs []string, maxCallSendMsgSize int) (*EtcdQDB, error)

func (*EtcdQDB) AddMoveTaskGroupStopFlag

func (q *EtcdQDB) AddMoveTaskGroupStopFlag(ctx context.Context, id string, immediate bool) error

TODO unit test

func (*EtcdQDB) AddRouter

func (q *EtcdQDB) AddRouter(ctx context.Context, r *Router) error

TODO : unit tests

func (*EtcdQDB) AddShard

func (q *EtcdQDB) AddShard(ctx context.Context, shard *Shard) error

TODO : unit tests

func (*EtcdQDB) AlterDistributedRelation

func (q *EtcdQDB) AlterDistributedRelation(ctx context.Context, id string, rel *DistributedRelation) error

TODO : unit tests

func (*EtcdQDB) AlterDistributedRelationDistributionKey

func (q *EtcdQDB) AlterDistributedRelationDistributionKey(ctx context.Context, id string, relationFQN *rfqn.RelationFQN, distributionKey []DistributionKeyEntry) error

TODO : unit tests

func (*EtcdQDB) AlterDistributedRelationSchema

func (q *EtcdQDB) AlterDistributedRelationSchema(ctx context.Context, id string, relationFQN *rfqn.RelationFQN, schemaName string) error

TODO : unit tests

func (*EtcdQDB) AlterDistributionAttach

func (q *EtcdQDB) AlterDistributionAttach(ctx context.Context, id string, rels []*DistributedRelation) error

TODO : unit tests

func (*EtcdQDB) AlterDistributionDetach

func (q *EtcdQDB) AlterDistributionDetach(ctx context.Context, id string, relation *rfqn.RelationFQN) error

TODO: unit tests

func (*EtcdQDB) AlterReferenceRelationStorage

func (q *EtcdQDB) AlterReferenceRelationStorage(ctx context.Context, relation *rfqn.RelationFQN, shs []string) error

AlterReferenceRelationStorage implements XQDB.

func (*EtcdQDB) AlterReplicatedRelationSchema

func (q *EtcdQDB) AlterReplicatedRelationSchema(ctx context.Context, dsID string, relationFQN *rfqn.RelationFQN, schemaName string) error

TODO : unit tests

func (*EtcdQDB) AlterSequenceAttach

func (q *EtcdQDB) AlterSequenceAttach(ctx context.Context, seqName string, relationFQN *rfqn.RelationFQN, colName string) error

func (*EtcdQDB) AlterSequenceDetachRelation

func (q *EtcdQDB) AlterSequenceDetachRelation(ctx context.Context, relationFQN *rfqn.RelationFQN) error

func (*EtcdQDB) AlterShard

func (q *EtcdQDB) AlterShard(ctx context.Context, newShard *Shard) error

func (*EtcdQDB) BeginTransaction

func (q *EtcdQDB) BeginTransaction(ctx context.Context, transaction *QdbTransaction) error

func (*EtcdQDB) CheckDistribution

func (q *EtcdQDB) CheckDistribution(ctx context.Context, id string) (bool, error)

TODO : unit tests

func (*EtcdQDB) CheckLockedKeyRange

func (q *EtcdQDB) CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) CheckMoveTaskGroupStopFlag

func (q *EtcdQDB) CheckMoveTaskGroupStopFlag(ctx context.Context, id string) (bool, bool, error)

TODO unit test

func (*EtcdQDB) CheckSequence

func (q *EtcdQDB) CheckSequence(ctx context.Context, seqName string) (bool, error)

func (*EtcdQDB) CheckTaskGroupLocked

func (q *EtcdQDB) CheckTaskGroupLocked(ctx context.Context, tgId string) (bool, error)

func (*EtcdQDB) Client

func (q *EtcdQDB) Client() *clientv3.Client

func (*EtcdQDB) CloseRouter

func (q *EtcdQDB) CloseRouter(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) CommitTransaction

func (q *EtcdQDB) CommitTransaction(ctx context.Context, transaction *QdbTransaction) error

func (*EtcdQDB) CreateDistribution

func (q *EtcdQDB) CreateDistribution(_ context.Context, distribution *Distribution) ([]QdbStatement, error)

TODO : unit tests

func (*EtcdQDB) CreateKeyRange

func (q *EtcdQDB) CreateKeyRange(_ context.Context, keyRange *KeyRange) ([]QdbStatement, error)

TODO : unit tests

func (*EtcdQDB) CreateRedistributeTask

func (q *EtcdQDB) CreateRedistributeTask(ctx context.Context, task *RedistributeTask) error

TODO: unit tests

func (*EtcdQDB) CreateReferenceRelation

func (q *EtcdQDB) CreateReferenceRelation(ctx context.Context, r *ReferenceRelation) error

CreateReferenceRelation implements XQDB.

func (*EtcdQDB) CreateSequence

func (q *EtcdQDB) CreateSequence(_ context.Context, seqName string, initialValue int64) ([]QdbStatement, error)

func (*EtcdQDB) CreateUniqueIndex

func (q *EtcdQDB) CreateUniqueIndex(ctx context.Context, idx *UniqueIndex) error

func (*EtcdQDB) CurrVal

func (q *EtcdQDB) CurrVal(ctx context.Context, seqName string) (int64, error)

func (*EtcdQDB) DeleteKeyRangeMove

func (q *EtcdQDB) DeleteKeyRangeMove(ctx context.Context, moveId string) error

func (*EtcdQDB) DeleteRouter

func (q *EtcdQDB) DeleteRouter(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) DeleteRouterAll

func (q *EtcdQDB) DeleteRouterAll(ctx context.Context) error

TODO : unit tests

func (*EtcdQDB) DropBalancerTask

func (q *EtcdQDB) DropBalancerTask(ctx context.Context) error

TODO: unit tests

func (*EtcdQDB) DropDistribution

func (q *EtcdQDB) DropDistribution(ctx context.Context, id string) error

func (*EtcdQDB) DropKeyRange

func (q *EtcdQDB) DropKeyRange(_ context.Context, id string) ([]QdbStatement, error)

TODO : unit tests

func (*EtcdQDB) DropKeyRangeAll

func (q *EtcdQDB) DropKeyRangeAll(ctx context.Context) error

TODO : unit tests

func (*EtcdQDB) DropMoveTask

func (q *EtcdQDB) DropMoveTask(ctx context.Context, id string) error

TODO unit test

func (*EtcdQDB) DropMoveTaskGroup

func (q *EtcdQDB) DropMoveTaskGroup(ctx context.Context, id string) error

TODO: unit tests TODO: drop move task

func (*EtcdQDB) DropRedistributeTask

func (q *EtcdQDB) DropRedistributeTask(ctx context.Context, task *RedistributeTask) error

TODO: unit tests

func (*EtcdQDB) DropRedistributeTaskLock

func (q *EtcdQDB) DropRedistributeTaskLock(ctx context.Context, id string) error

func (*EtcdQDB) DropReferenceRelation

func (q *EtcdQDB) DropReferenceRelation(ctx context.Context, relation *rfqn.RelationFQN) error

DropReferenceRelation implements XQDB.

func (*EtcdQDB) DropSequence

func (q *EtcdQDB) DropSequence(ctx context.Context, seqName string, force bool) error

func (*EtcdQDB) DropShard

func (q *EtcdQDB) DropShard(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) DropTaskGroupLock

func (q *EtcdQDB) DropTaskGroupLock(ctx context.Context, tgId string) error

func (*EtcdQDB) DropUniqueIndex

func (q *EtcdQDB) DropUniqueIndex(ctx context.Context, id string) error

func (*EtcdQDB) ExecNoTransaction

func (q *EtcdQDB) ExecNoTransaction(ctx context.Context, operations []QdbStatement) error

func (*EtcdQDB) GetAllTaskGroupStatuses

func (q *EtcdQDB) GetAllTaskGroupStatuses(ctx context.Context) (map[string]*TaskGroupStatus, error)

func (*EtcdQDB) GetBalancerTask

func (q *EtcdQDB) GetBalancerTask(ctx context.Context) (*BalancerTask, error)

TODO: unit tests

func (*EtcdQDB) GetCoordinator

func (q *EtcdQDB) GetCoordinator(ctx context.Context) (string, error)

TODO : unit tests

func (*EtcdQDB) GetDistribution

func (q *EtcdQDB) GetDistribution(ctx context.Context, id string) (*Distribution, error)

TODO : unit tests

func (*EtcdQDB) GetKeyRange

func (q *EtcdQDB) GetKeyRange(ctx context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) GetKeyRangeRedistributeTaskId

func (q *EtcdQDB) GetKeyRangeRedistributeTaskId(ctx context.Context, keyRangeId string) (string, error)

func (*EtcdQDB) GetMoveTask

func (q *EtcdQDB) GetMoveTask(ctx context.Context, id string) (*MoveTask, error)

TODO unit test

func (*EtcdQDB) GetMoveTaskByGroup

func (q *EtcdQDB) GetMoveTaskByGroup(ctx context.Context, taskGroupID string) (*MoveTask, error)

func (*EtcdQDB) GetMoveTaskGroup

func (q *EtcdQDB) GetMoveTaskGroup(ctx context.Context, id string) (*MoveTaskGroup, error)

TODO: unit tests

func (*EtcdQDB) GetMoveTaskGroupTotalKeys

func (q *EtcdQDB) GetMoveTaskGroupTotalKeys(ctx context.Context, id string) (int64, error)

TODO: unit tests

func (*EtcdQDB) GetRedistributeTask

func (q *EtcdQDB) GetRedistributeTask(ctx context.Context, id string) (*RedistributeTask, error)

TODO: unit tests

func (*EtcdQDB) GetRedistributeTaskTaskGroupId

func (q *EtcdQDB) GetRedistributeTaskTaskGroupId(ctx context.Context, id string) (string, error)

func (*EtcdQDB) GetReferenceRelation

func (q *EtcdQDB) GetReferenceRelation(ctx context.Context, relation *rfqn.RelationFQN) (*ReferenceRelation, error)

GetReferenceRelation implements XQDB.

func (*EtcdQDB) GetRelationDistribution

func (q *EtcdQDB) GetRelationDistribution(ctx context.Context, relationFQN *rfqn.RelationFQN) (*Distribution, error)

TODO : unit tests

func (*EtcdQDB) GetRelationSequence

func (q *EtcdQDB) GetRelationSequence(ctx context.Context, relationFQN *rfqn.RelationFQN) (map[string]string, error)

func (*EtcdQDB) GetSequenceRelations

func (q *EtcdQDB) GetSequenceRelations(ctx context.Context, seqName string) ([]*rfqn.RelationFQN, error)

func (*EtcdQDB) GetShard

func (q *EtcdQDB) GetShard(ctx context.Context, id string) (*Shard, error)

func (*EtcdQDB) GetTaskGroupStatus

func (q *EtcdQDB) GetTaskGroupStatus(ctx context.Context, id string) (*TaskGroupStatus, error)

func (*EtcdQDB) GetTransferTx

func (q *EtcdQDB) GetTransferTx(ctx context.Context, key string) (*DataTransferTransaction, error)

TODO : unit tests

func (*EtcdQDB) GetTxMetaStorage

func (q *EtcdQDB) GetTxMetaStorage(ctx context.Context) (shards []string, err error)

func (*EtcdQDB) ListAllKeyRanges

func (q *EtcdQDB) ListAllKeyRanges(ctx context.Context) ([]*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) ListDistributions

func (q *EtcdQDB) ListDistributions(ctx context.Context) ([]*Distribution, error)

TODO : unit tests

func (*EtcdQDB) ListKeyRangeMoves

func (q *EtcdQDB) ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error)

TODO : unit tests

func (*EtcdQDB) ListKeyRanges

func (q *EtcdQDB) ListKeyRanges(ctx context.Context, distribution string) ([]*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) ListLockedKeyRanges

func (q *EtcdQDB) ListLockedKeyRanges(ctx context.Context) ([]string, error)

func (*EtcdQDB) ListMoveTasks

func (q *EtcdQDB) ListMoveTasks(ctx context.Context) (map[string]*MoveTask, error)

TODO unit test

func (*EtcdQDB) ListRedistributeTasks

func (q *EtcdQDB) ListRedistributeTasks(ctx context.Context) ([]*RedistributeTask, error)

func (*EtcdQDB) ListReferenceRelations

func (q *EtcdQDB) ListReferenceRelations(ctx context.Context) ([]*ReferenceRelation, error)

ListReferenceRelations implements XQDB.

func (*EtcdQDB) ListRelationIndexes

func (q *EtcdQDB) ListRelationIndexes(ctx context.Context, relationFQN *rfqn.RelationFQN) (map[string]*UniqueIndex, error)

func (*EtcdQDB) ListRouters

func (q *EtcdQDB) ListRouters(ctx context.Context) ([]*Router, error)

TODO : unit tests

func (*EtcdQDB) ListSequences

func (q *EtcdQDB) ListSequences(ctx context.Context) ([]string, error)

func (*EtcdQDB) ListShards

func (q *EtcdQDB) ListShards(ctx context.Context) ([]*Shard, error)

TODO : unit tests

func (*EtcdQDB) ListTaskGroups

func (q *EtcdQDB) ListTaskGroups(ctx context.Context) (map[string]*MoveTaskGroup, error)

func (*EtcdQDB) ListUniqueIndexes

func (q *EtcdQDB) ListUniqueIndexes(ctx context.Context) (map[string]*UniqueIndex, error)

func (*EtcdQDB) LockKeyRange

func (q *EtcdQDB) LockKeyRange(ctx context.Context, idKeyRange string) (*KeyRange, error)

TODO : unit tests

func (*EtcdQDB) LockRedistributeTask

func (q *EtcdQDB) LockRedistributeTask(ctx context.Context, id, holder string) error

func (*EtcdQDB) NextRange

func (q *EtcdQDB) NextRange(ctx context.Context, seqName string, rangeSize uint64) (*SequenceIdRange, error)

func (*EtcdQDB) OpenRouter

func (q *EtcdQDB) OpenRouter(ctx context.Context, id string) error

TODO : unit tests

func (*EtcdQDB) RecordKeyRangeMove

func (q *EtcdQDB) RecordKeyRangeMove(ctx context.Context, m *MoveKeyRange) error

TODO : unit tests

func (*EtcdQDB) RecordTransferTx

func (q *EtcdQDB) RecordTransferTx(ctx context.Context, key string, info *DataTransferTransaction) error

TODO : unit tests

func (*EtcdQDB) RemoveTransferTx

func (q *EtcdQDB) RemoveTransferTx(ctx context.Context, key string) error

TODO : unit tests

func (*EtcdQDB) RenameKeyRange

func (q *EtcdQDB) RenameKeyRange(ctx context.Context, krId, krIdNew string) error

TODO: unit tests

func (*EtcdQDB) SetTxMetaStorage

func (q *EtcdQDB) SetTxMetaStorage(ctx context.Context, shards []string) error

func (*EtcdQDB) ShareKeyRange

func (q *EtcdQDB) ShareKeyRange(id string) error

TODO : unit tests

func (*EtcdQDB) TryCoordinatorLock

func (q *EtcdQDB) TryCoordinatorLock(ctx context.Context, addr string) error

TODO : unit tests

func (*EtcdQDB) TryTaskGroupLock

func (q *EtcdQDB) TryTaskGroupLock(ctx context.Context, tgId string, holder string) error

func (*EtcdQDB) UnlockKeyRange

func (q *EtcdQDB) UnlockKeyRange(ctx context.Context, idKeyRange string) error

TODO : unit tests

func (*EtcdQDB) UpdateCoordinator

func (q *EtcdQDB) UpdateCoordinator(_ context.Context, _ string) error

TODO : unit tests TODO : implement

func (*EtcdQDB) UpdateKeyRange

func (q *EtcdQDB) UpdateKeyRange(_ context.Context, keyRange *KeyRange) ([]QdbStatement, error)

TODO : unit tests

func (*EtcdQDB) UpdateKeyRangeMoveStatus

func (q *EtcdQDB) UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error

TODO : unit tests

func (*EtcdQDB) UpdateMoveTask

func (q *EtcdQDB) UpdateMoveTask(ctx context.Context, task *MoveTask) error

TODO unit test

func (*EtcdQDB) UpdateMoveTaskGroupTotalKeys

func (q *EtcdQDB) UpdateMoveTaskGroupTotalKeys(ctx context.Context, id string, totalKeys int64) error

TODO: unit tests

func (*EtcdQDB) UpdateRedistributeTask

func (q *EtcdQDB) UpdateRedistributeTask(ctx context.Context, task *RedistributeTask) error

TODO: unit tests

func (*EtcdQDB) WriteBalancerTask

func (q *EtcdQDB) WriteBalancerTask(ctx context.Context, task *BalancerTask) error

TODO: unit tests

func (*EtcdQDB) WriteMoveTask

func (q *EtcdQDB) WriteMoveTask(ctx context.Context, task *MoveTask) error

TODO unit test

func (*EtcdQDB) WriteMoveTaskGroup

func (q *EtcdQDB) WriteMoveTaskGroup(ctx context.Context, id string, group *MoveTaskGroup, totalKeys int64, task *MoveTask) error

TODO: unit tests

func (*EtcdQDB) WriteTaskGroupStatus

func (q *EtcdQDB) WriteTaskGroupStatus(ctx context.Context, id string, status *TaskGroupStatus) error

type GenericOption

type GenericOption struct {
	Name  string
	Value string
}

type KeyRange

type KeyRange struct {
	LowerBound     [][]byte
	ShardID        string
	KeyRangeID     string
	DistributionId string
	Locked         bool
	Version        int
}

type KeyRangeMeta

type KeyRangeMeta struct {
	Version    int       `json:"version"`
	UpdatedAt  time.Time `json:"updated_at"`
	ModifiedBy string    `json:"modified_by"`
}

type KeyRangeStatus

type KeyRangeStatus string

type MemPgQDB

type MemPgQDB struct {
	*MemQDB
	// contains filtered or unexported fields
}

func GetMemPgQDB

func GetMemPgQDB() (*MemPgQDB, error)

func NewMemPgQDB

func NewMemPgQDB(backupPath string) (*MemPgQDB, error)

func RestoreMemPgQDB

func RestoreMemPgQDB(backupPath string) (*MemPgQDB, error)

func (*MemPgQDB) AcquireTxOwnership

func (q *MemPgQDB) AcquireTxOwnership(ctx context.Context, txid string) (bool, error)

AcquireTxOwnership implements DCStateKeeper.

func (*MemPgQDB) ChangeTxStatus

func (q *MemPgQDB) ChangeTxStatus(ctx context.Context, txid string, state TwoPhaseTxState) error

ChangeTxStatus implements DCStateKeeper.

func (*MemPgQDB) ClearTxStatuses

func (q *MemPgQDB) ClearTxStatuses(ctx context.Context) error

func (*MemPgQDB) GetTXs

func (q *MemPgQDB) GetTXs(ctx context.Context) (map[string]*TwoPCInfo, error)

func (*MemPgQDB) GetTxMetaStorage

func (q *MemPgQDB) GetTxMetaStorage(_ context.Context) ([]string, error)

func (*MemPgQDB) ListTXNames

func (q *MemPgQDB) ListTXNames(ctx context.Context) ([]string, error)

func (*MemPgQDB) RecordTwoPhaseMembers

func (q *MemPgQDB) RecordTwoPhaseMembers(ctx context.Context, txid string, shards []string) error

RecordTwoPhaseMembers implements DCStateKeeper.

func (*MemPgQDB) ReleaseTxOwnership

func (q *MemPgQDB) ReleaseTxOwnership(ctx context.Context, txid string) error

ReleaseTxOwnership implements DCStateKeeper.

func (*MemPgQDB) RemoveTXData

func (q *MemPgQDB) RemoveTXData(ctx context.Context, txid string) error

func (*MemPgQDB) SetTxMetaStorage

func (q *MemPgQDB) SetTxMetaStorage(_ context.Context, storage []string) error

func (*MemPgQDB) TXCohortShards

func (q *MemPgQDB) TXCohortShards(ctx context.Context, txid string) ([]string, error)

TXCohortShards implements DCStateKeeper.

func (*MemPgQDB) TXStatus

func (q *MemPgQDB) TXStatus(ctx context.Context, txid string) (TwoPhaseTxState, error)

TXStatus implements DCStateKeeper.

type MemQDB

type MemQDB struct {
	State *MemQDBState

	SequenceLock sync.RWMutex
	// contains filtered or unexported fields
}

func GetMemQDB

func GetMemQDB() (*MemQDB, error)

func NewMemQDB

func NewMemQDB(backupPath string) (*MemQDB, error)

func RestoreQDB

func RestoreQDB(backupPath string) (*MemQDB, error)

func (*MemQDB) AcquireTxOwnership

func (q *MemQDB) AcquireTxOwnership(_ context.Context, id string) (bool, error)

func (*MemQDB) AddMoveTaskGroupStopFlag

func (q *MemQDB) AddMoveTaskGroupStopFlag(_ context.Context, id string, immediate bool) error

TODO: unit tests

func (*MemQDB) AddRouter

func (q *MemQDB) AddRouter(_ context.Context, r *Router) error

TODO : unit tests

func (*MemQDB) AddShard

func (q *MemQDB) AddShard(_ context.Context, shard *Shard) error

TODO : unit tests

func (*MemQDB) AlterDistributedRelation

func (q *MemQDB) AlterDistributedRelation(_ context.Context, id string, rel *DistributedRelation) error

TODO : unit tests

func (*MemQDB) AlterDistributedRelationDistributionKey

func (q *MemQDB) AlterDistributedRelationDistributionKey(_ context.Context, id string, relation *rfqn.RelationFQN, distributionKey []DistributionKeyEntry) error

TODO : unit tests

func (*MemQDB) AlterDistributedRelationSchema

func (q *MemQDB) AlterDistributedRelationSchema(_ context.Context, id string, relation *rfqn.RelationFQN, schemaName string) error

TODO : unit tests TODO: explicitly pass version

func (*MemQDB) AlterDistributionAttach

func (q *MemQDB) AlterDistributionAttach(_ context.Context, id string, rels []*DistributedRelation) error

TODO : unit tests

func (*MemQDB) AlterDistributionDetach

func (q *MemQDB) AlterDistributionDetach(ctx context.Context, id string, relationFQN *rfqn.RelationFQN) error

TODO: unit tests

func (*MemQDB) AlterReferenceRelationStorage

func (q *MemQDB) AlterReferenceRelationStorage(_ context.Context, relationFQN *rfqn.RelationFQN, shs []string) error

AlterReferenceRelationStorage implements XQDB.

func (*MemQDB) AlterReplicatedRelationSchema

func (q *MemQDB) AlterReplicatedRelationSchema(_ context.Context, id string, relation *rfqn.RelationFQN, schemaName string) error

TODO : unit tests

func (*MemQDB) AlterSequenceAttach

func (q *MemQDB) AlterSequenceAttach(_ context.Context, seqName string, relationFQN *rfqn.RelationFQN, colName string) error

func (*MemQDB) AlterSequenceDetachRelation

func (q *MemQDB) AlterSequenceDetachRelation(_ context.Context, relationFQN *rfqn.RelationFQN) error

func (*MemQDB) AlterShard

func (q *MemQDB) AlterShard(_ context.Context, newShard *Shard) error

func (*MemQDB) BeginTransaction

func (q *MemQDB) BeginTransaction(_ context.Context, transaction *QdbTransaction) error

func (*MemQDB) ChangeTxStatus

func (q *MemQDB) ChangeTxStatus(_ context.Context, id string, state TwoPhaseTxState) error

ChangeTxStatus implements DCStateKeeper.

func (*MemQDB) CheckDistribution

func (q *MemQDB) CheckDistribution(_ context.Context, id string) (bool, error)

TODO : unit tests

func (*MemQDB) CheckLockedKeyRange

func (q *MemQDB) CheckLockedKeyRange(_ context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*MemQDB) CheckMoveTaskGroupStopFlag

func (q *MemQDB) CheckMoveTaskGroupStopFlag(_ context.Context, id string) (bool, bool, error)

TODO: unit tests

func (*MemQDB) CheckSequence

func (q *MemQDB) CheckSequence(_ context.Context, seqName string) (bool, error)

func (*MemQDB) CheckTaskGroupLocked

func (q *MemQDB) CheckTaskGroupLocked(_ context.Context, _ string) (bool, error)

func (*MemQDB) ClearTxStatuses

func (q *MemQDB) ClearTxStatuses(_ context.Context) error

func (*MemQDB) CloseRouter

func (q *MemQDB) CloseRouter(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) CommitTransaction

func (q *MemQDB) CommitTransaction(_ context.Context, transaction *QdbTransaction) error

func (*MemQDB) CreateDistribution

func (q *MemQDB) CreateDistribution(_ context.Context, distribution *Distribution) ([]QdbStatement, error)

TODO : unit tests

func (*MemQDB) CreateKeyRange

func (q *MemQDB) CreateKeyRange(_ context.Context, keyRange *KeyRange) ([]QdbStatement, error)

TODO : unit tests

func (*MemQDB) CreateRedistributeTask

func (q *MemQDB) CreateRedistributeTask(_ context.Context, task *RedistributeTask) error

TODO: unit tests

func (*MemQDB) CreateReferenceRelation

func (q *MemQDB) CreateReferenceRelation(_ context.Context, r *ReferenceRelation) error

CreateReferenceRelation implements XQDB.

func (*MemQDB) CreateSequence

func (q *MemQDB) CreateSequence(_ context.Context, seqName string, initialValue int64) ([]QdbStatement, error)

func (*MemQDB) CreateUniqueIndex

func (q *MemQDB) CreateUniqueIndex(_ context.Context, idx *UniqueIndex) error

func (*MemQDB) CurrVal

func (q *MemQDB) CurrVal(_ context.Context, seqName string) (int64, error)

func (*MemQDB) DeleteKeyRangeMove

func (q *MemQDB) DeleteKeyRangeMove(_ context.Context, _ string) error

func (*MemQDB) DeleteRouter

func (q *MemQDB) DeleteRouter(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) DeleteRouterAll

func (q *MemQDB) DeleteRouterAll(_ context.Context) error

TODO : unit tests

func (*MemQDB) DropBalancerTask

func (q *MemQDB) DropBalancerTask(_ context.Context) error

TODO: unit tests

func (*MemQDB) DropDistribution

func (q *MemQDB) DropDistribution(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) DropKeyRange

func (q *MemQDB) DropKeyRange(_ context.Context, id string) ([]QdbStatement, error)

TODO : unit tests

func (*MemQDB) DropKeyRangeAll

func (q *MemQDB) DropKeyRangeAll(_ context.Context) error

TODO : unit tests

func (*MemQDB) DropMoveTask

func (q *MemQDB) DropMoveTask(_ context.Context, id string) error

TODO: unit tests

func (*MemQDB) DropMoveTaskGroup

func (q *MemQDB) DropMoveTaskGroup(_ context.Context, id string) error

TODO: unit tests

func (*MemQDB) DropRedistributeTask

func (q *MemQDB) DropRedistributeTask(_ context.Context, task *RedistributeTask) error

TODO: unit tests

func (*MemQDB) DropRedistributeTaskLock

func (q *MemQDB) DropRedistributeTaskLock(_ context.Context, _ string) error

func (*MemQDB) DropReferenceRelation

func (q *MemQDB) DropReferenceRelation(_ context.Context, relationFQN *rfqn.RelationFQN) error

DropReferenceRelation implements XQDB.

func (*MemQDB) DropSequence

func (q *MemQDB) DropSequence(_ context.Context, seqName string, force bool) error

func (*MemQDB) DropShard

func (q *MemQDB) DropShard(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) DropTaskGroupLock

func (q *MemQDB) DropTaskGroupLock(_ context.Context, _ string) error

func (*MemQDB) DropUniqueIndex

func (q *MemQDB) DropUniqueIndex(_ context.Context, id string) error

func (*MemQDB) DumpState

func (q *MemQDB) DumpState() error

TODO : unit tests

func (*MemQDB) ExecNoTransaction

func (q *MemQDB) ExecNoTransaction(_ context.Context, operations []QdbStatement) error

func (*MemQDB) GetAllTaskGroupStatuses

func (q *MemQDB) GetAllTaskGroupStatuses(_ context.Context) (map[string]*TaskGroupStatus, error)

func (*MemQDB) GetBalancerTask

func (q *MemQDB) GetBalancerTask(_ context.Context) (*BalancerTask, error)

TODO: unit tests

func (*MemQDB) GetCoordinator

func (q *MemQDB) GetCoordinator(_ context.Context) (string, error)

func (*MemQDB) GetDistribution

func (q *MemQDB) GetDistribution(_ context.Context, id string) (*Distribution, error)

TODO : unit tests

func (*MemQDB) GetKeyRange

func (q *MemQDB) GetKeyRange(_ context.Context, id string) (*KeyRange, error)

func (*MemQDB) GetKeyRangeRedistributeTaskId

func (q *MemQDB) GetKeyRangeRedistributeTaskId(_ context.Context, _ string) (string, error)

func (*MemQDB) GetMoveTask

func (q *MemQDB) GetMoveTask(_ context.Context, id string) (*MoveTask, error)

TODO: unit tests

func (*MemQDB) GetMoveTaskByGroup

func (q *MemQDB) GetMoveTaskByGroup(_ context.Context, taskGroupID string) (*MoveTask, error)

func (*MemQDB) GetMoveTaskGroup

func (q *MemQDB) GetMoveTaskGroup(_ context.Context, id string) (*MoveTaskGroup, error)

TODO: unit tests

func (*MemQDB) GetMoveTaskGroupTotalKeys

func (q *MemQDB) GetMoveTaskGroupTotalKeys(_ context.Context, id string) (int64, error)

TODO: unit tests

func (*MemQDB) GetRedistributeTask

func (q *MemQDB) GetRedistributeTask(_ context.Context, id string) (*RedistributeTask, error)

TODO: unit tests

func (*MemQDB) GetRedistributeTaskTaskGroupId

func (q *MemQDB) GetRedistributeTaskTaskGroupId(_ context.Context, id string) (string, error)

func (*MemQDB) GetReferenceRelation

func (q *MemQDB) GetReferenceRelation(_ context.Context, relationFQN *rfqn.RelationFQN) (*ReferenceRelation, error)

GetReferenceRelation implements XQDB.

func (*MemQDB) GetRelationDistribution

func (q *MemQDB) GetRelationDistribution(_ context.Context, relation *rfqn.RelationFQN) (*Distribution, error)

func (*MemQDB) GetRelationSequence

func (q *MemQDB) GetRelationSequence(_ context.Context, relationFQN *rfqn.RelationFQN) (map[string]string, error)

func (*MemQDB) GetSequenceRelations

func (q *MemQDB) GetSequenceRelations(_ context.Context, seqName string) ([]*rfqn.RelationFQN, error)

func (*MemQDB) GetShard

func (q *MemQDB) GetShard(_ context.Context, id string) (*Shard, error)

TODO : unit tests

func (*MemQDB) GetTXs

func (q *MemQDB) GetTXs(_ context.Context) (map[string]*TwoPCInfo, error)

func (*MemQDB) GetTaskGroupStatus

func (q *MemQDB) GetTaskGroupStatus(_ context.Context, id string) (*TaskGroupStatus, error)

func (*MemQDB) GetTransferTx

func (q *MemQDB) GetTransferTx(_ context.Context, key string) (*DataTransferTransaction, error)

TODO : unit tests

func (*MemQDB) GetTxMetaStorage

func (q *MemQDB) GetTxMetaStorage(_ context.Context) ([]string, error)

func (*MemQDB) ListAllKeyRanges

func (q *MemQDB) ListAllKeyRanges(_ context.Context) ([]*KeyRange, error)

TODO : unit tests

func (*MemQDB) ListDistributions

func (q *MemQDB) ListDistributions(_ context.Context) ([]*Distribution, error)

TODO : unit tests

func (*MemQDB) ListKeyRangeMoves

func (q *MemQDB) ListKeyRangeMoves(_ context.Context) ([]*MoveKeyRange, error)

func (*MemQDB) ListKeyRanges

func (q *MemQDB) ListKeyRanges(_ context.Context, distribution string) ([]*KeyRange, error)

TODO : unit tests

func (*MemQDB) ListLockedKeyRanges

func (q *MemQDB) ListLockedKeyRanges(_ context.Context) ([]string, error)

func (*MemQDB) ListMoveTasks

func (q *MemQDB) ListMoveTasks(_ context.Context) (map[string]*MoveTask, error)

func (*MemQDB) ListRedistributeTasks

func (q *MemQDB) ListRedistributeTasks(_ context.Context) ([]*RedistributeTask, error)

TODO: unit tests

func (*MemQDB) ListReferenceRelations

func (q *MemQDB) ListReferenceRelations(_ context.Context) ([]*ReferenceRelation, error)

ListReferenceRelations implements XQDB.

func (*MemQDB) ListRelationIndexes

func (q *MemQDB) ListRelationIndexes(_ context.Context, relationFQN *rfqn.RelationFQN) (map[string]*UniqueIndex, error)

func (*MemQDB) ListRouters

func (q *MemQDB) ListRouters(_ context.Context) ([]*Router, error)

TODO : unit tests

func (*MemQDB) ListSequences

func (q *MemQDB) ListSequences(_ context.Context) ([]string, error)

func (*MemQDB) ListShards

func (q *MemQDB) ListShards(_ context.Context) ([]*Shard, error)

TODO : unit tests

func (*MemQDB) ListTXNames

func (q *MemQDB) ListTXNames(_ context.Context) ([]string, error)

ListTXNames implements DCStateKeeper.

func (*MemQDB) ListTaskGroups

func (q *MemQDB) ListTaskGroups(_ context.Context) (map[string]*MoveTaskGroup, error)

func (*MemQDB) ListUniqueIndexes

func (q *MemQDB) ListUniqueIndexes(_ context.Context) (map[string]*UniqueIndex, error)

func (*MemQDB) LockKeyRange

func (q *MemQDB) LockKeyRange(_ context.Context, id string) (*KeyRange, error)

TODO : unit tests

func (*MemQDB) LockRedistributeTask

func (q *MemQDB) LockRedistributeTask(_ context.Context, _, _ string) error

func (*MemQDB) NextRange

func (q *MemQDB) NextRange(_ context.Context, seqName string, rangeSize uint64) (*SequenceIdRange, error)

func (*MemQDB) OpenRouter

func (q *MemQDB) OpenRouter(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) RecordKeyRangeMove

func (q *MemQDB) RecordKeyRangeMove(_ context.Context, _ *MoveKeyRange) error

func (*MemQDB) RecordTransferTx

func (q *MemQDB) RecordTransferTx(_ context.Context, key string, info *DataTransferTransaction) error

TODO : unit tests

func (*MemQDB) RecordTwoPhaseMembers

func (q *MemQDB) RecordTwoPhaseMembers(_ context.Context, id string, shards []string) error

RecordTwoPhaseMembers implements DCStateKeeper. XXX: check that all members are valid spqr shards

func (*MemQDB) ReleaseTxOwnership

func (q *MemQDB) ReleaseTxOwnership(_ context.Context, gid string) error

func (*MemQDB) RemoveTXData

func (q *MemQDB) RemoveTXData(_ context.Context, gid string) error

func (*MemQDB) RemoveTransferTx

func (q *MemQDB) RemoveTransferTx(_ context.Context, key string) error

TODO : unit tests

func (*MemQDB) RenameKeyRange

func (q *MemQDB) RenameKeyRange(_ context.Context, krId, krIdNew string) error

TODO: unit tests

func (*MemQDB) SetTxMetaStorage

func (q *MemQDB) SetTxMetaStorage(context.Context, []string) error

func (*MemQDB) ShareKeyRange

func (q *MemQDB) ShareKeyRange(id string) error

TODO : unit tests

func (*MemQDB) SwapState

func (q *MemQDB) SwapState(state *MemQDBState)

func (*MemQDB) TXCohortShards

func (q *MemQDB) TXCohortShards(_ context.Context, gid string) ([]string, error)

TXCohortShards implements DCStateKeeper.

func (*MemQDB) TXStatus

func (q *MemQDB) TXStatus(_ context.Context, gid string) (TwoPhaseTxState, error)

TXStatus implements DCStateKeeper.

func (*MemQDB) TryCoordinatorLock

func (q *MemQDB) TryCoordinatorLock(_ context.Context, _ string) error

func (*MemQDB) TryTaskGroupLock

func (q *MemQDB) TryTaskGroupLock(_ context.Context, _ string, _ string) error

func (*MemQDB) UnlockKeyRange

func (q *MemQDB) UnlockKeyRange(_ context.Context, id string) error

TODO : unit tests

func (*MemQDB) UpdateCoordinator

func (q *MemQDB) UpdateCoordinator(_ context.Context, address string) error

TODO : unit tests

func (*MemQDB) UpdateKeyRange

func (q *MemQDB) UpdateKeyRange(_ context.Context, keyRange *KeyRange) ([]QdbStatement, error)

TODO : unit tests

func (*MemQDB) UpdateKeyRangeMoveStatus

func (q *MemQDB) UpdateKeyRangeMoveStatus(_ context.Context, _ string, _ MoveKeyRangeStatus) error

func (*MemQDB) UpdateMoveTask

func (q *MemQDB) UpdateMoveTask(_ context.Context, task *MoveTask) error

TODO: unit tests

func (*MemQDB) UpdateMoveTaskGroupTotalKeys

func (q *MemQDB) UpdateMoveTaskGroupTotalKeys(_ context.Context, id string, totalKeys int64) error

TODO: unit tests

func (*MemQDB) UpdateRedistributeTask

func (q *MemQDB) UpdateRedistributeTask(_ context.Context, task *RedistributeTask) error

TODO: unit tests

func (*MemQDB) WriteBalancerTask

func (q *MemQDB) WriteBalancerTask(_ context.Context, task *BalancerTask) error

TODO: unit tests

func (*MemQDB) WriteMoveTask

func (q *MemQDB) WriteMoveTask(_ context.Context, task *MoveTask) error

TODO: unit tests

func (*MemQDB) WriteMoveTaskGroup

func (q *MemQDB) WriteMoveTaskGroup(_ context.Context, id string, group *MoveTaskGroup, _ int64, _ *MoveTask) error

TODO: unit tests

func (*MemQDB) WriteTaskGroupStatus

func (q *MemQDB) WriteTaskGroupStatus(_ context.Context, id string, status *TaskGroupStatus) error

type MemQDBState

type MemQDBState struct {
	Locks                       map[string]*sync.RWMutex            `json:"locks"`
	Freq                        map[string]bool                     `json:"freq"`
	Krs                         map[string]*internalKeyRange        `json:"krs"`
	KrVersions                  map[string]int                      `json:"kr_versions"`
	Shards                      map[string]*Shard                   `json:"shards"`
	Distributions               map[string]*Distribution            `json:"distributions"`
	RelationDistribution        map[string]string                   `json:"relation_distribution"`
	Routers                     map[string]*Router                  `json:"routers"`
	Transactions                map[string]*DataTransferTransaction `json:"transactions"`
	Coordinator                 string                              `json:"coordinator"`
	MoveTaskGroups              map[string]*MoveTaskGroup           `json:"taskGroup"`
	TaskGroupIDToStatus         map[string]*TaskGroupStatus         `json:"task_group_statuses"`
	StopMoveTaskGroup           map[string]string                   `json:"stop_move_task_group"`
	MoveTasks                   map[string]*MoveTask                `json:"move_tasks"`
	TotalKeys                   map[string]int64                    `json:"total_keys"`
	RedistributeTasks           map[string]*RedistributeTask        `json:"redistribute_tasks"`
	RedistributeTaskTaskGroupId map[string]string                   `json:"redistribute_task_task_group"`
	KeyRangeRedistributeTasks   map[string]string                   `json:"key_range_redistribute_tasks"`
	BalancerTask                *BalancerTask                       `json:"balancer_task"`
	ReferenceRelations          map[string]*ReferenceRelation       `json:"reference_relations"`
	Sequences                   map[string]bool                     `json:"sequences"`
	ColumnSequence              map[string]string                   `json:"column_sequence"`
	SequenceToValues            map[string]int64                    `json:"sequence_to_values"`
	TaskGroupMoveTaskID         map[string]string                   `json:"task_group_move_task"`
	UniqueIndexes               map[string]*UniqueIndex             `json:"unique_indexes"`
	UniqueIndexesByRel          map[string]map[string]*UniqueIndex  `json:"unique_indexes_by_relation"`

	TwoPhaseTx map[string]*TwoPCInfo `json:"two_phase_info"`
}

type MoveKeyRange

type MoveKeyRange struct {
	MoveId     string             `json:"move_id"`
	ShardId    string             `json:"shard_id"`
	KeyRangeID string             `json:"key_range_id"`
	Status     MoveKeyRangeStatus `json:"status"`
}

type MoveKeyRangeStatus

type MoveKeyRangeStatus string

type MoveTask

type MoveTask struct {
	TaskGroupID string `json:"task_group_id"`
	ID          string
	Bound       [][]byte `json:"bound"`
	KrIdTemp    string   `json:"kr_id_temp"`
	State       int      `json:"state"`
}

type MoveTaskGroup

type MoveTaskGroup struct {
	Type      int                  `json:"type"`
	ShardToId string               `json:"shard_to_id"`
	KrIdFrom  string               `json:"kr_id_from"`
	KrIdTo    string               `json:"kr_id_to"`
	BoundRel  string               `json:"rel"`
	Coeff     float64              `json:"coeff"`
	BatchSize int64                `json:"batch_size"`
	Limit     int64                `json:"limit"`
	CreatedAt time.Time            `json:"created_at"`
	Issuer    *MoveTaskGroupIssuer `json:"issuer"`
}

type MoveTaskGroupIssuer

type MoveTaskGroupIssuer struct {
	Type int    `json:"type"`
	Id   string `json:"id"`
}

type PgDCStateKeeper

type PgDCStateKeeper struct {
	// contains filtered or unexported fields
}

func (*PgDCStateKeeper) AcquireTxOwnership

func (q *PgDCStateKeeper) AcquireTxOwnership(_ context.Context, txid string) (bool, error)

AcquireTxOwnership implements DCStateKeeper.

func (*PgDCStateKeeper) ChangeTxStatus

func (q *PgDCStateKeeper) ChangeTxStatus(ctx context.Context, txid string, state TwoPhaseTxState) error

ChangeTxStatus implements DCStateKeeper.

func (*PgDCStateKeeper) ClearTxStatuses

func (q *PgDCStateKeeper) ClearTxStatuses(ctx context.Context) error

func (*PgDCStateKeeper) GetTXs

func (q *PgDCStateKeeper) GetTXs(ctx context.Context) (map[string]*TwoPCInfo, error)

func (*PgDCStateKeeper) GetTxMetaStorage

func (q *PgDCStateKeeper) GetTxMetaStorage() []string

func (*PgDCStateKeeper) ListTXNames

func (q *PgDCStateKeeper) ListTXNames(ctx context.Context) ([]string, error)

func (*PgDCStateKeeper) RecordTwoPhaseMembers

func (q *PgDCStateKeeper) RecordTwoPhaseMembers(ctx context.Context, txid string, shards []string) error

RecordTwoPhaseMembers implements DCStateKeeper.

func (*PgDCStateKeeper) ReleaseTxOwnership

func (q *PgDCStateKeeper) ReleaseTxOwnership(_ context.Context, txid string) error

ReleaseTxOwnership implements DCStateKeeper.

func (*PgDCStateKeeper) RemoveTXData

func (q *PgDCStateKeeper) RemoveTXData(ctx context.Context, txid string) error

func (*PgDCStateKeeper) SetTxMetaStorage

func (q *PgDCStateKeeper) SetTxMetaStorage(storage []string) error

func (*PgDCStateKeeper) TXCohortShards

func (q *PgDCStateKeeper) TXCohortShards(ctx context.Context, txid string) ([]string, error)

TXCohortShards implements DCStateKeeper.

func (*PgDCStateKeeper) TXStatus

func (q *PgDCStateKeeper) TXStatus(ctx context.Context, txid string) (TwoPhaseTxState, error)

TXStatus implements DCStateKeeper.

type QDB

type QDB interface {
	// Key ranges
	CreateKeyRange(ctx context.Context, keyRange *KeyRange) ([]QdbStatement, error)
	GetKeyRange(ctx context.Context, id string) (*KeyRange, error)
	UpdateKeyRange(ctx context.Context, keyRange *KeyRange) ([]QdbStatement, error)
	DropKeyRange(ctx context.Context, id string) ([]QdbStatement, error)
	DropKeyRangeAll(ctx context.Context) error
	ListKeyRanges(ctx context.Context, distribution string) ([]*KeyRange, error)
	ListAllKeyRanges(ctx context.Context) ([]*KeyRange, error)
	LockKeyRange(ctx context.Context, id string) (*KeyRange, error)
	UnlockKeyRange(ctx context.Context, id string) error
	CheckLockedKeyRange(ctx context.Context, id string) (*KeyRange, error)
	ListLockedKeyRanges(ctx context.Context) ([]string, error)
	ShareKeyRange(id string) error
	RenameKeyRange(ctx context.Context, krId, ktIdNew string) error

	// Distribution management
	CreateDistribution(ctx context.Context, distr *Distribution) ([]QdbStatement, error)
	ListDistributions(ctx context.Context) ([]*Distribution, error)
	DropDistribution(ctx context.Context, id string) error
	GetDistribution(ctx context.Context, id string) (*Distribution, error)
	CheckDistribution(ctx context.Context, id string) (bool, error)
	GetRelationDistribution(ctx context.Context, relation *rfqn.RelationFQN) (*Distribution, error)

	// Reference relations
	CreateReferenceRelation(ctx context.Context, r *ReferenceRelation) error
	GetReferenceRelation(ctx context.Context, relationFQN *rfqn.RelationFQN) (*ReferenceRelation, error)
	AlterReferenceRelationStorage(ctx context.Context, relationFQN *rfqn.RelationFQN, shs []string) error
	ListReferenceRelations(ctx context.Context) ([]*ReferenceRelation, error)
	DropReferenceRelation(ctx context.Context, relationFQN *rfqn.RelationFQN) error

	// Update distribution
	AlterDistributionAttach(ctx context.Context, id string, rels []*DistributedRelation) error
	AlterDistributionDetach(ctx context.Context, id string, relationFQN *rfqn.RelationFQN) error
	AlterDistributedRelation(ctx context.Context, id string, rel *DistributedRelation) error
	AlterDistributedRelationSchema(ctx context.Context, id string, relation *rfqn.RelationFQN, schemaName string) error
	AlterDistributedRelationDistributionKey(ctx context.Context, id string, relation *rfqn.RelationFQN, distributionKey []DistributionKeyEntry) error
	AlterReplicatedRelationSchema(ctx context.Context, dsID string, relation *rfqn.RelationFQN, schemaName string) error

	// Unique indexes
	CreateUniqueIndex(ctx context.Context, idx *UniqueIndex) error
	DropUniqueIndex(ctx context.Context, id string) error
	ListUniqueIndexes(ctx context.Context) (map[string]*UniqueIndex, error)
	ListRelationIndexes(ctx context.Context, relationFQN *rfqn.RelationFQN) (map[string]*UniqueIndex, error)

	// Task group
	ListTaskGroups(ctx context.Context) (map[string]*MoveTaskGroup, error)
	GetMoveTaskGroup(ctx context.Context, id string) (*MoveTaskGroup, error)
	WriteMoveTaskGroup(ctx context.Context, id string, group *MoveTaskGroup, totalKeys int64, moveTask *MoveTask) error
	GetMoveTaskGroupTotalKeys(ctx context.Context, id string) (int64, error)
	UpdateMoveTaskGroupTotalKeys(ctx context.Context, id string, totalKeys int64) error
	DropMoveTaskGroup(ctx context.Context, id string) error
	AddMoveTaskGroupStopFlag(ctx context.Context, id string, immediate bool) error
	/* XXX: stop move task kind? */
	CheckMoveTaskGroupStopFlag(ctx context.Context, id string) (bool, bool, error)
	WriteTaskGroupStatus(ctx context.Context, id string, status *TaskGroupStatus) error
	GetTaskGroupStatus(ctx context.Context, id string) (*TaskGroupStatus, error)
	GetAllTaskGroupStatuses(ctx context.Context) (map[string]*TaskGroupStatus, error)

	// MOVE tasks
	ListMoveTasks(ctx context.Context) (map[string]*MoveTask, error)
	GetMoveTask(ctx context.Context, id string) (*MoveTask, error)
	WriteMoveTask(ctx context.Context, task *MoveTask) error
	UpdateMoveTask(ctx context.Context, task *MoveTask) error
	DropMoveTask(ctx context.Context, id string) error
	GetMoveTaskByGroup(ctx context.Context, taskGroupId string) (*MoveTask, error)

	// Redistribute tasks
	ListRedistributeTasks(ctx context.Context) ([]*RedistributeTask, error)
	GetRedistributeTask(ctx context.Context, id string) (*RedistributeTask, error)
	CreateRedistributeTask(ctx context.Context, task *RedistributeTask) error
	UpdateRedistributeTask(ctx context.Context, task *RedistributeTask) error
	DropRedistributeTask(ctx context.Context, task *RedistributeTask) error
	GetRedistributeTaskTaskGroupId(ctx context.Context, redistributeTaskId string) (string, error)
	GetKeyRangeRedistributeTaskId(ctx context.Context, keyRangeId string) (string, error)

	// Balancer interaction
	GetBalancerTask(ctx context.Context) (*BalancerTask, error)
	WriteBalancerTask(ctx context.Context, task *BalancerTask) error
	DropBalancerTask(ctx context.Context) error

	// Coordinator interaction
	UpdateCoordinator(ctx context.Context, address string) error
	GetCoordinator(ctx context.Context) (string, error)
	ListRouters(ctx context.Context) ([]*Router, error)

	// Sequences for reference relation
	CreateSequence(ctx context.Context, seqName string, initialValue int64) ([]QdbStatement, error)
	CheckSequence(ctx context.Context, seqName string) (bool, error)
	ListSequences(ctx context.Context) ([]string, error)
	AlterSequenceAttach(ctx context.Context, seqName string, relationFQN *rfqn.RelationFQN, colName string) error
	GetRelationSequence(ctx context.Context, relationFQN *rfqn.RelationFQN) (map[string]string, error)
	NextRange(ctx context.Context, seqName string, rangeSize uint64) (*SequenceIdRange, error)
	CurrVal(ctx context.Context, seqName string) (int64, error)
	DropSequence(ctx context.Context, seqName string, force bool) error
	GetSequenceRelations(ctx context.Context, seqName string) ([]*rfqn.RelationFQN, error)
	AlterSequenceDetachRelation(ctx context.Context, rel *rfqn.RelationFQN) error
}

QDB is a generic interface used by both the coordinator and the router. The router uses a memory-based version of this interface to cache routing schema state while the coordinator uses etcd-based implementation to synchronize distributed state.

type QdbStatement

type QdbStatement struct {
	CmdType int32
	Key     string
	Value   any
	// for case when qdb have more than one KV-storage
	Extension string
}

func NewQdbStatement

func NewQdbStatement(cmdType int32, key string, value any) (*QdbStatement, error)

func NewQdbStatementExt

func NewQdbStatementExt(cmdType int32, key string, value any, extension string) (*QdbStatement, error)

type QdbTransaction

type QdbTransaction struct {
	// contains filtered or unexported fields
}

func NewTransaction

func NewTransaction() (*QdbTransaction, error)

func NewTransactionWithCmd

func NewTransactionWithCmd(transactionId uuid.UUID, commands []QdbStatement) *QdbTransaction

func (*QdbTransaction) Append

func (t *QdbTransaction) Append(qdbCommands []QdbStatement) error

func (*QdbTransaction) Id

func (t *QdbTransaction) Id() uuid.UUID

func (*QdbTransaction) Validate

func (t *QdbTransaction) Validate() error

type RedistributeTask

type RedistributeTask struct {
	ID          string `json:"id"`
	TaskGroupId string `json:"task_group_id"`
	KeyRangeId  string `json:"kr_id"`
	ShardId     string `json:"shard_id"`
	BatchSize   int    `json:"batch_size"`
	TempKrId    string `json:"temp_kr_id"`
	State       int    `json:"state"`
}

type ReferenceRelation

type ReferenceRelation struct {
	TableName             string            `json:"table_name"`
	SchemaName            string            `json:"schema_name"`
	SchemaVersion         uint64            `json:"schema_version"`
	ColumnSequenceMapping map[string]string `json:"column_sequence_mapping"`
	ShardIDs              []string          `json:"shard_ids"`

	Version uint64    `json:"version,omitempty"`
	ACL     []ACLItem `json:"acl,omitempty"`
}

type Router

type Router struct {
	Address string      `json:"address"`
	ID      string      `json:"id"`
	State   RouterState `json:"state,omitempty"`
}

func NewRouter

func NewRouter(addr, id string, rst RouterState) *Router

func (Router) Addr

func (r Router) Addr() string

type RouterState

type RouterState string

type RoutingExpr

type RoutingExpr struct {
	ColRefs []TypedColRef `json:"column_refs_v1"`
}

type Sequence

type Sequence struct {
	RelName string `json:"rel_name"`
	ColName string `json:"col_name"`
}

type SequenceIdRange

type SequenceIdRange struct {
	Left  int64
	Right int64
}

func NewRangeBySize

func NewRangeBySize(currentRight int64, rangeSize uint64) (*SequenceIdRange, error)

func NewSequenceIdRange

func NewSequenceIdRange(left int64, right int64) (*SequenceIdRange, error)

type Shard

type Shard struct {
	ID       string          `json:"id"`
	RawHosts []string        `json:"hosts"` // format host:port:availability_zone
	Options  []GenericOption `json:"options,omitempty"`
}

func NewShard

func NewShard(id string, hosts []string, options []GenericOption) *Shard

type ShardKey

type ShardKey struct {
	Name string
	RW   bool
}

type ShardingSchemaKeeper

type ShardingSchemaKeeper interface {
	// RecordKeyRangeMove persists start of key range movement in distributed storage
	RecordKeyRangeMove(ctx context.Context, m *MoveKeyRange) error
	// ListKeyRangeMoves lists all key-range moves that are in progress
	ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error)
	// UpdateKeyRangeMoveStatus marks the key range move as complete
	UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error
	// DeleteKeyRangeMove removes information about key range moves
	DeleteKeyRangeMove(ctx context.Context, moveId string) error
}

type StateKeeperQDB

type StateKeeperQDB interface {
	XQDB
	DCStateKeeper
}

func GetStateKeeperQDB

func GetStateKeeperQDB() (StateKeeperQDB, error)

type TXManager

type TXManager interface {
	//batch execution
	ExecNoTransaction(ctx context.Context, operations []QdbStatement) error
	CommitTransaction(ctx context.Context, transaction *QdbTransaction) error
	BeginTransaction(ctx context.Context, transaction *QdbTransaction) error
}

type TaskGroupStatus

type TaskGroupStatus struct {
	State     string    `json:"state"`
	Message   string    `json:"msg"`
	UpdatedAt time.Time `json:"updated_at"`
}

type TaskStateKeeper

type TaskStateKeeper interface {
	TryTaskGroupLock(ctx context.Context, tgId string, holder string) error
	CheckTaskGroupLocked(ctx context.Context, tgId string) (bool, error)
	DropTaskGroupLock(ctx context.Context, tgId string) error
	LockRedistributeTask(ctx context.Context, id string, holder string) error
	DropRedistributeTaskLock(ctx context.Context, id string) error
}

type TopologyKeeper

type TopologyKeeper interface {
	// AddRouter adds a new router to the cluster
	AddRouter(ctx context.Context, r *Router) error
	// DeleteRouter removes the router from the cluster
	DeleteRouter(ctx context.Context, rID string) error
	// DeleteRouter removes all routers from the cluster
	DeleteRouterAll(ctx context.Context) error
	//ListRouters lists the routers of the cluster
	ListRouters(ctx context.Context) ([]*Router, error)
	// OpenRouter changes the state of the router to online, making it usable for query execution.
	OpenRouter(ctx context.Context, rID string) error
	// CloseRouter changes the state of the router to offline, making it unavailable for query execution.
	CloseRouter(ctx context.Context, rID string) error

	// Shards
	AddShard(ctx context.Context, shard *Shard) error
	ListShards(ctx context.Context) ([]*Shard, error)
	GetShard(ctx context.Context, shardID string) (*Shard, error)
	DropShard(ctx context.Context, shardID string) error
	AlterShard(ctx context.Context, newShard *Shard) error
}

type TransferXactKeeper

type TransferXactKeeper interface {
	RecordTransferTx(ctx context.Context, key string, info *DataTransferTransaction) error
	GetTransferTx(ctx context.Context, key string) (*DataTransferTransaction, error)
	RemoveTransferTx(ctx context.Context, key string) error
}

Keep track of the status of the two-phase data move transaction.

type TwoPCInfo

type TwoPCInfo struct {
	Gid       string   `json:"gid"`
	ShardsIDs []string `json:"shard_ids"`

	State TwoPhaseTxState `json:"state"`

	/* ephemeral part of state */
	UpdatedAt time.Time `json:"-"`
	Locked    bool      `json:"-"`
}

type TwoPhaseTxMetaKeeper

type TwoPhaseTxMetaKeeper interface {
	SetTxMetaStorage(context.Context, []string) error
	GetTxMetaStorage(context.Context) ([]string, error)
}

type TwoPhaseTxState

type TwoPhaseTxState string
const (
	TwoPhaseInitState  TwoPhaseTxState = "TxInitState"
	TwoPhaseP1         TwoPhaseTxState = "PrepareDone"
	TwoPhaseP2         TwoPhaseTxState = "Done"
	TwoPhaseP2Rejected TwoPhaseTxState = "DoneRejected"
)
XXX: note that this is data-plane two phase transaction state,

* not control-plane transfer task state

func TwoPhaseTXStateFromString

func TwoPhaseTXStateFromString(status string) (TwoPhaseTxState, error)

type TxStatus

type TxStatus string

type TypedColRef

type TypedColRef struct {
	ColName string `json:"column_name"`
	ColType string `json:"column_type"`
}

type UniqueIndex

type UniqueIndex struct {
	ID             string            `json:"id"`
	Relation       *rfqn.RelationFQN `json:"relation"`
	ColumnNames    []string          `json:"column"`
	ColTypes       []string          `json:"column_type"`
	DistributionId string            `json:"distribution_id"`

	Version uint64    `json:"version,omitempty"`
	ACL     []ACLItem `json:"acl,omitempty"`
}

type UpdateCommand

type UpdateCommand[T any] struct {
	// contains filtered or unexported fields
}

func NewUpdateCommand

func NewUpdateCommand[T any](m map[string]T, key string, value T) *UpdateCommand[T]

func (*UpdateCommand[T]) Do

func (c *UpdateCommand[T]) Do() error

func (*UpdateCommand[T]) Undo

func (c *UpdateCommand[T]) Undo() error

type XDCStateKeeper

type XDCStateKeeper interface {
	TopologyKeeper
	DCStateKeeper
	TwoPhaseTxMetaKeeper
}

func NewDataPlaneTwoPhaseStateKeeper

func NewDataPlaneTwoPhaseStateKeeper(qdbType string) (XDCStateKeeper, error)

type XQDB

type XQDB interface {
	// routing schema
	QDB
	// router topology
	TopologyKeeper
	// data move state
	ShardingSchemaKeeper
	TransferXactKeeper
	TXManager
	TaskStateKeeper
	TwoPhaseTxMetaKeeper

	TryCoordinatorLock(ctx context.Context, addr string) error
}

XQDB means extended QDB The coordinator should use an etcd-based implementation to keep the distributed state in sync.

func NewXQDB

func NewXQDB(qdbType string) (XQDB, error)

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL