binlog

package module
v0.0.0-...-79f86a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: GPL-2.0 Imports: 16 Imported by: 0

README

mysql_binlog_utils

Some utilities for mysql binlog

WARNING: these utilities are tested for mysql 5.5.33, but are not tested for mysql 5.6+

Dump binlog from pos

DumpBinlogFromPos(srcFilePath string, startPos int, targetFilePath string)

This function will dump binlog (at srcFilePath) from pos (startPos), and the output is at targetFilePath

If the startPos is 0 or 4, the whole binlog will be dump. Otherwise, the source binlog header (including FORMAT_DESCRIPTION_EVENT & ROTATE_EVENT & PREVIOUS_GTIDS_LOG_EVENT) will be dump as the target binlog header, and then the source data will be dump from startPos

This will make target binlog complete and available to replay.

Rotate relay log

RotateRelayLog(relayLogPath string, endPos int)

This function will add a rotate event to a relay log (at relayLogPath), after the position (endPos), and truncate the data after the position (endPos)

WARNING: after manually rotate relay log, DONOT forget to update relay-log.index

Fake master server

NewFakeMasterServer(port int, unusedServerId int, characterSet int, keepAliveWhenFinish bool, baseDir string)

WARNING: only support mysql 5.5.x

This fake master server is very helpful if you want to replay some binlog files to a mysql instance, and you're afraid of mysqlbinlog (http://bugs.mysql.com/bug.php?id=33048, for example)

Mysql replication is more reliable way to replay binlog, what we need is :

  1. server := NewFakeMasterServer(...)
  2. server.Start()
  3. In target mysql instance, change master to the fake server, and start slave
  4. the server will be closed when done or error
  5. you can abort the server by server.Abort()

####Some other features:

  1. start slave until is also supported
  2. large packet (>= 1<<24-1 bytes) is supported
  3. multiple binlog files are supported, fake server will act as a real replication master (rotate to the next when one is finished)

####Arguments:

Argument _
port the fake server port
unusedServerId the fake server id, should not be duplicate with any other mysql instance
characterSet the fake server character set id, should be the same with target mysql instance's. You can get the id by SELECT id, collation_name FROM information_schema.collations ORDER BY id
keepAliveWhenFinish when false, the fake server will quit when all binlogs are replayed. when true, the fake server will wait for more binlogs.
baseDir where the binlog files are located

Get unexecuted binlog files by gtid

GetUnexecutedBinlogFilesByGtid(binlogDir string, binlogBaseName string, executedGtidDesc string) (ret []string, err error)

This function will search binlog files in binlogDir, for the one contains a event whose gtid is not contained in executedGtidDesc

The scenario is in mysql replication, if mysql master is broken, mysql slave can call GetUnexecutedBinlogFilesByGtid to search in mysql master binlog dir, for the binlog files which need to be replay in slave

Get unexecuted binlog pos by gtid

GetUnexecutedBinlogPosByGtid(binlogFilePath string, executedGtidDesc string) (pos uint, err error)

This function will search first binlog event pos, which is not contained in executedGtidDesc, in binlog file (binlogFilePath)

Or return EOF error if has no unexecuted binlog event


####Pull requests and issues are warmly welcome

Documentation

Index

Constants

View Source
const (
	UNKNOWN_EVENT            = 0
	START_EVENT_V3           = 1
	QUERY_EVENT              = 2
	STOP_EVENT               = 3
	ROTATE_EVENT             = 4
	INTVAR_EVENT             = 5
	LOAD_EVENT               = 6
	SLAVE_EVENT              = 7
	CREATE_FILE_EVENT        = 8
	APPEND_BLOCK_EVENT       = 9
	EXEC_LOAD_EVENT          = 10
	DELETE_FILE_EVENT        = 11
	NEW_LOAD_EVENT           = 12
	RAND_EVENT               = 13
	USER_VAR_EVENT           = 14
	FORMAT_DESCRIPTION_EVENT = 15
	XID_EVENT                = 16
	BEGIN_LOAD_QUERY_EVENT   = 17
	EXECUTE_LOAD_QUERY_EVENT = 18
	TABLE_MAP_EVENT          = 19
	PRE_GA_WRITE_ROWS_EVENT  = 20
	PRE_GA_UPDATE_ROWS_EVENT = 21
	PRE_GA_DELETE_ROWS_EVENT = 22
	WRITE_ROWS_EVENT_V1      = 23
	UPDATE_ROWS_EVENT_V1     = 24
	DELETE_ROWS_EVENT_V1     = 25
	INCIDENT_EVENT           = 26
	HEARTBEAT_LOG_EVENT      = 27
	IGNORABLE_LOG_EVENT      = 28
	ROWS_QUERY_LOG_EVENT     = 29
	WRITE_ROWS_EVENT         = 30
	UPDATE_ROWS_EVENT        = 31
	DELETE_ROWS_EVENT        = 32
	GTID_LOG_EVENT           = 33
	ANONYMOUS_GTID_LOG_EVENT = 34
	PREVIOUS_GTIDS_LOG_EVENT = 35
)
View Source
const (
	LOG_EVENT_FIXED_HEADER_LEN = 19
	MAX_ALLOWED_PACKET         = 1024 * 1024 * 1024
)
View Source
const (
	STATUS_PREPARE uint8 = iota
	STATUS_BEGIN
	STATUS_COMMIT
	STATUS_ROLLBACK
)

Variables

This section is empty.

Functions

func BinlogIndexPath

func BinlogIndexPath(binlogPath string) (string, error)

func DumpBinlogFromPos

func DumpBinlogFromPos(srcFilePath string, startPos uint, targetFilePath string) error

func DumpUnexecutedBinlogByGtid

func DumpUnexecutedBinlogByGtid(srcFilePath string, executedGtidDesc string, targetFilePath string, includeEventBeforeFirst bool) error

func GenBinlogEventBytes

func GenBinlogEventBytes(fh EventFixedHeader, fd EventFixedData, vd EventVariableData) ([]byte, error)

func GetAllGtidOfBinlogDir

func GetAllGtidOfBinlogDir(binlogDir, binlogBaseName string) (gtidDesc string, err error)

func GetFirstPreviousGtidOfBinlogDir

func GetFirstPreviousGtidOfBinlogDir(binlogDir, binlogBaseName string) (gtidDesc string, err error)

func GetGtidOfBinlog

func GetGtidOfBinlog(binlogPath string) (gtidDesc string, err error)

func GetPreviousGtids

func GetPreviousGtids(binlogPath string) (gtidDesc string, err error)

func GetUnexecutedBinlogFilesByGtid

func GetUnexecutedBinlogFilesByGtid(binlogDir string, binlogBaseName string, executedGtidDesc string, includeEventBeforeFirst bool) (
	ret []string, err error)

func GetUnexecutedBinlogPosByGtid

func GetUnexecutedBinlogPosByGtid(binlogPath string, executedGtidDesc string, includeEventBeforeFirst bool) (pos uint, err error)

func GetUnexecutedBinlogPosByGtidAndAllGtid

func GetUnexecutedBinlogPosByGtidAndAllGtid(binlogPath string, executedGtidDesc string, includeEventBeforeFirst bool) (pos uint, allgtid string, err error)

func NextBinlogName

func NextBinlogName(binlogPath string) (string, error)

func NextBinlogPath

func NextBinlogPath(binlogPath string) (string, error)

func ParseBinlogFile

func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error

func ParseBinlogWithFilter

func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error

func SetLogger

func SetLogger(logger Logger)

Types

type BinlogEvent

type BinlogEvent struct {
	Type            string
	DB              string
	TB              string
	Data            string
	RowCnt          uint32
	Rows            [][]interface{}
	CompressionType string
}

func ParseBinlogEvent

func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent

type BinlogFilter

type BinlogFilter struct {
	IncludeGtid   string
	ExcludeGtid   string
	IncludeTables []string
	ExcludeTables []string
	StartPos      int
	EndPos        int
	StartDate     time.Time
	EndDate       time.Time
	BigThan       int
	SmallThan     int
	OnlyShowGtid  bool
	OnlyShowDML   bool
}

type EventFixedData

type EventFixedData struct {
	Bytes []byte
}

type EventFixedHeader

type EventFixedHeader struct {
	Bytes        []byte
	Timestamp    int
	EventType    int
	ServerId     int
	EventLength  uint
	NextPosition int
	Flags        int
}

type EventVariableData

type EventVariableData struct {
	Bytes []byte
}

type Logger

type Logger interface {
	Tracef(fmt string, args ...interface{})
}

type Transaction

type Transaction struct {
	GTID        string    `json:"gtid"`
	Timestamp   int64     `json:"timestamp"`
	Time        time.Time `json:"time"`
	StartPos    int       `json:"startPos"`
	EndPos      int       `json:"endPos"`
	Size        int       `json:"size"`
	RowsCount   int       `json:"rowsCount"`
	Status      uint8     `json:"status"`
	TxStartTime int64     `json:"txStartTime"`
	TxEndTime   int64     `json:"txEndTime"`

	Txs []TxDetail `json:"txs"`
	// contains filtered or unexported fields
}

func (Transaction) GetSqlOrigin

func (t Transaction) GetSqlOrigin() []string

type TxDetail

type TxDetail struct {
	StartPos        int       `json:"startPos"`
	EndPos          int       `json:"endPos"`
	RowCount        int       `json:"rowCount"`
	Timestamp       int64     `json:"timestamp"`
	Time            time.Time `json:"time"`
	Sql             string    `json:"sql"`
	Db              string    `json:"db"`
	Table           string    `json:"table"`
	SqlType         string    `json:"sqlType"`
	CompressionType string    `json:"compressionType"`
	Rows            [][]interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL