canal

package
v0.0.0-...-64b8ef3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2019 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

The action name for sync.

Variables

View Source
var ErrExcludedTable = errors.New("excluded table meta")
View Source
var UnknownTableRetryPeriod = time.Second * time.Duration(10)

canal will retry fetching unknown table's meta after UnknownTableRetryPeriod

Functions

func IsDdlOrDclStatement

func IsDdlOrDclStatement(statement string) bool

func TrimStatement

func TrimStatement(statement string) string

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog

func NewCanal

func NewCanal(cfg *Config) (*Canal, error)

func (*Canal) AddDumpDatabases

func (c *Canal) AddDumpDatabases(dbs ...string)

func (*Canal) AddDumpIgnoreTables

func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)

func (*Canal) AddDumpTables

func (c *Canal) AddDumpTables(db string, tables ...string)

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(image string) error

Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) Close

func (c *Canal) Close()

func (*Canal) DDLCount

func (c *Canal) DDLCount() uint64

func (*Canal) Dump

func (c *Canal) Dump() error

Dump all data from MySQL master `mysqldump`, ignore sync binlog.

func (*Canal) Err

func (c *Canal) Err() <-chan error

func (*Canal) ExecDDL

func (c *Canal) ExecDDL(db string, statement string) error

func (*Canal) Execute

func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)

Execute a SQL

func (*Canal) FlushBinlog

func (c *Canal) FlushBinlog() error

func (*Canal) GetDatabases

func (c *Canal) GetDatabases() []string

func (*Canal) GetMasterGTIDSet

func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (prog.Position, error)

func (*Canal) GetMasterServerID

func (c *Canal) GetMasterServerID() (uint32, error)

func (*Canal) GetTableDef

func (c *Canal) GetTableDef(db string, table string) (*schema.TableDef, error)

func (*Canal) GetTables

func (c *Canal) GetTables(db string) ([]string, error)

func (*Canal) IsDdlStatement

func (c *Canal) IsDdlStatement(statement string) (bool, error)

func (*Canal) IudCount

func (c *Canal) IudCount() uint64

func (*Canal) RegisterBeforeSchemaChangeHook

func (c *Canal) RegisterBeforeSchemaChangeHook(fn func(string, string) error)

Register a hook that will be called before schema change

func (*Canal) RegisterBeforeServerIDChangeHook

func (c *Canal) RegisterBeforeServerIDChangeHook(fn func(uint32, uint32) error)

Register a hook that will be called before server_id change

func (*Canal) RegisterOnSchemaChangeFailedHook

func (c *Canal) RegisterOnSchemaChangeFailedHook(fn func(string, string, error) (bool, error))

Register a hook that will be called on DDL failed

func (*Canal) SetEventHandler

func (c *Canal) SetEventHandler(h EventHandler)

`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.

func (*Canal) Start

func (c *Canal) Start(p prog.Progress) error

Start will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.

func (*Canal) SyncedProgress

func (c *Canal) SyncedProgress() prog.Progress

func (*Canal) TrxCount

func (c *Canal) TrxCount() uint64

func (*Canal) WaitDumpDone

func (c *Canal) WaitDumpDone() <-chan struct{}

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos prog.Position, timeout time.Duration) error

type Config

type Config struct {
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`

	Charset string `toml:"charset"`

	GtidEnabled bool `toml:"gtid_enabled"`

	ServerID        uint32        `toml:"server_id"`
	Flavor          string        `toml:"flavor"`
	HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
	ReadTimeout     time.Duration `toml:"read_timeout"`

	// IncludeTableRegex or ExcludeTableRegex should contain database name
	// Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
	// eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"]
	//     this will include all database's 'canal' table, except database 'mysql'
	// Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
	IncludeTableRegex []string `toml:"include_table_regex"`
	ExcludeTableRegex []string `toml:"exclude_table_regex"`

	// discard row event without table meta
	DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`

	Dump DumpConfig `toml:"dump"`

	Tracker TrackerConfig `toml:"schema_tracker"`

	UseDecimal bool `toml:"use_decimal"`
	ParseTime  bool `toml:"parse_time"`

	// SemiSyncEnabled enables semi-sync or not.
	SemiSyncEnabled bool `toml:"semi_sync_enabled"`
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig() *Config

type DummyEventHandler

type DummyEventHandler struct {
}

func (*DummyEventHandler) OnBegin

func (*DummyEventHandler) OnCommit

func (*DummyEventHandler) OnDDL

func (*DummyEventHandler) OnGTID

func (*DummyEventHandler) OnQuery

func (*DummyEventHandler) OnRotate

func (*DummyEventHandler) OnRow

func (*DummyEventHandler) String

func (o *DummyEventHandler) String() string

type DumpConfig

type DumpConfig struct {
	// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
	// If not set, ignore using mysqldump.
	ExecutionPath string `toml:"mysqldump"`

	// Will override Databases, tables is in database table_db
	Tables  []string `toml:"tables"`
	TableDB string   `toml:"table_db"`

	Databases []string `toml:"dbs"`

	// Ignore table format is db.table
	IgnoreTables []string `toml:"ignore_tables"`

	// Dump only selected records. Quotes are mandatory
	Where string `toml:"where"`

	// If true, discard error msg, else, output to stderr
	DiscardErr bool `toml:"discard_err"`

	// Set true to skip --master-data if we have no privilege to do
	// 'FLUSH TABLES WITH READ LOCK'
	SkipMasterData bool `toml:"skip_master_data"`

	// Set to change the default max_allowed_packet size
	MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`
}

type EventHandlerConfig

type EventHandlerConfig struct {
	// You need tell canal the progress that you have synced,
	// so canal can know the evnets are duplicated or not for you,
	// and filter the duplicated events.
	// However, if current MySQL's server_id is not the same with
	// the server_id of this progress, canal can NOT recognize duplicated events.
	Progress     prog.Progress
	IncludeTable []string
	ExcludeTable []string
}

type EventHandlerWrapper

type EventHandlerWrapper struct {
	// contains filtered or unexported fields
}

func (*EventHandlerWrapper) IsEventDuplicated

func (o *EventHandlerWrapper) IsEventDuplicated(serverID uint32, logName string, logPos uint32) bool

IsEventDuplicated tell whether this handler has received the event at sometime before

func (*EventHandlerWrapper) IsIgnored

func (o *EventHandlerWrapper) IsIgnored(database string, table string) bool

IsIgnored tells whether this handler need to ignore the data of specified table

type GtidEvent

type GtidEvent struct {
	Gtid           mysql.GTIDSet
	LastCommitted  int64
	SequenceNumber int64
}

type HandlerMux

type HandlerMux struct {
	// contains filtered or unexported fields
}

func NewHandlerMux

func NewHandlerMux() *HandlerMux

func (*HandlerMux) OnBegin

func (o *HandlerMux) OnBegin(h *replication.EventHeader) error

func (*HandlerMux) OnCommit

func (o *HandlerMux) OnCommit(h *replication.EventHeader, p prog.Progress) error

func (*HandlerMux) OnDDL

func (*HandlerMux) OnGTID

func (o *HandlerMux) OnGTID(h *replication.EventHeader, e *GtidEvent) error

func (*HandlerMux) OnQuery

func (*HandlerMux) OnRotate

func (*HandlerMux) OnRow

func (*HandlerMux) RegisterEventHandler

func (o *HandlerMux) RegisterEventHandler(h EventHandler, cfg EventHandlerConfig)

func (*HandlerMux) String

func (o *HandlerMux) String() string

type Observer

type Observer struct {
	BeforeSchemaChange   func(string, string) error
	OnSchemaChangeFailed func(string, string, error) (bool, error)
	BeforeServerIDChange func(uint32, uint32) error
}

type RowsEvent

type RowsEvent struct {
	Table  *schema.TableDef
	Action string
	// changed row list
	// binlog has three update event version, v0, v1 and v2.
	// for v1 and v2, the rows number must be even.
	// Two rows for one event, format is [before update row, after update row]
	// for update v0, only one row for a event, and we don't support this version.
	Rows [][]interface{}
}

RowsEvent is the event for row replication.

func (*RowsEvent) String

func (r *RowsEvent) String() string

String implements fmt.Stringer interface.

type TrackerConfig

type TrackerConfig struct {
	// The charset_set_server of source mysql, we need
	// this charset to handle ddl statement
	CharsetServer string `toml:"charset_server"`

	// Storage type to store schema data, may be boltdb or mysql
	Storage string `toml:"storage"`

	// Boltdb file path to store data
	Dir string `toml:"dir"`

	// MySQL info to connect
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`
	Database string `toml:"database"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL