canal

package
v0.0.0-...-d30ddaf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2023 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

The action name for sync.

Variables

View Source
var ErrExcludedTable = errors.New("excluded table meta")
View Source
var UnknownTableRetryPeriod = time.Second * time.Duration(10)

canal will retry fetching unknown table's meta after UnknownTableRetryPeriod

Functions

This section is empty.

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog

func NewCanal

func NewCanal(cfg *Config) (*Canal, error)

func (*Canal) AddDumpDatabases

func (c *Canal) AddDumpDatabases(dbs ...string)

func (*Canal) AddDumpIgnoreTables

func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)

func (*Canal) AddDumpTables

func (c *Canal) AddDumpTables(db string, tables ...string)

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(image string) error

CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) ClearTableCache

func (c *Canal) ClearTableCache(db []byte, table []byte)

ClearTableCache clear table cache

func (*Canal) Close

func (c *Canal) Close()

func (*Canal) Ctx

func (c *Canal) Ctx() context.Context

func (*Canal) Dump

func (c *Canal) Dump() error

Dump all data from MySQL master `mysqldump`, ignore sync binlog.

func (*Canal) Execute

func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)

Execute a SQL

func (*Canal) FlushBinlog

func (c *Canal) FlushBinlog() error

func (*Canal) GetDelay

func (c *Canal) GetDelay() uint32

func (*Canal) GetMasterGTIDSet

func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (mysql.Position, error)

func (*Canal) GetTable

func (c *Canal) GetTable(db string, table string) (*schema.Table, error)

func (*Canal) GetTableForEvent

func (c *Canal) GetTableForEvent(te *replication.TableMapEvent) (*schema.Table, error)

func (*Canal) Run

func (c *Canal) Run(syncErrorCh chan error) error

Run will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.

func (*Canal) RunFrom

func (c *Canal) RunFrom(pos mysql.Position, syncErrorCh chan error) error

RunFrom will sync from the binlog position directly, ignore mysqldump.

func (*Canal) SetEventHandler

func (c *Canal) SetEventHandler(h EventHandler)

`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.

func (*Canal) SetTableCache

func (c *Canal) SetTableCache(db []byte, table []byte, schema *schema.Table)

SetTableCache sets table cache value for the given table

func (*Canal) StartFromGTID

func (c *Canal) StartFromGTID(set mysql.GTIDSet, syncErrorCh chan error) error

func (*Canal) SyncedGTIDSet

func (c *Canal) SyncedGTIDSet() mysql.GTIDSet

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() mysql.Position

func (*Canal) SyncedTimestamp

func (c *Canal) SyncedTimestamp() uint32

func (*Canal) WaitDumpDone

func (c *Canal) WaitDumpDone() <-chan struct{}

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error

type Config

type Config struct {
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`

	Charset         string        `toml:"charset"`
	ServerID        uint32        `toml:"server_id"`
	Flavor          string        `toml:"flavor"`
	DataDir         string        `toml:"data_dir"`
	HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
	ReadTimeout     time.Duration `toml:"read_timeout"`

	// IncludeTableRegex or ExcludeTableRegex should contain database name
	// Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
	// eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"]
	//     this will include all database's 'canal' table, except database 'mysql'
	// Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
	IncludeTableRegex []string `toml:"include_table_regex"`
	ExcludeTableRegex []string `toml:"exclude_table_regex"`

	// discard row event without table meta
	DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`

	Dump DumpConfig `toml:"dump"`

	UseDecimal bool `toml:"use_decimal"`
	ParseTime  bool `toml:"parse_time"`

	TimestampStringLocation *time.Location

	// SemiSyncEnabled enables semi-sync or not.
	SemiSyncEnabled bool `toml:"semi_sync_enabled"`

	// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
	// this configuration will not work if DisableRetrySync is true
	MaxReconnectAttempts int `toml:"max_reconnect_attempts"`

	// whether disable re-sync for broken connection
	DisableRetrySync bool `toml:"disable_retry_sync"`

	// Set TLS config
	TLSConfig *tls.Config

	//Set Logger
	Logger loggers.Advanced

	//Set Dialer
	Dialer client.Dialer

	InfoLoader MasterInfoLoader
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig() *Config

type DummyEventHandler

type DummyEventHandler struct {
}

func (*DummyEventHandler) OnDDL

func (*DummyEventHandler) OnGTID

func (*DummyEventHandler) OnPosSynced

func (*DummyEventHandler) OnRotate

func (*DummyEventHandler) OnRow

func (h *DummyEventHandler) OnRow(*RowsEvent) error

func (*DummyEventHandler) OnTableChanged

func (*DummyEventHandler) OnXID

func (*DummyEventHandler) String

func (h *DummyEventHandler) String() string

type DumpConfig

type DumpConfig struct {
	// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
	// If not set, ignore using mysqldump.
	ExecutionPath string `toml:"mysqldump"`

	// Will override Databases, tables is in database table_db
	Tables  []string `toml:"tables"`
	TableDB string   `toml:"table_db"`

	Databases []string `toml:"dbs"`

	// Ignore table format is db.table
	IgnoreTables []string `toml:"ignore_tables"`

	// Dump only selected records. Quotes are mandatory
	Where string `toml:"where"`

	// If true, discard error msg, else, output to stderr
	DiscardErr bool `toml:"discard_err"`

	// Set true to skip --master-data if we have no privilege to do
	// 'FLUSH TABLES WITH READ LOCK'
	SkipMasterData bool `toml:"skip_master_data"`

	// Set to change the default max_allowed_packet size
	MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`

	// Set to change the default protocol to connect with
	Protocol string `toml:"protocol"`

	// Set extra options
	ExtraOptions []string `toml:"extra_options"`
}

type EventHandler

type EventHandler interface {
	OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error
	// OnTableChanged is called when the table is created, altered, renamed or dropped.
	// You need to clear the associated data like cache with the table.
	// It will be called before OnDDL.
	OnTableChanged(header *replication.EventHeader, schema string, table string) error
	OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
	OnRow(e *RowsEvent) error
	OnXID(header *replication.EventHeader, nextPos mysql.Position) error
	OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
	// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
	OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
	String() string
}

type MasterInfoLoader

type MasterInfoLoader interface {
	Load(setValues MasterInfoSetter) error
	Save(addr, name string, position uint32, force bool) error
}

func NewFsInfoLoader

func NewFsInfoLoader(path string) MasterInfoLoader

type MasterInfoSetter

type MasterInfoSetter func(addr, name string, position uint32) error

abstract the way in which the master info is loaded and saved

type RowsEvent

type RowsEvent struct {
	Table  *schema.Table
	Action string
	// changed row list
	// binlog has three update event version, v0, v1 and v2.
	// for v1 and v2, the rows number must be even.
	// Two rows for one event, format is [before update row, after update row]
	// for update v0, only one row for a event, and we don't support this version.
	Rows [][]interface{}
	// Header can be used to inspect the event
	Header *replication.EventHeader
}

RowsEvent is the event for row replication.

func (*RowsEvent) String

func (r *RowsEvent) String() string

String implements fmt.Stringer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL