mycanal

package
v0.0.0-...-202847b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2023 License: Apache-2.0, MIT Imports: 22 Imported by: 0

Documentation

Overview

Package mycanal adds event listener to a MySQL/MariaDB compatible binlog, based on pkg myreplicator to provide triggerless events.

Overview

Triggers are stored routines which are invoked on a per-row operation upon INSERT, DELETE, UPDATE on a table. They were introduced in MySQL 5.0. A trigger may contain a set of queries, and these queries run in the same transaction space as the query that manipulates the table. This makes for an atomicity of both the original operation on the table and the trigger-invoked operations.

Triggers, overhead

A trigger in MySQL is a stored routine. MySQL stored routines are interpreted, never compiled. With triggers, for every INSERT, DELETE, UPDATE on the often busy tables, it pays the necessary price of the additional write, but also the price of interpreting the trigger body.

We know this to be a visible overhead on very busy or very large tables.

Triggers, locks

When a table with triggers is concurrently being written to, the triggers, being in same transaction space as the incoming queries, are also executed concurrently. While concurrent queries compete for resources via locks (e.g. the auto_increment value), the triggers need to simultaneously compete for their own locks (e.g., likewise on the auto_increment value on the ghost table, in a synchronous solution). These competitions are non-coordinated.

We have evidenced near or complete lock downs in production, to the effect of rendering the table or the entire database inaccessible due to lock contention.

Thus, triggers must keep operating. On busy servers, we have seen that even as the online operation throttles, the master is brought down by the load of the triggers.

Index

Constants

View Source
const (
	MySQLFlavor   = "mysql"
	MariaDBFlavor = "mariadb"
)

Use flavor for different MySQL versions,

View Source
const (
	ConfigPathBackendPosition     = `sql/mycanal/master_position`
	ConfigPathIncludeTableRegex   = `sql/mycanal/include_table_regex`
	ConfigPathExcludeTableRegex   = `sql/mycanal/exclude_table_regex`
	ConfigPathBinlogStartFile     = `sql/mycanal/binlog_start_file`
	ConfigPathBinlogStartPosition = `sql/mycanal/binlog_start_position`
	ConfigPathBinlogSlaveID       = `sql/mycanal/binlog_slave_id`
	ConfigPathServerFlavor        = `sql/mycanal/server_flavor`
)

Configuration paths for config.Service

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

Action constants to figure out the type of an event. Those constants will be passed to the interface RowsEventHandler.

Variables

This section is empty.

Functions

This section is empty.

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data. MySQL must use the binlog format ROW.

func NewCanal

func NewCanal(dsn string, db DBConFactory, opt *Options) (*Canal, error)

NewCanal creates a new canal object to start reading the MySQL binary log. The DSN is need to setup two different connections. One connection for reading the binary stream and the 2nd connection to execute queries. The 2nd argument `db` gets used to executed the queries, like setting variables or getting table information. Default database flavor is `mariadb`. export CS_DSN='root:PASSWORD@tcp(localhost:3306)/DATABASE_NAME

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

CatchMasterPos reads the current master position and waits until we reached it.

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(ctx context.Context, image string) error

CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) ClearTableCache

func (c *Canal) ClearTableCache(db string, table string)

ClearTableCache clear table cache

func (*Canal) Close

func (c *Canal) Close() error

Close closes all underlying connections

func (*Canal) FindTable

func (c *Canal) FindTable(ctx context.Context, tableName string) (*ddl.Table, error)

FindTable tries to find a table by its ID. If the table cannot be found by the first search, it will add the table to the internal map and performs a column load from the information_schema and then returns the fully defined table. Only tables which are found in the database name of the DSN get loaded.

func (*Canal) FlushBinlog

func (c *Canal) FlushBinlog() error

FlushBinlog executes FLUSH BINARY LOGS.

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (ms ddl.MasterStatus, err error)

func (*Canal) RegisterRowsEventHandler

func (c *Canal) RegisterRowsEventHandler(tableNames []string, h ...RowsEventHandler)

RegisterRowsEventHandler adds a new event handler to the internal list. If a table name gets provided the event handler is bound to that exact table name, if the table has not been excluded via the global regexes. An empty tableName calls the event handler for all tables. If a table name already exists, the RowsEventHandler gets appended to that list.

func (*Canal) Start

func (c *Canal) Start(ctx context.Context) error

Start starts the sync process in the background as a goroutine. You can stop the goroutine via the context.

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() ddl.MasterStatus

SyncedPosition returns the current synced position as retrieved from the SQl server.

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos ddl.MasterStatus, timeout time.Duration) error

WaitUntilPos flushes the binary logs until we've reached the desired position.

type DBConFactory

type DBConFactory func(dsn string) (*dml.ConnPool, error)

DBConFactory creates a new database connection.

func WithDB

func WithDB(db *sql.DB) DBConFactory

WithDB allows to set your own DB connection.

func WithMySQL

func WithMySQL() DBConFactory

WithMySQL adds the database/sql.DB driver including a ping to the database from the provided DSN.

type Options

type Options struct {
	// ConfigScoped defines the configuration to load the following fields from.
	// If not set the data won't be loaded.
	ConfigScoped config.Scoped
	// ConfigSet used to persists the master position of the binlog stream.
	ConfigSet config.Setter
	Log       log.Logger
	TLSConfig *tls.Config // Needs some rework
	// IncludeTableRegex defines the regex which matches the allowed table
	// names. Default state of WithIncludeTables is empty, this will include all
	// tables.
	IncludeTableRegex []string
	// ExcludeTableRegex defines the regex which matches the excluded table
	// names. Default state of WithExcludeTables is empty, ignores exluding and
	// includes all tables.
	ExcludeTableRegex []string

	// Set to change the maximum number of attempts to re-establish a broken
	// connection
	MaxReconnectAttempts int

	BinlogStartFile     string
	BinlogStartPosition uint64
	BinlogSlaveId       uint64
	// Flavor defines if `mariadb` or `mysql` should be used. Defaults to
	// `mariadb`.
	Flavor                   string
	MasterStatusQueryTimeout time.Duration
	// OnClose runs before the database connection gets closed and after the
	// syncer has been closed. The syncer does not "see" the changes comming
	// from the queries executed in the call back.
	OnClose func(*dml.ConnPool) error
}

Options provides multiple options for NewCanal. Part of those options can get loaded via config.Scoped.

type RowsEventHandler

type RowsEventHandler interface {
	// Do function handles a RowsEvent bound to a specific database. If it
	// returns an error behaviour of "Interrupted", the canal type will stop the
	// syncer. Binlog has three update event version, v0, v1 and v2. For v1 and
	// v2, the rows number must be even. Two rows for one event, format is
	// [before update row, after update row] for update v0, only one row for a
	// event, and we don't support this version yet. The Do function will run in
	// its own Goroutine. The provided argument `t` of type ddl.Table must only
	// be used for reading, changing `t` causes race conditions.
	Do(ctx context.Context, action string, t *ddl.Table, rows [][]any) error
	// Complete runs before a binlog rotation event happens. Same error rules
	// apply here like for function Do(). The Complete function will run in its
	// own Goroutine.
	Complete(context.Context) error
	// String returns the name of the handler
	String() string
}

RowsEventHandler calls your code when an event gets dispatched.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL