Back to

Package restore

v0.0.0 (181c081)
Latest Go to latest
Published: 1 day ago | License: Apache-2.0 | Module:


Package Files


const (
	SplitRetryTimes       = 32
	SplitRetryInterval    = 50 * time.Millisecond
	SplitMaxRetryInterval = time.Second

	SplitCheckMaxRetryTimes = 64
	SplitCheckInterval      = 8 * time.Millisecond
	SplitMaxCheckInterval   = time.Second

	ScatterWaitMaxRetryTimes = 64
	ScatterWaitInterval      = 50 * time.Millisecond
	ScatterMaxWaitInterval   = time.Second
	ScatterWaitUpperInterval = 180 * time.Second

	RejectStoreCheckRetryTimes  = 64
	RejectStoreCheckInterval    = 100 * time.Millisecond
	RejectStoreMaxCheckInterval = 2 * time.Second

Constants for split retry machinery.


var (
	// ErrEpochNotMatch is the error raised when ingestion failed with "epoch
	// not match". This error is retryable.
	ErrEpochNotMatch = errors.NewNoStackError("epoch not match")
	// ErrKeyNotInRegion is the error raised when ingestion failed with "key not
	// in region". This error cannot be retried.
	ErrKeyNotInRegion = errors.NewNoStackError("key not in region")
	// ErrRewriteRuleNotFound is the error raised when download failed with
	// "rewrite rule not found". This error cannot be retried
	ErrRewriteRuleNotFound = errors.NewNoStackError("rewrite rule not found")
	// ErrRangeIsEmpty is the error raised when download failed with "range is
	// empty". This error cannot be retried.
	ErrRangeIsEmpty = errors.NewNoStackError("range is empty")
	// ErrDownloadFailed indicates a generic download error, expected to be
	// retryable.
	ErrDownloadFailed = errors.NewNoStackError("download sst failed")
	// ErrIngestFailed indicates a generic, retryable ingest error.
	ErrIngestFailed = errors.NewNoStackError("ingest sst failed")

func AttachFilesToRanges

func AttachFilesToRanges(
	files []*backup.File,
	ranges []rtree.Range,
) []rtree.Range

AttachFilesToRanges attach files to ranges. Panic if range is overlapped or no range for files.

func EstimateRangeSize

func EstimateRangeSize(files []*backup.File) int

EstimateRangeSize estimates the total range count by file.

func Exhaust

func Exhaust(ec <-chan error) []error

Exhaust drains all remaining errors in the channel, into a slice of errors.

func FilterDDLJobs

func FilterDDLJobs(allDDLJobs []*model.Job, tables []*utils.Table) (ddlJobs []*model.Job)

FilterDDLJobs filters ddl jobs.

func GetSSTMetaFromFile

func GetSSTMetaFromFile(
	id []byte,
	file *backup.File,
	region *metapb.Region,
	regionRule *import_sstpb.RewriteRule,
) import_sstpb.SSTMeta

GetSSTMetaFromFile compares the keys in file, region and rewrite rules, then returns a sst conn. The range of the returned sst meta is [regionRule.NewKeyPrefix, append(regionRule.NewKeyPrefix, 0xff)].

func GoValidateFileRanges

func GoValidateFileRanges(
	ctx context.Context,
	tableStream <-chan CreatedTable,
	fileOfTable map[int64][]*backup.File,
	errCh chan<- error,
) <-chan TableWithRange

GoValidateFileRanges validate files by a stream of tables and yields tables with range.

func MapTableToFiles

func MapTableToFiles(files []*backup.File) map[int64][]*backup.File

MapTableToFiles makes a map that mapping table ID to its backup files. aware that one file can and only can hold one table.

func NewBackoffer

func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) utils.Backoffer

NewBackoffer creates a new controller regulating a truncated exponential backoff.

func SortRanges

func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range, error)

SortRanges checks if the range overlapped and sort them.

func SplitRanges

func SplitRanges(
	ctx context.Context,
	client *Client,
	ranges []rtree.Range,
	rewriteRules *RewriteRules,
	updateCh glue.Progress,
) error

SplitRanges splits region by 1. data range after rewrite. 2. rewrite rules.

func ValidateFileRanges

func ValidateFileRanges(
	files []*backup.File,
	rewriteRules *RewriteRules,
) ([]rtree.Range, error)

ValidateFileRanges checks and returns the ranges of the files.

func ValidateFileRewriteRule

func ValidateFileRewriteRule(file *backup.File, rewriteRules *RewriteRules) error

ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file.

func ZapRanges

func ZapRanges(ranges []rtree.Range) []zapcore.Field

ZapRanges make zap fields for debuging, which contains kv, size and count of ranges.

func ZapTables

func ZapTables(tables []CreatedTable) zapcore.Field

ZapTables make zap field of table for debuging, including table names.

type BatchSender

type BatchSender interface {
	// RestoreBatch will send the restore request.
	RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error

BatchSender is the abstract of how the batcher send a batch.

func NewTiKVSender

func NewTiKVSender(
	ctx context.Context,
	cli *Client,
	updateCh glue.Progress,

	removeTiFlash bool,
) (BatchSender, error)

NewTiKVSender make a sender that send restore requests to TiKV.

type Batcher

type Batcher struct {
	// contains filtered or unexported fields

Batcher collects ranges to restore and send batching split/ingest request.

func NewBatcher

func NewBatcher(
	ctx context.Context,
	sender BatchSender,
	manager ContextManager,
	errCh chan<- error,
) (*Batcher, <-chan CreatedTable)

NewBatcher creates a new batcher by a sender and a context manager. the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). this batcher will work background, send batches per second, or batch size reaches limit. and it will emit full-restored tables to the output channel returned.

func (*Batcher) Add

func (b *Batcher) Add(tbs TableWithRange)

Add adds a task to the Batcher.

func (*Batcher) Close

func (b *Batcher) Close()

Close closes the batcher, sending all pending requests, close updateCh.

func (*Batcher) DisableAutoCommit

func (b *Batcher) DisableAutoCommit()

DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, and then disable auto commit.

func (*Batcher) EnableAutoCommit

func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration)

EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough. we make this function for disable AutoCommit in some case.

func (*Batcher) Len

func (b *Batcher) Len() int

Len calculate the current size of this batcher.

func (*Batcher) Send

func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error)

Send sends all pending requests in the batcher. returns tables sent FULLY in the current batch.

func (*Batcher) SetThreshold

func (b *Batcher) SetThreshold(newThreshold int)

SetThreshold sets the threshold that how big the batch size reaching need to send batch. note this function isn't goroutine safe yet, just set threshold before anything starts(e.g. EnableAutoCommit), please.

type Client

type Client struct {
	// contains filtered or unexported fields

Client sends requests to restore files.

func NewRestoreClient

func NewRestoreClient(
	ctx context.Context,
	g glue.Glue,
	pdClient pd.Client,
	store kv.Storage,
	tlsConf *tls.Config,
) (*Client, error)

NewRestoreClient returns a new RestoreClient.

func (*Client) Close

func (rc *Client) Close()

Close a client.

func (*Client) CreateDatabase

func (rc *Client) CreateDatabase(db *model.DBInfo) error

CreateDatabase creates a database.

func (*Client) CreateTables

func (rc *Client) CreateTables(
	dom *domain.Domain,
	tables []*utils.Table,
	newTS uint64,
) (*RewriteRules, []*model.TableInfo, error)

CreateTables creates multiple tables, and returns their rewrite rules.

func (*Client) EnableOnline

func (rc *Client) EnableOnline()

EnableOnline sets the mode of restore to online.

func (*Client) EnableSkipCreateSQL

func (rc *Client) EnableSkipCreateSQL()

EnableSkipCreateSQL sets switch of skip create schema and tables.

func (*Client) ExecDDLs

func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error

ExecDDLs executes the queries of the ddl jobs.

func (*Client) GetDDLJobs

func (rc *Client) GetDDLJobs() []*model.Job

GetDDLJobs returns ddl jobs.

func (*Client) GetDatabase

func (rc *Client) GetDatabase(name string) *utils.Database

GetDatabase returns a database by name.

func (*Client) GetDatabases

func (rc *Client) GetDatabases() []*utils.Database

GetDatabases returns all databases.

func (*Client) GetFilesInRawRange

func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backup.File, error)

GetFilesInRawRange gets all files that are in the given range or intersects with the given range.

func (*Client) GetPDClient

func (rc *Client) GetPDClient() pd.Client

GetPDClient returns a pd client.

func (*Client) GetPlacementRules

func (rc *Client) GetPlacementRules(pdAddrs []string) ([]placement.Rule, error)

GetPlacementRules return the current placement rules.

func (*Client) GetTLSConfig

func (rc *Client) GetTLSConfig() *tls.Config

GetTLSConfig returns the tls config.

func (*Client) GetTS

func (rc *Client) GetTS(ctx context.Context) (uint64, error)

GetTS gets a new timestamp from PD.

func (*Client) GetTableSchema

func (rc *Client) GetTableSchema(
	dom *domain.Domain,
	dbName model.CIStr,
	tableName model.CIStr,
) (*model.TableInfo, error)

GetTableSchema returns the schema of a table from TiDB.

func (*Client) GoCreateTables

func (rc *Client) GoCreateTables(
	ctx context.Context,
	dom *domain.Domain,
	tables []*utils.Table,
	newTS uint64,
	dbPool []*DB,
	errCh chan<- error,
) <-chan CreatedTable

GoCreateTables create tables, and generate their information. this function will use workers as the same number of sessionPool, leave sessionPool nil to send DDLs sequential.

func (*Client) GoValidateChecksum

func (rc *Client) GoValidateChecksum(
	ctx context.Context,
	tableStream <-chan CreatedTable,
	kvClient kv.Client,
	errCh chan<- error,
	updateCh glue.Progress,
) <-chan struct{}

GoValidateChecksum forks a goroutine to validate checksum after restore. it returns a channel fires a struct{} when all things get done.

func (*Client) InitBackupMeta

func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.StorageBackend) error

InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.

func (*Client) IsIncremental

func (rc *Client) IsIncremental() bool

IsIncremental returns whether this backup is incremental.

func (*Client) IsOnline

func (rc *Client) IsOnline() bool

IsOnline tells if it's a online restore.

func (*Client) IsRawKvMode

func (rc *Client) IsRawKvMode() bool

IsRawKvMode checks whether the backup data is in raw kv format, in which case transactional recover is forbidden.

func (*Client) IsSkipCreateSQL

func (rc *Client) IsSkipCreateSQL() bool

IsSkipCreateSQL returns whether we need skip create schema and tables in restore.

func (*Client) LoadRestoreStores

func (rc *Client) LoadRestoreStores(ctx context.Context) error

LoadRestoreStores loads the stores used to restore data.

func (*Client) RecoverTiFlashOfTable

func (rc *Client) RecoverTiFlashOfTable(table *utils.Table) error

RecoverTiFlashOfTable recovers TiFlash replica of some table. TODO: remove this after tiflash supports restore.

func (*Client) RecoverTiFlashReplica

func (rc *Client) RecoverTiFlashReplica(tables []*utils.Table) error

RecoverTiFlashReplica recovers all the tiflash replicas of a table TODO: remove this after tiflash supports restore.

func (*Client) RemoveTiFlashOfTable

func (rc *Client) RemoveTiFlashOfTable(table CreatedTable, rule []placement.Rule) (int, error)

RemoveTiFlashOfTable removes TiFlash replica of some table, returns the removed count of TiFlash nodes. TODO: save the removed TiFlash information into disk. TODO: remove this after tiflash supports restore.

func (*Client) ResetPlacementRules

func (rc *Client) ResetPlacementRules(ctx context.Context, tables []*model.TableInfo) error

ResetPlacementRules removes placement rules for tables.

func (*Client) ResetRestoreLabels

func (rc *Client) ResetRestoreLabels(ctx context.Context) error

ResetRestoreLabels removes the exclusive labels of the restore stores.

func (*Client) ResetTS

func (rc *Client) ResetTS(pdAddrs []string) error

ResetTS resets the timestamp of PD to a bigger value.

func (*Client) RestoreFiles

func (rc *Client) RestoreFiles(
	files []*backup.File,
	rewriteRules *RewriteRules,
	rejectStoreMap map[uint64]bool,
	updateCh glue.Progress,
) (err error)

RestoreFiles tries to restore the files.

func (*Client) RestoreRaw

func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress) error

RestoreRaw tries to restore raw keys in the specified range.

func (*Client) SetConcurrency

func (rc *Client) SetConcurrency(c uint)

SetConcurrency sets the concurrency of dbs tables files.

func (*Client) SetRateLimit

func (rc *Client) SetRateLimit(rateLimit uint64)

SetRateLimit to set rateLimit.

func (*Client) SetStorage

func (rc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) error

SetStorage set ExternalStorage for client.

func (*Client) SetSwitchModeInterval

func (rc *Client) SetSwitchModeInterval(interval time.Duration)

SetSwitchModeInterval set switch mode interval for client.

func (*Client) SetupPlacementRules

func (rc *Client) SetupPlacementRules(ctx context.Context, tables []*model.TableInfo) error

SetupPlacementRules sets rules for the tables' regions.

func (*Client) SwitchToImportMode

func (rc *Client) SwitchToImportMode(ctx context.Context)

SwitchToImportMode switch tikv cluster to import mode.

func (*Client) SwitchToNormalMode

func (rc *Client) SwitchToNormalMode(ctx context.Context) error

SwitchToNormalMode switch tikv cluster to normal mode.

func (*Client) WaitPlacementSchedule

func (rc *Client) WaitPlacementSchedule(ctx context.Context, tables []*model.TableInfo) error

WaitPlacementSchedule waits PD to move tables to restore stores.

type ContextManager

type ContextManager interface {
	// Enter make some tables 'enter' this context(a.k.a., prepare for restore).
	Enter(ctx context.Context, tables []CreatedTable) error
	// Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works).
	Leave(ctx context.Context, tables []CreatedTable) error

ContextManager is the struct to manage a TiKV 'context' for restore. Batcher will call Enter when any table should be restore on batch, so you can do some prepare work here(e.g. set placement rules for online restore).

func NewBRContextManager

func NewBRContextManager(client *Client) ContextManager

NewBRContextManager makes a BR context manager, that is, set placement rules for online restore when enter(see <splitPrepareWork>), unset them when leave.

type CreatedTable

type CreatedTable struct {
	RewriteRule *RewriteRules
	Table       *model.TableInfo
	OldTable    *utils.Table

CreatedTable is a table created on restore process, but not yet filled with data.

type DB

type DB struct {
	// contains filtered or unexported fields

DB is a TiDB instance, not thread-safe.

func MakeDBPool

func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error)

MakeDBPool makes a session pool with specficated size by sessionFactory.

func NewDB

func NewDB(g glue.Glue, store kv.Storage) (*DB, error)

NewDB returns a new DB.

func (*DB) AlterTiflashReplica

func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count int) error

AlterTiflashReplica alters the replica count of tiflash.

func (*DB) Close

func (db *DB) Close()

Close closes the connection.

func (*DB) CreateDatabase

func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error

CreateDatabase executes a CREATE DATABASE SQL.

func (*DB) CreateTable

func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error

CreateTable executes a CREATE TABLE SQL.

func (*DB) ExecDDL

func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error

ExecDDL executes the query of a ddl job.

type FileImporter

type FileImporter struct {
	// contains filtered or unexported fields

FileImporter used to import a file to TiKV.

func NewFileImporter

func NewFileImporter(
	metaClient SplitClient,
	importClient ImporterClient,
	backend *backup.StorageBackend,
	isRawKvMode bool,
	rateLimit uint64,
) FileImporter

NewFileImporter returns a new file importClient.

func (*FileImporter) Import

func (importer *FileImporter) Import(
	ctx context.Context,
	file *backup.File,
	rejectStoreMap map[uint64]bool,
	rewriteRules *RewriteRules,
) error

Import tries to import a file. All rules must contain encoded keys.

func (*FileImporter) SetRawRange

func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error

SetRawRange sets the range to be restored in raw kv mode.

type ImporterClient

type ImporterClient interface {
		ctx context.Context,
		storeID uint64,
		req *import_sstpb.DownloadRequest,
	) (*import_sstpb.DownloadResponse, error)

		ctx context.Context,
		storeID uint64,
		req *import_sstpb.IngestRequest,
	) (*import_sstpb.IngestResponse, error)

		ctx context.Context,
		storeID uint64,
		req *import_sstpb.SetDownloadSpeedLimitRequest,
	) (*import_sstpb.SetDownloadSpeedLimitResponse, error)

ImporterClient is used to import a file to TiKV.

func NewImportClient

func NewImportClient(metaClient SplitClient, tlsConf *tls.Config) ImporterClient

NewImportClient returns a new ImporterClient.

type OnSplitFunc

type OnSplitFunc func(key [][]byte)

OnSplitFunc is called before split a range.

type RegionInfo

type RegionInfo struct {
	Region *metapb.Region
	Leader *metapb.Peer

RegionInfo includes a region and the leader of the region.

func NeedSplit

func NeedSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo

NeedSplit checks whether a key is necessary to split, if true returns the split region.

func PaginateScanRegion

func PaginateScanRegion(
	ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
) ([]*RegionInfo, error)

PaginateScanRegion scan regions with a limit pagination and return all regions at once. It reduces max gRPC message size.

func (*RegionInfo) ContainsInterior

func (region *RegionInfo) ContainsInterior(key []byte) bool

ContainsInterior returns whether the region contains the given key, and also that the key does not fall on the boundary (start key) of the region.

type RegionSplitter

type RegionSplitter struct {
	// contains filtered or unexported fields

RegionSplitter is a executor of region split by rules.

func NewRegionSplitter

func NewRegionSplitter(client SplitClient) *RegionSplitter

NewRegionSplitter returns a new RegionSplitter.

func (*RegionSplitter) Split

func (rs *RegionSplitter) Split(
	ctx context.Context,
	ranges []rtree.Range,
	rewriteRules *RewriteRules,
	onSplit OnSplitFunc,
) error

Split executes a region split. It will split regions by the rewrite rules, then it will split regions by the end key of each range. tableRules includes the prefix of a table, since some ranges may have a prefix with record sequence or index sequence. note: all ranges and rewrite rules must have raw key.

type RewriteRules

type RewriteRules struct {
	Table []*import_sstpb.RewriteRule
	Data  []*import_sstpb.RewriteRule

RewriteRules contains rules for rewriting keys of tables.

func EmptyRewriteRule

func EmptyRewriteRule() *RewriteRules

EmptyRewriteRule make a new, empty rewrite rule.

func GetRewriteRules

func GetRewriteRules(
	newTable *model.TableInfo,
	oldTable *model.TableInfo,
	newTimeStamp uint64,
) *RewriteRules

GetRewriteRules returns the rewrite rule of the new table and the old table.

func (*RewriteRules) Append

func (r *RewriteRules) Append(other RewriteRules)

Append append its argument to this rewrite rules.

type SendType

type SendType int

SendType is the 'type' of a send. when we make a 'send' command to worker, we may want to flush all pending ranges (when auto commit enabled), or, we just want to clean overflowing ranges(when just adding a table to batcher).

const (
	// SendUntilLessThanBatch will make the batcher send batch until
	// its remaining range is less than its batchSizeThreshold.
	SendUntilLessThanBatch SendType = iota
	// SendAll will make the batcher send all pending ranges.
	// SendAllThenClose will make the batcher send all pending ranges and then close itself.

type SplitClient

type SplitClient interface {
	// GetStore gets a store by a store id.
	GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error)
	// GetRegion gets a region which includes a specified key.
	GetRegion(ctx context.Context, key []byte) (*RegionInfo, error)
	// GetRegionByID gets a region by a region id.
	GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error)
	// SplitRegion splits a region from a key, if key is not included in the region, it will return nil.
	// note: the key should not be encoded
	SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error)
	// BatchSplitRegions splits a region from a batch of keys.
	// note: the keys should not be encoded
	BatchSplitRegions(ctx context.Context, regionInfo *RegionInfo, keys [][]byte) ([]*RegionInfo, error)
	// ScatterRegion scatters a specified region.
	ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error
	// GetOperator gets the status of operator of the specified region.
	GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
	// ScanRegion gets a list of regions, starts from the region that contains key.
	// Limit limits the maximum number of regions returned.
	ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error)
	// GetPlacementRule loads a placement rule from PD.
	GetPlacementRule(ctx context.Context, groupID, ruleID string) (placement.Rule, error)
	// SetPlacementRule insert or update a placement rule to PD.
	SetPlacementRule(ctx context.Context, rule placement.Rule) error
	// DeletePlacementRule removes a placement rule from PD.
	DeletePlacementRule(ctx context.Context, groupID, ruleID string) error
	// SetStoreLabel add or update specified label of stores. If labelValue
	// is empty, it clears the label.
	SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error

SplitClient is an external client used by RegionSplitter.

func NewSplitClient

func NewSplitClient(client pd.Client, tlsConf *tls.Config) SplitClient

NewSplitClient returns a client used by RegionSplitter.

type TableWithRange

type TableWithRange struct {

	Range []rtree.Range

TableWithRange is a CreatedTable that has been bind to some of key ranges.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier