replication

package
v0.0.0-...-04766b6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2015 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Replication package is to handle MySQL replication protocol.

Todo:

+ Get table information when handing rows event.

Index

Constants

View Source
const (
	LOG_EVENT_BINLOG_IN_USE_F            uint16 = 0x0001
	LOG_EVENT_FORCED_ROTATE_F            uint16 = 0x0002
	LOG_EVENT_THREAD_SPECIFIC_F          uint16 = 0x0004
	LOG_EVENT_SUPPRESS_USE_F             uint16 = 0x0008
	LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F uint16 = 0x0010
	LOG_EVENT_ARTIFICIAL_F               uint16 = 0x0020
	LOG_EVENT_RELAY_LOG_F                uint16 = 0x0040
	LOG_EVENT_IGNORABLE_F                uint16 = 0x0080
	LOG_EVENT_NO_FILTER_F                uint16 = 0x0100
	LOG_EVENT_MTS_ISOLATE_F              uint16 = 0x0200
)
View Source
const (
	BINLOG_DUMP_NEVER_STOP  uint16 = 0x00
	BINLOG_DUMP_NON_BLOCK   uint16 = 0x01
	BINLOG_THROUGH_POSITION uint16 = 0x02
	BINLOG_THROUGH_GTID     uint16 = 0x04
)
View Source
const (
	BINLOG_ROW_IMAGE_FULL    = "FULL"
	BINLOG_ROW_IAMGE_MINIMAL = "MINIMAL"
	BINLOG_ROW_IMAGE_NOBLOB  = "NOBLOB"
)
View Source
const (
	BINLOG_CHECKSUM_ALG_OFF byte = 0 // Events are without checksum though its generator
	// is checksum-capable New Master (NM).
	BINLOG_CHECKSUM_ALG_CRC32 byte = 1 // CRC32 of zlib algorithm.
	//  BINLOG_CHECKSUM_ALG_ENUM_END,  // the cut line: valid alg range is [1, 0x7f].
	BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum

)
View Source
const DATETIMEF_INT_OFS int64 = 0x8000000000
View Source
const (
	EventHeaderSize = 19
)
View Source
const (
	//we only support MySQL 5.0.0+ binlog format, maybe???
	MinBinlogVersion = 4
)
View Source
const TIMEF_INT_OFS int64 = 0x800000
View Source
const TIMEF_OFS int64 = 0x800000000000

Variables

View Source
var (
	ErrGetEventTimeout = errors.New("Get event timeout, try get later")
	ErrNeedSyncAgain   = errors.New("Last sync error or closed, try sync and get event again")
	ErrSyncClosed      = errors.New("Sync was closed")
)
View Source
var (
	//binlog header [ fe `bin` ]
	BinLogFileHeader []byte = []byte{0xfe, 0x62, 0x69, 0x6e}

	SemiSyncIndicator byte = 0xef
)

Functions

This section is empty.

Types

type BinlogEvent

type BinlogEvent struct {
	// raw binlog data, including crc32 checksum if exists
	RawData []byte

	Header *EventHeader
	Event  Event
}

func (*BinlogEvent) Dump

func (e *BinlogEvent) Dump(w io.Writer)

type BinlogParser

type BinlogParser struct {
	// contains filtered or unexported fields
}

func NewBinlogParser

func NewBinlogParser() *BinlogParser

func (*BinlogParser) ParseFile

func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error

func (*BinlogParser) ParseReader

func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error

func (*BinlogParser) SetRawMode

func (p *BinlogParser) SetRawMode(mode bool)

type BinlogStreamer

type BinlogStreamer struct {
	// contains filtered or unexported fields
}

func (*BinlogStreamer) GetEvent

func (s *BinlogStreamer) GetEvent() (*BinlogEvent, error)

func (*BinlogStreamer) GetEventTimeout

func (s *BinlogStreamer) GetEventTimeout(d time.Duration) (*BinlogEvent, error)

if timeout, ErrGetEventTimeout will returns

type BinlogSyncer

type BinlogSyncer struct {
	// contains filtered or unexported fields
}

func NewBinlogSyncer

func NewBinlogSyncer(serverID uint32, flavor string) *BinlogSyncer

func (*BinlogSyncer) Close

func (b *BinlogSyncer) Close()

func (*BinlogSyncer) EnableSemiSync

func (b *BinlogSyncer) EnableSemiSync() error

func (*BinlogSyncer) ExecuteSql

func (b *BinlogSyncer) ExecuteSql(query string, args ...interface{}) (*Result, error)

func (*BinlogSyncer) GetMasterUUID

func (b *BinlogSyncer) GetMasterUUID() (uuid.UUID, error)

func (*BinlogSyncer) ReRegisterSlave

func (b *BinlogSyncer) ReRegisterSlave() error

If you close sync before and want to restart again, you can call this before other operations This function will close old replication sync if exists

func (*BinlogSyncer) RegisterSlave

func (b *BinlogSyncer) RegisterSlave(host string, port uint16, user string, password string) error

You must register slave at first before you do other operations This function will close old replication sync if exists

func (*BinlogSyncer) SetRawMode

func (b *BinlogSyncer) SetRawMode(mode bool) error

func (*BinlogSyncer) StartBackup

func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error

Like mysqlbinlog remote raw backup Backup remote binlog from position (filename, offset) and write in backupDir

func (*BinlogSyncer) StartSync

func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error)

func (*BinlogSyncer) StartSyncGTID

func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error)

type Event

type Event interface {
	//Dump Event, format like python-mysql-replication
	Dump(w io.Writer)

	Decode(data []byte) error
}

type EventError

type EventError struct {
	Header *EventHeader

	//Error message
	Err string

	//Event data
	Data []byte
}

func (*EventError) Error

func (e *EventError) Error() string

type EventHeader

type EventHeader struct {
	Timestamp uint32
	EventType EventType
	ServerID  uint32
	EventSize uint32
	LogPos    uint32
	Flags     uint16
}

func (*EventHeader) Decode

func (h *EventHeader) Decode(data []byte) error

func (*EventHeader) Dump

func (h *EventHeader) Dump(w io.Writer)

type EventType

type EventType byte
const (
	UNKNOWN_EVENT EventType = iota
	START_EVENT_V3
	QUERY_EVENT
	STOP_EVENT
	ROTATE_EVENT
	INTVAR_EVENT
	LOAD_EVENT
	SLAVE_EVENT
	CREATE_FILE_EVENT
	APPEND_BLOCK_EVENT
	EXEC_LOAD_EVENT
	DELETE_FILE_EVENT
	NEW_LOAD_EVENT
	RAND_EVENT
	USER_VAR_EVENT
	FORMAT_DESCRIPTION_EVENT
	XID_EVENT
	BEGIN_LOAD_QUERY_EVENT
	EXECUTE_LOAD_QUERY_EVENT
	TABLE_MAP_EVENT
	WRITE_ROWS_EVENTv0
	UPDATE_ROWS_EVENTv0
	DELETE_ROWS_EVENTv0
	WRITE_ROWS_EVENTv1
	UPDATE_ROWS_EVENTv1
	DELETE_ROWS_EVENTv1
	INCIDENT_EVENT
	HEARTBEAT_EVENT
	IGNORABLE_EVENT
	ROWS_QUERY_EVENT
	WRITE_ROWS_EVENTv2
	UPDATE_ROWS_EVENTv2
	DELETE_ROWS_EVENTv2
	GTID_EVENT
	ANONYMOUS_GTID_EVENT
	PREVIOUS_GTIDS_EVENT
)
const (
	// MariaDB event starts from 160
	MARIADB_ANNOTATE_ROWS_EVENT EventType = 160 + iota
	MARIADB_BINLOG_CHECKPOINT_EVENT
	MARIADB_GTID_EVENT
	MARIADB_GTID_LIST_EVENT
)

func (EventType) String

func (e EventType) String() string

type FormatDescriptionEvent

type FormatDescriptionEvent struct {
	Version uint16
	//len = 50
	ServerVersion          []byte
	CreateTimestamp        uint32
	EventHeaderLength      uint8
	EventTypeHeaderLengths []byte

	// 0 is off, 1 is for CRC32, 255 is undefined
	ChecksumAlgorithm byte
}

func (*FormatDescriptionEvent) Decode

func (e *FormatDescriptionEvent) Decode(data []byte) error

func (*FormatDescriptionEvent) Dump

func (e *FormatDescriptionEvent) Dump(w io.Writer)

type GTIDEvent

type GTIDEvent struct {
	CommitFlag uint8
	SID        []byte
	GNO        int64
}

func (*GTIDEvent) Decode

func (e *GTIDEvent) Decode(data []byte) error

func (*GTIDEvent) Dump

func (e *GTIDEvent) Dump(w io.Writer)

type GenericEvent

type GenericEvent struct {
	Data []byte
}

we don't parse all event, so some we will use GenericEvent instead

func (*GenericEvent) Decode

func (e *GenericEvent) Decode(data []byte) error

func (*GenericEvent) Dump

func (e *GenericEvent) Dump(w io.Writer)

type MariadbAnnotaeRowsEvent

type MariadbAnnotaeRowsEvent struct {
	Query []byte
}

func (*MariadbAnnotaeRowsEvent) Decode

func (e *MariadbAnnotaeRowsEvent) Decode(data []byte) error

func (*MariadbAnnotaeRowsEvent) Dump

func (e *MariadbAnnotaeRowsEvent) Dump(w io.Writer)

type MariadbBinlogCheckPointEvent

type MariadbBinlogCheckPointEvent struct {
	Info []byte
}

func (*MariadbBinlogCheckPointEvent) Decode

func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error

func (*MariadbBinlogCheckPointEvent) Dump

type MariadbGTIDEvent

type MariadbGTIDEvent struct {
	GTID MariadbGTID
}

func (*MariadbGTIDEvent) Decode

func (e *MariadbGTIDEvent) Decode(data []byte) error

func (*MariadbGTIDEvent) Dump

func (e *MariadbGTIDEvent) Dump(w io.Writer)

type MariadbGTIDListEvent

type MariadbGTIDListEvent struct {
	GTIDs []MariadbGTID
}

func (*MariadbGTIDListEvent) Decode

func (e *MariadbGTIDListEvent) Decode(data []byte) error

func (*MariadbGTIDListEvent) Dump

func (e *MariadbGTIDListEvent) Dump(w io.Writer)

type OnEventFunc

type OnEventFunc func(*BinlogEvent) error

type QueryEvent

type QueryEvent struct {
	SlaveProxyID  uint32
	ExecutionTime uint32
	ErrorCode     uint16
	StatusVars    []byte
	Schema        []byte
	Query         []byte
}

func (*QueryEvent) Decode

func (e *QueryEvent) Decode(data []byte) error

func (*QueryEvent) Dump

func (e *QueryEvent) Dump(w io.Writer)

type RotateEvent

type RotateEvent struct {
	Position    uint64
	NextLogName []byte
}

func (*RotateEvent) Decode

func (e *RotateEvent) Decode(data []byte) error

func (*RotateEvent) Dump

func (e *RotateEvent) Dump(w io.Writer)

type RowsEvent

type RowsEvent struct {
	//0, 1, 2
	Version int

	Table *TableMapEvent

	TableID uint64

	Flags uint16

	//if version == 2
	ExtraData []byte

	//lenenc_int
	ColumnCount uint64
	//len = (ColumnCount + 7) / 8
	ColumnBitmap1 []byte

	//if UPDATE_ROWS_EVENTv1 or v2
	//len = (ColumnCount + 7) / 8
	ColumnBitmap2 []byte

	//rows: invalid: int64, float64, bool, []byte, string
	Rows [][]interface{}
	// contains filtered or unexported fields
}

func (*RowsEvent) Decode

func (e *RowsEvent) Decode(data []byte) error

func (*RowsEvent) Dump

func (e *RowsEvent) Dump(w io.Writer)

type RowsQueryEvent

type RowsQueryEvent struct {
	Query []byte
}

func (*RowsQueryEvent) Decode

func (e *RowsQueryEvent) Decode(data []byte) error

func (*RowsQueryEvent) Dump

func (e *RowsQueryEvent) Dump(w io.Writer)

type TableMapEvent

type TableMapEvent struct {
	TableID uint64

	Flags uint16

	Schema []byte
	Table  []byte

	ColumnCount uint64
	ColumnType  []byte
	ColumnMeta  []uint16

	//len = (ColumnCount + 7) / 8
	NullBitmap []byte
	// contains filtered or unexported fields
}

func (*TableMapEvent) Decode

func (e *TableMapEvent) Decode(data []byte) error

func (*TableMapEvent) Dump

func (e *TableMapEvent) Dump(w io.Writer)

type XIDEvent

type XIDEvent struct {
	XID uint64
}

func (*XIDEvent) Decode

func (e *XIDEvent) Decode(data []byte) error

func (*XIDEvent) Dump

func (e *XIDEvent) Dump(w io.Writer)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL