common

package
v0.2.1-0...-d023f04 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: MPL-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NotDML int8 = iota
	InsertDML
	UpdateDML
	DeleteDML
)
View Source
const (
	OPTION_AUTO_IS_NULL          uint32 = 0x00004000
	OPTION_NOT_AUTOCOMMIT        uint32 = 0x00080000
	OPTION_NO_FOREIGN_KEY_CHECKS uint32 = 0x04000000
	OPTION_RELAXED_UNIQUE_CHECKS uint32 = 0x08000000
)
View Source
const (
	// see mysql-server/libbinlogevents/include/statement_events.h
	Q_FLAGS2_CODE byte = iota
	Q_SQL_MODE_CODE
	Q_CATALOG
	Q_AUTO_INCREMENT
	Q_CHARSET_CODE
	Q_TIME_ZONE_CODE
	Q_CATALOG_NZ_CODE
	Q_LC_TIME_NAMES_CODE
	Q_CHARSET_DATABASE_CODE
	Q_TABLE_MAP_FOR_UPDATE_CODE
	Q_MASTER_DATA_WRITTEN_CODE
	Q_INVOKERS
	Q_UPDATED_DB_NAMES
	Q_MICROSECONDS
	Q_COMMIT_TS
	Q_COMMIT_TS2
	Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP
	Q_DDL_LOGGED_WITH_XID
	Q_DEFAULT_COLLATION_FOR_UTF8MB4
	Q_SQL_REQUIRE_PRIMARY_KEY
	Q_DEFAULT_TABLE_ENCRYPTION
)
View Source
const (
	RowsEventFlagEndOfStatement     uint16 = 1
	RowsEventFlagNoForeignKeyChecks uint16 = 2
	RowsEventFlagNoUniqueKeyChecks  uint16 = 4
	RowsEventFlagRowHasAColumns     uint16 = 8
)
View Source
const (
	DefaultConnectWaitSecond = 10
	DefaultConnectWait       = DefaultConnectWaitSecond * time.Second

	DtleJobStatusNonPaused   = "non-paused"
	DtleJobStatusPaused      = "paused"
	DtleJobStatusUndefined   = "undefined"
	DtleJobStatusReverseInit = "reverse-init"
	DtleJobStatusStop        = "stop"
	TargetGtidFinished       = "finished"
)
View Source
const (
	// TODO: Using configuration to set jwt secret
	JWTSecret              = "secret"
	DefaultAdminTenant     = "platform"
	DefaultAdminUser       = "admin"
	DefaultAdminPwd        = "admin"
	DefaultEncryptAdminPwd = "" /* 172-byte string literal not displayed */
	DefaultRole            = "admin"
	DefaultAdminAuth       = "" /* 6003-byte string literal not displayed */
)
View Source
const (
	ControlMsgError  int32 = 1
	ControlMsgFinish int32 = 2
)
View Source
const (
	StageFinishedReadingOneBinlogSwitchingToNextBinlog = "Finished reading one binlog; switching to next binlog"
	StageMasterHasSentAllBinlogToSlave                 = "Master has sent all binlog to slave; waiting for more updates"
	StageRegisteringSlaveOnMaster                      = "Registering slave on master"
	StageRequestingBinlogDump                          = "Requesting binlog dump"
	StageSearchingRowsForUpdate                        = "Searching rows for update"
	StageSendingBinlogEventToSlave                     = "Sending binlog event to slave"
	StageSendingData                                   = "Sending data"
	StageSlaveHasReadAllRelayLog                       = "Slave has read all relay log; waiting for more updates"
	StageSlaveWaitingForWorkersToProcessQueue          = "Waiting for slave workers to process their queues"
	StageWaitingForGtidToBeCommitted                   = "Waiting for GTID to be committed"
	StageWaitingForMasterToSendEvent                   = "Waiting for master to send event"
)
View Source
const (
	DefaultChannelBufferSize        = 32
	DefaultChunkSize                = 2000
	DefaultNumWorkers               = 1
	DefaultClusterID                = "dtle-nats"
	DefaultSrcGroupMaxSize          = 1
	DefaultSrcGroupTimeout          = 100
	DefaultKafkaMessageGroupMaxSize = 1
	DefaultKafkaMessageGroupTimeout = 100
	DefaultDependencyHistorySize    = 2500

	TaskTypeSrc     = "src"
	TaskTypeDest    = "dest"
	TaskTypeUnknown = "unknown"
)
View Source
const (
	DtleFlagCreateSchemaIfNotExists = 0x1
)
View Source
const (
	TaskStateDead = 2
)

Variables

View Source
var (
	ErrNoConsul = fmt.Errorf("consul return nil value. check if consul is started or reachable")
)

Functions

func Compress

func Compress(bs []byte) (outBs []byte, err error)

func Decode

func Decode(data []byte, out GencodeType) (err error)

func DtleParseMysqlGTIDSet

func DtleParseMysqlGTIDSet(gtidSetStr string) (*mysql.MysqlGTIDSet, error)

func Encode

func Encode(v GencodeType) ([]byte, error)

func EncodeTable

func EncodeTable(v *Table) ([]byte, error)

func GetFieldValue

func GetFieldValue(fieldName string) interface{}

todo Support key value general settings

func GetGtidFromConsul

func GetGtidFromConsul(sm *StoreManager, subject string, logger g.LoggerType, mysqlContext *MySQLDriverConfig) error

func IgnoreDbByReplicateIgnoreDb

func IgnoreDbByReplicateIgnoreDb(replicateIgnoreDb []*DataSource, dbName string) bool

func IgnoreTbByReplicateIgnoreDb

func IgnoreTbByReplicateIgnoreDb(replicateIgnoreDb []*DataSource, dbName, tbName string) bool

func MysqlVersionInDigit

func MysqlVersionInDigit(v string) (int, error)

func RegularlyUpdateJobStatus

func RegularlyUpdateJobStatus(store *StoreManager, shutdownCh chan struct{}, jobId string)

regularly update the task status value by the memory usage

func RowColumnIsNull

func RowColumnIsNull(row []interface{}, index int) bool

func RowGetBytesColumn

func RowGetBytesColumn(row []interface{}, index int) []byte

func SetField

func SetField(fieldName string, fieldValue interface{})

func TaskTypeFromString

func TaskTypeFromString(s string) string

func ToEventDML

func ToEventDML(eventType replication.EventType) int8

func UpdateGtidSet

func UpdateGtidSet(gtidSet *mysql.MysqlGTIDSet, sid uuid.UUID, txGno int64)

func ValidateJobName

func ValidateJobName(name string) error

func WriteWaitCh

func WriteWaitCh(ch chan<- *drivers.ExitResult, r *drivers.ExitResult)

Types

type ApplierTableItem

type ApplierTableItem struct {
	Columns     *ColumnList
	PsInsert0   []*sql.Stmt
	PsInsert1   []*sql.Stmt
	PsInsert2   []*sql.Stmt
	PsInsert3   []*sql.Stmt
	PsDelete    []*sql.Stmt
	PsUpdate    []*sql.Stmt
	ColumnMapTo []string
}

func NewApplierTableItem

func NewApplierTableItem(parallelWorkers int) *ApplierTableItem

func (*ApplierTableItem) Reset

func (ait *ApplierTableItem) Reset()

type BigTxAck

type BigTxAck struct {
	GNO   int64
	Index int32
}

func (*BigTxAck) Marshal

func (d *BigTxAck) Marshal(buf []byte) ([]byte, error)

func (*BigTxAck) Size

func (d *BigTxAck) Size() (s uint64)

func (*BigTxAck) Unmarshal

func (d *BigTxAck) Unmarshal(buf []byte) (uint64, error)

type BufferStat

type BufferStat struct {
	BinlogEventQueueSize int
	ExtractorTxQueueSize int
	ApplierMsgQueueSize  int
	ApplierTxQueueSize   int
	SendByTimeout        int
	SendBySizeFull       int
}

type ColumnList

type ColumnList struct {
	Columns    []mysqlconfig.Column
	Ordinals   mysqlconfig.ColumnsMap
	UniqueKeys []*UniqueKey
}

ColumnList makes for a named list of columns

func NewColumnList

func NewColumnList(columns []mysqlconfig.Column) *ColumnList

NewColumnList creates an object given ordered list of column names

func ParseColumnList

func ParseColumnList(names string, tableColumns *ColumnList) *ColumnList

ParseColumnList parses a comma delimited list of column names

func (*ColumnList) ColumnList

func (c *ColumnList) ColumnList() []mysqlconfig.Column

func (*ColumnList) GetCharset

func (c *ColumnList) GetCharset(columnName string) string

func (*ColumnList) GetColumn

func (c *ColumnList) GetColumn(columnName string) *mysqlconfig.Column

TODO caller doesn't handle nil.

func (*ColumnList) GetColumnType

func (c *ColumnList) GetColumnType(columnName string) mysqlconfig.ColumnType

func (*ColumnList) HasTimezoneConversion

func (c *ColumnList) HasTimezoneConversion(columnName string) bool

func (*ColumnList) IsSubsetOf

func (c *ColumnList) IsSubsetOf(other *ColumnList) bool

IsSubsetOf returns 'true' when column names of this list are a subset of another list, in arbitrary order (order agnostic)

func (*ColumnList) IsUnsigned

func (c *ColumnList) IsUnsigned(columnName string) bool

func (*ColumnList) Len

func (c *ColumnList) Len() int

func (*ColumnList) Names

func (c *ColumnList) Names() []string

func (*ColumnList) SetCharset

func (c *ColumnList) SetCharset(columnName string, charset string)

func (*ColumnList) SetColumnType

func (c *ColumnList) SetColumnType(columnName string, columnType mysqlconfig.ColumnType)

func (*ColumnList) SetConvertDatetimeToTimestamp

func (c *ColumnList) SetConvertDatetimeToTimestamp(columnName string, toTimezone string)

func (*ColumnList) SetUnsigned

func (c *ColumnList) SetUnsigned(columnName string)

func (*ColumnList) String

func (c *ColumnList) String() string

type ControlMsg

type ControlMsg struct {
	Type int32
	Msg  string
}

func (*ControlMsg) Marshal

func (d *ControlMsg) Marshal(buf []byte) ([]byte, error)

func (*ControlMsg) Size

func (d *ControlMsg) Size() (s uint64)

func (*ControlMsg) Unmarshal

func (d *ControlMsg) Unmarshal(buf []byte) (uint64, error)

type CoordinatesI

type CoordinatesI interface {
	GetSid() interface{}
	GetSidStr() string
	GetGtidForThisTx() string
	GetLogPos() int64
	GetLastCommit() int64
	GetGNO() int64
	GetSequenceNumber() int64
	GetLogFile() string
}

type CurrentCoordinates

type CurrentCoordinates struct {
	// replayed (executed)
	File     string
	Position int64
	GtidSet  string

	// relayed (retrieved)
	RelayMasterLogFile string
	ReadMasterLogPos   int64
	RetrievedGtidSet   string
}

type DataEntries

type DataEntries struct {
	Entries []*DataEntry
}

func (*DataEntries) Marshal

func (d *DataEntries) Marshal(buf []byte) ([]byte, error)

func (*DataEntries) Size

func (d *DataEntries) Size() (s uint64)

func (*DataEntries) Unmarshal

func (d *DataEntries) Unmarshal(buf []byte) (uint64, error)

type DataEntry

type DataEntry struct {
	Coordinates CoordinatesI
	Events      []DataEvent
	Index       int32
	Final       bool
}

func NewBinlogEntry

func NewBinlogEntry() *DataEntry

func (*DataEntry) HasDDL

func (b *DataEntry) HasDDL() bool

func (*DataEntry) IsOneStmtDDL

func (b *DataEntry) IsOneStmtDDL() bool

func (*DataEntry) IsPartOfBigTx

func (b *DataEntry) IsPartOfBigTx() bool

func (*DataEntry) Marshal

func (d *DataEntry) Marshal(buf []byte) ([]byte, error)

func (*DataEntry) Size

func (d *DataEntry) Size() (s uint64)

func (*DataEntry) String

func (b *DataEntry) String() string

Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned

func (*DataEntry) Unmarshal

func (d *DataEntry) Unmarshal(buf []byte) (uint64, error)

type DataEvent

type DataEvent struct {
	Query         string
	CurrentSchema string
	DatabaseName  string
	TableName     string
	DML           int8
	ColumnCount   uint64
	Table         []byte
	LogPos        int64
	Timestamp     uint32
	Flags         []byte
	FKParent      bool
	Rows          [][]interface{}
	DtleFlags     uint32
}

func NewDataEvent

func NewDataEvent(databaseName, tableName string, dml int8, columnCount uint64, timestamp uint32) *DataEvent

func NewQueryEvent

func NewQueryEvent(currentSchema, query string, dml int8, timestamp uint32, flags []byte) DataEvent

func NewQueryEventAffectTable

func NewQueryEventAffectTable(currentSchema, query string, dml int8, affectedTable SchemaTable, timestamp uint32, flags []byte) DataEvent

func (*DataEvent) Marshal

func (d *DataEvent) Marshal(buf []byte) ([]byte, error)

func (*DataEvent) Size

func (d *DataEvent) Size() (s uint64)

func (*DataEvent) String

func (b *DataEvent) String() string

func (*DataEvent) Unmarshal

func (d *DataEvent) Unmarshal(buf []byte) (uint64, error)

type DataSource

type DataSource struct {
	TableSchema       string
	TableSchemaRegex  string
	TableSchemaRename string
	Tables            []*Table
}

TableName is the table configuration slave restrict replication to a given table

func (*DataSource) String

func (d *DataSource) String() string

type DelayCount

type DelayCount struct {
	Num  uint64
	Time int64 // it might be negative if hw clock is wrong
}

type DtleTaskConfig

type DtleTaskConfig struct {
	//Ref:http://dev.mysql.com/doc/refman/5.7/en/replication-options-slave.html#option_mysqld_replicate-do-table
	ReplicateDoDb        []*DataSource `codec:"ReplicateDoDb"`
	ReplicateIgnoreDb    []*DataSource `codec:"ReplicateIgnoreDb"`
	DropTableIfExists    bool          `codec:"DropTableIfExists"`
	ExpandSyntaxSupport  bool          `codec:"ExpandSyntaxSupport"`
	ReplChanBufferSize   int64         `codec:"ReplChanBufferSize"`
	TrafficAgainstLimits int           `codec:"TrafficAgainstLimits"`
	ChunkSize            int64         `codec:"ChunkSize"`
	SqlFilter            []string      `codec:"SqlFilter"`
	GroupMaxSize         int           `codec:"GroupMaxSize"`
	GroupTimeout         int           `codec:"GroupTimeout"`
	Gtid                 string        `codec:"Gtid"`
	BinlogFile           string        `codec:"BinlogFile"`
	BinlogPos            int64         `codec:"BinlogPos"`
	GtidStart            string        `codec:"GtidStart"`
	AutoGtid             bool          `codec:"AutoGtid"`
	BinlogRelay          bool          `codec:"BinlogRelay"`
	WaitOnJob            string        `codec:"WaitOnJob"`
	BulkInsert1          int           `codec:"BulkInsert1"`
	BulkInsert2          int           `codec:"BulkInsert2"`
	BulkInsert3          int           `codec:"BulkInsert3"`
	RetryTxLimit         int           `codec:"RetryTxLimit"`
	SlaveNetWriteTimeout int           `codec:"SlaveNetWriteTimeout"`
	BigTxSrcQueue        int32         `codec:"BigTxSrcQueue"`
	TwoWaySync           bool          `codec:"TwoWaySync"`
	TwoWaySyncGtid       string        `codec:"TwoWaySyncGtid"`

	ParallelWorkers       int  `codec:"ParallelWorkers"`
	DependencyHistorySize int  `codec:"DependencyHistorySize"`
	UseMySQLDependency    bool `codec:"UseMySQLDependency"`
	ForeignKeyChecks      bool `codec:"ForeignKeyChecks"`
	DumpEntryLimit        int  `codec:"DumpEntryLimit"`
	SetGtidNext           bool `codec:"SetGtidNext"`

	SkipCreateDbTable    bool                          `codec:"SkipCreateDbTable"`
	SkipPrivilegeCheck   bool                          `codec:"SkipPrivilegeCheck"`
	SkipIncrementalCopy  bool                          `codec:"SkipIncrementalCopy"`
	SrcConnectionConfig  *mysqlconfig.ConnectionConfig `codec:"SrcConnectionConfig"`
	DestConnectionConfig *mysqlconfig.ConnectionConfig `codec:"DestConnectionConfig"`
	KafkaConfig          *KafkaConfig                  `codec:"KafkaConfig"`
	DestType             string                        `codec:"DestType"`
	// support oracle extractor/applier
	SrcOracleConfig *config.OracleConfig `codec:"SrcOracleConfig"`
}

func (*DtleTaskConfig) SetDefaultForEmpty

func (d *DtleTaskConfig) SetDefaultForEmpty()

type DumpCoordinates

type DumpCoordinates interface {
	GetLogPos() int64
	GetTxSet() string
	GetLogFile() string
}

type DumpEntry

type DumpEntry struct {
	SystemVariables [][2]string
	SqlMode         string
	DbSQL           string
	TableName       string
	TableSchema     string
	TbSQL           []string
	ValuesX         [][]*[]byte
	TotalCount      int64
	Table           []byte
	ColumnMapTo     []string
}

func (*DumpEntry) Marshal

func (d *DumpEntry) Marshal(buf []byte) ([]byte, error)

func (*DumpEntry) Size

func (d *DumpEntry) Size() (s uint64)

func (*DumpEntry) Unmarshal

func (d *DumpEntry) Unmarshal(buf []byte) (uint64, error)

type DumpStatResult

type DumpStatResult struct {
	Coord      DumpCoordinates
	Type       int32
	TableSpecs []*TableSpec
}

func (*DumpStatResult) Marshal

func (d *DumpStatResult) Marshal(buf []byte) ([]byte, error)

func (*DumpStatResult) Size

func (d *DumpStatResult) Size() (s uint64)

func (*DumpStatResult) Unmarshal

func (d *DumpStatResult) Unmarshal(buf []byte) (uint64, error)

type Dumper

type Dumper struct {
	Ctx                context.Context
	Logger             g.LoggerType
	ChunkSize          int64
	TableSchema        string
	EscapedTableSchema string
	TableName          string
	EscapedTableName   string
	Table              *Table
	Iteration          int64
	Columns            string
	// ResultsChannel should be closed after writing all entries.
	ResultsChannel chan *DumpEntry
	// Set Err (if there is) before closing ResultsChannel.
	Err error

	ShutdownCh chan struct{}

	Memory *int64

	GetChunkData      GetChunkDataFn
	PrepareForDumping PrepareFn
	// contains filtered or unexported fields
}

func NewDumper

func NewDumper(ctx context.Context, table *Table, chunkSize int64, logger g.LoggerType, memory *int64) *Dumper

func (*Dumper) Close

func (d *Dumper) Close() error

func (*Dumper) Dump

func (d *Dumper) Dump() error

type EntryContext

type EntryContext struct {
	Entry *DataEntry
	// Only a DML has a tableItem. For a DDL, its tableItem is nil.
	TableItems   []*ApplierTableItem
	OriginalSize int // size of binlog entry
	Rows         int // for logging
}

type ExecContext

type ExecContext struct {
	Subject  string
	StateDir string
}

type GencodeType

type GencodeType interface {
	Marshal(buf []byte) ([]byte, error)
	Unmarshal(buf []byte) (uint64, error)
	Size() (s uint64)
}

type GetChunkDataFn

type GetChunkDataFn func() (nRows int64, err error)

type JobListItemV2

type JobListItemV2 struct {
	JobId            string            `json:"job_id"`
	JobStatus        string            `json:"job_status"`
	Topic            string            `json:"topic"`
	JobCreateTime    string            `json:"job_create_time"`
	SrcDatabaseType  string            `json:"src_database_type"`
	DstDatabaseType  string            `json:"dst_database_type"`
	SrcAddrList      []string          `json:"src_addr_list"`
	DstAddrList      []string          `json:"dst_addr_list"`
	User             string            `json:"user"`
	JobSteps         []JobStep         `json:"job_steps"`
	AllocationStatus map[string]string `json:"allocation_status"`
}

type JobStep

type JobStep struct {
	StepName      string  `json:"step_name"`
	StepStatus    string  `json:"step_status"`
	StepSchedule  float64 `json:"step_schedule"`
	JobCreateTime string  `json:"job_create_time"`
}

func NewJobStep

func NewJobStep(stepName string) JobStep

type KafkaConfig

type KafkaConfig struct {
	Brokers             []string
	Topic               string
	Converter           string
	DateTimeZone        string
	User                string
	Password            string
	MessageGroupMaxSize uint64
	MessageGroupTimeout uint64

	TopicWithSchemaTable bool
	SchemaChangeTopic    string
}

type MemoryStat

type MemoryStat struct {
	Full int64
	Incr int64
}

type MySQLCoordinateTx

type MySQLCoordinateTx struct {
	LogFile       string
	LogPos        int64
	SID           [16]byte
	GNO           int64
	LastCommitted int64
	SeqenceNumber int64
}

func (*MySQLCoordinateTx) GetGNO

func (b *MySQLCoordinateTx) GetGNO() int64

func (*MySQLCoordinateTx) GetGtidForThisTx

func (b *MySQLCoordinateTx) GetGtidForThisTx() string

func (*MySQLCoordinateTx) GetLastCommit

func (b *MySQLCoordinateTx) GetLastCommit() int64

func (*MySQLCoordinateTx) GetLogFile

func (b *MySQLCoordinateTx) GetLogFile() string

func (*MySQLCoordinateTx) GetLogPos

func (b *MySQLCoordinateTx) GetLogPos() int64

func (*MySQLCoordinateTx) GetSequenceNumber

func (b *MySQLCoordinateTx) GetSequenceNumber() int64

func (*MySQLCoordinateTx) GetSid

func (b *MySQLCoordinateTx) GetSid() interface{}

func (*MySQLCoordinateTx) GetSidStr

func (b *MySQLCoordinateTx) GetSidStr() string

Do not call this frequently. Cache your result.

func (*MySQLCoordinateTx) Marshal

func (d *MySQLCoordinateTx) Marshal(buf []byte) ([]byte, error)

func (*MySQLCoordinateTx) Size

func (d *MySQLCoordinateTx) Size() (s uint64)

func (*MySQLCoordinateTx) Unmarshal

func (d *MySQLCoordinateTx) Unmarshal(buf []byte) (uint64, error)

type MySQLCoordinates

type MySQLCoordinates struct {
	LogFile string
	LogPos  int64
	GtidSet string
}

func (*MySQLCoordinates) CompareFilePos

func (b *MySQLCoordinates) CompareFilePos(other *MySQLCoordinates) int

func (*MySQLCoordinates) GetLogFile

func (b *MySQLCoordinates) GetLogFile() string

func (*MySQLCoordinates) GetLogPos

func (b *MySQLCoordinates) GetLogPos() int64

func (*MySQLCoordinates) GetTxSet

func (b *MySQLCoordinates) GetTxSet() string

func (*MySQLCoordinates) IsEmpty

func (b *MySQLCoordinates) IsEmpty() bool

IsEmpty returns true if the log file is empty, unnamed

func (*MySQLCoordinates) Marshal

func (d *MySQLCoordinates) Marshal(buf []byte) ([]byte, error)

func (*MySQLCoordinates) Size

func (d *MySQLCoordinates) Size() (s uint64)

func (MySQLCoordinates) String

func (b MySQLCoordinates) String() string

String returns a user-friendly string representation of these coordinates

func (*MySQLCoordinates) Unmarshal

func (d *MySQLCoordinates) Unmarshal(buf []byte) (uint64, error)

type MySQLDriverConfig

type MySQLDriverConfig struct {
	DtleTaskConfig

	RowsEstimate     int64
	DeltaEstimate    int64
	BinlogRowImage   string
	RowCopyStartTime time.Time
	RowCopyEndTime   time.Time

	Stage string
}

func (*MySQLDriverConfig) ElapsedRowCopyTime

func (m *MySQLDriverConfig) ElapsedRowCopyTime() time.Duration

ElapsedRowCopyTime returns time since starting to copy chunks of rows

func (*MySQLDriverConfig) MarkRowCopyEndTime

func (m *MySQLDriverConfig) MarkRowCopyEndTime()

ElapsedRowCopyTime returns time since starting to copy chunks of rows

func (*MySQLDriverConfig) MarkRowCopyStartTime

func (m *MySQLDriverConfig) MarkRowCopyStartTime()

MarkRowCopyStartTime

type NatsMsgMerger

type NatsMsgMerger struct {
	// contains filtered or unexported fields
}

func NewNatsMsgMerger

func NewNatsMsgMerger(logger g.LoggerType) *NatsMsgMerger

func (*NatsMsgMerger) GetBytes

func (nmm *NatsMsgMerger) GetBytes() []byte

func (*NatsMsgMerger) Handle

func (nmm *NatsMsgMerger) Handle(data []byte) (segmentFinished bool, err error)

func (*NatsMsgMerger) Reset

func (nmm *NatsMsgMerger) Reset()

type OracleCoordinateTx

type OracleCoordinateTx struct {
	OldestUncommittedScn int64
	EndSCN               int64
}

func (*OracleCoordinateTx) GetGNO

func (o *OracleCoordinateTx) GetGNO() int64

func (*OracleCoordinateTx) GetGtidForThisTx

func (o *OracleCoordinateTx) GetGtidForThisTx() string

func (*OracleCoordinateTx) GetLastCommit

func (o *OracleCoordinateTx) GetLastCommit() int64

func (*OracleCoordinateTx) GetLogFile

func (o *OracleCoordinateTx) GetLogFile() string

func (*OracleCoordinateTx) GetLogPos

func (o *OracleCoordinateTx) GetLogPos() int64

func (*OracleCoordinateTx) GetSequenceNumber

func (b *OracleCoordinateTx) GetSequenceNumber() int64

func (*OracleCoordinateTx) GetSid

func (o *OracleCoordinateTx) GetSid() interface{}

func (*OracleCoordinateTx) GetSidStr

func (o *OracleCoordinateTx) GetSidStr() string

func (*OracleCoordinateTx) Marshal

func (d *OracleCoordinateTx) Marshal(buf []byte) ([]byte, error)

func (*OracleCoordinateTx) Size

func (d *OracleCoordinateTx) Size() (s uint64)

func (*OracleCoordinateTx) Unmarshal

func (d *OracleCoordinateTx) Unmarshal(buf []byte) (uint64, error)

type OracleCoordinates

type OracleCoordinates struct {
	LaststSCN int64
}

func (*OracleCoordinates) GetLogFile

func (b *OracleCoordinates) GetLogFile() string

func (*OracleCoordinates) GetLogPos

func (b *OracleCoordinates) GetLogPos() int64

func (*OracleCoordinates) GetTxSet

func (b *OracleCoordinates) GetTxSet() string

func (*OracleCoordinates) Marshal

func (d *OracleCoordinates) Marshal(buf []byte) ([]byte, error)

func (*OracleCoordinates) Size

func (d *OracleCoordinates) Size() (s uint64)

func (*OracleCoordinates) Unmarshal

func (d *OracleCoordinates) Unmarshal(buf []byte) (uint64, error)

type PrepareFn

type PrepareFn func() (err error)

type QueryCount

type QueryCount struct {
	ExtractedQueryCount *uint64
	AppliedQueryCount   *uint64
}

type QueryEventFlags

type QueryEventFlags struct {
	NoForeignKeyChecks bool

	// The query is converted to utf8 in dtle-src. Ignore charset/collation flags on dtle-dest.
	CharacterSetClient  string
	CollationConnection string
	CollationServer     string
}

func ParseQueryEventFlags

func ParseQueryEventFlags(bs []byte, logger g.LoggerType) (r QueryEventFlags, err error)

type Role

type Role struct {
	Tenant      string   `json:"tenant"`
	Name        string   `json:"name"`
	ObjectUsers []string `json:"object_users"`
	ObjectType  string   `json:"object_type"`
	Authority   string   `json:"authority"`
}

func NewDefaultRole

func NewDefaultRole(tenant string) *Role

type SchemaContext

type SchemaContext struct {
	TableSchema        string
	TableSchemaRename  string
	CreateSchemaString string

	TableMap map[string]*TableContext
}

func NewSchemaContext

func NewSchemaContext(name string) *SchemaContext

func (*SchemaContext) AddTable

func (sc *SchemaContext) AddTable(table *Table) (err error)

func (*SchemaContext) AddTables

func (sc *SchemaContext) AddTables(tables []*Table) (err error)

type SchemaTable

type SchemaTable struct {
	Schema string
	Table  string
}

type StoreManager

type StoreManager struct {
	// contains filtered or unexported fields
}

func NewStoreManager

func NewStoreManager(consulAddr []string, logger g.LoggerType) (*StoreManager, error)

func (*StoreManager) CheckJobExists

func (sm *StoreManager) CheckJobExists(jobId string) bool

func (*StoreManager) DeleteRole

func (sm *StoreManager) DeleteRole(tenant, name string) error

func (*StoreManager) DeleteUser

func (sm *StoreManager) DeleteUser(tenant, user string) error

func (*StoreManager) DestroyJob

func (sm *StoreManager) DestroyJob(jobId string) error

func (*StoreManager) DstPutNats

func (sm *StoreManager) DstPutNats(jobName string, natsAddr string, stopCh chan struct{}, onWatchError func(error)) error

func (*StoreManager) FindJobList

func (sm *StoreManager) FindJobList() (map[string]*JobListItemV2, error)

func (*StoreManager) FindRoleList

func (sm *StoreManager) FindRoleList(tenant string) ([]*Role, error)

func (*StoreManager) FindTenantList

func (sm *StoreManager) FindTenantList() (tenants []string, err error)

func (*StoreManager) FindUserList

func (sm *StoreManager) FindUserList(userKey string) ([]*User, error)

func (*StoreManager) GetBinlogFilePosForJob

func (sm *StoreManager) GetBinlogFilePosForJob(jobName string) (*mysql.Position, error)

func (*StoreManager) GetConfig

func (sm *StoreManager) GetConfig(jobName string) (*MySQLDriverConfig, error)

func (*StoreManager) GetDumpProgress

func (sm *StoreManager) GetDumpProgress(jobName string) (int64, int64, error)

return: ExecRowCount, TotalRowCount

func (*StoreManager) GetGtidForJob

func (sm *StoreManager) GetGtidForJob(jobName string) (string, error)

func (*StoreManager) GetJobInfo

func (sm *StoreManager) GetJobInfo(jobId string) (*JobListItemV2, error)

func (*StoreManager) GetJobStage

func (sm *StoreManager) GetJobStage(jobName string) (string, error)

func (*StoreManager) GetJobStatus

func (sm *StoreManager) GetJobStatus(jobId string) (string, error)

func (*StoreManager) GetNatsIfExist

func (sm *StoreManager) GetNatsIfExist(jobName string) (string, bool, error)

func (*StoreManager) GetOracleSCNPosForJob

func (sm *StoreManager) GetOracleSCNPosForJob(jobName string) (oldestUncommittedScn int64, committedSCN int64, err error)

func (*StoreManager) GetRole

func (sm *StoreManager) GetRole(tenant, name string) (*Role, bool, error)

func (*StoreManager) GetTargetGtid

func (sm *StoreManager) GetTargetGtid(subject string) (string, error)

func (*StoreManager) GetUser

func (sm *StoreManager) GetUser(tenant, username string) (*User, bool, error)

func (*StoreManager) PutConfig

func (sm *StoreManager) PutConfig(subject string, config *MySQLDriverConfig) error

func (*StoreManager) PutDumpProgress

func (sm *StoreManager) PutDumpProgress(jobName string, exec int64, total int64) error

func (*StoreManager) PutJobStage

func (sm *StoreManager) PutJobStage(jobName string, stage string) error

func (*StoreManager) PutTargetGtid

func (sm *StoreManager) PutTargetGtid(subject string, value string) error

func (*StoreManager) SaveBinlogFilePosForJob

func (sm *StoreManager) SaveBinlogFilePosForJob(jobName string, file string, pos int) error

func (*StoreManager) SaveGtidForJob

func (sm *StoreManager) SaveGtidForJob(jobName string, gtid string) error

func (*StoreManager) SaveJobInfo

func (sm *StoreManager) SaveJobInfo(job JobListItemV2) error

func (*StoreManager) SaveOracleSCNPos

func (sm *StoreManager) SaveOracleSCNPos(jobName string, oldestUncommittedScn, committedSCN int64) error

func (*StoreManager) SaveRole

func (sm *StoreManager) SaveRole(role *Role) error

func (*StoreManager) SaveUser

func (sm *StoreManager) SaveUser(user *User) error

func (*StoreManager) SrcWatchNats

func (sm *StoreManager) SrcWatchNats(jobName string, stopCh chan struct{},
	onWatchError func(error)) (natsAddr string, err error)

func (*StoreManager) WaitKv

func (sm *StoreManager) WaitKv(subject string, key string, stopCh chan struct{}) ([]byte, error)

func (*StoreManager) WaitOnJob

func (sm *StoreManager) WaitOnJob(currentJob string, waitJob string, stopCh chan struct{}) error

func (*StoreManager) WatchTargetGtid

func (sm *StoreManager) WatchTargetGtid(subject string, stopCh chan struct{}) (string, error)

func (*StoreManager) WatchTree

func (sm *StoreManager) WatchTree(dir string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error)

type Table

type Table struct {
	TableName         string
	TableRegex        string
	TableRename       string
	TableSchema       string // not user assigned
	TableSchemaRename string // not user assigned
	Counter           int64
	ColumnMapFrom     []string
	ColumnMapTo       []string

	OriginalTableColumns *ColumnList
	UseUniqueKey         *UniqueKey
	ColumnMap            []int

	TableType string

	Where string // Call GetWhere() instead of directly accessing.
}

func DecodeMaybeTable

func DecodeMaybeTable(data []byte) (*Table, error)

func NewTable

func NewTable(schemaName string, tableName string) *Table

func (*Table) GetWhere

func (t *Table) GetWhere() string

type TableContext

type TableContext struct {
	Table          *Table
	WhereCtx       *WhereContext
	DefChangedSent bool
	FKChildren     map[SchemaTable]struct{}
}

func NewTableContext

func NewTableContext(table *Table) (*TableContext, error)

func (*TableContext) WhereTrue

func (t *TableContext) WhereTrue(row []interface{}) (bool, error)

type TableSpec

type TableSpec struct {
	Schema      string
	Table       string
	ColumnMapTo []string
}

func (*TableSpec) Marshal

func (d *TableSpec) Marshal(buf []byte) ([]byte, error)

func (*TableSpec) Size

func (d *TableSpec) Size() (s uint64)

func (*TableSpec) Unmarshal

func (d *TableSpec) Unmarshal(buf []byte) (uint64, error)

type TableStats

type TableStats struct {
	InsertCount int64
	UpdateCount int64
	DelCount    int64
}

type TaskStatistics

type TaskStatistics struct {
	CurrentCoordinates *CurrentCoordinates
	TableStats         *TableStats
	DelayCount         *DelayCount
	ProgressPct        string
	ExecMasterRowCount int64
	ExecMasterTxCount  int64
	ReadMasterRowCount int64
	ReadMasterTxCount  int64
	ETA                string
	Backlog            string
	ThroughputStat     *ThroughputStat
	MsgStat            gonats.Statistics
	BufferStat         BufferStat
	Stage              string
	Timestamp          int64
	MemoryStat         MemoryStat
	HandledTxCount     TxCount
	HandledQueryCount  QueryCount
}

type ThroughputStat

type ThroughputStat struct {
	Num  uint64
	Time uint64
}

type TxCount

type TxCount struct {
	ExtractedTxCount *uint32
	AppliedTxCount   *uint32
}

type UniqueKey

type UniqueKey struct {
	Name            string
	Columns         ColumnList
	HasNullable     bool
	IsAutoIncrement bool
	LastMaxVals     []string
}

UniqueKey is the combination of a key's name and columns

func (*UniqueKey) IsPrimary

func (c *UniqueKey) IsPrimary() bool

IsPrimary checks if this unique key is primary

func (*UniqueKey) Len

func (c *UniqueKey) Len() int

func (*UniqueKey) String

func (c *UniqueKey) String() string

type User

type User struct {
	Username   string `json:"username"`
	Tenant     string `json:"tenant"`
	Role       string `json:"role"`
	Password   string `json:"password"`
	CreateTime string `json:"create_time"`
	Remark     string `json:"remark"`
}

type WhereContext

type WhereContext struct {
	Where     string
	Ast       expr.Node
	FieldsMap map[string]int
	IsDefault bool // is 'true'
}

func NewWhereCtx

func NewWhereCtx(where string, table *Table) (*WhereContext, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL