canal

package
v1.8.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CheckPoint

type CheckPoint struct {
	GTID     string
	FileName string
	Offset   int
	ID       int
}

type LogCanal

type LogCanal struct {
}

func (*LogCanal) Fire

func (p *LogCanal) Fire(e *canalLog.Entry) error

for canal log system

func (*LogCanal) Levels

func (p *LogCanal) Levels() []canalLog.Level

for canal log system

type ServiceCanal

type ServiceCanal struct {
	Host     string
	Port     int
	User     string
	Password string
	Flavor   string
	ServerID int
	// Array of regex string, only when the name of table (included schema, such as db.test)
	// matches any regex in this array, its binlog will be collected.
	// Example: "IncludeTables": [".*\\..*"], collect all tables, notes the backslash for escaping.
	IncludeTables []string
	// Array of regex string, if the name of table (included schema, too) matches any regex
	// in this array, its binlog will not be collected.
	// Example: "ExcludeTables": ["^mysql\\..*$"], exclude all tables in schema named 'mysql'.
	ExcludeTables []string
	// Start* are used to specify start synchronization point. StartGTID is preferred if
	// both of them are specified. **Only works when no checkpoint is exiting.**
	// Start* will fail if any error is encountered during synchronization because plugin will
	// recovery from latest checkpoint (got from server).
	// StartGTID will not synchrnoize event specified by it, for example, set StartGTID to
	// 'uuid:1-9', then plugin will skip 1 to 9, and synchrnoize from interval 10.
	// However, StartBinName and StartBinLogPos will start synchrnoize from the event identified
	// by them.
	// For StartGTID:
	// - If its format is invalid (parse error), plugin starts synchronization from latest,
	//   because in such case, the first synchronization will fail, and plugin will get the
	//   latest checkpoint from server to failover.
	// - If its format is valid but value is invalid, plugin's behavior depends on server. It
	//   might start synchrnoize from beginning or ERROR to start from latest.
	// For StartBinName and StartBinLogPos:
	// - If any of them are invalid, plugins starts synchronization from latest (failover).
	StartGTID       string
	StartBinName    string
	StartBinLogPos  int
	HeartBeatPeriod int
	ReadTimeout     int
	EnableDDL       bool
	EnableXID       bool
	EnableGTID      bool
	EnableInsert    bool
	EnableUpdate    bool
	EnableDelete    bool
	TextToString    bool
	EnableEventMeta bool
	// True by default, convert field with SET type to string in format [elem1, elem2, ...].
	SetToString bool
	// True by default, convert field in Go []byte type to string, such as JSON type.
	ByteValueToString bool
	// TODO: This parameter is not exposed in document.
	// StartFromBegining only works when no checkpoint and all Start* parameters are missing.
	// If StartFromBegining is set, plugin will not try to get latest position from server.
	StartFromBegining bool
	Charset           string
	// Pack values into two fields: new_data and old_data. False by default.
	PackValues bool
	// contains filtered or unexported fields
}

ServiceCanal is a service input plugin to collect binlog from MySQL. It works as a slave node to collect binlog from master (source) continually. It supports two kinds of replication mode: GTID or binlog-file.

GTID mode uses GTID as checkpoint, which needs the server to enable this feature (by setting gtid_mode=ON). At the beginning, plugin will check this necessary condition, it will fall down to binlog-file mode if gtid_mode is OFF. One special case: the server has turned on gtid_mode, but the latest GTID is empty. In such case, plugin will fall down to binlog-file mode at beginning.

Binlog-file mode uses the sequence number of file as checkpoint, which might leads to data repetition in master-slave arch. Because for same binlog content, master and slave nodes can use different sequence number to store it, once the sequence number on slave is higher and HA switch happens, data repetition happened. That's why we prefer GTID mode.

func NewServiceCanal

func NewServiceCanal() *ServiceCanal

func (*ServiceCanal) Collect

func (sc *ServiceCanal) Collect(pipeline.Collector) error

Collect takes in an accumulator and adds the metrics that the Input gathers. This is called every "interval"

func (*ServiceCanal) Description

func (sc *ServiceCanal) Description() string

func (*ServiceCanal) GetBinlogLatestPos

func (sc *ServiceCanal) GetBinlogLatestPos() mysql.Position

GetBinlogLatestPos gets the latest binlog position from server.

func (*ServiceCanal) Init

func (sc *ServiceCanal) Init(context pipeline.Context) (int, error)

func (*ServiceCanal) OnDDL

OnDDL...

func (*ServiceCanal) OnGTID

func (sc *ServiceCanal) OnGTID(s mysql.GTIDSet) error

OnGTID reports the GTID of the following event (OnRow, OnDDL). So we can not update checkpoint here, just record GTID and update in OnRow.

This strategy brings a potential problem, checkpoint will only be updated when OnRow is called, however, because of IncludeTables and ExcludeTables, calls to OnRow will be filtered. So, if plugin restarts before the next OnRow call comes, it will rerun from a old checkpoint. But this should be trivial for cases that valid data comes continuously.

func (*ServiceCanal) OnPosSynced

func (sc *ServiceCanal) OnPosSynced(pos mysql.Position, _ mysql.GTIDSet, force bool) error

func (*ServiceCanal) OnRotate

func (sc *ServiceCanal) OnRotate(r *replication.RotateEvent) error

func (*ServiceCanal) OnRow

func (sc *ServiceCanal) OnRow(e *canal.RowsEvent) error

OnRow processes the row event, according user's config, constructs data to send.

func (*ServiceCanal) OnTableChanged

func (sc *ServiceCanal) OnTableChanged(schema string, table string) error

func (*ServiceCanal) OnXID

func (sc *ServiceCanal) OnXID(p mysql.Position) error

func (*ServiceCanal) Start

func (sc *ServiceCanal) Start(c pipeline.Collector) error

Start starts the ServiceInput's service, whatever that may be

func (*ServiceCanal) Stop

func (sc *ServiceCanal) Stop() error

Stop stops the services and closes any necessary channels and connections

func (*ServiceCanal) String

func (sc *ServiceCanal) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL