package module
Version: v0.0.0-...-05fb06b Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2022 License: MIT Imports: 42 Imported by: 12



Ghostferry is a library that enables you to selectively copy data from one mysql instance to another with minimal amount of downtime.

It is inspired by Github's gh-ost, although instead of copying data from and to the same database, Ghostferry copies data from one database to another and has the ability to only partially copy data.

There is an example application called ghostferry-copydb included (under the copydb directory) that demonstrates this library by copying an entire database from one machine to another.

Talk to us on IRC at #ghostferry.

Overview of How it Works

An overview of Ghostferry's high-level design is expressed in the TLA+ specification, under the tlaplus directory. It may be good to consult with that as it has a concise definition. However, the specification might not be entirely correct as proofs remain elusive.

On a high-level, Ghostferry is broken into several components, enabling it to copy data. This is documented at

Development Setup

For Internal Contributors

dev up

For External Contributors
  • Have Docker installed
  • Clone the repo
  • docker-compose up -d
  • nix-shell


Run all tests
  • make test
Run example copydb usage
  • make copydb && ghostferry-copydb -verbose examples/copydb/conf.json
  • For a more detailed tutorial, see the documentation.
Ruby Integration Tests

Kindly take note of following options:

  • DEBUG=1: To see more detailed debug output by Ghostferry live, as opposed to only when the test fails. This is helpful for debugging hanging test.


DEBUG=1 ruby test/main.rb -v -n "TrivialIntegrationTests#test_logged_query_omits_columns"




View Source
const (
	VerifierTypeChecksumTable  = "ChecksumTable"
	VerifierTypeIterative      = "Iterative"
	VerifierTypeInline         = "Inline"
	VerifierTypeNoVerification = "NoVerification"

	DefaultNet          = "tcp"
	DefaultMarginalia   = "application:ghostferry"
	MySQLNumParamsLimit = 1<<16 - 1 // see num_params
View Source
const (
	StateStarting            = "starting"
	StateCopying             = "copying"
	StateWaitingForCutover   = "wait-for-cutover"
	StateVerifyBeforeCutover = "verify-before-cutover"
	StateCutover             = "cutover"
	StateDone                = "done"
View Source
const (
	TableActionWaiting   = "waiting"
	TableActionCopying   = "copying"
	TableActionCompleted = "completed"
View Source
const (
	// CompressionSnappy is used to identify Snappy ( compressed column data
	CompressionSnappy = "SNAPPY"


View Source
var (
	VersionString string = "?.?.?+??????????????+???????"
	WebUiBasedir  string = ""


func CheckDbIsAReplica

func CheckDbIsAReplica(db *sql.DB) (bool, error)

func ConvertTableColumnsToStrings

func ConvertTableColumnsToStrings(columns []schema.TableColumn) []string

func DefaultBuildSelect

func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey, batchSize uint64) squirrel.SelectBuilder

func GetMd5HashesSql

func GetMd5HashesSql(schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (string, []interface{}, error)

func Int64Value

func Int64Value(value interface{}) (int64, bool)

func MaskedDSN

func MaskedDSN(c *mysql.Config) string

func NewMysqlPosition

func NewMysqlPosition(file string, position uint32, err error) (mysql.Position, error)

func NonExistingPaginationKeyColumnError

func NonExistingPaginationKeyColumnError(schema, table, paginationKey string) error

NonExistingPaginationKeyColumnError exported to facilitate black box testing

func NonExistingPaginationKeyError

func NonExistingPaginationKeyError(schema, table string) error

NonExistingPaginationKeyError exported to facilitate black box testing

func NonNumericPaginationKeyError

func NonNumericPaginationKeyError(schema, table, paginationKey string) error

NonNumericPaginationKeyError exported to facilitate black box testing

func QuoteField

func QuoteField(field string) string

func QuoteFields

func QuoteFields(fields []string) (out []string)

func QuotedTableName

func QuotedTableName(table *TableSchema) string

func QuotedTableNameFromString

func QuotedTableNameFromString(database, table string) string

func ScanByteRow

func ScanByteRow(rows *sqlorig.Rows, columnCount int) ([][]byte, error)

func ShowMasterStatusBinlogPosition

func ShowMasterStatusBinlogPosition(db *sql.DB) (mysql.Position, error)

func TargetToSourceRewrites

func TargetToSourceRewrites(databaseRewrites map[string]string) (map[string]string, error)

func Uint64Value

func Uint64Value(value interface{}) (uint64, bool)

func WaitForThrottle

func WaitForThrottle(t Throttler)

func WithRetries

func WithRetries(maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, f func() error) (err error)

func WithRetriesContext

func WithRetriesContext(ctx context.Context, maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, f func() error) (err error)


type AtomicBoolean

type AtomicBoolean int32

func (*AtomicBoolean) Get

func (a *AtomicBoolean) Get() bool

func (*AtomicBoolean) Set

func (a *AtomicBoolean) Set(b bool)

type BatchWriter

type BatchWriter struct {
	DB                        *sql.DB
	InlineVerifier            *InlineVerifier
	StateTracker              *StateTracker
	EnforceInlineVerification bool // Only needed when running the BatchWriter during cutover

	DatabaseRewrites map[string]string
	TableRewrites    map[string]string

	WriteRetries int
	// contains filtered or unexported fields

func (*BatchWriter) Initialize

func (w *BatchWriter) Initialize()

func (*BatchWriter) WriteRowBatch

func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error

type BatchWriterVerificationFailed

type BatchWriterVerificationFailed struct {
	// contains filtered or unexported fields

func (BatchWriterVerificationFailed) Error

type BinlogDeleteEvent

type BinlogDeleteEvent struct {
	// contains filtered or unexported fields

func (*BinlogDeleteEvent) AsSQLString

func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, error)

func (*BinlogDeleteEvent) NewValues

func (e *BinlogDeleteEvent) NewValues() RowData

func (*BinlogDeleteEvent) OldValues

func (e *BinlogDeleteEvent) OldValues() RowData

func (*BinlogDeleteEvent) PaginationKey

func (e *BinlogDeleteEvent) PaginationKey() (uint64, error)

type BinlogEventState

type BinlogEventState struct {
	// contains filtered or unexported fields

this is passed into event handlers to keep track of state of the binlog event stream.

type BinlogInsertEvent

type BinlogInsertEvent struct {
	// contains filtered or unexported fields

func (*BinlogInsertEvent) AsSQLString

func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error)

func (*BinlogInsertEvent) NewValues

func (e *BinlogInsertEvent) NewValues() RowData

func (*BinlogInsertEvent) OldValues

func (e *BinlogInsertEvent) OldValues() RowData

func (*BinlogInsertEvent) PaginationKey

func (e *BinlogInsertEvent) PaginationKey() (uint64, error)

type BinlogStreamer

type BinlogStreamer struct {
	DB           *sql.DB
	DBConfig     *DatabaseConfig
	MyServerId   uint32
	ErrorHandler ErrorHandler
	Filter       CopyFilter

	TableSchema TableSchemaCache
	LogTag      string

	// These rewrite structures are used specifically for the Target
	// Verifier as it needs to map events streamed from the Target back
	// to the TableSchemaCache of the Source
	// See for details
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	// contains filtered or unexported fields

func (*BinlogStreamer) AddBinlogEventHandler

func (s *BinlogStreamer) AddBinlogEventHandler(evType replication.EventType, eh func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)) error

Attach an event handler to a replication BinLogEvent We only support attaching events to any of the events defined in custom event handlers are provided the replication BinLogEvent and a state object that carries the current state of the binlog event stream.

func (*BinlogStreamer) AddEventListener

func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)

func (*BinlogStreamer) ConnectBinlogStreamerToMysql

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error)

func (*BinlogStreamer) ConnectBinlogStreamerToMysqlFrom

func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error)

func (*BinlogStreamer) FlushAndStop

func (s *BinlogStreamer) FlushAndStop()

func (*BinlogStreamer) GetLastStreamedBinlogPosition

func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position

func (*BinlogStreamer) IsAlmostCaughtUp

func (s *BinlogStreamer) IsAlmostCaughtUp() bool

func (*BinlogStreamer) Run

func (s *BinlogStreamer) Run()

type BinlogUpdateEvent

type BinlogUpdateEvent struct {
	// contains filtered or unexported fields

func (*BinlogUpdateEvent) AsSQLString

func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, error)

func (*BinlogUpdateEvent) NewValues

func (e *BinlogUpdateEvent) NewValues() RowData

func (*BinlogUpdateEvent) OldValues

func (e *BinlogUpdateEvent) OldValues() RowData

func (*BinlogUpdateEvent) PaginationKey

func (e *BinlogUpdateEvent) PaginationKey() (uint64, error)

type BinlogVerifyBatch

type BinlogVerifyBatch struct {
	SchemaName     string
	TableName      string
	PaginationKeys []uint64

type BinlogVerifySerializedStore

type BinlogVerifySerializedStore map[string]map[string]map[uint64]int

func (BinlogVerifySerializedStore) Copy

func (BinlogVerifySerializedStore) EntriesCount

func (s BinlogVerifySerializedStore) EntriesCount() uint64

func (BinlogVerifySerializedStore) RowCount

func (s BinlogVerifySerializedStore) RowCount() uint64

type BinlogVerifyStore

type BinlogVerifyStore struct {
	EmitLogPerRowsAdded uint64
	// contains filtered or unexported fields

This struct is very similar to ReverifyStore, but it is more optimized for serialization into JSON.

TODO: remove IterativeVerifier and remove this comment.

func NewBinlogVerifyStore

func NewBinlogVerifyStore() *BinlogVerifyStore

func NewBinlogVerifyStoreFromSerialized

func NewBinlogVerifyStoreFromSerialized(serialized BinlogVerifySerializedStore) *BinlogVerifyStore

func (*BinlogVerifyStore) Add

func (s *BinlogVerifyStore) Add(table *TableSchema, paginationKey uint64)

func (*BinlogVerifyStore) Batches

func (s *BinlogVerifyStore) Batches(batchsize int) []BinlogVerifyBatch

func (*BinlogVerifyStore) CurrentEntriesCount

func (s *BinlogVerifyStore) CurrentEntriesCount() uint64

func (*BinlogVerifyStore) CurrentRowCount

func (s *BinlogVerifyStore) CurrentRowCount() uint64

func (*BinlogVerifyStore) RemoveVerifiedBatch

func (s *BinlogVerifyStore) RemoveVerifiedBatch(batch BinlogVerifyBatch)

func (*BinlogVerifyStore) Serialize

type BinlogWriter

type BinlogWriter struct {
	DB               *sql.DB
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	Throttler        Throttler

	BatchSize    int
	WriteRetries int

	ErrorHandler ErrorHandler
	StateTracker *StateTracker
	// contains filtered or unexported fields

func (*BinlogWriter) BufferBinlogEvents

func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error

func (*BinlogWriter) Run

func (b *BinlogWriter) Run()

func (*BinlogWriter) Stop

func (b *BinlogWriter) Stop()

type CascadingPaginationColumnConfig

type CascadingPaginationColumnConfig struct {
	// PerTable has greatest specificity and takes precedence over the other options
	PerTable map[string]map[string]string // SchemaName => TableName => ColumnName

	// FallbackColumn is a global default to fallback to and is less specific than the
	// default, which is the Primary Key
	FallbackColumn string

CascadingPaginationColumnConfig to configure pagination columns to be used. The term `Cascading` to denote that greater specificity takes precedence.

func (*CascadingPaginationColumnConfig) FallbackPaginationColumnName

func (c *CascadingPaginationColumnConfig) FallbackPaginationColumnName() (string, bool)

FallbackPaginationColumnName retreives the column name specified as a fallback when the Primary Key isn't suitable for pagination

func (*CascadingPaginationColumnConfig) PaginationColumnFor

func (c *CascadingPaginationColumnConfig) PaginationColumnFor(schemaName, tableName string) (string, bool)

PaginationColumnFor is a helper function to retrieve the column name to paginate by

type ChecksumTableVerifier

type ChecksumTableVerifier struct {
	Tables           []*TableSchema
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	SourceDB         *sql.DB
	TargetDB         *sql.DB
	// contains filtered or unexported fields

func (*ChecksumTableVerifier) Message

func (v *ChecksumTableVerifier) Message() string

func (*ChecksumTableVerifier) Result

func (*ChecksumTableVerifier) StartInBackground

func (v *ChecksumTableVerifier) StartInBackground() error

func (*ChecksumTableVerifier) VerifyBeforeCutover

func (v *ChecksumTableVerifier) VerifyBeforeCutover() error

func (*ChecksumTableVerifier) VerifyDuringCutover

func (v *ChecksumTableVerifier) VerifyDuringCutover() (VerificationResult, error)

func (*ChecksumTableVerifier) Wait

func (v *ChecksumTableVerifier) Wait()

type ColumnCompressionConfig

type ColumnCompressionConfig map[string]map[string]map[string]string

SchemaName => TableName => ColumnName => CompressionAlgorithm Example: blog1 => articles => body => snappy

(SELECT body FROM blog1.articles => returns compressed blob)

func (ColumnCompressionConfig) CompressedColumnsFor

func (c ColumnCompressionConfig) CompressedColumnsFor(schemaName, tableName string) map[string]string

type ColumnIgnoreConfig

type ColumnIgnoreConfig map[string]map[string]map[string]struct{}

SchemaName => TableName => ColumnName => struct{}{} These columns will be ignored during InlineVerification

func (ColumnIgnoreConfig) IgnoredColumnsFor

func (c ColumnIgnoreConfig) IgnoredColumnsFor(schemaName, tableName string) map[string]struct{}

type CompressionVerifier

type CompressionVerifier struct {
	// contains filtered or unexported fields

CompressionVerifier provides support for verifying the payload of compressed columns that may have different hashes for the same data by first decompressing the compressed data before fingerprinting

func NewCompressionVerifier

func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error)

NewCompressionVerifier first checks the map for supported compression algorithms before initializing and returning the initialized instance.

func (*CompressionVerifier) Decompress

func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error)

Decompress will apply the configured decompression algorithm to the configured columns data

func (*CompressionVerifier) GetCompressedHashes

func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error)

GetCompressedHashes compares the source data with the target data to ensure the integrity of the data being copied.

The GetCompressedHashes method checks if the existing table contains compressed data and will apply the decompression algorithm to the applicable columns if necessary. After the columns are decompressed, the hashes of the data are used to verify equality

func (*CompressionVerifier) HashRow

func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error)

HashRow will fingerprint the non-primary columns of the row to verify data equality

func (*CompressionVerifier) IsCompressedTable

func (c *CompressionVerifier) IsCompressedTable(table string) bool

IsCompressedTable will identify whether or not a table is compressed

type Config

type Config struct {
	// Source database connection configuration
	// Required
	Source *DatabaseConfig

	// Target database connection configuration
	// Required
	Target *DatabaseConfig

	// Map database name on the source database (key of the map) to a
	// different name on the target database (value of the associated key).
	// This allows one to move data and change the database name in the
	// process.
	// Optional: defaults to empty map/no rewrites
	DatabaseRewrites map[string]string

	// Map the table name on the source database to a different name on
	// the target database. See DatabaseRewrite.
	// Optional: defaults to empty map/no rewrites
	TableRewrites map[string]string

	// The maximum number of retries for writes if the writes failed on
	// the target database.
	// Optional: defaults to 5.
	DBWriteRetries int

	// Filter out the databases/tables when detecting the source databases
	// and tables.
	// Required
	TableFilter TableFilter

	// Filter out unwanted data/events from being copied.
	// Optional: defaults to nil/no filter.
	CopyFilter CopyFilter

	// The server id used by Ghostferry to connect to MySQL as a replication
	// slave. This id must be unique on the MySQL server. If 0 is specified,
	// a random id will be generated upon connecting to the MySQL server.
	// Optional: defaults to an automatically generated one
	MyServerId uint32

	// The maximum number of binlog events to write at once. Note this is a
	// maximum: if there are not a lot of binlog events, they will be written
	// one at a time such the binlog streamer lag is as low as possible. This
	// batch size will only be hit if there is a log of binlog at the same time.
	// Optional: defaults to 100
	BinlogEventBatchSize int

	// This optional config uses different data points to calculate
	// batch size per table using linear interpolation
	DataIterationBatchSizePerTableOverride *DataIterationBatchSizePerTableOverride

	// The maximum number of retries for reads if the reads fail on the source
	// database.
	// Optional: defaults to 5
	DBReadRetries int

	// This specify the number of concurrent goroutines, each iterating over
	// a single table.
	// At this point in time, parallelize iteration within a single table. This
	// may be possible to add to the future.
	// Optional: defaults to 4
	DataIterationConcurrency int

	// This specifies if Ghostferry will pause before cutover or not.
	// Optional: defaults to false
	AutomaticCutover bool

	// This specifies whether or not Ferry.Run will handle SIGINT and SIGTERM
	// by dumping the current state to stdout and the error HTTP callback.
	// The dumped state can be used to resume Ghostferry.
	DumpStateOnSignal bool

	// This specifies whether or not Ghostferry will dump the current state to stdout
	// before exiting due to an error.
	// Optional: defaults to false
	DumpStateToStdoutOnError bool

	// This excludes schema cache from the state dump in both the HTTP callback
	// and the stdout dumping. This may save a lot of space if you don't need
	// to deal with schema migrations.
	DoNotIncludeSchemaCacheInStateDump bool

	// Config for the ControlServer
	ControlServerConfig *ControlServerConfig

	// Report progress via an HTTP callback. The Payload field of the callback
	// will be sent to the server as the CustomPayload field in the Progress
	// struct. The unit of ProgressReportFrequency is in milliseconds.
	ProgressCallback        HTTPCallback
	ProgressReportFrequency int

	// Report state via an HTTP callback. The SerializedState struct will be
	// sent as the Payload parameter. The unit of StateReportFrequency is
	// in milliseconds.
	StateCallback        HTTPCallback
	StateReportFrequency int

	// Report error via an HTTP callback. The Payload field will contain the ErrorType,
	// ErrorMessage and the StateDump.
	ErrorCallback HTTPCallback

	// Report when ghostferry is entering cutover
	CutoverLock HTTPCallback

	// Report when ghostferry is finished cutover
	CutoverUnlock HTTPCallback

	// If the callback returns a non OK status, these two values configure the number of times Ferry should attempt to
	// retry acquiring the cutover lock, and for how long the Ferry should wait
	// before attempting another lock acquisition
	// MaxCutoverRetries default is 1 retry
	// CutoverRetryWaitSeconds default is 1 second
	MaxCutoverRetries       int
	CutoverRetryWaitSeconds int

	// The state to resume from as dumped by the PanicErrorHandler.
	// If this is null, a new Ghostferry run will be started. Otherwise, the
	// reconciliation process will start and Ghostferry will resume after that.
	StateToResumeFrom *SerializableState

	// The verifier to use during the run. Valid choices are:
	// ChecksumTable
	// Iterative
	// NoVerification
	// If it is left blank, the Verifier member variable on the Ferry will be
	// used. If that member variable is nil, no verification will be done.
	VerifierType string

	// Only useful if VerifierType == Iterative.
	// This specifies the configurations to the IterativeVerifier.
	// This option is in the process of being deprecated.
	IterativeVerifierConfig IterativeVerifierConfig

	// Only useful if VerifierType == Inline.
	// This specifies the configurations to the InlineVerifierConfig.
	InlineVerifierConfig InlineVerifierConfig

	// For old versions mysql<5.6.2, MariaDB<10.1.6 which has no related var
	// Make sure you have binlog_row_image=FULL when turning on this
	SkipBinlogRowImageCheck bool

	// This config is necessary for inline verification for a special case of
	// Ghostferry:
	// - If you are copying a table where the data is already partially on the
	//   target through some other means.
	//   - Specifically, the PaginationKey of this row on both the source and the target are
	//     the same. Thus, INSERT IGNORE will skip copying this row, leaving the
	//     data on the target unchanged.
	//   - If the data on the target is already identical to the source, then
	//     verification will pass and all is well.
	// - However, if this data is compressed with a non-determinstic algorithm
	//   such as snappy, the compressed blob may not be equal even when the
	//   uncompressed data is equal.
	// - This column signals to the InlineVerifier that it needs to decompress
	//   the data to compare identity.
	// Note: a similar option exists in IterativeVerifier. However, the
	// IterativeVerifier is being deprecated and this will be the correct place
	// to specify it if you don't need the IterativeVerifier.
	CompressedColumnsForVerification ColumnCompressionConfig

	// This config is also for inline verification for the same special case of
	// Ghostferry as documented with the CompressedColumnsForVerification option:
	// - If you're copying a table where the data is partially already on the
	//   the target through some other means.
	// - A difference in a particular column could be acceptable.
	//   - An example would be a table with a data field and a created_at field.
	//     Maybe the created_at field is not important for data integrity as long
	//     as the data field is correct.
	// - Putting the column in this config will cause the InlineVerifier to skip
	//   this column for verification.
	IgnoredColumnsForVerification ColumnIgnoreConfig

	// Map an index to a table, will add `FORCE INDEX (index_name)` to the fingerprint SELECT query.
	// Index hinting might be necessary if you are running into slow queries during copy on your target.
	// Example:
	// "ForceIndexForVerification": {
	//   "blog": {
	//     "users": "ix_users_some_id"
	//   }
	// }
	ForceIndexForVerification ForceIndexConfig

	// Ghostferry requires a single numeric column to paginate over tables. Inferring that column is done in the following exact order:
	// 1. Use the PerTable pagination column, if configured for a table. Fail if we cannot find this column in the table.
	// 2. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric or is a composite key without a FallbackColumn specified.
	// 3. Use the FallbackColumn pagination column, if configured. Fail if we cannot find this column in the table.
	CascadingPaginationColumnConfig *CascadingPaginationColumnConfig

	// SkipTargetVerification is used to enable or disable target verification during moves.
	// This feature is currently only available while using the InlineVerifier.
	// This does so by inspecting the annotations (configured as Marginalia in the DatabaseConfig above)
	// and will fail the move unless all applicable DMLs (as identified by the sharding key) sent to the
	// Target were sent from Ghostferry.
	// NOTE:
	// The Target database must be configured with binlog_rows_query_log_events
	// set to "ON" for this to function properly. Ghostferry not allow the move
	// process to begin if this is enabled and the above option is set to "OFF".
	// Required: defaults to false
	SkipTargetVerification bool

	// During initialization, Ghostferry will raise an error if any
	// foreign key constraints are detected in the source database.
	// This check can be bypassed by setting this value to true.
	// Using Ghostferry with foreign keys is highly discouraged and
	// disabling this check makes no guarantees of the success of the run.
	// Required: defaults to false
	SkipForeignKeyConstraintsCheck bool

	// EnableRowBatchsize is used to enable or disable the calculation of number of bytes written for each row batch.
	// Optional: Defaults to false.
	// NOTE:
	// Turning off the EnableRowBatchSize flag would show the NumBytes written per RowBatch to be zero
	// in the Progress. This behaviour is perfectly okay and doesn't mean there are no rows being written
	// to the target DB.
	EnableRowBatchSize bool

	// If the target DB is set to read_only ghostferry will throw an error during the initialization step.
	// AllowSuperUserOnReadOnly flag allows to run ghostferry even if the target DB is read_only. This is helpful in
	// scenarios where target DB needs to be restricted from writes made by any other user then the ghostferry user.
	// Optional: Defaults to false.
	// NOTE:
	// The ghostferry target user should have SUPER permissions to actually write to the target DB,
	// if ghostferry is ran with AllowSuperUserOnReadOnly = true and the target DB is set to read_only.
	AllowSuperUserOnReadOnly bool

	// If true, net/http/pprof will be enabled on port 6060.
	EnablePProf bool

	UpdatableConfig UpdatableConfig

	// ----------------------------------------------------------------------------------------------------------------
	// The following configs are deprecated
	DataIterationBatchSize     uint64              // replaced by UpdatableConfig.DataIterationBatchSize
	ServerBindAddr             string              // replaced by ControlServerConfig.ServerBindAddr
	WebBasedir                 string              // replaced by ControlServerConfig.WebBasedir
	ControlServerCustomScripts map[string][]string // replaced by ControlServerConfig.CustomScripts

func (*Config) Update

func (c *Config) Update(updatedConfig UpdatableConfig)

func (*Config) ValidateConfig

func (c *Config) ValidateConfig() error

type ControlServer

type ControlServer struct {
	Config   *ControlServerConfig
	F        *Ferry
	Verifier Verifier
	// contains filtered or unexported fields

func (*ControlServer) HandleConfigGet

func (this *ControlServer) HandleConfigGet(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleConfigPost

func (this *ControlServer) HandleConfigPost(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleCutover

func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleIndex

func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandlePause

func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleScript

func (this *ControlServer) HandleScript(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleStatus

func (this *ControlServer) HandleStatus(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleStop

func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleUnpause

func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleVerify

func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)

func (*ControlServer) Initialize

func (this *ControlServer) Initialize() (err error)

func (*ControlServer) Run

func (this *ControlServer) Run()

func (*ControlServer) ServeHTTP

func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*ControlServer) Wait

func (this *ControlServer) Wait()

type ControlServerConfig

type ControlServerConfig struct {
	// enable/disable http control server
	Enabled bool

	// Bind control server address
	ServerBindAddr string

	// Path to `web` base dir
	WebBasedir string

	// TODO: refactor control server config out of the base ferry at some point
	// This adds optional buttons in the web ui that runs a script located at the
	// path specified.
	// The format is "script name" => ["path to script", "arg1", "arg2"]. The script name
	// will be displayed on the web ui.
	CustomScripts map[string][]string

func (*ControlServerConfig) Validate

func (c *ControlServerConfig) Validate() error

type ControlServerStatus

type ControlServerStatus struct {
	GhostferryVersion string

	SourceHostPort string
	TargetHostPort string

	OverallState      string
	StartTime         time.Time
	CurrentTime       time.Time
	TimeTaken         time.Duration
	ETA               time.Duration
	BinlogStreamerLag time.Duration

	AutomaticCutover            bool
	BinlogStreamerStopRequested bool

	CompletedTableCount int
	TotalTableCount     int
	TableStatuses       []*ControlServerTableStatus
	AllTableNames       []string
	AllDatabaseNames    []string

	VerifierSupport     bool
	VerifierAvailable   bool
	VerificationStarted bool
	VerificationDone    bool
	VerificationResult  VerificationResult
	VerificationErr     error

	// TODO: this is populated by the control server. Clearly this all needs a refactor.
	CustomScriptStatuses map[string]CustomScriptStatus

type ControlServerTableStatus

type ControlServerTableStatus struct {
	TableName                   string
	PaginationKeyName           string
	Status                      string
	LastSuccessfulPaginationKey uint64
	TargetPaginationKey         uint64

	BatchSize uint64

type CopyFilter

type CopyFilter interface {
	// BuildSelect is used to set up the query used for batch data copying,
	// allowing for restricting copying to a subset of data. Returning an error
	// here will cause the query to be retried, until the retry limit is
	// reached, at which point the ferry will be aborted. BuildSelect is passed
	// the columns to be selected, table being copied, the last primary key value
	// from the previous batch, and the batch size. Call DefaultBuildSelect to
	// generate the default query, which may be used as a starting point.
	BuildSelect([]string, *TableSchema, uint64, uint64) (sq.SelectBuilder, error)

	// ApplicableEvent is used to filter events for rows that have been
	// filtered in ConstrainSelect. ApplicableEvent should return true if the
	// event is for a row that would be selected by ConstrainSelect, and false
	// otherwise.
	// Returning an error here will cause the ferry to be aborted.
	ApplicableEvent(DMLEvent) (bool, error)

CopyFilter provides an interface for restricting the copying to a subset of data. This typically involves adding a WHERE condition in the ConstrainSelect function, and returning false for unwanted rows in ApplicableEvent.

type CountMetric

type CountMetric struct {
	Value int64

type Cursor

type Cursor struct {

	Table            *TableSchema
	MaxPaginationKey uint64
	RowLock          bool
	// contains filtered or unexported fields

func (*Cursor) Each

func (c *Cursor) Each(f func(*RowBatch) error) error

func (*Cursor) Fetch

func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64, err error)

type CursorConfig

type CursorConfig struct {
	DB        *sql.DB
	Throttler Throttler

	ColumnsToSelect []string
	BuildSelect     func([]string, *TableSchema, uint64, uint64) (squirrel.SelectBuilder, error)
	// BatchSize is a pointer to the BatchSize in Config.UpdatableConfig which can be independently updated from this code.
	// Having it as a pointer allows the updated value to be read without needing additional code to copy the batch size value into the cursor config for each cursor we create.
	BatchSize                 *uint64
	BatchSizePerTableOverride *DataIterationBatchSizePerTableOverride
	ReadRetries               int

func (CursorConfig) GetBatchSize

func (c CursorConfig) GetBatchSize(schemaName string, tableName string) uint64

func (*CursorConfig) NewCursor

func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor

returns a new Cursor with an embedded copy of itself

func (*CursorConfig) NewCursorWithoutRowLock

func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor

returns a new Cursor with an embedded copy of itself

type CustomScriptStatus

type CustomScriptStatus struct {
	Name     string
	Status   string
	Logs     string
	ExitCode int
	Running  bool

type DMLEvent

type DMLEvent interface {
	Database() string
	Table() string
	TableSchema() *TableSchema
	AsSQLString(string, string) (string, error)
	OldValues() RowData
	NewValues() RowData
	PaginationKey() (uint64, error)
	BinlogPosition() mysql.Position
	ResumableBinlogPosition() mysql.Position
	Annotation() (string, error)
	Timestamp() time.Time

func NewBinlogDMLEvents

func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error)

func NewBinlogDeleteEvents

func NewBinlogDeleteEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)

func NewBinlogInsertEvents

func NewBinlogInsertEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)

func NewBinlogUpdateEvents

func NewBinlogUpdateEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)

type DMLEventBase

type DMLEventBase struct {
	// contains filtered or unexported fields

The base of DMLEvent to provide the necessary methods.

func NewDMLEventBase

func NewDMLEventBase(table *TableSchema, pos, resumablePos mysql.Position, query []byte, timestamp time.Time) *DMLEventBase

func (*DMLEventBase) Annotation

func (e *DMLEventBase) Annotation() (string, error)

Annotation will return the first prefixed comment on the SQL string, or an error if the query attribute of the DMLEvent is not set

func (*DMLEventBase) BinlogPosition

func (e *DMLEventBase) BinlogPosition() mysql.Position

func (*DMLEventBase) Database

func (e *DMLEventBase) Database() string

func (*DMLEventBase) ResumableBinlogPosition

func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position

func (*DMLEventBase) Table

func (e *DMLEventBase) Table() string

func (*DMLEventBase) TableSchema

func (e *DMLEventBase) TableSchema() *TableSchema

func (*DMLEventBase) Timestamp

func (e *DMLEventBase) Timestamp() time.Time

type DataIterationBatchSizePerTableOverride

type DataIterationBatchSizePerTableOverride struct {
	// Lower limit for rowSize, if a rowSize <= MinRowSize, ControlPoints[MinRowSize] will be used
	MinRowSize int
	// Upper limit for rowSize, if a rowSize >= MaxRowSize, ControlPoints[MaxRowSize] will be used
	MaxRowSize int
	// Map of rowSize  => batchSize used to calculate batchSize for new rowSizes, results stored in TableOverride
	ControlPoints map[int]uint64
	// Map of schemaName(source schema) => tableName => batchSize to override default values for certain tables
	TableOverride map[string]map[string]uint64

func (*DataIterationBatchSizePerTableOverride) CalculateBatchSize

func (d *DataIterationBatchSizePerTableOverride) CalculateBatchSize(rowSize int) int

func (*DataIterationBatchSizePerTableOverride) UpdateBatchSizes

func (d *DataIterationBatchSizePerTableOverride) UpdateBatchSizes(db *sql.DB, tables TableSchemaCache) error

func (*DataIterationBatchSizePerTableOverride) Validate

type DataIterator

type DataIterator struct {
	DB                *sql.DB
	Concurrency       int
	SelectFingerprint bool

	ErrorHandler ErrorHandler
	CursorConfig *CursorConfig
	StateTracker *StateTracker
	TableSorter  DataIteratorSorter
	// contains filtered or unexported fields

func (*DataIterator) AddBatchListener

func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error)

func (*DataIterator) AddDoneListener

func (d *DataIterator) AddDoneListener(listener func() error)

func (*DataIterator) Run

func (d *DataIterator) Run(tables []*TableSchema)

type DataIteratorSorter

type DataIteratorSorter interface {
	Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error)

DataIteratorSorter is an interface for the DataIterator to choose which order it will process table

type DatabaseConfig

type DatabaseConfig struct {
	Host      string
	Port      uint16
	Net       string
	User      string
	Pass      string
	Collation string
	Params    map[string]string
	TLS       *TLSConfig

	// ReadTimeout is used to configure the MySQL client timeout for waiting for data from server.
	// Timeout is in seconds. Defaults to 0, which means no timeout.
	ReadTimeout uint64

	// WriteTimeout is used to configure the MySQL client timeout for writing data to server.
	// Timeout is in seconds. Defaults to 0, which means no timeout.
	WriteTimeout uint64

	// SQL annotations used to differentiate Ghostferry's DMLs
	// against other actor's. This will default to the defaultMarginalia
	// constant above if not set.
	// This is used to ensure any changes to the Target during the move process
	// are performed only by Ghostferry (until cutover). Otherwise, the modification
	// will be identified as data corruption and fail the move.
	Marginalia string

func (*DatabaseConfig) MySQLConfig

func (c *DatabaseConfig) MySQLConfig() (*mysql.Config, error)

func (*DatabaseConfig) SqlDB

func (c *DatabaseConfig) SqlDB(logger *logrus.Entry) (*sql.DB, error)

func (*DatabaseConfig) Validate

func (c *DatabaseConfig) Validate() error

type ErrorHandler

type ErrorHandler interface {
	// Usually called from Fatal. When called from Fatal, if this method returns
	// true, Fatal should panic, otherwise it should not.
	ReportError(from string, err error)
	Fatal(from string, err error)

type Ferry

type Ferry struct {

	SourceDB *sql.DB
	TargetDB *sql.DB

	ControlServer *ControlServer

	BinlogStreamer *BinlogStreamer
	BinlogWriter   *BinlogWriter

	TargetVerifier *TargetVerifier

	DataIterator *DataIterator
	BatchWriter  *BatchWriter

	StateTracker                       *StateTracker
	ErrorHandler                       ErrorHandler
	Throttler                          Throttler
	WaitUntilReplicaIsCaughtUpToMaster *WaitUntilReplicaIsCaughtUpToMaster

	// This can be specified by the caller. If specified, do not specify
	// VerifierType in Config (or as an empty string) or an error will be
	// returned in Initialize.
	// If VerifierType is specified and this is nil on Ferry initialization, a
	// Verifier will be created by Initialize. If an IterativeVerifier is to be
	// created, IterativeVerifierConfig will be used to create the verifier.
	Verifier Verifier

	Tables TableSchemaCache

	StartTime    time.Time
	DoneTime     time.Time
	OverallState atomic.Value
	// contains filtered or unexported fields

func (*Ferry) EndCutover

func (f *Ferry) EndCutover(cutoverStart time.Time)

func (*Ferry) FlushBinlogAndStopStreaming

func (f *Ferry) FlushBinlogAndStopStreaming()

After you stop writing to the source and made sure that all inflight transactions to the source are completed, call this method to ensure that the binlog streaming has caught up and stop the binlog streaming.

This method will actually not shutdown the BinlogStreamer immediately. You will know that the BinlogStreamer finished when .Run() returns.

func (*Ferry) Initialize

func (f *Ferry) Initialize() (err error)

Initialize all the components of Ghostferry and connect to the Database

func (*Ferry) NewBatchWriter

func (f *Ferry) NewBatchWriter() *BatchWriter

func (*Ferry) NewBatchWriterWithoutStateTracker

func (f *Ferry) NewBatchWriterWithoutStateTracker() *BatchWriter

func (*Ferry) NewBinlogWriter

func (f *Ferry) NewBinlogWriter() *BinlogWriter

func (*Ferry) NewBinlogWriterWithoutStateTracker

func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter

func (*Ferry) NewChecksumTableVerifier

func (f *Ferry) NewChecksumTableVerifier() *ChecksumTableVerifier

func (*Ferry) NewControlServer

func (f *Ferry) NewControlServer() (*ControlServer, error)

func (*Ferry) NewDataIterator

func (f *Ferry) NewDataIterator() *DataIterator

func (*Ferry) NewDataIteratorWithoutStateTracker

func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator

func (*Ferry) NewInlineVerifier

func (f *Ferry) NewInlineVerifier() *InlineVerifier

func (*Ferry) NewInlineVerifierWithoutStateTracker

func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier

func (*Ferry) NewIterativeVerifier

func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error)

func (*Ferry) NewSourceBinlogStreamer

func (f *Ferry) NewSourceBinlogStreamer() *BinlogStreamer

func (*Ferry) NewTargetBinlogStreamer

func (f *Ferry) NewTargetBinlogStreamer() (*BinlogStreamer, error)

func (*Ferry) Progress

func (f *Ferry) Progress() *Progress

func (*Ferry) ReportProgress

func (f *Ferry) ReportProgress()

func (*Ferry) ReportState

func (f *Ferry) ReportState()

ReportState may have a slight performance impact as it will temporarily lock the StateTracker when it is serialized before posting to the callback

func (*Ferry) Run

func (f *Ferry) Run()

Spawns the background tasks that actually perform the run. Wait for the background tasks to finish.

func (*Ferry) RunStandaloneDataCopy

func (f *Ferry) RunStandaloneDataCopy(tables []*TableSchema) error

func (*Ferry) SerializeStateToJSON

func (f *Ferry) SerializeStateToJSON() (string, error)

func (*Ferry) Start

func (f *Ferry) Start() error

Attach event listeners for Ghostferry components and connect the binlog streamer to the source shard

Note: Binlog streaming doesn't start until Run. Here we simply connect to MySQL.

func (*Ferry) StartCutover

func (f *Ferry) StartCutover() time.Time

func (*Ferry) StopTargetVerifier

func (f *Ferry) StopTargetVerifier()

func (*Ferry) WaitUntilBinlogStreamerCatchesUp

func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()

func (*Ferry) WaitUntilRowCopyIsComplete

func (f *Ferry) WaitUntilRowCopyIsComplete()

Call this method and perform the cutover after this method returns.

type ForceIndexConfig

type ForceIndexConfig map[string]map[string]string

SchemaName => TableName => IndexName These indices will be forced for queries in InlineVerification

func (ForceIndexConfig) IndexFor

func (c ForceIndexConfig) IndexFor(schemaName, tableName string) string

type GaugeMetric

type GaugeMetric struct {
	Value float64

type HTTPCallback

type HTTPCallback struct {
	URI     string
	Payload string

func (HTTPCallback) Post

func (h HTTPCallback) Post(client *http.Client) error

type IncompleteVerificationError

type IncompleteVerificationError struct{}

func (IncompleteVerificationError) Error

type InlineVerifier

type InlineVerifier struct {
	SourceDB                   *sql.DB
	TargetDB                   *sql.DB
	DatabaseRewrites           map[string]string
	TableRewrites              map[string]string
	TableSchemaCache           TableSchemaCache
	BatchSize                  int
	VerifyBinlogEventsInterval time.Duration
	MaxExpectedDowntime        time.Duration

	StateTracker *StateTracker
	ErrorHandler ErrorHandler
	// contains filtered or unexported fields

func (*InlineVerifier) CheckFingerprintInline

func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]uint64, error)

func (*InlineVerifier) Message

func (v *InlineVerifier) Message() string

func (*InlineVerifier) PeriodicallyVerifyBinlogEvents

func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context)

func (*InlineVerifier) Result

func (*InlineVerifier) StartInBackground

func (v *InlineVerifier) StartInBackground() error

This is called from the control server, which is triggered by pushing Run Verification during cutover. This step is necessary to ensure the binlogs are verified in Ghostferry.

func (*InlineVerifier) VerifyBeforeCutover

func (v *InlineVerifier) VerifyBeforeCutover() error

func (*InlineVerifier) VerifyDuringCutover

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error)

func (*InlineVerifier) Wait

func (v *InlineVerifier) Wait()

type InlineVerifierConfig

type InlineVerifierConfig struct {
	// The maximum expected downtime during cutover, in the format of
	// time.ParseDuration. If nothing is specified, the InlineVerifier will not
	// try to estimate the downtime and will always allow cutover.
	MaxExpectedDowntime string

	// The interval at which the periodic binlog reverification occurs, in the
	// format of time.ParseDuration. Default: 1s.
	VerifyBinlogEventsInterval string
	// contains filtered or unexported fields

func (*InlineVerifierConfig) Validate

func (c *InlineVerifierConfig) Validate() error

type IterativeVerifier

type IterativeVerifier struct {
	CompressionVerifier *CompressionVerifier
	CursorConfig        *CursorConfig
	BinlogStreamer      *BinlogStreamer
	TableSchemaCache    TableSchemaCache
	SourceDB            *sql.DB
	TargetDB            *sql.DB

	Tables              []*TableSchema
	IgnoredTables       []string
	IgnoredColumns      map[string]map[string]struct{}
	DatabaseRewrites    map[string]string
	TableRewrites       map[string]string
	Concurrency         int
	MaxExpectedDowntime time.Duration
	// contains filtered or unexported fields

func (*IterativeVerifier) GetHashes

func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error)

func (*IterativeVerifier) Initialize

func (v *IterativeVerifier) Initialize() error

func (*IterativeVerifier) Message

func (v *IterativeVerifier) Message() string

func (*IterativeVerifier) Result

func (*IterativeVerifier) SanityCheckParameters

func (v *IterativeVerifier) SanityCheckParameters() error

func (*IterativeVerifier) StartInBackground

func (v *IterativeVerifier) StartInBackground() error

func (*IterativeVerifier) VerifyBeforeCutover

func (v *IterativeVerifier) VerifyBeforeCutover() error

func (*IterativeVerifier) VerifyDuringCutover

func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)

func (*IterativeVerifier) VerifyOnce

func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)

func (*IterativeVerifier) Wait

func (v *IterativeVerifier) Wait()

type IterativeVerifierConfig

type IterativeVerifierConfig struct {
	// List of tables that should be ignored by the IterativeVerifier.
	IgnoredTables []string

	// List of columns that should be ignored by the IterativeVerifier.
	// This is in the format of table_name -> [list of column names]
	IgnoredColumns map[string][]string

	// The number of concurrent verifiers. Note that a single table can only be
	// assigned to one goroutine and currently multiple goroutines per table
	// is not supported.
	Concurrency int

	// The maximum expected downtime during cutover, in the format of
	// time.ParseDuration.
	MaxExpectedDowntime string

	// Map of the table and column identifying the compression type
	// (if any) of the column. This is used during verification to ensure
	// the data was successfully copied as some compression algorithms can
	// output different compressed data with the same input data.
	// The data structure is a map of table names to a map of column names
	// to the compression algorithm.
	// ex: {books: {contents: snappy}}
	// Currently supported compression algorithms are:
	//	1. Snappy ( as "SNAPPY"
	// Optional: defaults to empty map/no compression
	// Note that the IterativeVerifier is in the process of being deprecated.
	// If this is specified, ColumnCompressionConfig should also be filled out in
	// the main Config.
	TableColumnCompression TableColumnCompressionConfig

func (*IterativeVerifierConfig) Validate

func (c *IterativeVerifierConfig) Validate() error

type LagThrottler

type LagThrottler struct {

	DB *sql.DB
	// contains filtered or unexported fields

func NewLagThrottler

func NewLagThrottler(config *LagThrottlerConfig) (*LagThrottler, error)

func (*LagThrottler) Run

func (t *LagThrottler) Run(ctx context.Context) error

func (*LagThrottler) Throttled

func (t *LagThrottler) Throttled() bool

type LagThrottlerConfig

type LagThrottlerConfig struct {
	Connection     *DatabaseConfig
	MaxLag         int
	Query          string
	UpdateInterval string

type MaxPaginationKeySorter

type MaxPaginationKeySorter struct{}

MaxPaginationKeySorter arranges table based on the MaxPaginationKey in DESC order

func (*MaxPaginationKeySorter) Sort

func (s *MaxPaginationKeySorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error)

type MaxTableSizeSorter

type MaxTableSizeSorter struct {
	DataIterator *DataIterator

MaxTableSizeSorter uses `information_schema.tables` to estimate the size of the DB and sorts tables in DESC order

func (*MaxTableSizeSorter) Sort

func (s *MaxTableSizeSorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error)

type MetricBase

type MetricBase struct {
	Key        string
	Tags       []MetricTag
	SampleRate float64

type MetricTag

type MetricTag struct {
	Name  string
	Value string

type Metrics

type Metrics struct {
	Prefix      string
	DefaultTags []MetricTag
	Sink        chan interface{}
	// contains filtered or unexported fields

func SetGlobalMetrics

func SetGlobalMetrics(prefix string, sink chan interface{}) *Metrics

func (*Metrics) AddConsumer

func (m *Metrics) AddConsumer()

func (*Metrics) Count

func (m *Metrics) Count(key string, value int64, tags []MetricTag, sampleRate float64)

func (*Metrics) DoneConsumer

func (m *Metrics) DoneConsumer()

func (*Metrics) Gauge

func (m *Metrics) Gauge(key string, value float64, tags []MetricTag, sampleRate float64)

func (*Metrics) Measure

func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f func())

func (*Metrics) StopAndFlush

func (m *Metrics) StopAndFlush()

func (*Metrics) Timer

func (m *Metrics) Timer(key string, duration time.Duration, tags []MetricTag, sampleRate float64)

type PaginationKeyPositionLog

type PaginationKeyPositionLog struct {
	Position uint64
	At       time.Time

For tracking the speed of the copy

type PanicErrorHandler

type PanicErrorHandler struct {
	Ferry                    *Ferry
	ErrorCallback            HTTPCallback
	DumpStateToStdoutOnError bool
	// contains filtered or unexported fields

func (*PanicErrorHandler) Fatal

func (this *PanicErrorHandler) Fatal(from string, err error)

func (*PanicErrorHandler) ReportError

func (this *PanicErrorHandler) ReportError(from string, err error)

type PauserThrottler

type PauserThrottler struct {
	// contains filtered or unexported fields

func (*PauserThrottler) Run

func (t *PauserThrottler) Run(ctx context.Context) error

func (*PauserThrottler) SetPaused

func (t *PauserThrottler) SetPaused(paused bool)

func (*PauserThrottler) Throttled

func (t *PauserThrottler) Throttled() bool

type Progress

type Progress struct {
	// Possible values are defined in ferry.go
	// Shows what the ferry is currently doing in one word.
	CurrentState string

	// The Payload field of the ProgressCallback config will be copied to here
	// verbatim.
	// Example usecase: you can be sending all the status to some aggregation
	// server and you want some sort of custom identification with this field.
	CustomPayload string

	Tables                  map[string]TableProgress
	LastSuccessfulBinlogPos mysql.Position
	BinlogStreamerLag       float64 // This is the amount of seconds the binlog streamer is lagging by (seconds)
	BinlogWriterLag         float64 // This is the amount of seconds the binlog writer is lagging by (seconds)
	Throttled               bool

	// if the TargetVerifier is enabled, we emit this lag, otherwise this number will be 0
	TargetBinlogStreamerLag float64

	// The number of data iterators currently active.
	ActiveDataIterators int

	// The behaviour of Ghostferry varies with respect to the VerifierType.
	// For example: a long cutover is OK if
	VerifierType string

	// The message that the verifier may emit for additional information
	VerifierMessage string

	// These are some variables that are only filled when CurrentState == done.
	FinalBinlogPos mysql.Position

	// A best estimate on the speed at which the copying is taking place. If
	// there are large gaps in the PaginationKey space, this probably will be inaccurate.
	PaginationKeysPerSecond uint64
	ETA                     float64 // seconds
	TimeTaken               float64 // seconds

type ReplicatedMasterPositionFetcher

type ReplicatedMasterPositionFetcher interface {
	Current(*sql.DB) (mysql.Position, error)

type ReplicatedMasterPositionViaCustomQuery

type ReplicatedMasterPositionViaCustomQuery struct {
	// The custom query executing should return a single row with two values:
	// the string file and the integer position. For pt-heartbeat, this query
	// would be:
	// "SELECT file, position FROM meta.ptheartbeat WHERE server_id = %d" % serverId
	// where serverId is the master server id, and meta.ptheartbeat is the table
	// where pt-heartbeat writes to.
	// For pt-heartbeat in particular, you should not use the
	// relay_master_log_file and exec_master_log_pos of the DB being replicated
	// as these values are not the master binlog positions.
	Query string

Selects the master position that we have replicated until from a heartbeat table of sort.

func (ReplicatedMasterPositionViaCustomQuery) Current

type ReverifyBatch

type ReverifyBatch struct {
	PaginationKeys []uint64
	Table          TableIdentifier

type ReverifyEntry

type ReverifyEntry struct {
	PaginationKey uint64
	Table         *TableSchema

type ReverifyStore

type ReverifyStore struct {
	MapStore map[TableIdentifier]map[uint64]struct{}

	BatchStore         []ReverifyBatch
	RowCount           uint64
	EmitLogPerRowCount uint64
	// contains filtered or unexported fields

func NewReverifyStore

func NewReverifyStore() *ReverifyStore

func (*ReverifyStore) Add

func (r *ReverifyStore) Add(entry ReverifyEntry)

func (*ReverifyStore) FlushAndBatchByTable

func (r *ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch

type RowBatch

type RowBatch struct {
	// contains filtered or unexported fields

func NewRowBatch

func NewRowBatch(table *TableSchema, values []RowData, paginationKeyIndex int) *RowBatch

func (*RowBatch) AsSQLQuery

func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface{}, error)

func (*RowBatch) EstimateByteSize

func (e *RowBatch) EstimateByteSize() uint64

func (*RowBatch) Fingerprints

func (e *RowBatch) Fingerprints() map[uint64][]byte

func (*RowBatch) PaginationKeyIndex

func (e *RowBatch) PaginationKeyIndex() int

func (*RowBatch) Size

func (e *RowBatch) Size() int

func (*RowBatch) TableSchema

func (e *RowBatch) TableSchema() *TableSchema

func (*RowBatch) Values

func (e *RowBatch) Values() []RowData

func (*RowBatch) ValuesContainPaginationKey

func (e *RowBatch) ValuesContainPaginationKey() bool

type RowData

type RowData []interface{}

func ScanGenericRow

func ScanGenericRow(rows *sqlorig.Rows, columnCount int) (RowData, error)

func (RowData) GetUint64

func (r RowData) GetUint64(colIdx int) (uint64, error)

For historical reasons, this function ended up being used at two different places: the DataIterator and the DMLEvent (which indirectly is used by the BinlogStreamer, InlineVerifier, etc).

The original code here was introduced in 152caec0ff5195d4698672df6dc0f72fb77b02fc, where it is used solely in context of the DataIterator. In this context, the value being parsed here is given to us by the go-sql-driver/mysql driver. This value could either by int64 or it could be a byte slice decimal string with the uint64 value in it, which is why we have this awkward byte slice to integer trick. This also means the original code is not designed to handle uint64, as go-sql-driver/mysql never returns uint64. This could possibly be an upstream problem that have since been fixed, but this was not investigated. (A possibly related PR:

At some point, this code was refactored into this function, such that the BinlogStreamer also uses the same code to decode integers. The binlog data is given to us by go-mysql-org/go-mysql. The go-mysql-org/go-mysql library should not be giving us awkward byte slices. Instead, it should properly gives us uint64. This code thus panics when it encounters such case. See

In summary:

  • This code receives values from both go-sql-driver/mysql and go-mysql-org/go-mysql.
  • go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte slice for unsigned integer.
  • go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for unsigned integer.
  • We currently make this function deal with both cases. In the future we can investigate alternative solutions.

type RowStats

type RowStats struct {
	NumRows  uint64
	NumBytes uint64

type SerializableState

type SerializableState struct {
	GhostferryVersion         string
	LastKnownTableSchemaCache TableSchemaCache

	LastSuccessfulPaginationKeys              map[string]uint64
	CompletedTables                           map[string]bool
	LastWrittenBinlogPosition                 mysql.Position
	BinlogVerifyStore                         BinlogVerifySerializedStore
	LastStoredBinlogPositionForInlineVerifier mysql.Position
	LastStoredBinlogPositionForTargetVerifier mysql.Position

func (*SerializableState) MinSourceBinlogPosition

func (s *SerializableState) MinSourceBinlogPosition() mysql.Position

type SqlDBWithFakeRollback

type SqlDBWithFakeRollback struct {

func (*SqlDBWithFakeRollback) Rollback

func (d *SqlDBWithFakeRollback) Rollback() error

type SqlPreparer

type SqlPreparer interface {
	Prepare(string) (*sqlorig.Stmt, error)

both `sql.Tx` and `sql.DB` allow a SQL query to be `Prepare`d

type SqlPreparerAndRollbacker

type SqlPreparerAndRollbacker interface {
	Rollback() error

sql.DB does not implement Rollback, but can use SqlDBWithFakeRollback to perform a noop.

type StateTracker

type StateTracker struct {
	BinlogRWMutex *sync.RWMutex
	CopyRWMutex   *sync.RWMutex
	// contains filtered or unexported fields

func NewStateTracker

func NewStateTracker(speedLogCount int) *StateTracker

func NewStateTrackerFromSerializedState

func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *SerializableState) *StateTracker

serializedState is a state the tracker should start from, as opposed to starting from the beginning.

func (*StateTracker) EstimatedPaginationKeysPerSecond

func (s *StateTracker) EstimatedPaginationKeysPerSecond() float64

This is reasonably accurate if the rows copied are distributed uniformly between paginationKey = 0 -> max(paginationKey). It would not be accurate if the distribution is concentrated in a particular region.

func (*StateTracker) IsTableComplete

func (s *StateTracker) IsTableComplete(table string) bool

func (*StateTracker) LastSuccessfulPaginationKey

func (s *StateTracker) LastSuccessfulPaginationKey(table string) uint64

func (*StateTracker) MarkTableAsCompleted

func (s *StateTracker) MarkTableAsCompleted(table string)

func (*StateTracker) RowStatsWrittenPerTable

func (s *StateTracker) RowStatsWrittenPerTable() map[string]RowStats

func (*StateTracker) Serialize

func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore) *SerializableState

func (*StateTracker) UpdateLastResumableBinlogPositionForTargetVerifier

func (s *StateTracker) UpdateLastResumableBinlogPositionForTargetVerifier(pos mysql.Position)

func (*StateTracker) UpdateLastResumableSourceBinlogPosition

func (s *StateTracker) UpdateLastResumableSourceBinlogPosition(pos mysql.Position)

func (*StateTracker) UpdateLastResumableSourceBinlogPositionForInlineVerifier

func (s *StateTracker) UpdateLastResumableSourceBinlogPositionForInlineVerifier(pos mysql.Position)

func (*StateTracker) UpdateLastSuccessfulPaginationKey

func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginationKey uint64, rowStats RowStats)

type StmtCache

type StmtCache struct {
	// contains filtered or unexported fields

func NewStmtCache

func NewStmtCache() *StmtCache

func (*StmtCache) StmtFor

func (c *StmtCache) StmtFor(p SqlPreparer, query string) (*sqlorig.Stmt, error)

type TLSConfig

type TLSConfig struct {
	CertPath   string
	ServerName string
	// contains filtered or unexported fields

func (*TLSConfig) BuildConfig

func (this *TLSConfig) BuildConfig() (*tls.Config, error)

type TableColumnCompressionConfig

type TableColumnCompressionConfig map[string]map[string]string

TableColumnCompressionConfig represents compression configuration for a column in a table as table -> column -> compression-type ex: books -> contents -> snappy

type TableFilter

type TableFilter interface {
	ApplicableTables([]*TableSchema) ([]*TableSchema, error)
	ApplicableDatabases([]string) ([]string, error)

type TableIdentifier

type TableIdentifier struct {
	SchemaName string
	TableName  string

A comparable and lightweight type that stores the schema and table name.

func NewTableIdentifierFromSchemaTable

func NewTableIdentifierFromSchemaTable(table *TableSchema) TableIdentifier

type TableMaxPaginationKey

type TableMaxPaginationKey struct {
	Table            *TableSchema
	MaxPaginationKey uint64

type TableProgress

type TableProgress struct {
	LastSuccessfulPaginationKey uint64
	TargetPaginationKey         uint64
	CurrentAction               string // Possible values are defined via the constants TableAction*
	RowsWritten                 uint64
	BatchSize                   uint64
	BytesWritten                uint64

type TableSchema

type TableSchema struct {

	CompressedColumnsForVerification map[string]string   // Map of column name => compression type
	IgnoredColumnsForVerification    map[string]struct{} // Set of column name
	ForcedIndexForVerification       string              // Forced index name
	PaginationKeyColumn              *schema.TableColumn
	PaginationKeyIndex               int
	// contains filtered or unexported fields

This is a wrapper on schema.Table with some custom information we need.

func MaxPaginationKeys

func MaxPaginationKeys(db *sql.DB, tables []*TableSchema, logger *logrus.Entry) (map[*TableSchema]uint64, []*TableSchema, error)

func (*TableSchema) FingerprintQuery

func (t *TableSchema) FingerprintQuery(schemaName, tableName string, numRows int) string

This query returns the MD5 hash for a row on this table. This query is valid for both the source and the target shard.

Any compressed columns specified via CompressedColumnsForVerification are excluded in this checksum and the raw data is returned directly.

Any columns specified in IgnoredColumnsForVerification are excluded from the checksum and the raw data will not be returned.

Note that the MD5 hash should consists of at least 1 column: the paginationKey column. This is to say that there should never be a case where the MD5 hash is derived from an empty string.

func (*TableSchema) GetPaginationColumn

func (t *TableSchema) GetPaginationColumn() *schema.TableColumn

GetPaginationColumn retrieves PaginationKeyColumn

func (*TableSchema) GetPaginationKeyIndex

func (t *TableSchema) GetPaginationKeyIndex() int

func (*TableSchema) RowMd5Query

func (t *TableSchema) RowMd5Query() string

type TableSchemaCache

type TableSchemaCache map[string]*TableSchema

func LoadTables

func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig ColumnCompressionConfig, columnIgnoreConfig ColumnIgnoreConfig, forceIndexConfig ForceIndexConfig, cascadingPaginationColumnConfig *CascadingPaginationColumnConfig) (TableSchemaCache, error)

func (TableSchemaCache) AllTableNames

func (c TableSchemaCache) AllTableNames() (tableNames []string)

func (TableSchemaCache) AsSlice

func (c TableSchemaCache) AsSlice() (tables []*TableSchema)

func (TableSchemaCache) Get

func (c TableSchemaCache) Get(database, table string) *TableSchema

func (TableSchemaCache) GetTableListWithPriority

func (c TableSchemaCache) GetTableListWithPriority(priorityList []string) (prioritizedTableNames []string)

Helper to sort a given map of tables with a second list giving a priority. If an element is present in the input and the priority lists, the item will appear first (in the order of the priority list), all other items appear in the order given in the input

type TargetVerifier

type TargetVerifier struct {
	DB             *sql.DB
	BinlogStreamer *BinlogStreamer
	StateTracker   *StateTracker
	// contains filtered or unexported fields

func NewTargetVerifier

func NewTargetVerifier(targetDB *sql.DB, stateTracker *StateTracker, binlogStreamer *BinlogStreamer) (*TargetVerifier, error)

func (*TargetVerifier) BinlogEventListener

func (t *TargetVerifier) BinlogEventListener(evs []DMLEvent) error

Verify that all DMLs against the target are coming from Ghostferry for the duration of the move. Once cutover has completed, we no longer need to perform this verification as all writes from the application are directed to the target

type Throttler

type Throttler interface {
	Throttled() bool
	Disabled() bool
	Run(context.Context) error

type ThrottlerBase

type ThrottlerBase struct {
	// contains filtered or unexported fields

func (*ThrottlerBase) Disabled

func (t *ThrottlerBase) Disabled() bool

func (*ThrottlerBase) SetDisabled

func (t *ThrottlerBase) SetDisabled(disabled bool)

type TimerMetric

type TimerMetric struct {
	Value time.Duration

type UnsupportedCompressionError

type UnsupportedCompressionError struct {
	// contains filtered or unexported fields

UnsupportedCompressionError is used to identify errors resulting from attempting to decompress unsupported algorithms

func (UnsupportedCompressionError) Error

type UpdatableConfig

type UpdatableConfig struct {
	// The batch size used to iterate the data during data copy. This batch size
	// is always used: if this is specified to be 100, 100 rows will be copied
	// per iteration.
	// With the current implementation of Ghostferry, we need to lock the rows
	// we select. This means, the larger this number is, the longer we need to
	// hold this lock. On the flip side, the smaller this number is, the slower
	// the copy will likely be.
	// Optional: defaults to 200
	DataIterationBatchSize uint64

UpdatableConfig defines config fields that support dynamic updates

func (*UpdatableConfig) Validate

func (c *UpdatableConfig) Validate() error

type VerificationResult

type VerificationResult struct {
	DataCorrect     bool
	Message         string
	IncorrectTables []string

func NewCorrectVerificationResult

func NewCorrectVerificationResult() VerificationResult

func (VerificationResult) Error

func (e VerificationResult) Error() string

type VerificationResultAndStatus

type VerificationResultAndStatus struct {

	StartTime time.Time
	DoneTime  time.Time

func (VerificationResultAndStatus) IsDone

func (r VerificationResultAndStatus) IsDone() bool

func (VerificationResultAndStatus) IsStarted

func (r VerificationResultAndStatus) IsStarted() bool

type Verifier

type Verifier interface {
	// If the Verifier needs to do anything immediately after the DataIterator
	// finishes copying data and before cutover occurs, implement this function.
	VerifyBeforeCutover() error

	// This is called during cutover and should give the result of the
	// verification.
	VerifyDuringCutover() (VerificationResult, error)

	// Start the verifier in the background during the cutover phase.
	// Traditionally, this is called from within the ControlServer.
	// This method maybe called multiple times and it's up to the verifier
	// to decide if it is possible to re-run the verification.
	StartInBackground() error

	// Wait for the verifier until it finishes verification after it was
	// started with the StartInBackground.
	// A verification is "done" when it verified the dbs (either
	// correct or incorrect) OR when it experiences an error.

	// Returns arbitrary message that is consumed by the control server.
	// Can just be "".
	Message() string

	// Returns the result and the status of the verification.
	// To check the status, call IsStarted() and IsDone() on
	// VerificationResultAndStatus.
	// If the verification has been completed successfully (without errors) and
	// the data checks out to be "correct", the result will be
	// VerificationResult{true, ""}, with error = nil.
	// Otherwise, the result will be VerificationResult{false, "message"}, with
	// error = nil.
	// If the verification is "done" but experienced an error during the check,
	// the result will be VerificationResult{}, with err = yourErr.
	Result() (VerificationResultAndStatus, error)

The sole purpose of this interface is to make it easier for one to implement their own strategy for verification and hook it up with the ControlServer. If there is no such need, one does not need to implement this interface.

type WaitUntilReplicaIsCaughtUpToMaster

type WaitUntilReplicaIsCaughtUpToMaster struct {
	MasterDB                        *sql.DB
	ReplicatedMasterPositionFetcher ReplicatedMasterPositionFetcher
	Timeout                         time.Duration

	ReplicaDB *sql.DB
	// contains filtered or unexported fields

Only set the MasterDB and ReplicatedMasterPosition options in your code as the others will be overwritten by the ferry.

func (*WaitUntilReplicaIsCaughtUpToMaster) IsCaughtUp

func (w *WaitUntilReplicaIsCaughtUpToMaster) IsCaughtUp(targetMasterPos mysql.Position, retries int) (bool, error)

func (*WaitUntilReplicaIsCaughtUpToMaster) Wait

type WorkerPool

type WorkerPool struct {
	Concurrency int
	Process     func(int) (interface{}, error)

func (*WorkerPool) Run

func (p *WorkerPool) Run(n int) ([]interface{}, error)

Returns a list of results of the size same as the concurrency number. Returns the first error that occurs during the run. Also as soon as a single worker errors, all workers terminates.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL