binlogsync

package
v0.0.0-...-b553abb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2019 License: Apache-2.0, MIT Imports: 21 Imported by: 0

Documentation

Overview

Package binlogsync adds event listener to a MySQL compatible binlog, based on pkg myreplicator.

Index

Constants

View Source
const (
	MySQLFlavor   = "mysql"
	MariaDBFlavor = "mariadb"
)

Use flavor for different MySQL versions,

View Source
const (
	ConfigPathBackendPosition     = `sql/binlogsync/master_position`
	ConfigPathIncludeTableRegex   = `sql/binlogsync/include_table_regex`
	ConfigPathExcludeTableRegex   = `sql/binlogsync/exclude_table_regex`
	ConfigPathBinlogStartFile     = `sql/binlogsync/binlog_start_file`
	ConfigPathBinlogStartPosition = `sql/binlogsync/binlog_start_position`
	ConfigPathBinlogSlaveID       = `sql/binlogsync/binlog_slave_id`
	ConfigPathServerFlavor        = `sql/binlogsync/server_flavor`
)

Configuration paths for config.Service

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

Action constants to figure out the type of an event. Those constants will be passed to the interface RowsEventHandler.

Variables

This section is empty.

Functions

This section is empty.

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data. MySQL must use the binlog format ROW.

func NewCanal

func NewCanal(dsn string, db DBConFactory, opt *Options) (*Canal, error)

NewCanal creates a new canal object to start reading the MySQL binary log. The DSN is need to setup two different connections. One connection for reading the binary stream and the 2nd connection to execute queries. The 2nd argument `db` gets used to executed the queries, like setting variables or getting table information. Default database flavor is `mariadb`. export CS_DSN='root:PASSWORD@tcp(localhost:3306)/DATABASE_NAME

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

CatchMasterPos reads the current master position and waits until we reached it.

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(ctx context.Context, image string) error

CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) ClearTableCache

func (c *Canal) ClearTableCache(db string, table string)

ClearTableCache clear table cache

func (*Canal) Close

func (c *Canal) Close() error

Close closes all underlying connections

func (*Canal) FindTable

func (c *Canal) FindTable(ctx context.Context, tableName string) (*ddl.Table, error)

FindTable tries to find a table by its ID. If the table cannot be found by the first search, it will add the table to the internal map and performs a column load from the information_schema and then returns the fully defined table. Only tables which are found in the database name of the DSN get loaded.

func (*Canal) FlushBinlog

func (c *Canal) FlushBinlog() error

FlushBinlog executes FLUSH BINARY LOGS.

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (ms ddl.MasterStatus, err error)

func (*Canal) RegisterRowsEventHandler

func (c *Canal) RegisterRowsEventHandler(tableName string, h ...RowsEventHandler)

RegisterRowsEventHandler adds a new event handler to the internal list. If a table name gets provided the event handler is bound to that exact table name, if the table has not been excluded via the global regexes. An empty tableName calls the event handler for all tables.

func (*Canal) Start

func (c *Canal) Start(ctx context.Context) error

Start starts the sync process in the background as a goroutine. You can stop the goroutine via the context.

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() ddl.MasterStatus

SyncedPosition returns the current synced position as retrieved from the SQl server.

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos ddl.MasterStatus, timeout time.Duration) error

WaitUntilPos flushes the binary logs until we've reached the desired position.

type DBConFactory

type DBConFactory func(dsn string) (*dml.ConnPool, error)

DBConFactory creates a new database connection.

func WithDB

func WithDB(db *sql.DB) DBConFactory

WithDB allows to set your own DB connection.

func WithMySQL

func WithMySQL() DBConFactory

WithMySQL adds the database/sql.DB driver including a ping to the database from the provided DSN.

type Options

type Options struct {
	// ConfigScoped defines the configuration to load the following fields from.
	// If not set the data won't be loaded.
	ConfigScoped config.Scoped
	// ConfigSet used to persists the master position of the binlog stream.
	ConfigSet config.Setter
	Log       log.Logger
	TLSConfig *tls.Config // Needs some rework
	// IncludeTableRegex defines the regex which matches the allowed table
	// names. Default state of WithIncludeTables is empty, this will include all
	// tables.
	IncludeTableRegex []string
	// ExcludeTableRegex defines the regex which matches the excluded table
	// names. Default state of WithExcludeTables is empty, ignores exluding and
	// includes all tables.
	ExcludeTableRegex []string

	BinlogStartFile     string
	BinlogStartPosition uint64
	BinlogSlaveId       uint64
	// Flavor defines if `mariadb` or `mysql` should be used. Defaults to
	// `mariadb`.
	Flavor                   string
	MasterStatusQueryTimeout time.Duration
	// OnClose runs before the database connection gets closed and after the
	// syncer has been closed. The syncer does not "see" the changes comming
	// from the queries executed in the call back.
	OnClose func(*dml.ConnPool) error
}

Options provides multiple options for NewCanal. Part of those options can get loaded via config.Scoped.

type RowsEventHandler

type RowsEventHandler interface {
	// Do function handles a RowsEvent bound to a specific database. If it
	// returns an error behaviour of "Interrupted", the canal type will stop the
	// syncer. Binlog has three update event version, v0, v1 and v2. For v1 and
	// v2, the rows number must be even. Two rows for one event, format is
	// [before update row, after update row] for update v0, only one row for a
	// event, and we don't support this version yet. The Do function will run in
	// its own Goroutine. The provided argument `t` of type ddl.Table must only
	// be used for reading, changing `t` causes race conditions.
	Do(ctx context.Context, action string, t *ddl.Table, rows [][]interface{}) error
	// Complete runs before a binlog rotation event happens. Same error rules
	// apply here like for function Do(). The Complete function will run in its
	// own Goroutine.
	Complete(context.Context) error
	// String returns the name of the handler
	String() string
}

RowsEventHandler calls your code when an event gets dispatched.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL