Documentation
¶
Index ¶
- Constants
- Variables
- func BuildAddIndexSQL(tableName string, curTblInfo, desiredTblInfo *model.TableInfo) (singleSQL string, multiSQLs []string)
- func BuildDropIndexSQL(dbName, tableName string, idxInfo *model.IndexInfo) string
- func ConnectMySQL(cfg *mysql.Config) (*sql.DB, error)
- func EncodeIntRowID(rowID int64) []byte
- func EscapeIdentifier(identifier string) string
- func FprintfWithIdentifiers(w io.Writer, format string, identifiers ...string) (int, error)
- func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo
- func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error)
- func GetDropIndexInfos(tblInfo *model.TableInfo) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo)
- func GetExplicitRequestSourceTypeFromDB(ctx context.Context, db *sql.DB) (string, error)
- func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error)
- func GetJSON(ctx context.Context, client *http.Client, url string, v any) error
- func GetMaxAutoIDBase(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (int64, error)
- func GetMockTLSUrl(tls *TLS) string
- func GetPDEnableFollowerHandleRegion(ctx context.Context, db *sql.DB) (bool, error)
- func InterpolateMySQLString(s string) string
- func IsAccessDeniedNeedConfigPrivilegeError(err error) bool
- func IsContextCanceledError(err error) bool
- func IsDirExists(name string) bool
- func IsDupKeyError(err error) bool
- func IsEmptyDir(name string) bool
- func IsFunctionNotExistErr(err error, functionName string) bool
- func IsRaftKV2(ctx context.Context, db *sql.DB) (bool, error)
- func IsRetryableError(err error) bool
- func KillMySelf() error
- func NormalizeError(err error) error
- func NormalizeOrWrapErr(rfcErr *errors.Error, err error, args ...any) error
- func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, ...) error
- func Retry(purpose string, parentLogger log.Logger, action func() error) error
- func SameDisk(dir1 string, dir2 string) (bool, error)
- func SchemaExists(ctx context.Context, db dbutil.QueryExecutor, schema string) (bool, error)
- func SkipReadRowCount(tblInfo *model.TableInfo) bool
- func SprintfWithIdentifiers(format string, identifiers ...string) string
- func TableExists(ctx context.Context, db dbutil.QueryExecutor, schema, table string) (bool, error)
- func TableHasAutoID(info *model.TableInfo) bool
- func TableHasAutoRowID(info *model.TableInfo) bool
- func ToGRPCDialOption(tls *tls.Config) grpc.DialOption
- func UniqueTable(schema string, table string) string
- func WriteMySQLIdentifier(builder *strings.Builder, identifier string)
- type ChunkFlushStatus
- type ConnPool
- type DupDetectKeyAdapter
- type DupDetectOpt
- type DupDetector
- type GRPCConns
- type KVIter
- type KeyAdapter
- type KvPair
- type MySQLConnectParam
- type NoopKeyAdapter
- type OnceError
- type Pauser
- type SQLWithRetry
- func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, args ...any) error
- func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...any) error
- func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)
- func (t SQLWithRetry) Transact(ctx context.Context, purpose string, ...) error
- type StorageSize
- type TLS
- func (tc *TLS) GetJSON(ctx context.Context, path string, v any) error
- func (tc *TLS) TLSConfig() *tls.Config
- func (tc *TLS) ToGRPCDialOption() grpc.DialOption
- func (tc *TLS) ToPDSecurityOption() pd.SecurityOption
- func (tc *TLS) ToTiKVSecurityConfig() config.Security
- func (tc *TLS) WithHost(host string) *TLS
- func (tc *TLS) WrapListener(l net.Listener) net.Listener
Constants ¶
const (
// IndexEngineID is the engine ID for index engine.
IndexEngineID = -1
)
Variables ¶
var ( ErrUnknown = errors.Normalize("unknown error", errors.RFCCodeText("Lightning:Common:ErrUnknown")) ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("Lightning:Common:ErrInvalidArgument")) ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("Lightning:Common:ErrVersionMismatch")) ErrReadConfigFile = errors.Normalize("cannot read config file '%s'", errors.RFCCodeText("Lightning:Config:ErrReadConfigFile")) ErrParseConfigFile = errors.Normalize("cannot parse config file '%s'", errors.RFCCodeText("Lightning:Config:ErrParseConfigFile")) ErrInvalidConfig = errors.Normalize("invalid config", errors.RFCCodeText("Lightning:Config:ErrInvalidConfig")) ErrInvalidTLSConfig = errors.Normalize("invalid tls config", errors.RFCCodeText("Lightning:Config:ErrInvalidTLSConfig")) ErrInvalidSortedKVDir = errors.Normalize("invalid sorted-kv-dir '%s' for local backend, please change the config or delete the path", errors.RFCCodeText("Lightning:Config:ErrInvalidSortedKVDir")) ErrStorageUnknown = errors.Normalize("unknown storage error", errors.RFCCodeText("Lightning:Storage:ErrStorageUnknown")) ErrInvalidPermission = errors.Normalize("invalid permission", errors.RFCCodeText("Lightning:Storage:ErrInvalidPermission")) ErrInvalidStorageConfig = errors.Normalize("invalid data-source-dir", errors.RFCCodeText("Lightning:Storage:ErrInvalidStorageConfig")) ErrEmptySourceDir = errors.Normalize("data-source-dir '%s' doesn't exist or contains no files", errors.RFCCodeText("Lightning:Storage:ErrEmptySourceDir")) ErrTableRoute = errors.Normalize("table route error", errors.RFCCodeText("Lightning:Loader:ErrTableRoute")) ErrInvalidSchemaFile = errors.Normalize("invalid schema file", errors.RFCCodeText("Lightning:Loader:ErrInvalidSchemaFile")) ErrTooManySourceFiles = errors.Normalize("too many source files", errors.RFCCodeText("Lightning:Loader:ErrTooManySourceFiles")) ErrSystemRequirementNotMet = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet")) ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict")) ErrPreCheckFailed = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed")) ErrCheckClusterRegion = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion")) ErrCheckLocalResource = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource")) ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty")) ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader")) ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource")) ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR")) ErrCheckPDTiDBFromSameCluster = errors.Normalize("check PD and TiDB in the same cluster error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckPDTiDBSameCluster")) ErrOpenCheckpoint = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint")) ErrReadCheckpoint = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint")) ErrUpdateCheckpoint = errors.Normalize("update checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrUpdateCheckpoint")) ErrUnknownCheckpointDriver = errors.Normalize("unknown checkpoint driver '%s'", errors.RFCCodeText("Lightning:Checkpoint:ErrUnknownCheckpointDriver")) ErrInvalidCheckpoint = errors.Normalize("invalid checkpoint", errors.RFCCodeText("Lightning:Checkpoint:ErrInvalidCheckpoint")) ErrCheckpointNotFound = errors.Normalize("checkpoint not found", errors.RFCCodeText("Lightning:Checkpoint:ErrCheckpointNotFound")) ErrInitCheckpoint = errors.Normalize("init checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrInitCheckpoint")) ErrCleanCheckpoint = errors.Normalize("clean checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrCleanCheckpoint")) ErrMetaMgrUnknown = errors.Normalize("unknown error occur on meta manager", errors.RFCCodeText("Lightning:MetaMgr:ErrMetaMgrUnknown")) ErrDBConnect = errors.Normalize("failed to connect database", errors.RFCCodeText("Lightning:DB:ErrDBConnect")) ErrInitErrManager = errors.Normalize("init error manager error", errors.RFCCodeText("Lightning:DB:ErrInitErrManager")) ErrInitMetaManager = errors.Normalize("init meta manager error", errors.RFCCodeText("Lightning:DB:ErrInitMetaManager")) ErrUpdatePD = errors.Normalize("update pd error", errors.RFCCodeText("Lightning:PD:ErrUpdatePD")) ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient")) ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC")) ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion")) ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient")) ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest")) ErrNoLeader = errors.Normalize("write to tikv with no leader returned, region '%d', leader: %d", errors.RFCCodeText("Lightning:KV:ErrNoLeader")) ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend")) ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile")) ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB")) ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists")) ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt")) ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema")) ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns")) ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch")) ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable")) ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV")) ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs")) ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus")) ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming")) ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows")) // ErrFoundDuplicateKeys shoud be replaced with ErrFoundDataConflictRecords and ErrFoundIndexConflictRecords (TODO) ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey")) ErrAddIndexFailed = errors.Normalize("add index on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrAddIndexFailed")) ErrDropIndexFailed = errors.Normalize("drop index %s on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrDropIndexFailed")) ErrFoundDataConflictRecords = errors.Normalize("found data conflict records in table %s, primary key is '%s', row data is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDataConflictRecords")) ErrFoundIndexConflictRecords = errors.Normalize("found index conflict records in table %s, index name is '%s', unique key is '%s', primary key is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundIndexConflictRecords")) )
error definitions
var DefaultImportVariablesTiDB = map[string]string{
"tidb_row_format_version": "1",
}
DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system variables from downstream in local/importer backend. The values record the default values if missing.
var DefaultImportantVariables = map[string]string{
"max_allowed_packet": "67108864",
"div_precision_increment": "4",
"time_zone": "SYSTEM",
"lc_time_names": "en_US",
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
"tidb_backoff_weight": "6",
}
DefaultImportantVariables is used in ObtainImportantVariables to retrieve the system variables from downstream which may affect KV encode result. The values record the default values if missing.
var ErrWriteTooSlow = errors.New("write too slow, maybe gRPC is blocked forever")
ErrWriteTooSlow is used to get rid of the gRPC blocking issue. there are some strange blocking issues of gRPC like https://github.com/pingcap/tidb/issues/48352 https://github.com/pingcap/tidb/issues/46321 and I don't know why 😭
var ( // MinRowID is the minimum rowID value after DupDetectKeyAdapter.Encode(). MinRowID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0} )
Functions ¶
func BuildAddIndexSQL ¶
func BuildAddIndexSQL( tableName string, curTblInfo, desiredTblInfo *model.TableInfo, ) (singleSQL string, multiSQLs []string)
BuildAddIndexSQL builds the SQL statement to create missing indexes. It returns both a single SQL statement that creates all indexes at once, and a list of SQL statements that creates each index individually.
func BuildDropIndexSQL ¶
BuildDropIndexSQL builds the SQL statement to drop index.
func ConnectMySQL ¶
ConnectMySQL connects MySQL with the dsn. If access is denied and the password is a valid base64 encoding, we will try to connect MySQL with the base64 decoding of the password.
func EncodeIntRowID ¶
EncodeIntRowID encodes an int64 row id.
func EscapeIdentifier ¶
EscapeIdentifier quote and escape an sql identifier
func FprintfWithIdentifiers ¶
FprintfWithIdentifiers escapes the identifiers and fprintf them. The input identifiers must not be escaped.
func GetAutoRandomColumn ¶
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo
GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it. todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetBackoffWeightFromDB ¶
GetBackoffWeightFromDB gets the backoff weight from database.
func GetDropIndexInfos ¶
func GetDropIndexInfos( tblInfo *model.TableInfo, ) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo)
GetDropIndexInfos returns the index infos that need to be dropped and the remain indexes.
func GetExplicitRequestSourceTypeFromDB ¶
GetExplicitRequestSourceTypeFromDB gets the explicit request source type from database.
func GetGlobalAutoIDAlloc ¶
func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error)
GetGlobalAutoIDAlloc returns the autoID allocators for a table. export it for testing.
func GetJSON ¶
GetJSON fetches a page and parses it as JSON. The parsed result will be stored into the `v`. The variable `v` must be a pointer to a type that can be unmarshalled from JSON.
Example:
client := &http.Client{} var resp struct { IP string } if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil { return errors.Trace(err) } fmt.Println(resp.IP)
func GetMaxAutoIDBase ¶
GetMaxAutoIDBase returns the max auto ID base for a table.
func GetMockTLSUrl ¶
GetMockTLSUrl returns tls's host for mock test
func GetPDEnableFollowerHandleRegion ¶
GetPDEnableFollowerHandleRegion gets the pd_enable_follower_handle_region from database.
func InterpolateMySQLString ¶
InterpolateMySQLString interpolates a string into a MySQL string literal.
func IsAccessDeniedNeedConfigPrivilegeError ¶
IsAccessDeniedNeedConfigPrivilegeError checks if err is generated from a query to TiDB which failed due to missing CONFIG privilege.
func IsContextCanceledError ¶
IsContextCanceledError returns whether the error is caused by context cancellation. This function should only be used when the code logic is affected by whether the error is canceling or not.
This function returns `false` (not a context-canceled error) if `err == nil`.
func IsDupKeyError ¶
IsDupKeyError checks if err is a duplicate index error.
func IsFunctionNotExistErr ¶
IsFunctionNotExistErr checks if err is a function not exist error.
func IsRetryableError ¶
IsRetryableError returns whether the error is transient (e.g. network connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This function returns `false` (irrecoverable) if `err == nil`.
If the error is a multierr, returns true only if all suberrors are retryable.
func KillMySelf ¶
func KillMySelf() error
KillMySelf sends sigint to current process, used in integration test only
Only works on Unix. Signaling on Windows is not supported.
func NormalizeError ¶
NormalizeError converts an arbitrary error to *errors.Error based above predefined errors. If the underlying err is already an *error.Error which is prefixed by "Lightning:", leave error ID unchanged. Otherwise, converts the error ID to Lightning's predefined error IDs.
func NormalizeOrWrapErr ¶
NormalizeOrWrapErr tries to normalize err. If the returned error is ErrUnknown, wraps it with the given rfcErr.
func RebaseTableAllocators ¶
func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) error
RebaseTableAllocators rebase the allocators of a table. This function only rebase a table allocator when its new base is given in `bases` param, else it will be skipped. base is the max id that have been used by the table, the next usable id will be base + 1, see Allocator.Alloc.
func Retry ¶
Retry is shared by SQLWithRetry.perform, implementation of GlueCheckpointsDB and TiDB's glue implementation
func SchemaExists ¶
SchemaExists return whether schema with specified name exists.
func SkipReadRowCount ¶
SkipReadRowCount determines whether the target table requires a precise row count. If any unique index/clustered primary contains columns with auto_random/auto_increment, we must read the actual row count to prevent generating duplicate keys. Otherwise, we can skip reading it to improve performance.
func SprintfWithIdentifiers ¶
SprintfWithIdentifiers escapes the identifiers and sprintf them. The input identifiers must not be escaped.
func TableExists ¶
TableExists return whether table with specified name exists in target db
func TableHasAutoID ¶
TableHasAutoID return whether table has auto generated id.
func TableHasAutoRowID ¶
TableHasAutoRowID return whether table has auto generated row id
func ToGRPCDialOption ¶
func ToGRPCDialOption(tls *tls.Config) grpc.DialOption
ToGRPCDialOption constructs a gRPC dial option from tls.Config.
func UniqueTable ¶
UniqueTable returns an unique table name.
func WriteMySQLIdentifier ¶
WriteMySQLIdentifier writes a MySQL identifier into the string builder. Writes a MySQL identifier into the string builder. The identifier is always escaped into the form "`foo`".
Types ¶
type ChunkFlushStatus ¶
type ChunkFlushStatus interface {
Flushed() bool
}
ChunkFlushStatus is the status of a chunk flush.
type ConnPool ¶
type ConnPool struct {
// contains filtered or unexported fields
}
ConnPool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.
func NewConnPool ¶
func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error), logger log.Logger) *ConnPool
NewConnPool creates a new connPool by the specified conn factory function and capacity.
func (*ConnPool) TakeConns ¶
func (p *ConnPool) TakeConns() (conns []*grpc.ClientConn)
TakeConns takes all connections from the pool.
type DupDetectKeyAdapter ¶
type DupDetectKeyAdapter struct{}
DupDetectKeyAdapter is a key adapter that appends rowID to the key to avoid overwritten.
func (DupDetectKeyAdapter) Decode ¶
func (DupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error)
Decode implements KeyAdapter.
func (DupDetectKeyAdapter) Encode ¶
func (DupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte
Encode implements KeyAdapter.
func (DupDetectKeyAdapter) EncodedLen ¶
func (DupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int
EncodedLen implements KeyAdapter.
type DupDetectOpt ¶
type DupDetectOpt struct {
ReportErrOnDup bool
}
DupDetectOpt is the option for duplicate detection.
type DupDetector ¶
type DupDetector struct {
// contains filtered or unexported fields
}
DupDetector extract the decoded key and value from the iter which may contain duplicate keys and store the keys encoded by KeyAdapter. The duplicate keys and values will be saved in dupDB.
func NewDupDetector ¶
func NewDupDetector( keyAdaptor KeyAdapter, dupDBWriteBatch *pebble.Batch, logger log.Logger, option DupDetectOpt, ) *DupDetector
NewDupDetector creates a new DupDetector. dupDBWriteBatch will be closed when DupDetector is closed.
type GRPCConns ¶
type GRPCConns struct {
// contains filtered or unexported fields
}
GRPCConns is a pool of gRPC connections.
func (*GRPCConns) Close ¶
func (conns *GRPCConns) Close()
Close closes all gRPC connections in the pool.
func (*GRPCConns) GetGrpcConn ¶
func (conns *GRPCConns) GetGrpcConn(ctx context.Context, storeID uint64, tcpConcurrency int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) (*grpc.ClientConn, error)
GetGrpcConn gets a gRPC connection from the pool.
type KeyAdapter ¶
type KeyAdapter interface { // Encode encodes the key with its corresponding rowID. It appends the encoded // key to dst and returns the resulting slice. The encoded key is guaranteed to // be in ascending order for comparison. // rowID must be a coded mem-comparable value, one way to get it is to use // tidb/util/codec package. Encode(dst []byte, key []byte, rowID []byte) []byte // Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice. Decode(dst []byte, data []byte) ([]byte, error) // EncodedLen returns the encoded key length. EncodedLen(key []byte, rowID []byte) int }
KeyAdapter is used to encode and decode keys so that duplicate key can be identified by rowID and avoid overwritten.
type KvPair ¶
type KvPair struct { // Key is the key of the KV pair Key []byte // Val is the value of the KV pair Val []byte // RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It has // two sources: // // When the KvPair is generated from ADD INDEX, the RowID is the encoded handle. // // Otherwise, the RowID is related to the row number in the source files, and // encode the number with `codec.EncodeComparableVarint`. RowID []byte }
KvPair contains a key-value pair and other fields that can be used to ingest KV pairs into TiKV.
type MySQLConnectParam ¶
type MySQLConnectParam struct { Host string Port int User string Password string SQLMode string MaxAllowedPacket uint64 TLSConfig *tls.Config AllowFallbackToPlaintext bool Net string Vars map[string]string }
MySQLConnectParam records the parameters needed to connect to a MySQL database.
func (*MySQLConnectParam) Connect ¶
func (param *MySQLConnectParam) Connect() (*sql.DB, error)
Connect creates a new connection to the database.
func (*MySQLConnectParam) ToDriverConfig ¶
func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config
ToDriverConfig converts the MySQLConnectParam to a mysql.Config.
type NoopKeyAdapter ¶
type NoopKeyAdapter struct{}
NoopKeyAdapter is a key adapter that does nothing.
func (NoopKeyAdapter) Decode ¶
func (NoopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error)
Decode implements KeyAdapter.
func (NoopKeyAdapter) Encode ¶
func (NoopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte
Encode implements KeyAdapter.
func (NoopKeyAdapter) EncodedLen ¶
func (NoopKeyAdapter) EncodedLen(key []byte, _ []byte) int
EncodedLen implements KeyAdapter.
type OnceError ¶
type OnceError struct {
// contains filtered or unexported fields
}
OnceError is an error value which will can be assigned once.
The zero value is ready for use.
type Pauser ¶
type Pauser struct {
// contains filtered or unexported fields
}
Pauser is a type which could allow multiple goroutines to wait on demand, similar to a gate or traffic light.
type SQLWithRetry ¶
type SQLWithRetry struct { // either *sql.DB or *sql.Conn DB dbutil.DBExecutor Logger log.Logger HideQueryLog bool }
SQLWithRetry constructs a retryable transaction.
func (SQLWithRetry) QueryRow ¶
func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...any) error
QueryRow executes a query that is expected to return at most one row.
func (SQLWithRetry) QueryStringRows ¶
func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)
QueryStringRows executes a query that is expected to return multiple rows whose every column is string.
type StorageSize ¶
StorageSize represents the storage's capacity and available size Learn from tidb-binlog source code.
func GetStorageSize ¶
func GetStorageSize(dir string) (size StorageSize, err error)
GetStorageSize gets storage's capacity and available size
type TLS ¶
type TLS struct {
// contains filtered or unexported fields
}
TLS is a wrapper around a TLS configuration.
func NewTLS ¶
func NewTLS(caPath, certPath, keyPath, host string, caBytes, certBytes, keyBytes []byte) (*TLS, error)
NewTLS constructs a new HTTP client with TLS configured with the CA, certificate and key paths.
func NewTLSFromMockServer ¶
NewTLSFromMockServer constructs a new TLS instance from the certificates of an *httptest.Server.
func (*TLS) ToGRPCDialOption ¶
func (tc *TLS) ToGRPCDialOption() grpc.DialOption
ToGRPCDialOption constructs a gRPC dial option.
func (*TLS) ToPDSecurityOption ¶
func (tc *TLS) ToPDSecurityOption() pd.SecurityOption
ToPDSecurityOption converts the TLS configuration to a PD security option.
func (*TLS) ToTiKVSecurityConfig ¶
ToTiKVSecurityConfig converts the TLS configuration to a TiKV security config. TODO: TiKV does not support pass in content.