Documentation
¶
Overview ¶
Copyright © 2020 Marvin
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://wwc.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func GetTableSortedColumnChangedEvent(cols []*ConvColumn) ([]string, []string, map[string]string, map[string]interface{})
- func InspectTiDBConsumeTask(taskName, taskFlow, taskMode string, databaseS database.IDatabase) error
- func QuoteSchemaTable(schema string, table string) string
- type BatchDecoder
- func (b *BatchDecoder) AddKeyValue(key, value []byte) error
- func (b *BatchDecoder) HasNext() (MsgEventType, bool, error)
- func (b *BatchDecoder) NextDDLEvent() (*DDLChangedEvent, error)
- func (b *BatchDecoder) NextResolvedEvent() (uint64, error)
- func (b *BatchDecoder) NextRowChangedEvent() (*RowChangedEvent, error)
- type Column
- type ColumnFlagType
- func (b ColumnFlagType) IsBinary() bool
- func (b ColumnFlagType) IsGeneratedColumn() bool
- func (b ColumnFlagType) IsHandleKey() bool
- func (b ColumnFlagType) IsMultipleKey() bool
- func (b ColumnFlagType) IsNullable() bool
- func (b ColumnFlagType) IsPrimaryKey() bool
- func (b ColumnFlagType) IsUniqueKey() bool
- func (b ColumnFlagType) IsUnsigned() bool
- type ColumnType
- type Consumer
- type ConsumerGroup
- func (cg *ConsumerGroup) CheckObsoleteMessages(currentTs, nowTs uint64) bool
- func (cg *ConsumerGroup) Close() error
- func (cg *ConsumerGroup) CommitMessage(ctx context.Context, msg kafka.Message) error
- func (cg *ConsumerGroup) Consume(c *Consumer)
- func (cg *ConsumerGroup) Pause(partition int) error
- func (cg *ConsumerGroup) Resume(partition int) error
- func (cg *ConsumerGroup) Run() error
- func (cg *ConsumerGroup) Start(ctx context.Context, partition int) error
- func (cg *ConsumerGroup) Stop(partition int) error
- func (cg *ConsumerGroup) StopAll() error
- func (cg *ConsumerGroup) WriteMessage(c *Consumer, msg kafka.Message) (bool, error)
- type ConvColumn
- type DDLChangedEvent
- type DDLChangedEvents
- type DDLType
- type DdlCoordinator
- func (d *DdlCoordinator) Append(key *DDLChangedEvent, value int)
- func (d *DdlCoordinator) Get(key *DDLChangedEvent) []int
- func (d *DdlCoordinator) GetFrontDDL() *DDLChangedEvent
- func (d *DdlCoordinator) IsDDLFlush(key *DDLChangedEvent) bool
- func (d *DdlCoordinator) IsResolvedFlush(resolvedTs uint64) bool
- func (d *DdlCoordinator) Len(key *DDLChangedEvent) int
- func (d *DdlCoordinator) PopDDL()
- func (d *DdlCoordinator) Remove(key *DDLChangedEvent) error
- type EventGroup
- type Flag
- type MessageDDLEventValue
- type MessageEventKey
- type MessageRowEventValue
- type MsgEventType
- type RowChangedEvent
- type RowEventDecoder
Constants ¶
const BatchVersion uint64 = 1
BatchVersion represents the version of ticdc batch key-value format
Variables ¶
var ColumnTypeMap = map[ColumnType]string{ TypeUnspecified: "NONE", TypeTinyint: "TINYINT", TypeBool: "BOOL", TypeSmallint: "SMALLINT", TypeInt: "INT", TypeFloat: "FLOAT", TypeDouble: "DOUBLE", TypeNull: "NULL", TypeTimestamp: "TIMESTAMP", TypeBigint: "BIGINT", TypeMediumint: "MEDIUMINT", TypeDate: "DATE", TypeTime: "TIME", TypeDatetime: "DATETIME", TypeYear: "YEAR", TypeNewDate: "DATE", TypeVarchar: "VARCHAR", TypeBit: "BIT", TypeJSON: "JSON", TypeDecimal: "DECIMAL", TypeEnum: "ENUM", TypeSet: "SET", TypeTinyBlob: "TINYBLOB", TypeMediumBlob: "MEDIUMBLOB", TypeLongBlob: "LONGBLOB", TypeBlob: "BLOB", TypeVarbinary: "VARBINARY", TypeChar: "CHAR", TypeBinary: "Binary", TypeGeometry: "GEOMETRY", TypeTiDBVectorFloat32: "VECTOR", }
var DDLTypeMap = map[DDLType]string{ DDLCreateSchema: "create schema", DDLDropSchema: "drop schema", DDLCreateTable: "create table", DDLCreateTables: "create tables", DDLDropTable: "drop table", DDLAddColumn: "add column", DDLDropColumn: "drop column", DDLAddIndex: "add index", DDLDropIndex: "drop index", DDLAddForeignKey: "add foreign key", DDLDropForeignKey: "drop foreign key", DDLTruncateTable: "truncate table", DDLModifyColumn: "modify column", DDLRebaseAutoID: "rebase auto_increment ID", DDLRenameTable: "rename table", DDLRenameTables: "rename tables", DDLSetDefaultValue: "set default value", DDLShardRowID: "shard row ID", DDLModifyTableComment: "modify table comment", DDLRenameIndex: "rename index", DDLAddTablePartition: "add partition", DDLDropTablePartition: "drop partition", DDLCreateView: "create view", DDLModifyTableCharsetAndCollate: "modify table charset and collate", DDLTruncateTablePartition: "truncate partition", DDLDropView: "drop view", DDLRecoverTable: "recover table", DDLModifySchemaCharsetAndCollate: "modify schema charset and collate", DDLLockTable: "lock table", DDLUnlockTable: "unlock table", DDLRepairTable: "repair table", DDLSetTiFlashReplica: "set tiflash replica", DDLUpdateTiFlashReplicaStatus: "update tiflash replica status", DDLAddPrimaryKey: "add primary key", DDLDropPrimaryKey: "drop primary key", DDLCreateSequence: "create sequence", DDLAlterSequence: "alter sequence", DDLDropSequence: "drop sequence", DDLModifyTableAutoIDCache: "modify auto id cache", DDLRebaseAutoRandomBase: "rebase auto_random ID", DDLAlterIndexVisibility: "alter index visibility", DDLExchangeTablePartition: "exchange partition", DDLAddCheckConstraint: "add check constraint", DDLDropCheckConstraint: "drop check constraint", DDLAlterCheckConstraint: "alter check constraint", DDLAlterTableAttributes: "alter table attributes", DDLAlterTablePartitionPlacement: "alter table partition placement", DDLAlterTablePartitionAttributes: "alter table partition attributes", DDLCreatePlacementPolicy: "create placement policy", DDLAlterPlacementPolicy: "alter placement policy", DDLDropPlacementPolicy: "drop placement policy", DDLModifySchemaDefaultPlacement: "modify schema default placement", DDLAlterTablePlacement: "alter table placement", DDLAlterCacheTable: "alter table cache", DDLAlterNoCacheTable: "alter table nocache", DDLAlterTableStatsOptions: "alter table statistics options", DDLMultiSchemaChange: "alter table multi-schema change", DDLFlashbackCluster: "flashback cluster", DDLRecoverSchema: "flashback schema", DDLReorganizePartition: "alter table reorganize partition", DDLAlterTTLInfo: "alter table ttl", DDLAlterTTLRemove: "alter table no_ttl", DDLCreateResourceGroup: "create resource group", DDLAlterResourceGroup: "alter resource group", DDLDropResourceGroup: "drop resource group", DDLAlterTablePartitioning: "alter table partition by", DDLRemovePartitioning: "alter table remove partitioning", DDLAddVectorIndex: "add vector index", // contains filtered or unexported fields }
DDLTypeMap is the map of DDL DDLType to string.
Functions ¶
func InspectTiDBConsumeTask ¶
func InspectTiDBConsumeTask(taskName, taskFlow, taskMode string, databaseS database.IDatabase) error
TiDB TiCDC index-value dispatcher update event compatible https://docs.pingcap.com/zh/tidb/dev/ticdc-split-update-behavior v6.5 [>=v6.5.5] tidb database version greater than v6.5.5 and less than v7.0.0 All versions are supported normally v7 版本及以上 [>=v7.1.2] all versions of the tidb database version greater than v7.1.2 can be supported normally
func QuoteSchemaTable ¶
QuoteSchemaTable quotes a table name
Types ¶
type BatchDecoder ¶
type BatchDecoder struct {
// contains filtered or unexported fields
}
Solved Case:
key [A{"ts":454376015366979596,"scm":"marvin","tbl":"t1","rid":1,"t":1}A{"ts":454376015366979596,"scm":"marvin","tbl":"t1","rid":3,"t":1}]
values [I{"u":{"id":{"t":3,"h":true,"f":11,"v":1},"val":{"t":15,"f":64,"v":"aa"}}}I{"u":{"id":{"t":3,"h":true,"f":11,"v":3},"val":{"t":15,"f":64,"v":"cc"}}}]
BatchDecoder decodes the byte of a batch into the original messages.
func (*BatchDecoder) AddKeyValue ¶
func (b *BatchDecoder) AddKeyValue(key, value []byte) error
AddKeyValue implements the RowEventDecoder interface
func (*BatchDecoder) HasNext ¶
func (b *BatchDecoder) HasNext() (MsgEventType, bool, error)
HasNext implements the RowEventDecoder interface
func (*BatchDecoder) NextDDLEvent ¶
func (b *BatchDecoder) NextDDLEvent() (*DDLChangedEvent, error)
NextDDLEvent implements the RowEventDecoder interface
func (*BatchDecoder) NextResolvedEvent ¶
func (b *BatchDecoder) NextResolvedEvent() (uint64, error)
NextResolvedEvent implements the RowEventDecoder interface
func (*BatchDecoder) NextRowChangedEvent ¶
func (b *BatchDecoder) NextRowChangedEvent() (*RowChangedEvent, error)
NextRowChangedEvent implements the RowEventDecoder interface
type Column ¶
type Column struct {
ColumnType ColumnType `json:"t"`
WhereHandle bool `json:"h,omitempty"`
ColumnFlag ColumnFlagType `json:"f"`
ColumnValue interface{} `json:"v"`
}
func FormatColumn ¶
FormatColumn formats a codec column.
func (*Column) ConvRowChangeColumn ¶
func (c *Column) ConvRowChangeColumn(name string) (*ConvColumn, error)
ConvRowChangeColumn converts from a codec column to a row changed event column.
type ColumnFlagType ¶
type ColumnFlagType Flag
ColumnFlagType is for encapsulating the flag operations for different flags.
const ( // BinaryFlag means the column charset is binary BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) // HandleKeyFlag means the column is selected as the handle key // The handleKey is chosen by the following rules in the order: // 1. if the table has primary key, it's the handle key. // 2. If the table has not null unique key, it's the handle key. // 3. If the table has no primary key and no not null unique key, it has no handleKey. HandleKeyFlag // GeneratedColumnFlag means the column is a generated column GeneratedColumnFlag // PrimaryKeyFlag means the column is primary key PrimaryKeyFlag // UniqueKeyFlag means the column is unique key UniqueKeyFlag // MultipleKeyFlag means the column is multiple key MultipleKeyFlag // NullableFlag means the column is nullable NullableFlag // UnsignedFlag means the column stores an unsigned integer UnsignedFlag )
func (ColumnFlagType) IsBinary ¶
func (b ColumnFlagType) IsBinary() bool
IsBinary shows whether BinaryFlag is set
func (ColumnFlagType) IsGeneratedColumn ¶
func (b ColumnFlagType) IsGeneratedColumn() bool
IsGeneratedColumn shows whether GeneratedColumn is set
func (ColumnFlagType) IsHandleKey ¶
func (b ColumnFlagType) IsHandleKey() bool
IsHandleKey shows whether HandleKey is set
func (ColumnFlagType) IsMultipleKey ¶
func (b ColumnFlagType) IsMultipleKey() bool
IsMultipleKey shows whether MultipleKeyFlag is set
func (ColumnFlagType) IsNullable ¶
func (b ColumnFlagType) IsNullable() bool
IsNullable shows whether NullableFlag is set
func (ColumnFlagType) IsPrimaryKey ¶
func (b ColumnFlagType) IsPrimaryKey() bool
IsPrimaryKey shows whether PrimaryKeyFlag is set
func (ColumnFlagType) IsUniqueKey ¶
func (b ColumnFlagType) IsUniqueKey() bool
IsUniqueKey shows whether UniqueKeyFlag is set
func (ColumnFlagType) IsUnsigned ¶
func (b ColumnFlagType) IsUnsigned() bool
IsUnsigned shows whether UnsignedFlag is set
type ColumnType ¶
type ColumnType uint64
const ( TypeUnspecified ColumnType = 0 TypeTinyint ColumnType = 1 // TINYINT / BOOL TypeSmallint ColumnType = 2 // SMALLINT TypeInt ColumnType = 3 // INT TypeFloat ColumnType = 4 TypeDouble ColumnType = 5 TypeNull ColumnType = 6 TypeTimestamp ColumnType = 7 TypeBigint ColumnType = 8 // BIGINT TypeMediumint ColumnType = 9 // MEDIUMINT TypeDate ColumnType = 10 // Date -> 10/14 TypeTime ColumnType = 11 TypeDatetime ColumnType = 12 TypeYear ColumnType = 13 TypeNewDate ColumnType = 14 TypeVarchar ColumnType = 15 TypeBit ColumnType = 16 TypeJSON ColumnType = 245 TypeDecimal ColumnType = 246 TypeEnum ColumnType = 247 TypeSet ColumnType = 248 // value 编码为 Base64 TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252 TypeVarbinary ColumnType = 253 // value 编码为 UTF-8;当上游类型为 BINARY 时,将对不可见的字符转义 TypeChar ColumnType = 254 // CHAR/BINARY /// not support TypeGeometry ColumnType = 255 TypeTiDBVectorFloat32 ColumnType = 256 // 处理同个 ColumnType 代表不同数据类型 Case TypeBool ColumnType = 1001 TypeBinary ColumnType = 1002 )
func (ColumnType) ColumnType ¶
func (c ColumnType) ColumnType() string
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup( ctx context.Context, task *task.Task, schemaRoute *rule.SchemaRouteRule, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, consumeTables []string, param *pb.CdcConsumeParam, partitions []int, dbTypeT string, database database.IDatabase) *ConsumerGroup
func (*ConsumerGroup) CheckObsoleteMessages ¶
func (cg *ConsumerGroup) CheckObsoleteMessages(currentTs, nowTs uint64) bool
CheckObsoleteMessages 1. Initially start filtering and filtration of synchronously consumed events 2. During the operation, the corresponding partition DDL/ResolvedTs Event is refreshed. The reason is that CDC guarantees that all events before the DDL/ResolvedTs Event have been sent, and there should be no more events smaller than the DDL/ResolvedTs Event Ts
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
func (*ConsumerGroup) CommitMessage ¶
func (*ConsumerGroup) Consume ¶
func (cg *ConsumerGroup) Consume(c *Consumer)
Consume will read message from Kafka.
func (*ConsumerGroup) Pause ¶
func (cg *ConsumerGroup) Pause(partition int) error
func (*ConsumerGroup) Resume ¶
func (cg *ConsumerGroup) Resume(partition int) error
func (*ConsumerGroup) Run ¶
func (cg *ConsumerGroup) Run() error
func (*ConsumerGroup) Start ¶
func (cg *ConsumerGroup) Start(ctx context.Context, partition int) error
func (*ConsumerGroup) Stop ¶
func (cg *ConsumerGroup) Stop(partition int) error
func (*ConsumerGroup) StopAll ¶
func (cg *ConsumerGroup) StopAll() error
func (*ConsumerGroup) WriteMessage ¶
WriteMessage is to decode kafka message to event.
type ConvColumn ¶
type ConvColumn struct {
ColumnName string `json:"columnName"`
ColumnType string `json:"columnType"`
ColumnFlag ColumnFlagType `json:"columnFlag"`
ColumnValue interface{} `json:"columnValue"`
}
type DDLChangedEvent ¶
type DDLChangedEvent struct {
CommitTs uint64 `json:"commitTs"`
SchemaName string `json:"schemaName"`
TableName string `json:"tableName"`
DdlQuery string `json:"ddlQuery"`
DdlType DDLType `json:"ddlType"`
}
func MsgConvDDLEvent ¶
func MsgConvDDLEvent(key *MessageEventKey, value *MessageDDLEventValue) *DDLChangedEvent
func (*DDLChangedEvent) String ¶
func (d *DDLChangedEvent) String() string
type DDLChangedEvents ¶
type DDLChangedEvents []*DDLChangedEvent
DDLChangedEvents is a slice of DDLChangedEvent and implements the sort.Interface interface.
func (*DDLChangedEvents) Add ¶
func (d *DDLChangedEvents) Add(event *DDLChangedEvent)
Add a new DDLChangedEvent and keep the ddls sorted by CommitTs
func (DDLChangedEvents) Len ¶
func (d DDLChangedEvents) Len() int
func (DDLChangedEvents) Less ¶
func (d DDLChangedEvents) Less(i, j int) bool
func (DDLChangedEvents) Swap ¶
func (d DDLChangedEvents) Swap(i, j int)
type DDLType ¶
type DDLType int
DDLType is the type for DDL DDL.
const ( DDLNone DDLType = 0 DDLCreateSchema DDLType = 1 DDLDropSchema DDLType = 2 DDLCreateTable DDLType = 3 DDLDropTable DDLType = 4 DDLAddColumn DDLType = 5 DDLDropColumn DDLType = 6 DDLAddIndex DDLType = 7 DDLDropIndex DDLType = 8 DDLAddForeignKey DDLType = 9 DDLDropForeignKey DDLType = 10 DDLTruncateTable DDLType = 11 DDLModifyColumn DDLType = 12 DDLRebaseAutoID DDLType = 13 DDLRenameTable DDLType = 14 DDLSetDefaultValue DDLType = 15 DDLShardRowID DDLType = 16 DDLModifyTableComment DDLType = 17 DDLRenameIndex DDLType = 18 DDLAddTablePartition DDLType = 19 DDLDropTablePartition DDLType = 20 DDLCreateView DDLType = 21 DDLModifyTableCharsetAndCollate DDLType = 22 DDLTruncateTablePartition DDLType = 23 DDLDropView DDLType = 24 DDLRecoverTable DDLType = 25 DDLModifySchemaCharsetAndCollate DDLType = 26 DDLLockTable DDLType = 27 DDLUnlockTable DDLType = 28 DDLRepairTable DDLType = 29 DDLSetTiFlashReplica DDLType = 30 DDLUpdateTiFlashReplicaStatus DDLType = 31 DDLAddPrimaryKey DDLType = 32 DDLDropPrimaryKey DDLType = 33 DDLCreateSequence DDLType = 34 DDLAlterSequence DDLType = 35 DDLDropSequence DDLType = 36 DDLAddColumns DDLType = 37 // Deprecated, we use DDLMultiSchemaChange instead. DDLDropColumns DDLType = 38 // Deprecated, we use DDLMultiSchemaChange instead. DDLModifyTableAutoIDCache DDLType = 39 DDLRebaseAutoRandomBase DDLType = 40 DDLAlterIndexVisibility DDLType = 41 DDLExchangeTablePartition DDLType = 42 DDLAddCheckConstraint DDLType = 43 DDLDropCheckConstraint DDLType = 44 DDLAlterCheckConstraint DDLType = 45 DDLRenameTables DDLType = 47 DDLAlterTableAttributes DDLType = 49 DDLAlterTablePartitionAttributes DDLType = 50 DDLCreatePlacementPolicy DDLType = 51 DDLAlterPlacementPolicy DDLType = 52 DDLDropPlacementPolicy DDLType = 53 DDLAlterTablePartitionPlacement DDLType = 54 DDLModifySchemaDefaultPlacement DDLType = 55 DDLAlterTablePlacement DDLType = 56 DDLAlterCacheTable DDLType = 57 // not used DDLAlterTableStatsOptions DDLType = 58 DDLAlterNoCacheTable DDLType = 59 DDLCreateTables DDLType = 60 DDLMultiSchemaChange DDLType = 61 DDLFlashbackCluster DDLType = 62 DDLRecoverSchema DDLType = 63 DDLReorganizePartition DDLType = 64 DDLAlterTTLInfo DDLType = 65 DDLAlterTTLRemove DDLType = 67 DDLCreateResourceGroup DDLType = 68 DDLAlterResourceGroup DDLType = 69 DDLDropResourceGroup DDLType = 70 DDLAlterTablePartitioning DDLType = 71 DDLRemovePartitioning DDLType = 72 DDLAddVectorIndex DDLType = 73 )
List DDL actions.
type DdlCoordinator ¶
type DdlCoordinator struct {
// ticdc open protocol, DDL message events will be distributed to all partitions
// That is, if any partition receives the DDL Event, all other partitions need to receive it, indicating that the DDL event is complete
CoordNums int `json:"coordNums"`
// ddl coordinator
// ddl -> partitions
// 1. Determine whether the ddl event is received completely, len([]int32) == coordNums
// 2. Receive the complete ddl event and append the ddl task queue
Coordinators map[string][]int `json:"Coordinators"`
// store ddl max commitTs information
DdlWithMaxCommitTs *DDLChangedEvent `json:"ddlWithMaxCommits"`
// ddl task queue, stores ddl events in order of commitTs
Ddls DDLChangedEvents `json:"Ddls"`
// contains filtered or unexported fields
}
func NewDdlCoordinator ¶
func NewDdlCoordinator(coords int) *DdlCoordinator
func (*DdlCoordinator) Append ¶
func (d *DdlCoordinator) Append(key *DDLChangedEvent, value int)
func (*DdlCoordinator) Get ¶
func (d *DdlCoordinator) Get(key *DDLChangedEvent) []int
func (*DdlCoordinator) GetFrontDDL ¶
func (d *DdlCoordinator) GetFrontDDL() *DDLChangedEvent
func (*DdlCoordinator) IsDDLFlush ¶
func (d *DdlCoordinator) IsDDLFlush(key *DDLChangedEvent) bool
func (*DdlCoordinator) IsResolvedFlush ¶
func (d *DdlCoordinator) IsResolvedFlush(resolvedTs uint64) bool
func (*DdlCoordinator) Len ¶
func (d *DdlCoordinator) Len(key *DDLChangedEvent) int
func (*DdlCoordinator) PopDDL ¶
func (d *DdlCoordinator) PopDDL()
func (*DdlCoordinator) Remove ¶
func (d *DdlCoordinator) Remove(key *DDLChangedEvent) error
type EventGroup ¶
type EventGroup struct {
// contains filtered or unexported fields
}
EventGroup could store change ddl and dml event message.
func (*EventGroup) Append ¶
func (g *EventGroup) Append(e *RowChangedEvent)
Append will append an event to event groups.
func (*EventGroup) DDLCommitTs ¶
func (g *EventGroup) DDLCommitTs(ddlCommitTs uint64) []*RowChangedEvent
DDLCommitTs returns all events strictly < ddlCommitTs
func (*EventGroup) ResolvedTs ¶
func (g *EventGroup) ResolvedTs(resolveTs uint64) []*RowChangedEvent
ResolvedTs will get events whose CommitTs is less than or equal to resolveTs, and at the same time remove events whose CommitTs is less than or equal to resolveTs from the original queue
type MessageDDLEventValue ¶
func (*MessageDDLEventValue) Decode ¶
func (m *MessageDDLEventValue) Decode(data []byte) error
func (*MessageDDLEventValue) Encode ¶
func (m *MessageDDLEventValue) Encode() ([]byte, error)
type MessageEventKey ¶
type MessageEventKey struct {
CommitTs uint64 `json:"ts"`
SchemaName string `json:"scm,omitempty"`
TableName string `json:"tbl,omitempty"`
RowID int64 `json:"rid,omitempty"`
MsgEventType MsgEventType `json:"t"`
}
func (*MessageEventKey) Decode ¶
func (m *MessageEventKey) Decode(data []byte) error
Decode codes a message key from a byte slice.
func (*MessageEventKey) Encode ¶
func (m *MessageEventKey) Encode() ([]byte, error)
Encode encodes the message key to a byte slice.
type MessageRowEventValue ¶
type MessageRowEventValue struct {
// Update Event 统一拆分 Delete/Insert,Before 对应 Delete
// 如果 before 值存在则说明 Upsert 对应 Update Event,否则说明 Upsert 对应 Insert Event
Upsert map[string]Column `json:"u,omitempty"`
Before map[string]Column `json:"p,omitempty"`
Delete map[string]Column `json:"d,omitempty"`
}
func (*MessageRowEventValue) Decode ¶
func (m *MessageRowEventValue) Decode(data []byte) error
func (*MessageRowEventValue) Encode ¶
func (m *MessageRowEventValue) Encode() ([]byte, error)
type MsgEventType ¶
type MsgEventType int
MsgEventType is the type of message, which is used by MqSink and RedoLog.
const ( // MsgEventTypeUnknown is unknown type of message key MsgEventTypeUnknown MsgEventType = iota // MsgEventTypeRow is row type of message key MsgEventTypeRow // MsgEventTypeDDL is ddl type of message key MsgEventTypeDDL // MsgEventTypeResolved is resolved type of message key MsgEventTypeResolved )
type RowChangedEvent ¶
type RowChangedEvent struct {
SchemaName string `json:"schemaName"`
TableName string `json:"tableName"`
QueryType string `json:"queryType"`
CommitTs uint64 `json:"commitTS"`
IsDDL bool `json:"isDDL"`
DdlQuery string `json:"ddlQuery"`
// The table synchronized by TiCDC needs to have at least one valid index. The definition of a valid index is as follows:
// 1, the primary key (PRIMARY KEY) is a valid index.
// 2, each column in the unique index (UNIQUE INDEX) is explicitly defined as NOT NULL in the table structure and there are no virtual generated columns (VIRTUAL GENERATED COLUMNS).
// Data synchronization TiCDC will select a valid index as the Handle Index. The HandleKeyFlag of the columns included in the Handle Index is set to 1.
ValidUniqColumns []string `json:"validUniqColumns"`
// Represents all field names, but Kafka information does not carry field offsets, and is not guaranteed to be strictly consistent with the order in which the fields are created in the table structure. Sort by field name
ColumnNames []string `json:"columnNames"`
ColumnType map[string]string `json:"columnType"`
NewColumnData map[string]interface{} `json:"newColumnData"`
// Only when this message is generated by an Update type event, record the name of each column and the data value before Update
OldColumnData map[string]interface{} `json:"oldColumnData"`
}
RowChangedEvent store the ddl and dml event
func MsgConvRowChangedEvent ¶
func MsgConvRowChangedEvent(key *MessageEventKey, value *MessageRowEventValue) (*RowChangedEvent, error)
func (*RowChangedEvent) Delete ¶
func (e *RowChangedEvent) Delete( dbTypeT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{})
func (*RowChangedEvent) Replace ¶
func (e *RowChangedEvent) Replace(dbTypeT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{})
func (*RowChangedEvent) String ¶
func (e *RowChangedEvent) String() string
type RowEventDecoder ¶
type RowEventDecoder interface {
// AddKeyValue add the received key and values to the decoder,
// should be called before `HasNext`
// decoder decode the key and value into the event format.
AddKeyValue(key, value []byte) error
// HasNext returns
// 1. the type of the next event
// 2. a bool if the next event is exist
// 3. error
HasNext() (MsgEventType, bool, error)
// NextResolvedEvent returns the next resolved event if exists
NextResolvedEvent() (uint64, error)
// NextRowChangedEvent returns the next row changed event if exists
NextRowChangedEvent() (*RowChangedEvent, error)
// NextDDLEvent returns the next DDL event if exists
NextDDLEvent() (*DDLChangedEvent, error)
}
RowEventDecoder is an abstraction for events decoder this interface is only for testing now
func NewBatchDecoder ¶
func NewBatchDecoder(msgCompresion string) RowEventDecoder
NewBatchDecoder creates a new BatchDecoder.