Documentation ¶
Index ¶
- Constants
- Variables
- func IsDdlOrDclStatement(statement string) bool
- func TrimStatement(statement string) string
- type Canal
- func (c *Canal) AddDumpDatabases(dbs ...string)
- func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)
- func (c *Canal) AddDumpTables(db string, tables ...string)
- func (c *Canal) CatchMasterPos(timeout time.Duration) error
- func (c *Canal) CheckBinlogRowImage(image string) error
- func (c *Canal) Close()
- func (c *Canal) DDLCount() uint64
- func (c *Canal) Dump() error
- func (c *Canal) Err() <-chan error
- func (c *Canal) ExecDDL(db string, statement string) error
- func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)
- func (c *Canal) FlushBinlog() error
- func (c *Canal) GetDatabases() []string
- func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)
- func (c *Canal) GetMasterPos() (prog.Position, error)
- func (c *Canal) GetMasterServerID() (uint32, error)
- func (c *Canal) GetTableDef(db string, table string) (*schema.TableDef, error)
- func (c *Canal) GetTables(db string) ([]string, error)
- func (c *Canal) IsDdlStatement(statement string) (bool, error)
- func (c *Canal) IudCount() uint64
- func (c *Canal) RegisterBeforeSchemaChangeHook(fn func(string, string) error)
- func (c *Canal) RegisterBeforeServerIDChangeHook(fn func(uint32, uint32) error)
- func (c *Canal) RegisterOnSchemaChangeFailedHook(fn func(string, string, error) (bool, error))
- func (c *Canal) SetEventHandler(h EventHandler)
- func (c *Canal) Start(p prog.Progress) error
- func (c *Canal) SyncedProgress() prog.Progress
- func (c *Canal) TrxCount() uint64
- func (c *Canal) WaitDumpDone() <-chan struct{}
- func (c *Canal) WaitUntilPos(pos prog.Position, timeout time.Duration) error
- type Config
- type DummyEventHandler
- func (o *DummyEventHandler) OnBegin(h *replication.EventHeader) error
- func (o *DummyEventHandler) OnCommit(h *replication.EventHeader, p prog.Progress) error
- func (o *DummyEventHandler) OnDDL(h *replication.EventHeader, e *replication.QueryEvent, p prog.Progress) error
- func (o *DummyEventHandler) OnGTID(h *replication.EventHeader, e *GtidEvent) error
- func (o *DummyEventHandler) OnQuery(h *replication.EventHeader, e *replication.QueryEvent) error
- func (o *DummyEventHandler) OnRotate(h *replication.EventHeader, e *replication.RotateEvent) error
- func (o *DummyEventHandler) OnRow(h *replication.EventHeader, e *RowsEvent) error
- func (o *DummyEventHandler) String() string
- type DumpConfig
- type EventHandler
- type EventHandlerConfig
- type EventHandlerWrapper
- type GtidEvent
- type HandlerMux
- func (o *HandlerMux) OnBegin(h *replication.EventHeader) error
- func (o *HandlerMux) OnCommit(h *replication.EventHeader, p prog.Progress) error
- func (o *HandlerMux) OnDDL(h *replication.EventHeader, e *replication.QueryEvent, p prog.Progress) error
- func (o *HandlerMux) OnGTID(h *replication.EventHeader, e *GtidEvent) error
- func (o *HandlerMux) OnQuery(h *replication.EventHeader, e *replication.QueryEvent) error
- func (o *HandlerMux) OnRotate(h *replication.EventHeader, e *replication.RotateEvent) error
- func (o *HandlerMux) OnRow(h *replication.EventHeader, e *RowsEvent) error
- func (o *HandlerMux) RegisterEventHandler(h EventHandler, cfg EventHandlerConfig)
- func (o *HandlerMux) String() string
- type Observer
- type RowsEvent
- type TrackerConfig
Constants ¶
const ( UpdateAction = "update" InsertAction = "insert" DeleteAction = "delete" )
The action name for sync.
Variables ¶
var ErrExcludedTable = errors.New("excluded table meta")
var UnknownTableRetryPeriod = time.Second * time.Duration(10)
canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
Functions ¶
func IsDdlOrDclStatement ¶
func TrimStatement ¶
Types ¶
type Canal ¶
type Canal struct {
// contains filtered or unexported fields
}
Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog
func (*Canal) AddDumpDatabases ¶
func (*Canal) AddDumpIgnoreTables ¶
func (*Canal) AddDumpTables ¶
func (*Canal) CheckBinlogRowImage ¶
Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (*Canal) FlushBinlog ¶
func (*Canal) GetDatabases ¶
func (*Canal) GetMasterServerID ¶
func (*Canal) GetTableDef ¶
func (*Canal) RegisterBeforeSchemaChangeHook ¶
Register a hook that will be called before schema change
func (*Canal) RegisterBeforeServerIDChangeHook ¶
Register a hook that will be called before server_id change
func (*Canal) RegisterOnSchemaChangeFailedHook ¶
Register a hook that will be called on DDL failed
func (*Canal) SetEventHandler ¶
func (c *Canal) SetEventHandler(h EventHandler)
`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.
func (*Canal) Start ¶
Start will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.
func (*Canal) SyncedProgress ¶
func (*Canal) WaitDumpDone ¶
func (c *Canal) WaitDumpDone() <-chan struct{}
type Config ¶
type Config struct { Addr string `toml:"addr"` User string `toml:"user"` Password string `toml:"password"` Charset string `toml:"charset"` GtidEnabled bool `toml:"gtid_enabled"` ServerID uint32 `toml:"server_id"` Flavor string `toml:"flavor"` HeartbeatPeriod time.Duration `toml:"heartbeat_period"` ReadTimeout time.Duration `toml:"read_timeout"` // IncludeTableRegex or ExcludeTableRegex should contain database name // Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed // eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"] // this will include all database's 'canal' table, except database 'mysql' // Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables IncludeTableRegex []string `toml:"include_table_regex"` ExcludeTableRegex []string `toml:"exclude_table_regex"` // discard row event without table meta DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"` Dump DumpConfig `toml:"dump"` Tracker TrackerConfig `toml:"schema_tracker"` UseDecimal bool `toml:"use_decimal"` ParseTime bool `toml:"parse_time"` // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool `toml:"semi_sync_enabled"` }
func NewConfigWithFile ¶
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
type DummyEventHandler ¶
type DummyEventHandler struct { }
func (*DummyEventHandler) OnBegin ¶
func (o *DummyEventHandler) OnBegin(h *replication.EventHeader) error
func (*DummyEventHandler) OnCommit ¶
func (o *DummyEventHandler) OnCommit(h *replication.EventHeader, p prog.Progress) error
func (*DummyEventHandler) OnDDL ¶
func (o *DummyEventHandler) OnDDL(h *replication.EventHeader, e *replication.QueryEvent, p prog.Progress) error
func (*DummyEventHandler) OnGTID ¶
func (o *DummyEventHandler) OnGTID(h *replication.EventHeader, e *GtidEvent) error
func (*DummyEventHandler) OnQuery ¶
func (o *DummyEventHandler) OnQuery(h *replication.EventHeader, e *replication.QueryEvent) error
func (*DummyEventHandler) OnRotate ¶
func (o *DummyEventHandler) OnRotate(h *replication.EventHeader, e *replication.RotateEvent) error
func (*DummyEventHandler) OnRow ¶
func (o *DummyEventHandler) OnRow(h *replication.EventHeader, e *RowsEvent) error
func (*DummyEventHandler) String ¶
func (o *DummyEventHandler) String() string
type DumpConfig ¶
type DumpConfig struct { // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc... // If not set, ignore using mysqldump. ExecutionPath string `toml:"mysqldump"` // Will override Databases, tables is in database table_db Tables []string `toml:"tables"` TableDB string `toml:"table_db"` Databases []string `toml:"dbs"` // Ignore table format is db.table IgnoreTables []string `toml:"ignore_tables"` // Dump only selected records. Quotes are mandatory Where string `toml:"where"` // If true, discard error msg, else, output to stderr DiscardErr bool `toml:"discard_err"` // Set true to skip --master-data if we have no privilege to do // 'FLUSH TABLES WITH READ LOCK' SkipMasterData bool `toml:"skip_master_data"` // Set to change the default max_allowed_packet size MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"` }
type EventHandler ¶
type EventHandler interface { OnBegin(h *replication.EventHeader) error OnCommit(h *replication.EventHeader, p prog.Progress) error OnQuery(h *replication.EventHeader, e *replication.QueryEvent) error OnDDL(h *replication.EventHeader, e *replication.QueryEvent, p prog.Progress) error OnRow(h *replication.EventHeader, e *RowsEvent) error OnGTID(h *replication.EventHeader, e *GtidEvent) error OnRotate(h *replication.EventHeader, e *replication.RotateEvent) error String() string }
type EventHandlerConfig ¶
type EventHandlerConfig struct { // You need tell canal the progress that you have synced, // so canal can know the evnets are duplicated or not for you, // and filter the duplicated events. // However, if current MySQL's server_id is not the same with // the server_id of this progress, canal can NOT recognize duplicated events. Progress prog.Progress IncludeTable []string ExcludeTable []string }
type EventHandlerWrapper ¶
type EventHandlerWrapper struct {
// contains filtered or unexported fields
}
func (*EventHandlerWrapper) IsEventDuplicated ¶
func (o *EventHandlerWrapper) IsEventDuplicated(serverID uint32, logName string, logPos uint32) bool
IsEventDuplicated tell whether this handler has received the event at sometime before
type HandlerMux ¶
type HandlerMux struct {
// contains filtered or unexported fields
}
func NewHandlerMux ¶
func NewHandlerMux() *HandlerMux
func (*HandlerMux) OnBegin ¶
func (o *HandlerMux) OnBegin(h *replication.EventHeader) error
func (*HandlerMux) OnCommit ¶
func (o *HandlerMux) OnCommit(h *replication.EventHeader, p prog.Progress) error
func (*HandlerMux) OnDDL ¶
func (o *HandlerMux) OnDDL(h *replication.EventHeader, e *replication.QueryEvent, p prog.Progress) error
func (*HandlerMux) OnGTID ¶
func (o *HandlerMux) OnGTID(h *replication.EventHeader, e *GtidEvent) error
func (*HandlerMux) OnQuery ¶
func (o *HandlerMux) OnQuery(h *replication.EventHeader, e *replication.QueryEvent) error
func (*HandlerMux) OnRotate ¶
func (o *HandlerMux) OnRotate(h *replication.EventHeader, e *replication.RotateEvent) error
func (*HandlerMux) OnRow ¶
func (o *HandlerMux) OnRow(h *replication.EventHeader, e *RowsEvent) error
func (*HandlerMux) RegisterEventHandler ¶
func (o *HandlerMux) RegisterEventHandler(h EventHandler, cfg EventHandlerConfig)
func (*HandlerMux) String ¶
func (o *HandlerMux) String() string
type RowsEvent ¶
type RowsEvent struct { Table *schema.TableDef Action string // changed row list // binlog has three update event version, v0, v1 and v2. // for v1 and v2, the rows number must be even. // Two rows for one event, format is [before update row, after update row] // for update v0, only one row for a event, and we don't support this version. Rows [][]interface{} }
RowsEvent is the event for row replication.
type TrackerConfig ¶
type TrackerConfig struct { // The charset_set_server of source mysql, we need // this charset to handle ddl statement CharsetServer string `toml:"charset_server"` // Storage type to store schema data, may be boltdb or mysql Storage string `toml:"storage"` // Boltdb file path to store data Dir string `toml:"dir"` // MySQL info to connect Addr string `toml:"addr"` User string `toml:"user"` Password string `toml:"password"` Database string `toml:"database"` }