canal

package
v0.0.0-...-d18ddd8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2017 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

Variables

This section is empty.

Functions

func GetColumnValue

func GetColumnValue(table *schema.Table, column string, row []interface{}) (interface{}, error)

Get term column's value

func GetPKValues

func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error)

Get primary keys in one row for a table, a table may use multi fields as the PK

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog

func NewCanal

func NewCanal(cfg *Config) (*Canal, error)

func (*Canal) AddDumpDatabases

func (c *Canal) AddDumpDatabases(dbs ...string)

func (*Canal) AddDumpIgnoreTables

func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)

func (*Canal) AddDumpTables

func (c *Canal) AddDumpTables(db string, tables ...string)

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(image string) error

Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) ClearTableCache

func (c *Canal) ClearTableCache(db []byte, table []byte)

ClearTableCache clear table cache

func (*Canal) Close

func (c *Canal) Close()

func (*Canal) Ctx

func (c *Canal) Ctx() context.Context

func (*Canal) Execute

func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)

Execute a SQL

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (mysql.Position, error)

func (*Canal) GetTable

func (c *Canal) GetTable(db string, table string) (*schema.Table, error)

func (*Canal) SetEventHandler

func (c *Canal) SetEventHandler(h EventHandler)

`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.

func (*Canal) Start

func (c *Canal) Start() error

Start will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data.

func (*Canal) StartFrom

func (c *Canal) StartFrom(pos mysql.Position) error

StartFrom will sync from the binlog position directly, ignore mysqldump.

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() mysql.Position

func (*Canal) WaitDumpDone

func (c *Canal) WaitDumpDone() <-chan struct{}

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error

type Config

type Config struct {
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`

	Charset  string `toml:"charset"`
	ServerID uint32 `toml:"server_id"`
	Flavor   string `toml:"flavor"`

	Dump DumpConfig `toml:"dump"`
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig() *Config

type DummyEventHandler

type DummyEventHandler struct {
}

func (*DummyEventHandler) OnDDL

func (*DummyEventHandler) OnRotate

func (*DummyEventHandler) OnRow

func (h *DummyEventHandler) OnRow(*RowsEvent) error

func (*DummyEventHandler) OnXID

func (*DummyEventHandler) String

func (h *DummyEventHandler) String() string

type DumpConfig

type DumpConfig struct {
	// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
	// If not set, ignore using mysqldump.
	ExecutionPath string `toml:"mysqldump"`

	// Will override Databases, tables is in database table_db
	Tables  []string `toml:"tables"`
	TableDB string   `toml:"table_db"`

	Databases []string `toml:"dbs"`

	// Ignore table format is db.table
	IgnoreTables []string `toml:"ignore_tables"`

	// If true, discard error msg, else, output to stderr
	DiscardErr bool `toml:"discard_err"`

	// Set true to skip --master-data if we have no privilege to do
	// 'FLUSH TABLES WITH READ LOCK'
	SkipMasterData bool `toml:"skip_master_data"`
}

type EventHandler

type EventHandler interface {
	OnRotate(roateEvent *replication.RotateEvent) error
	OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
	OnRow(e *RowsEvent) error
	OnXID(nextPos mysql.Position) error
	String() string
}

type RowsEvent

type RowsEvent struct {
	Table  *schema.Table
	Action string
	// changed row list
	// binlog has three update event version, v0, v1 and v2.
	// for v1 and v2, the rows number must be even.
	// Two rows for one event, format is [before update row, after update row]
	// for update v0, only one row for a event, and we don't support this version.
	Rows [][]interface{}
}

func (*RowsEvent) String

func (r *RowsEvent) String() string

String implements fmt.Stringer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL