Documentation
¶
Index ¶
- Constants
- Variables
- func AllocateEngineIDs(filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, ...)
- func CalculateBatchSize(mydumperBatchSize float64, isRowOrdered bool, totalSize float64) float64
- func EstimateRealSizeForFile(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) int64
- func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, ...) ([]byte, error)
- func IndexAnyByte(s []byte, as *byteSet) int
- func OpenParquetReader(ctx context.Context, store storage.ExternalStorage, path string, size int64) (source.ParquetFile, error)
- func OpenReader(ctx context.Context, fileMeta *SourceFileMeta, store storage.ExternalStorage, ...) (reader storage.ReadSeekCloser, err error)
- func ParallelProcess[T, R any](ctx context.Context, inputs []R, concurrency int, ...) ([]T, error)
- func ReadParquetFileRowCountByFile(ctx context.Context, store storage.ExternalStorage, fileMeta SourceFileMeta) (int64, error)
- func ReadUntil(parser Parser, pos int64) error
- func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
- func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
- func ToStorageCompressType(compression Compression) (storage.CompressType, error)
- type CSVParser
- func (parser *CSVParser) Close() error
- func (parser *CSVParser) Columns() []string
- func (parser *CSVParser) LastRow() Row
- func (parser *CSVParser) Pos() (pos int64, lastRowID int64)
- func (parser *CSVParser) ReadColumns() error
- func (parser *CSVParser) ReadRow() error
- func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error)
- func (parser *CSVParser) RecycleRow(row Row)
- func (parser *CSVParser) ScannedPos() (int64, error)
- func (parser *CSVParser) SetColumns(columns []string)
- func (parser *CSVParser) SetLogger(logger log.Logger)
- func (parser *CSVParser) SetPos(pos int64, rowID int64) error
- func (parser *CSVParser) SetRowID(rowID int64)
- type CharsetConvertor
- type Chunk
- type ChunkParser
- func (parser *ChunkParser) Close() error
- func (parser *ChunkParser) Columns() []string
- func (parser *ChunkParser) LastRow() Row
- func (parser *ChunkParser) Pos() (pos int64, lastRowID int64)
- func (parser *ChunkParser) ReadRow() error
- func (parser *ChunkParser) RecycleRow(row Row)
- func (parser *ChunkParser) ScannedPos() (int64, error)
- func (parser *ChunkParser) SetColumns(columns []string)
- func (parser *ChunkParser) SetLogger(logger log.Logger)
- func (parser *ChunkParser) SetPos(pos int64, rowID int64) error
- func (parser *ChunkParser) SetRowID(rowID int64)
- type Compression
- type DataDivideConfig
- type ExtendColumnData
- type FileHandler
- type FileInfo
- type FileIterator
- type FileRouter
- type LoaderConfig
- type MDDatabaseMeta
- type MDLoader
- type MDLoaderSetupConfig
- type MDLoaderSetupOption
- type MDTableMeta
- type ParquetParser
- func (pp *ParquetParser) Close() error
- func (pp *ParquetParser) Columns() []string
- func (pp *ParquetParser) LastRow() Row
- func (pp *ParquetParser) Pos() (pos int64, rowID int64)
- func (pp *ParquetParser) ReadRow() error
- func (*ParquetParser) RecycleRow(_ Row)
- func (pp *ParquetParser) ScannedPos() (int64, error)
- func (*ParquetParser) SetColumns(_ []string)
- func (pp *ParquetParser) SetLogger(l log.Logger)
- func (pp *ParquetParser) SetPos(pos int64, rowID int64) error
- func (pp *ParquetParser) SetRowID(rowID int64)
- type Parser
- type PooledReader
- type RawFile
- type ReadSeekCloser
- type RegexRouter
- type RouteResult
- type Row
- type SchemaImporter
- type SourceFileMeta
- type SourceType
- type StringReader
- type TableRegion
- func MakeSourceFileRegion(ctx context.Context, cfg *DataDivideConfig, fi FileInfo) ([]*TableRegion, []float64, error)
- func MakeTableRegions(ctx context.Context, cfg *DataDivideConfig) ([]*TableRegion, error)
- func SplitLargeCSV(ctx context.Context, cfg *DataDivideConfig, dataFile FileInfo) (regions []*TableRegion, dataFileSizes []float64, err error)
Constants ¶
const ( // TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency // It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold // CompressSizeFactor is used to adjust compressed data size CompressSizeFactor = 5 )
const ( // SchemaSchema is the source type value for schema file for DB. SchemaSchema = "schema-schema" // TableSchema is the source type value for schema file for table. TableSchema = "table-schema" // ViewSchema is the source type value for schema file for view. ViewSchema = "view-schema" // TypeSQL is the source type value for sql data file. TypeSQL = "sql" // TypeCSV is the source type value for csv data file. TypeCSV = "csv" // TypeParquet is the source type value for parquet data file. TypeParquet = "parquet" // TypeIgnore is the source type value for a ignored data file. TypeIgnore = "ignore" )
Variables ¶
var ( // ErrInsertStatementNotFound is the error that cannot find the insert statement. ErrInsertStatementNotFound = errors.New("insert statement not found") )
var ( // LargestEntryLimit is the max size for reading file to buf LargestEntryLimit int )
Functions ¶
func AllocateEngineIDs ¶
func AllocateEngineIDs( filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, batchImportRatio float64, engineConcurrency float64, )
AllocateEngineIDs allocates the table engine IDs.
func CalculateBatchSize ¶
CalculateBatchSize calculates batch size according to row order and file size.
func EstimateRealSizeForFile ¶
func EstimateRealSizeForFile(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) int64
EstimateRealSizeForFile estimate the real size for the file. If the file is not compressed, the real size is the same as the file size. If the file is compressed, the real size is the estimated uncompressed size.
func ExportStatement ¶
func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error)
ExportStatement exports the SQL statement in the schema file.
func IndexAnyByte ¶
IndexAnyByte returns the byte index of the first occurrence in s of any of the byte points in chars. It returns -1 if there is no code point in common.
func OpenParquetReader ¶
func OpenParquetReader( ctx context.Context, store storage.ExternalStorage, path string, size int64, ) (source.ParquetFile, error)
OpenParquetReader opens a parquet file and returns a handle that can at least read the file.
func OpenReader ¶
func OpenReader( ctx context.Context, fileMeta *SourceFileMeta, store storage.ExternalStorage, decompressCfg storage.DecompressConfig, ) (reader storage.ReadSeekCloser, err error)
OpenReader opens a reader for the given file and storage.
func ParallelProcess ¶
func ParallelProcess[T, R any]( ctx context.Context, inputs []R, concurrency int, hdl func(ctx context.Context, f R) (T, error), ) ([]T, error)
ParallelProcess is a helper function to parallel process inputs and keep the order of the outputs same as the inputs. It's used for both lightning and IMPORT INTO.
func ReadParquetFileRowCountByFile ¶
func ReadParquetFileRowCountByFile( ctx context.Context, store storage.ExternalStorage, fileMeta SourceFileMeta, ) (int64, error)
ReadParquetFileRowCountByFile reads the parquet file row count. It is a special func to fetch parquet file row count fast.
func ReadUntil ¶
ReadUntil parses the entire file and splits it into continuous chunks of size >= minSize.
func SampleFileCompressRatio ¶
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
SampleFileCompressRatio samples the compress ratio of the compressed file. Exported for test.
func SampleParquetRowSize ¶
func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
SampleParquetRowSize samples row size of the parquet file.
func ToStorageCompressType ¶
func ToStorageCompressType(compression Compression) (storage.CompressType, error)
ToStorageCompressType converts Compression to storage.CompressType.
Types ¶
type CSVParser ¶
type CSVParser struct {
// contains filtered or unexported fields
}
CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
func NewCSVParser ¶
func NewCSVParser( ctx context.Context, cfg *config.CSVConfig, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, shouldParseHeader bool, charsetConvertor *CharsetConvertor, ) (*CSVParser, error)
NewCSVParser creates a CSV parser. The ownership of the reader is transferred to the parser.
func (*CSVParser) LastRow ¶
func (parser *CSVParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*CSVParser) Pos ¶
Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (*CSVParser) ReadColumns ¶
ReadColumns reads the columns of this CSV file.
func (*CSVParser) ReadUntilTerminator ¶
ReadUntilTerminator seeks the file until the terminator token is found, and returns - the content with terminator, or the content read before meet error - the file offset beyond the terminator, or the offset when meet error - error Note that the terminator string pattern may be the content of a field, which means it's inside quotes. Caller should make sure to handle this case.
func (*CSVParser) RecycleRow ¶
func (parser *CSVParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*CSVParser) ScannedPos ¶
ScannedPos gets the read position of current reader. this always returns the position of the underlying file, either compressed or not.
func (*CSVParser) SetColumns ¶
func (parser *CSVParser) SetColumns(columns []string)
type CharsetConvertor ¶
type CharsetConvertor struct {
// contains filtered or unexported fields
}
CharsetConvertor is used to convert a character set to utf8mb4 encoding. In Lightning, we mainly use it to do the GB18030/GBK -> UTF8MB4 conversion.
func NewCharsetConvertor ¶
func NewCharsetConvertor(dataCharacterSet, dataInvalidCharReplace string) (*CharsetConvertor, error)
NewCharsetConvertor creates a new CharsetConvertor.
func (*CharsetConvertor) Decode ¶
func (cc *CharsetConvertor) Decode(src string) (string, error)
Decode does the charset conversion work from sourceCharacterSet to utf8mb4. It will return a string as the conversion result whose length may be less or greater than the original string `src`. TODO: maybe using generic type later to make Decode/Encode accept both []byte and string.
type Chunk ¶
type Chunk struct { Offset int64 // for parquet file, it's the total row count // see makeParquetFileRegion EndOffset int64 RealOffset int64 // we estimate row-id range of the chunk using file-size divided by some factor(depends on column count) // after estimation, we will rebase them for all chunks of this table in this instance, // then it's rebased again based on all instances of parallel import. // allocatable row-id is in range (PrevRowIDMax, RowIDMax]. // PrevRowIDMax will be increased during local encoding PrevRowIDMax int64 RowIDMax int64 // only assigned when using strict-mode for CSV files and the file contains header Columns []string }
Chunk represents a portion of the data file.
type ChunkParser ¶
type ChunkParser struct {
// contains filtered or unexported fields
}
ChunkParser is a parser of the data files (the file containing only INSERT statements).
func NewChunkParser ¶
func NewChunkParser( ctx context.Context, sqlMode mysql.SQLMode, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, ) *ChunkParser
NewChunkParser creates a new parser which can read chunks out of a file.
func (*ChunkParser) LastRow ¶
func (parser *ChunkParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*ChunkParser) Pos ¶
Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (*ChunkParser) ReadRow ¶
func (parser *ChunkParser) ReadRow() error
ReadRow reads a row from the datafile.
func (*ChunkParser) RecycleRow ¶
func (parser *ChunkParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*ChunkParser) ScannedPos ¶
ScannedPos gets the read position of current reader. this always returns the position of the underlying file, either compressed or not.
func (*ChunkParser) SetColumns ¶
func (parser *ChunkParser) SetColumns(columns []string)
type Compression ¶
type Compression int
Compression specifies the compression type.
const ( // CompressionNone is the compression type that with no compression. CompressionNone Compression = iota // CompressionGZ is the compression type that uses GZ algorithm. CompressionGZ // CompressionLZ4 is the compression type that uses LZ4 algorithm. CompressionLZ4 // CompressionZStd is the compression type that uses ZStd algorithm. CompressionZStd // CompressionXZ is the compression type that uses XZ algorithm. CompressionXZ // CompressionLZO is the compression type that uses LZO algorithm. CompressionLZO // CompressionSnappy is the compression type that uses Snappy algorithm. CompressionSnappy )
func ParseCompressionOnFileExtension ¶
func ParseCompressionOnFileExtension(filename string) Compression
ParseCompressionOnFileExtension parses the compression type from the file extension.
type DataDivideConfig ¶
type DataDivideConfig struct { ColumnCnt int // limit of engine size, we have a complex algorithm to calculate the best engine size, see AllocateEngineIDs. EngineDataSize int64 // max chunk size(inside this file we named it region which collides with TiKV region) MaxChunkSize int64 // number of concurrent workers to dive data files Concurrency int // number of engine runs concurrently, need this to calculate the best engine size for pipelining local-sort and import. // todo: remove those 2 params, the algorithm seems useless, since we can import concurrently now, the foundation // assumption of the algorithm is broken. EngineConcurrency int // used together with prev param. it is 0.75 nearly all the time, see Mydumper.BatchImportRatio. // this variable is defined as speed-write-to-TiKV / speed-to-do-local-sort BatchImportRatio float64 // used to split large CSV files, to limit concurrency of data read/seek operations // when nil, no limit. IOWorkers *worker.Pool // we need it read row-count for parquet, and to read line terminator to split large CSV files Store storage.ExternalStorage TableMeta *MDTableMeta // only used when split large CSV files. StrictFormat bool DataCharacterSet string DataInvalidCharReplace string ReadBlockSize int64 CSV config.CSVConfig }
DataDivideConfig config used to divide data files into chunks/engines(regions in this context).
func NewDataDivideConfig ¶
func NewDataDivideConfig(cfg *config.Config, columns int, ioWorkers *worker.Pool, store storage.ExternalStorage, meta *MDTableMeta, ) *DataDivideConfig
NewDataDivideConfig creates a new DataDivideConfig from lightning cfg.
type ExtendColumnData ¶
ExtendColumnData contains the extended column names and values information for a table.
type FileHandler ¶
FileHandler is the interface to handle the file give the path and size. It is mainly used in the `FileIterator` as parameters.
type FileInfo ¶
type FileInfo struct { TableName filter.Table FileMeta SourceFileMeta }
FileInfo contains the information for a data file in a table.
type FileIterator ¶
type FileIterator interface {
IterateFiles(ctx context.Context, hdl FileHandler) error
}
FileIterator is the interface to iterate files in a data source. Use this interface to customize the file iteration policy.
type FileRouter ¶
type FileRouter interface { // Route apply rule to path. Return nil if path doesn't match route rule; // return error if path match route rule but the captured value for field is invalid Route(path string) (*RouteResult, error) }
FileRouter provides some operations to apply a rule to route file path to target schema/table
func NewDefaultFileRouter ¶
func NewDefaultFileRouter(logger log.Logger) (FileRouter, error)
NewDefaultFileRouter creates a new file router with the default file route rules.
func NewFileRouter ¶
func NewFileRouter(cfg []*config.FileRouteRule, logger log.Logger) (FileRouter, error)
NewFileRouter creates a new file router with the rule.
type LoaderConfig ¶
type LoaderConfig struct { // SourceID is the unique identifier for the data source, it's used in DM only. // must be used together with Routes. SourceID string // SourceURL is the URL of the data source. SourceURL string // Routes is the routing rules for the tables, exclusive with FileRouters. // it's deprecated in lightning, but still used in DM. // when used this, DefaultFileRules must be true. Routes config.Routes // CharacterSet is the character set of the schema sql files. CharacterSet string // Filter is the filter for the tables, files related to filtered-out tables are not loaded. // must be specified, else all tables are filtered out, see config.GetDefaultFilter. Filter []string FileRouters []*config.FileRouteRule // CaseSensitive indicates whether Routes and Filter are case-sensitive. CaseSensitive bool // DefaultFileRules indicates whether to use the default file routing rules. // If it's true, the default file routing rules will be appended to the FileRouters. // a little confusing, but it's true only when FileRouters is empty. DefaultFileRules bool }
LoaderConfig is the configuration for constructing a MDLoader.
func NewLoaderCfg ¶
func NewLoaderCfg(cfg *config.Config) LoaderConfig
NewLoaderCfg creates loader config from lightning config.
type MDDatabaseMeta ¶
type MDDatabaseMeta struct { Name string SchemaFile FileInfo Tables []*MDTableMeta Views []*MDTableMeta // contains filtered or unexported fields }
MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
func NewMDDatabaseMeta ¶
func NewMDDatabaseMeta(charSet string) *MDDatabaseMeta
NewMDDatabaseMeta creates an Mydumper database meta with specified character set.
func (*MDDatabaseMeta) GetSchema ¶
func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) string
GetSchema gets the schema SQL for a source database.
type MDLoader ¶
type MDLoader struct {
// contains filtered or unexported fields
}
MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata.
func NewLoader ¶
func NewLoader(ctx context.Context, cfg LoaderConfig, opts ...MDLoaderSetupOption) (*MDLoader, error)
NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.
func NewLoaderWithStore ¶
func NewLoaderWithStore(ctx context.Context, cfg LoaderConfig, store storage.ExternalStorage, opts ...MDLoaderSetupOption) (*MDLoader, error)
NewLoaderWithStore constructs a MyDumper loader with the provided external storage that scanns the data source and constructs a set of metadatas.
func (*MDLoader) GetDatabases ¶
func (l *MDLoader) GetDatabases() []*MDDatabaseMeta
GetDatabases gets the list of scanned MDDatabaseMeta for the loader.
func (*MDLoader) GetStore ¶
func (l *MDLoader) GetStore() storage.ExternalStorage
GetStore gets the external storage used by the loader.
type MDLoaderSetupConfig ¶
type MDLoaderSetupConfig struct { // MaxScanFiles specifies the maximum number of files to scan. // If the value is <= 0, it means the number of data source files will be scanned as many as possible. MaxScanFiles int // ScanFileConcurrency specifes the concurrency of scaning source files. ScanFileConcurrency int // ReturnPartialResultOnError specifies whether the currently scanned files are analyzed, // and return the partial result. ReturnPartialResultOnError bool // FileIter controls the file iteration policy when constructing a MDLoader. FileIter FileIterator }
MDLoaderSetupConfig stores the configs when setting up a MDLoader. This can control the behavior when constructing an MDLoader.
func DefaultMDLoaderSetupConfig ¶
func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig
DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig.
type MDLoaderSetupOption ¶
type MDLoaderSetupOption func(cfg *MDLoaderSetupConfig)
MDLoaderSetupOption is the option type for setting up a MDLoaderSetupConfig.
func ReturnPartialResultOnError ¶
func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption
ReturnPartialResultOnError generates an option that controls whether return the partial scanned result on error when setting up a MDLoader.
func WithFileIterator ¶
func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption
WithFileIterator generates an option that specifies the file iteration policy.
func WithMaxScanFiles ¶
func WithMaxScanFiles(maxScanFiles int) MDLoaderSetupOption
WithMaxScanFiles generates an option that limits the max scan files when setting up a MDLoader.
func WithScanFileConcurrency ¶
func WithScanFileConcurrency(concurrency int) MDLoaderSetupOption
WithScanFileConcurrency generates an option that set the concurrency to scan files when setting up a MDLoader.
type MDTableMeta ¶
type MDTableMeta struct { DB string Name string SchemaFile FileInfo DataFiles []FileInfo TotalSize int64 IndexRatio float64 // default to true, and if we do precheck, this var is updated using data sampling result, so it's not accurate. IsRowOrdered bool // contains filtered or unexported fields }
MDTableMeta contains some parsed metadata for a table in the source by MyDumper Loader.
func NewMDTableMeta ¶
func NewMDTableMeta(charSet string) *MDTableMeta
NewMDTableMeta creates an Mydumper table meta with specified character set.
func (*MDTableMeta) GetSchema ¶
func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error)
GetSchema gets the table-creating SQL for a source table.
type ParquetParser ¶
type ParquetParser struct { Reader *preader.ParquetReader // contains filtered or unexported fields }
ParquetParser parses a parquet file for import It implements the Parser interface.
func NewParquetParser ¶
func NewParquetParser( ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, path string, ) (*ParquetParser, error)
NewParquetParser generates a parquet parser.
func (*ParquetParser) Close ¶
func (pp *ParquetParser) Close() error
Close closes the parquet file of the parser. It implements the Parser interface.
func (*ParquetParser) Columns ¶
func (pp *ParquetParser) Columns() []string
Columns returns the _lower-case_ column names corresponding to values in the LastRow.
func (*ParquetParser) LastRow ¶
func (pp *ParquetParser) LastRow() Row
LastRow gets the last row parsed by the parser. It implements the Parser interface.
func (*ParquetParser) Pos ¶
func (pp *ParquetParser) Pos() (pos int64, rowID int64)
Pos returns the currently row number of the parquet file
func (*ParquetParser) ReadRow ¶
func (pp *ParquetParser) ReadRow() error
ReadRow reads a row in the parquet file by the parser. It implements the Parser interface.
func (*ParquetParser) RecycleRow ¶
func (*ParquetParser) RecycleRow(_ Row)
RecycleRow implements the Parser interface.
func (*ParquetParser) ScannedPos ¶
func (pp *ParquetParser) ScannedPos() (int64, error)
ScannedPos implements the Parser interface. For parquet it's parquet file's reader current position.
func (*ParquetParser) SetColumns ¶
func (*ParquetParser) SetColumns(_ []string)
SetColumns set restored column names to parser
func (*ParquetParser) SetLogger ¶
func (pp *ParquetParser) SetLogger(l log.Logger)
SetLogger sets the logger used in the parser. It implements the Parser interface.
func (*ParquetParser) SetPos ¶
func (pp *ParquetParser) SetPos(pos int64, rowID int64) error
SetPos sets the position in a parquet file. It implements the Parser interface.
func (*ParquetParser) SetRowID ¶
func (pp *ParquetParser) SetRowID(rowID int64)
SetRowID sets the rowID in a parquet file when we start a compressed file. It implements the Parser interface.
type Parser ¶
type Parser interface { // Pos returns means the position that parser have already handled. It's mainly used for checkpoint. // For normal files it's the file offset we handled. // For parquet files it's the row count we handled. // For compressed files it's the uncompressed file offset we handled. // TODO: replace pos with a new structure to specify position offset and rows offset Pos() (pos int64, rowID int64) SetPos(pos int64, rowID int64) error // ScannedPos always returns the current file reader pointer's location ScannedPos() (int64, error) Close() error ReadRow() error LastRow() Row RecycleRow(row Row) // Columns returns the _lower-case_ column names corresponding to values in // the LastRow. Columns() []string // SetColumns set restored column names to parser SetColumns([]string) SetLogger(log.Logger) SetRowID(rowID int64) }
Parser provides some methods to parse a source data file.
type PooledReader ¶
type PooledReader struct {
// contains filtered or unexported fields
}
PooledReader is a throttled reader wrapper, where Read() calls have an upper limit of concurrency imposed by the given worker pool.
func MakePooledReader ¶
func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReader
MakePooledReader constructs a new PooledReader.
func (PooledReader) Read ¶
func (pr PooledReader) Read(p []byte) (n int, err error)
Read implements io.Reader
type ReadSeekCloser ¶
ReadSeekCloser = Reader + Seeker + Closer
type RegexRouter ¶
type RegexRouter struct {
// contains filtered or unexported fields
}
RegexRouter is a `FileRouter` implement that apply specific regex pattern to filepath. if regex pattern match, then each extractors with capture the matched regexp pattern and set value to target field in `RouteResult`
func (*RegexRouter) Route ¶
func (r *RegexRouter) Route(path string) (*RouteResult, error)
Route routes a file path to a source file type.
type RouteResult ¶
type RouteResult struct { filter.Table Key string Compression Compression Type SourceType }
RouteResult contains the information for a file routing.
type Row ¶
type Row struct { // RowID is the row id of the row. // as objects of this struct is reused, this RowID is increased when reading // next row. RowID int64 Row []types.Datum Length int }
Row is the content of a row.
func (Row) MarshalLogArray ¶
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error
MarshalLogArray implements the zapcore.ArrayMarshaler interface
type SchemaImporter ¶
type SchemaImporter struct {
// contains filtered or unexported fields
}
SchemaImporter is used to import schema from dump files.
func NewSchemaImporter ¶
func NewSchemaImporter(logger log.Logger, sqlMode mysql.SQLMode, db *sql.DB, store storage.ExternalStorage, concurrency int) *SchemaImporter
NewSchemaImporter creates a new SchemaImporter instance.
func (*SchemaImporter) Run ¶
func (si *SchemaImporter) Run(ctx context.Context, dbMetas []*MDDatabaseMeta) (err error)
Run imports all schemas from the given database metas.
type SourceFileMeta ¶
type SourceFileMeta struct { Path string Type SourceType Compression Compression SortKey string // FileSize is the size of the file in the storage. FileSize int64 // WARNING: variables below are not persistent ExtendData ExtendColumnData // RealSize is same as FileSize if the file is not compressed and not parquet. // If the file is compressed, RealSize is the estimated uncompressed size. // If the file is parquet, RealSize is the estimated data size after convert // to row oriented storage. RealSize int64 Rows int64 // only for parquet }
SourceFileMeta contains some analyzed metadata for a source file by MyDumper Loader.
type SourceType ¶
type SourceType int
SourceType specifies the source file types.
const ( // SourceTypeIgnore means this source file is ignored. SourceTypeIgnore SourceType = iota // SourceTypeSchemaSchema means this source file is a schema file for the DB. SourceTypeSchemaSchema // SourceTypeTableSchema means this source file is a schema file for the table. SourceTypeTableSchema // SourceTypeSQL means this source file is a SQL data file. SourceTypeSQL // SourceTypeCSV means this source file is a CSV data file. SourceTypeCSV // SourceTypeParquet means this source file is a parquet data file. SourceTypeParquet // SourceTypeViewSchema means this source file is a schema file for the view. SourceTypeViewSchema )
func (SourceType) String ¶
func (s SourceType) String() string
type StringReader ¶
StringReader is a wrapper around *strings.Reader with an additional Close() method
func NewStringReader ¶
func NewStringReader(s string) StringReader
NewStringReader constructs a new StringReader
type TableRegion ¶
type TableRegion struct { EngineID int32 DB string Table string FileMeta SourceFileMeta ExtendData ExtendColumnData Chunk Chunk }
TableRegion contains information for a table region during import.
func MakeSourceFileRegion ¶
func MakeSourceFileRegion( ctx context.Context, cfg *DataDivideConfig, fi FileInfo, ) ([]*TableRegion, []float64, error)
MakeSourceFileRegion create a new source file region.
func MakeTableRegions ¶
func MakeTableRegions( ctx context.Context, cfg *DataDivideConfig, ) ([]*TableRegion, error)
MakeTableRegions create a new table region. row-id range of returned TableRegion is increasing monotonically
func SplitLargeCSV ¶
func SplitLargeCSV( ctx context.Context, cfg *DataDivideConfig, dataFile FileInfo, ) (regions []*TableRegion, dataFileSizes []float64, err error)
SplitLargeCSV splits a large csv file into multiple regions, the size of each regions is specified by `config.MaxRegionSize`. Note: We split the file coarsely, thus the format of csv file is needed to be strict. e.g. - CSV file with header is invalid - a complete tuple split into multiple lines is invalid
func (*TableRegion) Offset ¶
func (reg *TableRegion) Offset() int64
Offset gets the offset in the file of this table region.
func (*TableRegion) RowIDMin ¶
func (reg *TableRegion) RowIDMin() int64
RowIDMin returns the minimum row ID of this table region.
func (*TableRegion) Rows ¶
func (reg *TableRegion) Rows() int64
Rows returns the row counts of this table region.
func (*TableRegion) Size ¶
func (reg *TableRegion) Size() int64
Size gets the size of this table region.