source

package
v0.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateJSONFile

func GenerateJSONFile(columns []string, data [][]interface{}) (string, int, error)

func SlimCondition added in v0.1.8

func SlimCondition(maxThread int, minSplitKey, maxSplitKey int64) [][]int64

func SplitCondition added in v0.1.8

func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey int64) []string

func SplitConditionAccordingMaxGoRoutine added in v0.1.8

func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax int64) <-chan string

func SplitConditionAccordingToTimeSplitKey added in v0.1.8

func SplitConditionAccordingToTimeSplitKey(cfg *config.Config, minTimeSplitKey, maxTimeSplitKey string) ([]string, error)

func SplitTimeConditionsByMaxThread added in v0.1.8

func SplitTimeConditionsByMaxThread(conditions []string, maxThread int) [][]string

Types

type DatabendIngesterStatsData added in v0.1.3

type DatabendIngesterStatsData struct {
	BytesPerSecond float64
	RowsPerSecondd float64
}

type DatabendSourceStatsRecorder added in v0.1.3

type DatabendSourceStatsRecorder struct {
	// contains filtered or unexported fields
}

func NewDatabendIntesterStatsRecorder added in v0.1.3

func NewDatabendIntesterStatsRecorder() *DatabendSourceStatsRecorder

func (*DatabendSourceStatsRecorder) RecordMetric added in v0.1.3

func (stats *DatabendSourceStatsRecorder) RecordMetric(rows int)

func (*DatabendSourceStatsRecorder) Stats added in v0.1.3

type MysqlSource added in v0.1.8

type MysqlSource struct {
	// contains filtered or unexported fields
}

func NewMysqlSource added in v0.1.8

func NewMysqlSource(cfg *config.Config) (*MysqlSource, error)

func (*MysqlSource) AdjustBatchSizeAccordingToSourceDbTable added in v0.1.8

func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() int64

AdjustBatchSizeAccordingToSourceDbTable has a concept called s, s = (maxKey - minKey) / sourceTableRowCount if s == 1 it means the data is uniform in the table, if s is much bigger than 1, it means the data is not uniform in the table

func (*MysqlSource) DeleteAfterSync added in v0.1.8

func (s *MysqlSource) DeleteAfterSync() error

func (*MysqlSource) GetAllSourceReadRowsCount added in v0.1.8

func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)

func (*MysqlSource) GetDatabasesAccordingToSourceDbRegex added in v0.1.8

func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*MysqlSource) GetDbTablesAccordingToSourceDbTables added in v0.1.8

func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*MysqlSource) GetMinMaxSplitKey added in v0.1.8

func (s *MysqlSource) GetMinMaxSplitKey() (int64, int64, error)

func (*MysqlSource) GetMinMaxTimeSplitKey added in v0.1.8

func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*MysqlSource) GetSourceReadRowsCount added in v0.1.8

func (s *MysqlSource) GetSourceReadRowsCount() (int, error)

func (*MysqlSource) GetTablesAccordingToSourceTableRegex added in v0.1.8

func (s *MysqlSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*MysqlSource) QueryTableData added in v0.1.8

func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

type OracleSource added in v0.2.0

type OracleSource struct {
	// contains filtered or unexported fields
}

func NewOracleSource added in v0.2.0

func NewOracleSource(cfg *config.Config) (*OracleSource, error)

func (*OracleSource) AdjustBatchSizeAccordingToSourceDbTable added in v0.2.0

func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() int64

func (*OracleSource) DeleteAfterSync added in v0.2.0

func (p *OracleSource) DeleteAfterSync() error

func (*OracleSource) GetAllSourceReadRowsCount added in v0.2.0

func (p *OracleSource) GetAllSourceReadRowsCount() (int, error)

func (*OracleSource) GetDatabasesAccordingToSourceDbRegex added in v0.2.0

func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*OracleSource) GetDbTablesAccordingToSourceDbTables added in v0.2.0

func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*OracleSource) GetMinMaxSplitKey added in v0.2.0

func (p *OracleSource) GetMinMaxSplitKey() (int64, int64, error)

func (*OracleSource) GetMinMaxTimeSplitKey added in v0.2.0

func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*OracleSource) GetSourceReadRowsCount added in v0.2.0

func (p *OracleSource) GetSourceReadRowsCount() (int, error)

func (*OracleSource) GetTablesAccordingToSourceTableRegex added in v0.2.0

func (p *OracleSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*OracleSource) QueryTableData added in v0.2.0

func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

func (*OracleSource) SwitchDatabase added in v0.2.0

func (p *OracleSource) SwitchDatabase() error

type PostgresSource added in v0.1.8

type PostgresSource struct {
	// contains filtered or unexported fields
}

func NewPostgresSource added in v0.1.8

func NewPostgresSource(cfg *config.Config) (*PostgresSource, error)

func (*PostgresSource) AdjustBatchSizeAccordingToSourceDbTable added in v0.1.8

func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() int64

func (*PostgresSource) DeleteAfterSync added in v0.1.8

func (p *PostgresSource) DeleteAfterSync() error

func (*PostgresSource) GetAllSourceReadRowsCount added in v0.1.8

func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)

func (*PostgresSource) GetDatabasesAccordingToSourceDbRegex added in v0.1.8

func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*PostgresSource) GetDbTablesAccordingToSourceDbTables added in v0.1.8

func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*PostgresSource) GetMinMaxSplitKey added in v0.1.8

func (p *PostgresSource) GetMinMaxSplitKey() (int64, int64, error)

func (*PostgresSource) GetMinMaxTimeSplitKey added in v0.1.8

func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*PostgresSource) GetSourceReadRowsCount added in v0.1.8

func (p *PostgresSource) GetSourceReadRowsCount() (int, error)

func (*PostgresSource) GetTablesAccordingToSourceTableRegex added in v0.1.8

func (p *PostgresSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*PostgresSource) QueryTableData added in v0.1.8

func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

func (*PostgresSource) SwitchDatabase added in v0.1.8

func (p *PostgresSource) SwitchDatabase() error

type Sourcer

type Sourcer interface {
	AdjustBatchSizeAccordingToSourceDbTable() int64
	GetSourceReadRowsCount() (int, error)
	GetMinMaxSplitKey() (int64, int64, error)
	GetMinMaxTimeSplitKey() (string, string, error)
	DeleteAfterSync() error
	QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
	GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
	GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
	GetAllSourceReadRowsCount() (int, error)
	GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
}

func NewSource

func NewSource(cfg *config.Config) (Sourcer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL