Documentation
¶
Index ¶
- func GenerateJSONFile(columns []string, data [][]interface{}) (string, int, error)
- func SlimCondition(maxThread int, minSplitKey, maxSplitKey int64) [][]int64
- func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey int64) []string
- func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax int64) <-chan string
- func SplitConditionAccordingToTimeSplitKey(cfg *config.Config, minTimeSplitKey, maxTimeSplitKey string) ([]string, error)
- func SplitTimeConditionsByMaxThread(conditions []string, maxThread int) [][]string
- type DatabendIngesterStatsData
- type DatabendSourceStatsRecorder
- type MysqlSource
- func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() int64
- func (s *MysqlSource) DeleteAfterSync() error
- func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)
- func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (s *MysqlSource) GetMinMaxSplitKey() (int64, int64, error)
- func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (s *MysqlSource) GetSourceReadRowsCount() (int, error)
- func (s *MysqlSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- type OracleSource
- func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() int64
- func (p *OracleSource) DeleteAfterSync() error
- func (p *OracleSource) GetAllSourceReadRowsCount() (int, error)
- func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (p *OracleSource) GetMinMaxSplitKey() (int64, int64, error)
- func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (p *OracleSource) GetSourceReadRowsCount() (int, error)
- func (p *OracleSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- func (p *OracleSource) SwitchDatabase() error
- type PostgresSource
- func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() int64
- func (p *PostgresSource) DeleteAfterSync() error
- func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)
- func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (p *PostgresSource) GetMinMaxSplitKey() (int64, int64, error)
- func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (p *PostgresSource) GetSourceReadRowsCount() (int, error)
- func (p *PostgresSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- func (p *PostgresSource) SwitchDatabase() error
- type Sourcer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenerateJSONFile ¶
func SlimCondition ¶ added in v0.1.8
func SplitCondition ¶ added in v0.1.8
func SplitConditionAccordingMaxGoRoutine ¶ added in v0.1.8
func SplitConditionAccordingToTimeSplitKey ¶ added in v0.1.8
func SplitTimeConditionsByMaxThread ¶ added in v0.1.8
Types ¶
type DatabendIngesterStatsData ¶ added in v0.1.3
type DatabendSourceStatsRecorder ¶ added in v0.1.3
type DatabendSourceStatsRecorder struct {
// contains filtered or unexported fields
}
func NewDatabendIntesterStatsRecorder ¶ added in v0.1.3
func NewDatabendIntesterStatsRecorder() *DatabendSourceStatsRecorder
func (*DatabendSourceStatsRecorder) RecordMetric ¶ added in v0.1.3
func (stats *DatabendSourceStatsRecorder) RecordMetric(rows int)
func (*DatabendSourceStatsRecorder) Stats ¶ added in v0.1.3
func (stats *DatabendSourceStatsRecorder) Stats(statsWindow time.Duration) DatabendIngesterStatsData
type MysqlSource ¶ added in v0.1.8
type MysqlSource struct {
// contains filtered or unexported fields
}
func NewMysqlSource ¶ added in v0.1.8
func NewMysqlSource(cfg *config.Config) (*MysqlSource, error)
func (*MysqlSource) AdjustBatchSizeAccordingToSourceDbTable ¶ added in v0.1.8
func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() int64
AdjustBatchSizeAccordingToSourceDbTable has a concept called s, s = (maxKey - minKey) / sourceTableRowCount if s == 1 it means the data is uniform in the table, if s is much bigger than 1, it means the data is not uniform in the table
func (*MysqlSource) DeleteAfterSync ¶ added in v0.1.8
func (s *MysqlSource) DeleteAfterSync() error
func (*MysqlSource) GetAllSourceReadRowsCount ¶ added in v0.1.8
func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)
func (*MysqlSource) GetDatabasesAccordingToSourceDbRegex ¶ added in v0.1.8
func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*MysqlSource) GetDbTablesAccordingToSourceDbTables ¶ added in v0.1.8
func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*MysqlSource) GetMinMaxSplitKey ¶ added in v0.1.8
func (s *MysqlSource) GetMinMaxSplitKey() (int64, int64, error)
func (*MysqlSource) GetMinMaxTimeSplitKey ¶ added in v0.1.8
func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*MysqlSource) GetSourceReadRowsCount ¶ added in v0.1.8
func (s *MysqlSource) GetSourceReadRowsCount() (int, error)
func (*MysqlSource) GetTablesAccordingToSourceTableRegex ¶ added in v0.1.8
func (*MysqlSource) QueryTableData ¶ added in v0.1.8
func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
type OracleSource ¶ added in v0.2.0
type OracleSource struct {
// contains filtered or unexported fields
}
func NewOracleSource ¶ added in v0.2.0
func NewOracleSource(cfg *config.Config) (*OracleSource, error)
func (*OracleSource) AdjustBatchSizeAccordingToSourceDbTable ¶ added in v0.2.0
func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() int64
func (*OracleSource) DeleteAfterSync ¶ added in v0.2.0
func (p *OracleSource) DeleteAfterSync() error
func (*OracleSource) GetAllSourceReadRowsCount ¶ added in v0.2.0
func (p *OracleSource) GetAllSourceReadRowsCount() (int, error)
func (*OracleSource) GetDatabasesAccordingToSourceDbRegex ¶ added in v0.2.0
func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*OracleSource) GetDbTablesAccordingToSourceDbTables ¶ added in v0.2.0
func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*OracleSource) GetMinMaxSplitKey ¶ added in v0.2.0
func (p *OracleSource) GetMinMaxSplitKey() (int64, int64, error)
func (*OracleSource) GetMinMaxTimeSplitKey ¶ added in v0.2.0
func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*OracleSource) GetSourceReadRowsCount ¶ added in v0.2.0
func (p *OracleSource) GetSourceReadRowsCount() (int, error)
func (*OracleSource) GetTablesAccordingToSourceTableRegex ¶ added in v0.2.0
func (*OracleSource) QueryTableData ¶ added in v0.2.0
func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
func (*OracleSource) SwitchDatabase ¶ added in v0.2.0
func (p *OracleSource) SwitchDatabase() error
type PostgresSource ¶ added in v0.1.8
type PostgresSource struct {
// contains filtered or unexported fields
}
func NewPostgresSource ¶ added in v0.1.8
func NewPostgresSource(cfg *config.Config) (*PostgresSource, error)
func (*PostgresSource) AdjustBatchSizeAccordingToSourceDbTable ¶ added in v0.1.8
func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() int64
func (*PostgresSource) DeleteAfterSync ¶ added in v0.1.8
func (p *PostgresSource) DeleteAfterSync() error
func (*PostgresSource) GetAllSourceReadRowsCount ¶ added in v0.1.8
func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)
func (*PostgresSource) GetDatabasesAccordingToSourceDbRegex ¶ added in v0.1.8
func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*PostgresSource) GetDbTablesAccordingToSourceDbTables ¶ added in v0.1.8
func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*PostgresSource) GetMinMaxSplitKey ¶ added in v0.1.8
func (p *PostgresSource) GetMinMaxSplitKey() (int64, int64, error)
func (*PostgresSource) GetMinMaxTimeSplitKey ¶ added in v0.1.8
func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*PostgresSource) GetSourceReadRowsCount ¶ added in v0.1.8
func (p *PostgresSource) GetSourceReadRowsCount() (int, error)
func (*PostgresSource) GetTablesAccordingToSourceTableRegex ¶ added in v0.1.8
func (*PostgresSource) QueryTableData ¶ added in v0.1.8
func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
func (*PostgresSource) SwitchDatabase ¶ added in v0.1.8
func (p *PostgresSource) SwitchDatabase() error
type Sourcer ¶
type Sourcer interface { AdjustBatchSizeAccordingToSourceDbTable() int64 GetSourceReadRowsCount() (int, error) GetMinMaxSplitKey() (int64, int64, error) GetMinMaxTimeSplitKey() (string, string, error) DeleteAfterSync() error QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error) GetAllSourceReadRowsCount() (int, error) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error) }
Click to show internal directories.
Click to hide internal directories.