db

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2021 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Copyright © 2020 Marvin

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Oracle/Mysql 对于 'NULL' 统一字符 NULL 处理,查询出来转成 NULL,所以需要判断处理
	// 查询字段值 NULL
	// 如果字段值 = NULLABLE 则表示值是 NULL
	// 如果字段值 = "" 则表示值是空字符串
	// 如果字段值 = 'NULL' 则表示值是 NULL 字符串
	// 如果字段值 = 'null' 则表示值是 null 字符串
	IsNull = "NULLABLE"
)

Functions

func NewOracleDBEngine

func NewOracleDBEngine(oraCfg config.SourceConfig) (*sql.DB, error)

创建 oracle 数据库引擎

func Query

func Query(db *sql.DB, querySQL string) ([]string, []map[string]string, error)

查询返回表字段列和对应的字段行数据

Types

type CustomSchemaColumnTypeMap

type CustomSchemaColumnTypeMap struct {
	ID               uint       `gorm:"primary_key;autoIncrement;comment:'自增编号'" json:"id"`
	SourceSchemaName string     `gorm:"not null;index:unique_schema_col,unique;comment:'源端库 schema'" json:"source_schema_name"`
	SourceColumnType string     `gorm:"not null;index:unique_schema_col,unique;comment:'源端表字段类型'" json:"source_column_type"`
	TargetColumnType string     `gorm:"not null;index:unique_schema_col,unique;comment:'目标表字段类型'" json:"target_column_type"`
	CreatedAt        *time.Time `gorm:"type:timestamp;not null;default:current_timestamp;comment:'创建时间'" json:"createdAt"`
	UpdatedAt        *time.Time `gorm:"type:timestamp;not null on update current_timestamp;default:current_timestamp;comment:'更新时间'" json:"updatedAt"`
}

自定义表结构字段类型转换规则 - schema 级别

func (*CustomSchemaColumnTypeMap) GetCustomSchemaColumnType

func (c *CustomSchemaColumnTypeMap) GetCustomSchemaColumnType() string

type CustomTableColumnTypeMap

type CustomTableColumnTypeMap struct {
	ID               uint       `gorm:"primary_key;autoIncrement;comment:'自增编号'" json:"id"`
	SourceSchemaName string     `gorm:"not null;index:unique_schema_table_col,unique;comment:'源端库 schema'" json:"source_schema_name"`
	SourceTableName  string     `gorm:"not null;index:unique_schema_table_col,unique;comment:'源端表名'" json:"source_table_name"`
	SourceColumnType string     `gorm:"not null;index:unique_schema_table_col,unique;comment:'源端表字段类型'" json:"source_column_type"`
	TargetColumnType string     `gorm:"not null;index:unique_schema_table_col,unique;comment:'目标表字段类型'" json:"target_column_type"`
	CreatedAt        *time.Time `gorm:"type:timestamp;not null;default:current_timestamp;comment:'创建时间'" json:"createdAt"`
	UpdatedAt        *time.Time `gorm:"type:timestamp;not null on update current_timestamp;default:current_timestamp;comment:'更新时间'" json:"updatedAt"`
}

自定义表结构字段类型转换规则 - table 级别 如果同步表未单独配置 table 级别字段类型映射规则,并且未单独配置 schema 级别字段类型映射规则,则采用程序内置规则转换 优先级: table > schema > internal

func (*CustomTableColumnTypeMap) GetCustomTableColumnType

func (c *CustomTableColumnTypeMap) GetCustomTableColumnType(tableName string) string

type CustomTableNameMap

type CustomTableNameMap struct {
	ID               uint       `gorm:"primary_key;autoIncrement;comment:'自增编号'" json:"id"`
	SourceSchemaName string     `gorm:"not null;index:unique_schema_table,unique;comment:'源端库 schema'" json:"source_schema_name"`
	SourceTableName  string     `gorm:"not null;index:unique_schema_table,unique;comment:'源端表名'" json:"source_table_name"`
	TargetTableName  string     `gorm:"not null;index:unique_schema_table,unique;comment:'目标表名'" json:"target_table_name"`
	CreatedAt        *time.Time `gorm:"type:timestamp;not null;default:current_timestamp;comment:'创建时间'" json:"createdAt"`
	UpdatedAt        *time.Time `gorm:"type:timestamp;not null on update current_timestamp;default:current_timestamp;comment:'更新时间'" json:"updatedAt"`
}
上下游字段类型映射表

自定义表结构名称转换规则 - table 级别 如果同步表未单独配置表结构名称转换规则 ,则采用源表默认

type Engine

type Engine struct {
	OracleDB *sql.DB
	MysqlDB  *sql.DB
	GormDB   *gorm.DB
}

定义数据库引擎

func NewMySQLEngineGeneralDB

func NewMySQLEngineGeneralDB(mysqlCfg config.TargetConfig, slowQueryThreshold int) (*Engine, error)

func NewMySQLEnginePrepareDB

func NewMySQLEnginePrepareDB(mysqlCfg config.TargetConfig, slowQueryThreshold int) (*Engine, error)

创建 mysql 数据库引擎

func (*Engine) AddOracleLogminerlogFile

func (e *Engine) AddOracleLogminerlogFile(logFile string) error

func (*Engine) AdjustFullStageMySQLTableMetaRecord added in v1.0.2

func (e *Engine) AdjustFullStageMySQLTableMetaRecord(schemaName string, tableMetas []TableMeta) (bool, []string, []string, error)

该函数只应用于全量同步模式或者 ALL 同步模式 1、断点续传判断,判断是否可进行断点续传 2、判断是否存在未初始化元信息的表

func (*Engine) ClearMySQLTableMetaRecord added in v1.0.2

func (e *Engine) ClearMySQLTableMetaRecord(metaSchemaName, sourceSchemaName string) error

func (*Engine) EndOracleLogminerStoredProcedure

func (e *Engine) EndOracleLogminerStoredProcedure() error

func (*Engine) FilterDifferenceOracleTable

func (e *Engine) FilterDifferenceOracleTable(schemaName string, excludeTables []string) ([]string, error)

func (*Engine) FilterIntersectionMySQLTable

func (e *Engine) FilterIntersectionMySQLTable(schemaName string, includeTables []string) ([]string, error)

func (*Engine) FilterOraclePartitionTable

func (e *Engine) FilterOraclePartitionTable(schemaName string, tableSlice []string) ([]string, error)

func (*Engine) GetCustomSchemaColumnTypeMap

func (e *Engine) GetCustomSchemaColumnTypeMap(schemaName string) ([]CustomSchemaColumnTypeMap, error)

func (*Engine) GetCustomTableColumnTypeMap

func (e *Engine) GetCustomTableColumnTypeMap(schemaName string) ([]CustomTableColumnTypeMap, error)

func (*Engine) GetCustomTableNameMap

func (e *Engine) GetCustomTableNameMap(schemaName string) ([]CustomTableNameMap, error)

func (*Engine) GetMySQLTableFullMetaRowIDRecord added in v1.0.2

func (e *Engine) GetMySQLTableFullMetaRowIDRecord(schemaName, tableName string) ([]string, error)

func (*Engine) GetMySQLTableFullMetaSchemaTableRecord added in v1.0.2

func (e *Engine) GetMySQLTableFullMetaSchemaTableRecord(schemaName string) ([]string, error)

func (*Engine) GetMySQLTableIncrementMetaMinGlobalSCNTime added in v1.0.2

func (e *Engine) GetMySQLTableIncrementMetaMinGlobalSCNTime(sourceSchemaName string) (int, error)

func (*Engine) GetMySQLTableIncrementMetaMinSourceTableSCNTime added in v1.0.2

func (e *Engine) GetMySQLTableIncrementMetaMinSourceTableSCNTime(sourceSchemaName string) (int, error)

func (*Engine) GetMySQLTableIncrementMetaRecord

func (e *Engine) GetMySQLTableIncrementMetaRecord(sourceSchemaName string) ([]string, map[string]int, error)

func (*Engine) GetMySQLTableMetaRecord added in v1.0.2

func (e *Engine) GetMySQLTableMetaRecord(schemaName string) ([]TableMeta, []string, error)

func (*Engine) GetOracleALLRedoLogFile added in v1.0.2

func (e *Engine) GetOracleALLRedoLogFile() ([]string, error)

func (*Engine) GetOracleArchivedLogFile

func (e *Engine) GetOracleArchivedLogFile(scn string) ([]map[string]string, error)

func (*Engine) GetOracleArchivedLogSCN

func (e *Engine) GetOracleArchivedLogSCN(scn string) (int, error)

func (*Engine) GetOracleCurrentRedoMaxSCN added in v1.0.2

func (e *Engine) GetOracleCurrentRedoMaxSCN() (int, int, string, error)

func (*Engine) GetOracleCurrentSnapshotSCN

func (e *Engine) GetOracleCurrentSnapshotSCN() (int, error)

func (*Engine) GetOracleLogminerContentToMySQL

func (e *Engine) GetOracleLogminerContentToMySQL(schemaName string, sourceTableNameList string, lastCheckpoint string, logminerQueryTimeout int) ([]LogminerContent, error)

func (*Engine) GetOracleRedoLogFile

func (e *Engine) GetOracleRedoLogFile(scn string) ([]map[string]string, error)

func (*Engine) GetOracleRedoLogSCN

func (e *Engine) GetOracleRedoLogSCN(scn string) (int, error)

func (*Engine) GetOracleTable

func (e *Engine) GetOracleTable(schemaName string) ([]string, error)

func (*Engine) GetOracleTableCheckKey

func (e *Engine) GetOracleTableCheckKey(schemaName string, tableName string) ([]map[string]string, error)

func (*Engine) GetOracleTableColumn

func (e *Engine) GetOracleTableColumn(schemaName string, tableName string) ([]map[string]string, error)

func (*Engine) GetOracleTableComment

func (e *Engine) GetOracleTableComment(schemaName string, tableName string) ([]map[string]string, error)

func (*Engine) GetOracleTableForeignKey

func (e *Engine) GetOracleTableForeignKey(schemaName string, tableName string) ([]map[string]string, error)

func (*Engine) GetOracleTableIndex

func (e *Engine) GetOracleTableIndex(schemaName string, tableName string) ([]map[string]string, error)

func (*Engine) GetOracleTablePrimaryKey

func (e *Engine) GetOracleTablePrimaryKey(schemaName string, tableName string) ([]map[string]string, error)

func (*Engine) GetOracleTableRecordByRowIDSQL

func (e *Engine) GetOracleTableRecordByRowIDSQL(sql string) ([]string, []string, error)

func (*Engine) GetOracleTableUniqueKey

func (e *Engine) GetOracleTableUniqueKey(schemaName string, tableName string) ([]map[string]string, error)

func (*Engine) InitMySQLTableFullMeta

func (e *Engine) InitMySQLTableFullMeta(schemaName, tableName string, globalSCN int, extractorBatch, insertBatchSize int) error

func (*Engine) InitMySQLTableIncrementMeta

func (e *Engine) InitMySQLTableIncrementMeta(schemaName, tableName string, globalSCN int) error

func (*Engine) InitMySQLTableMetaRecord added in v1.0.2

func (e *Engine) InitMySQLTableMetaRecord(schemaName string, tableName []string) error

func (*Engine) InitMysqlEngineDB

func (e *Engine) InitMysqlEngineDB() error

初始化同步表结构

func (*Engine) IsExistMySQLSchema

func (e *Engine) IsExistMySQLSchema(schemaName string) (bool, error)

func (*Engine) IsExistMysqlIndex

func (e *Engine) IsExistMysqlIndex(schemaName, tableName, indexName string) bool

func (*Engine) IsExistOracleSchema

func (e *Engine) IsExistOracleSchema(schemaName string) error

func (*Engine) IsExistOracleTable

func (e *Engine) IsExistOracleTable(schemaName string, includeTables []string) error

func (*Engine) IsNotExistMySQLTableIncrementMetaRecord

func (e *Engine) IsNotExistMySQLTableIncrementMetaRecord() (bool, error)

func (*Engine) ModifyMySQLTableMetaRecord added in v1.0.2

func (e *Engine) ModifyMySQLTableMetaRecord(metaSchemaName, sourceSchemaName, sourceTableName, rowidSQL string) error

清理并更新同步任务元数据表 1、全量每成功同步一张表记录,再清理记录 2、更新同步数据表元信息

func (*Engine) QueryFormatOracleRows

func (e *Engine) QueryFormatOracleRows(querySQL string) ([]string, []string, error)

查询 Oracle 数据并按行返回对应字段以及行数据 -> 按字段类型返回行数据 用于拼接 batch

func (*Engine) RenameMySQLTableName

func (e *Engine) RenameMySQLTableName(schemaName string, tableName string) error

func (*Engine) StartOracleLogminerStoredProcedure

func (e *Engine) StartOracleLogminerStoredProcedure(scn string) error

func (*Engine) TruncateMySQLTableFullMetaRecord

func (e *Engine) TruncateMySQLTableFullMetaRecord(metaSchemaName string) error

清理全量同步任务元数据表

func (*Engine) TruncateMySQLTableRecord added in v1.0.2

func (e *Engine) TruncateMySQLTableRecord(targetSchemaName string, tableMetas []TableMeta) error

func (*Engine) UpdateMySQLTableMetaRecord added in v1.0.2

func (e *Engine) UpdateMySQLTableMetaRecord(schemaName, tableName string, rowCounts, globalSCN int) error

func (*Engine) UpdateSingleTableIncrementMetaSCNByArchivedLog added in v1.0.2

func (e *Engine) UpdateSingleTableIncrementMetaSCNByArchivedLog(
	sourceSchemaName string, logFileEndSCN int, transferTableSlice []string) error

func (*Engine) UpdateSingleTableIncrementMetaSCNByCurrentRedo added in v1.0.2

func (e *Engine) UpdateSingleTableIncrementMetaSCNByCurrentRedo(
	sourceSchemaName string, lastRedoLogMaxSCN, logFileStartSCN, logFileEndSCN int) error

func (*Engine) UpdateSingleTableIncrementMetaSCNByNonCurrentRedo added in v1.0.2

func (e *Engine) UpdateSingleTableIncrementMetaSCNByNonCurrentRedo(
	sourceSchemaName string, lastRedoLogMaxSCN, logFileStartSCN, logFileEndSCN int, transferTableSlice []string) error

func (*Engine) UpdateTableIncrementMetaALLSCNRecord added in v1.0.2

func (e *Engine) UpdateTableIncrementMetaALLSCNRecord(sourceSchemaName, sourceTableName, operationType string, globalSCN, sourceTableSCN int) error

type LogminerContent added in v1.0.2

type LogminerContent struct {
	SCN       int
	SegOwner  string
	TableName string
	SQLRedo   string
	SQLUndo   string
	Operation string
}

获取 Oracle logminer 日志内容并过滤筛选已提交的 INSERT/DELETE/UPDATE 事务语句 考虑异构数据库 MySQL,只同步 INSERT/DELETE/UPDATE 事务语句以及 TRUNCATE TABLE/DROP TABLE DDL 语句,其他类型 SQL 不同步 V$LOGMNR_CONTENTS 字段解释参考链接 https://docs.oracle.com/en/database/oracle/oracle-database/21/refrn/V-LOGMNR_CONTENTS.html#GUID-B9196942-07BF-4935-B603-FA875064F5C3

type Postgres

type Postgres struct {
	DB *sql.DB
}

func NewPostgresDSN

func NewPostgresDSN(dbUser, dbPassword, ipAddr, dbPort, dbName string) *Postgres

func (*Postgres) GetSchemaMeta

func (p *Postgres) GetSchemaMeta() (schemaMeta []string)

func (*Postgres) GetTableColumnMeta

func (p *Postgres) GetTableColumnMeta(schemaName string, tableName string) (colMeta []map[string]string)

func (*Postgres) GetTableForeignKey

func (p *Postgres) GetTableForeignKey(schemaName string, tableName string) (fkList []map[string]string)

func (*Postgres) GetTableIndexMeta

func (p *Postgres) GetTableIndexMeta(schemaName string, tableName string) (idxMeta []map[string]string)

func (*Postgres) GetTableMeta

func (p *Postgres) GetTableMeta(schemaName string) (tableMeta []map[string]string)

func (*Postgres) GetTablePrimaryKey

func (p *Postgres) GetTablePrimaryKey(schemaName string, tableName string) (pkList []map[string]string)

func (*Postgres) GetTableUniqueKey

func (p *Postgres) GetTableUniqueKey(schemaName string, tableName string) (ukList []map[string]string)

func (*Postgres) GetViewMeta

func (p *Postgres) GetViewMeta(schemaName, viewName string) (viewMeta []map[string]string)

func (*Postgres) QuerySQL

func (p *Postgres) QuerySQL(querySQL string) (cols []string, res []map[string]string)

type TableFullMeta

type TableFullMeta struct {
	ID               uint       `gorm:"primary_key;autoIncrement;comment:'自增编号'" json:"id"`
	SourceSchemaName string     `gorm:"not null;index:idx_schema_table_rowid;comment:'源端 schema'" json:"source_schema_name"`
	SourceTableName  string     `gorm:"not null;index:idx_schema_table_rowid;comment:'源端表名'" json:"source_table_name"`
	GlobalSCN        int        `gorm:"comment:'全局 SCN'" json:"global_scn"`
	RowidSQL         string     `gorm:"not null;index:idx_schema_table_rowid;comment:'表 rowid 切分SQL'" json:"rowid_sql"`
	IsPartition      string     `gorm:"comment:'是否是分区表'" json:"is_partition"` // 同步转换统一转换成非分区表,此处只做标志
	CreatedAt        *time.Time `gorm:"type:timestamp;not null;default:current_timestamp;comment:'创建时间'" json:"createdAt"`
	UpdatedAt        *time.Time `gorm:"type:timestamp;not null on update current_timestamp;default:current_timestamp;comment:'更新时间'" json:"updatedAt"`
}

全量同步元数据表

func (*TableFullMeta) GetTableFullMetaRecordCounts

func (f *TableFullMeta) GetTableFullMetaRecordCounts(schemaName, tableName string, engine *Engine) (int, error)

type TableIncrementMeta

type TableIncrementMeta struct {
	ID               uint       `gorm:"primary_key;autoIncrement;comment:'自增编号'" json:"id"`
	GlobalSCN        int        `gorm:"comment:'全局 SCN'" json:"global_scn"`
	SourceSchemaName string     `gorm:"not null;index:unique_schema_table,unique;comment:'源端 schema'" json:"source_schema_name"`
	SourceTableName  string     `gorm:"not null;index:unique_schema_table,unique;comment:'源端表名'" json:"source_table_name"`
	SourceTableSCN   int        `gorm:"comment:'表同步 SCN'" json:"source_table_scn"`
	CreatedAt        *time.Time `gorm:"type:timestamp;not null;default:current_timestamp;comment:'创建时间'" json:"createdAt"`
	UpdatedAt        *time.Time `gorm:"type:timestamp;not null on update current_timestamp;default:current_timestamp;comment:'更新时间'" json:"updatedAt"`
}

增量同步元数据表

func (*TableIncrementMeta) GetTableIncrementMetaRowCounts

func (i *TableIncrementMeta) GetTableIncrementMetaRowCounts(engine *Engine) (int, error)

type TableMeta added in v1.0.2

type TableMeta struct {
	ID               uint       `gorm:"primary_key;autoIncrement;comment:'自增编号'" json:"id"`
	SourceSchemaName string     `gorm:"not null;index:idx_schema_table,unique;comment:'源端 schema'" json:"source_schema_name"`
	SourceTableName  string     `gorm:"not null;index:idx_schema_table,unique;comment:'源端表名'" json:"source_table_name"`
	FullGlobalSCN    int        `gorm:"comment:'全量全局 SCN'" json:"full_global_scn"`
	FullSplitTimes   int        `gorm:"comment:'全量任务切分 SQL 次数'" json:"full_split_times"`
	CreatedAt        *time.Time `gorm:"type:timestamp;not null;default:current_timestamp;comment:'创建时间'" json:"createdAt"`
	UpdatedAt        *time.Time `gorm:"type:timestamp;not null on update current_timestamp;default:current_timestamp;comment:'更新时间'" json:"updatedAt"`
}

同步元数据表

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL