logic

package
v1.0.55 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NoPrintStatusRule           PrintStatusRule = iota
	HeuristicPrintStatusRule                    = iota
	ForcePrintStatusRule                        = iota
	ForcePrintStatusOnlyRule                    = iota
	ForcePrintStatusAndHintRule                 = iota
)
View Source
const (
	EventsChannelBufferSize       = 1
	ReconnectStreamerSleepSeconds = 5
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Applier

type Applier struct {
	// contains filtered or unexported fields
}

Applier connects and writes the the applier-server, which is the server where migration happens. This is typically the master, but could be a replica when `--test-on-replica` or `--execute-on-replica` are given. Applier is the one to actually write row data and apply binlog events onto the ghost table. It is where the ghost & changelog tables get created. It is where the cut-over phase happens.

func NewApplier

func NewApplier(migrationContext *base.MigrationContext) *Applier

func (*Applier) AlterGhost

func (this *Applier) AlterGhost() error

AlterGhost applies `alter` statement on ghost table

func (*Applier) ApplyDMLEventQueries

func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error

ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table

func (*Applier) ApplyDMLEventQuery

func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error

ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted original-table binlog event

func (*Applier) ApplyIterationInsertQuery

func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error)

ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where data actually gets copied from original table.

func (*Applier) AtomicCutOverMagicLock

func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error

AtomicCutOverMagicLock

func (*Applier) AtomicCutoverRename

func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error

AtomicCutoverRename

func (*Applier) CalculateNextIterationRangeEndValues

func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error)

CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, which will be used for copying the next chunk of rows. Ir returns "false" if there is no further chunk to work through, i.e. we're past the last chunk and are done with iterating the range (and this done with copying row chunks)

func (*Applier) CreateAtomicCutOverSentryTable

func (this *Applier) CreateAtomicCutOverSentryTable() error

CreateAtomicCutOverSentryTable

func (*Applier) CreateChangelogTable

func (this *Applier) CreateChangelogTable() error

CreateChangelogTable creates the changelog table on the applier host

func (*Applier) CreateGhostTable

func (this *Applier) CreateGhostTable() error

CreateGhostTable creates the ghost table on the applier host

func (*Applier) DropAtomicCutOverSentryTableIfExists

func (this *Applier) DropAtomicCutOverSentryTableIfExists() error

DropAtomicCutOverSentryTableIfExists checks if the "old" table name happens to be a cut-over magic table; if so, it drops it.

func (*Applier) DropChangelogTable

func (this *Applier) DropChangelogTable() error

DropChangelogTable drops the changelog table on the applier host

func (*Applier) DropGhostTable

func (this *Applier) DropGhostTable() error

DropGhostTable drops the ghost table on the applier host

func (*Applier) DropOldTable

func (this *Applier) DropOldTable() error

DropOldTable drops the _Old table on the applier host

func (*Applier) ExecuteThrottleQuery

func (this *Applier) ExecuteThrottleQuery() (int64, error)

ExecuteThrottleQuery executes the `--throttle-query` and returns its results.

func (*Applier) ExpectProcess

func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error

ExpectProcess expects a process to show up in `SHOW PROCESSLIST` that has given characteristics

func (*Applier) ExpectUsedLock

func (this *Applier) ExpectUsedLock(sessionId int64) error

ExpectUsedLock expects the special hint voluntary lock to exist on given session

func (*Applier) GetSessionLockName

func (this *Applier) GetSessionLockName(sessionId int64) string

GetSessionLockName returns a name for the special hint session voluntary lock

func (*Applier) InitDBConnections

func (this *Applier) InitDBConnections() (err error)

func (*Applier) InitiateHeartbeat

func (this *Applier) InitiateHeartbeat()

InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. This is done asynchronously

func (*Applier) LockOriginalTable

func (this *Applier) LockOriginalTable() error

LockOriginalTable places a write lock on the original table

func (*Applier) ReadMigrationMaxValues

func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error

ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy

func (*Applier) ReadMigrationMinValues

func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error

ReadMigrationMinValues returns the minimum values to be iterated on rowcopy

func (*Applier) ReadMigrationRangeValues

func (this *Applier) ReadMigrationRangeValues() error

ReadMigrationRangeValues reads min/max values that will be used for rowcopy

func (*Applier) RenameTablesRollback

func (this *Applier) RenameTablesRollback() (renameError error)

RenameTablesRollback renames back both table: original back to ghost, _old back to original. This is used by `--test-on-replica`

func (*Applier) ShowStatusVariable

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error)

func (*Applier) StartReplication

func (this *Applier) StartReplication() error

StartReplication is used by `--test-on-replica` on cut-over failure

func (*Applier) StartSlaveIOThread

func (this *Applier) StartSlaveIOThread() error

StartSlaveIOThread is applicable with --test-on-replica

func (*Applier) StartSlaveSQLThread

func (this *Applier) StartSlaveSQLThread() error

StartSlaveSQLThread is applicable with --test-on-replica

func (*Applier) StopReplication

func (this *Applier) StopReplication() error

StopReplication is used by `--test-on-replica` and stops replication.

func (*Applier) StopSlaveIOThread

func (this *Applier) StopSlaveIOThread() error

StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh. We need to keep the SQL thread active so as to complete processing received events, and have them written to the binary log, so that we can then read them via streamer.

func (*Applier) StopSlaveSQLThread

func (this *Applier) StopSlaveSQLThread() error

StartSlaveSQLThread is applicable with --test-on-replica

func (*Applier) SwapTablesQuickAndBumpy

func (this *Applier) SwapTablesQuickAndBumpy() error

SwapTablesQuickAndBumpy issues a two-step swap table operation: - rename original table to _old - rename ghost table to original There is a point in time in between where the table does not exist.

func (*Applier) Teardown

func (this *Applier) Teardown()

func (*Applier) UnlockTables

func (this *Applier) UnlockTables() error

UnlockTables makes tea. No wait, it unlocks tables.

func (*Applier) ValidateOrDropExistingTables

func (this *Applier) ValidateOrDropExistingTables() error

ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, or attempts to drop them if instructed to.

func (*Applier) WriteAndLogChangelog

func (this *Applier) WriteAndLogChangelog(hint, value string) (string, error)

func (*Applier) WriteChangelog

func (this *Applier) WriteChangelog(hint, value string) (string, error)

WriteChangelog writes a value to the changelog table. It returns the hint as given, for convenience

func (*Applier) WriteChangelogState

func (this *Applier) WriteChangelogState(value string) (string, error)

type BinlogEventListener

type BinlogEventListener struct {
	// contains filtered or unexported fields
}

type ChangelogState

type ChangelogState string
const (
	GhostTableMigrated         ChangelogState = "GhostTableMigrated"
	AllEventsUpToLockProcessed                = "AllEventsUpToLockProcessed"
)

func ReadChangelogState

func ReadChangelogState(s string) ChangelogState

type EventsStreamer

type EventsStreamer struct {
	// contains filtered or unexported fields
}

EventsStreamer reads data from binary logs and streams it on. It acts as a publisher, and interested parties may subscribe for per-table events.

func NewEventsStreamer

func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer

func (*EventsStreamer) AddListener

func (this *EventsStreamer) AddListener(
	async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error)

AddListener registers a new listener for binlog events, on a per-table basis

func (*EventsStreamer) Close

func (this *EventsStreamer) Close() (err error)

func (*EventsStreamer) GetCurrentBinlogCoordinates

func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates

func (*EventsStreamer) GetReconnectBinlogCoordinates

func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates

func (*EventsStreamer) InitDBConnections

func (this *EventsStreamer) InitDBConnections() (err error)

func (*EventsStreamer) StreamEvents

func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error

StreamEvents will begin streaming events. It will be blocking, so should be executed by a goroutine

func (*EventsStreamer) Teardown

func (this *EventsStreamer) Teardown()

type HooksExecutor

type HooksExecutor struct {
	// contains filtered or unexported fields
}

func NewHooksExecutor

func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor

type Inspector

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector reads data from the read-MySQL-server (typically a replica, but can be the master) It is used for gaining initial status and structure, and later also follow up on progress and changelog

func NewInspector

func NewInspector(migrationContext *base.MigrationContext) *Inspector

func (*Inspector) CountTableRows

func (this *Inspector) CountTableRows() error

CountTableRows counts exact number of rows on the original table

func (*Inspector) InitDBConnections

func (this *Inspector) InitDBConnections() (err error)

func (*Inspector) InspectOriginalTable

func (this *Inspector) InspectOriginalTable() (err error)

func (*Inspector) InspectTableColumnsAndUniqueKeys

func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (columns *sql.ColumnList, virtualColumns *sql.ColumnList, uniqueKeys [](*sql.UniqueKey), err error)

func (*Inspector) Teardown

func (this *Inspector) Teardown()

func (*Inspector) ValidateOriginalTable

func (this *Inspector) ValidateOriginalTable() (err error)

type Migrator

type Migrator struct {

	// Log *io.Writer
	Log   *bytes.Buffer
	Error error
	// contains filtered or unexported fields
}

Migrator is the main schema migration flow manager.

func NewMigrator

func NewMigrator(context *base.MigrationContext) *Migrator

func (*Migrator) ExecOnFailureHook

func (this *Migrator) ExecOnFailureHook() (err error)

ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external hook access point

func (*Migrator) FinalizeMigration

func (this *Migrator) FinalizeMigration()

func (*Migrator) Migrate

func (this *Migrator) Migrate() (err error)

Migrate executes the complete migration logic. This is *the* major gh-ost function.

type PrintStatusRule

type PrintStatusRule int

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server listens for requests on a socket file or via TCP

func NewServer

func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server

func (*Server) BindSocketFile

func (this *Server) BindSocketFile() (err error)

func (*Server) BindTCPPort

func (this *Server) BindTCPPort() (err error)

func (*Server) RemoveSocketFile

func (this *Server) RemoveSocketFile() (err error)

func (*Server) Serve

func (this *Server) Serve() (err error)

Serve begins listening & serving on whichever device was configured

type Throttler

type Throttler struct {
	// contains filtered or unexported fields
}

Throttler collects metrics related to throttling and makes informed decision whether throttling should take place.

func NewThrottler

func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler

func (*Throttler) Teardown

func (this *Throttler) Teardown()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL