drainer

package
v0.0.0-...-6fba4f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 72 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenCheckPointCfg

func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error)

GenCheckPointCfg returns an CheckPoint config instance

Types

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

Collector collects binlog from all pump, and send binlog to syncer.

func NewCollector

func NewCollector(cfg *Config, clusterID uint64, s *Syncer, cpt checkpoint.CheckPoint) (*Collector, error)

NewCollector returns an instance of Collector

func (*Collector) HTTPStatus

func (c *Collector) HTTPStatus() *HTTPStatus

HTTPStatus returns a snapshot of current http status.

func (*Collector) Notify

func (c *Collector) Notify() error

Notify notifies to detcet pumps

func (*Collector) Start

func (c *Collector) Start(ctx context.Context)

Start run a loop of collecting binlog from pumps online

func (*Collector) Status

func (c *Collector) Status(w http.ResponseWriter, r *http.Request)

Status exposes collector's status to HTTP handler.

type Config

type Config struct {
	*flag.FlagSet   `toml:"-" json:"-"`
	LogLevel        string          `toml:"log-level" json:"log-level"`
	NodeID          string          `toml:"node-id" json:"node-id"`
	ListenAddr      string          `toml:"addr" json:"addr"`
	AdvertiseAddr   string          `toml:"advertise-addr" json:"advertise-addr"`
	DataDir         string          `toml:"data-dir" json:"data-dir"`
	DetectInterval  int             `toml:"detect-interval" json:"detect-interval"`
	EtcdURLs        string          `toml:"pd-urls" json:"pd-urls"`
	LogFile         string          `toml:"log-file" json:"log-file"`
	InitialCommitTS int64           `toml:"initial-commit-ts" json:"initial-commit-ts"`
	SyncerCfg       *SyncerConfig   `toml:"syncer" json:"sycner"`
	Security        security.Config `toml:"security" json:"security"`
	SyncedCheckTime int             `toml:"synced-check-time" json:"synced-check-time"`
	Compressor      string          `toml:"compressor" json:"compressor"`
	EtcdTimeout     time.Duration
	MetricsAddr     string
	MetricsInterval int
	// contains filtered or unexported fields
}

Config holds the configuration of drainer

func NewConfig

func NewConfig() *Config

NewConfig return an instance of configuration

func (*Config) Parse

func (cfg *Config) Parse(args []string) error

Parse parses all config from command-line flags, environment vars or the configuration file

func (*Config) String

func (cfg *Config) String() string

type HTTPStatus

type HTTPStatus struct {
	PumpPos map[string]int64 `json:"PumpPos"`
	Synced  bool             `json:"Synced"`
	LastTS  int64            `json:"LastTS"`
	TsMap   string           `json:"TsMap"`
}

HTTPStatus exposes current status of the collector via HTTP

func (*HTTPStatus) Status

func (s *HTTPStatus) Status(w http.ResponseWriter, r *http.Request)

Status implements http.ServeHTTP interface

type HeapStrategy

type HeapStrategy struct {
	// contains filtered or unexported fields
}

HeapStrategy is a strategy to get min item using heap

func NewHeapStrategy

func NewHeapStrategy() *HeapStrategy

NewHeapStrategy returns a new HeapStrategy

func (*HeapStrategy) Exist

func (h *HeapStrategy) Exist(sourceID string) bool

Exist implements MergeStrategy's Exist function

func (*HeapStrategy) Pop

func (h *HeapStrategy) Pop() MergeItem

Pop implements MergeStrategy's Pop function

func (*HeapStrategy) Push

func (h *HeapStrategy) Push(item MergeItem)

Push implements MergeStrategy's Push function

type MergeItem

type MergeItem interface {
	GetCommitTs() int64

	GetSourceID() string
}

MergeItem is the item in Merger

type MergeItems

type MergeItems []MergeItem

MergeItems is a heap of MergeItems.

func (MergeItems) Len

func (m MergeItems) Len() int

func (MergeItems) Less

func (m MergeItems) Less(i, j int) bool

func (*MergeItems) Pop

func (m *MergeItems) Pop() interface{}

Pop implements heap.Interface's Pop function

func (*MergeItems) Push

func (m *MergeItems) Push(x interface{})

Push implements heap.Interface's Push function

func (MergeItems) Swap

func (m MergeItems) Swap(i, j int)

type MergeSource

type MergeSource struct {
	ID     string
	Source chan MergeItem
}

MergeSource contains a source info about binlog

type MergeStrategy

type MergeStrategy interface {
	Push(MergeItem)
	Pop() MergeItem
	Exist(string) bool
}

MergeStrategy is a strategy interface for merge item

type Merger

type Merger struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Merger do merge sort of binlog

func NewMerger

func NewMerger(ts int64, strategy string, sources ...MergeSource) *Merger

NewMerger creates a instance of Merger

func (*Merger) AddSource

func (m *Merger) AddSource(source MergeSource)

AddSource add a source to Merger

func (*Merger) Close

func (m *Merger) Close()

Close close the output chan when all the source id drained

func (*Merger) Continue

func (m *Merger) Continue()

Continue continue merge

func (*Merger) GetLatestTS

func (m *Merger) GetLatestTS() int64

GetLatestTS returns the last binlog's ts send to syncer

func (*Merger) Output

func (m *Merger) Output() chan MergeItem

Output get the output chan of binlog

func (*Merger) RemoveSource

func (m *Merger) RemoveSource(sourceID string)

RemoveSource removes a source from Merger

func (*Merger) Stop

func (m *Merger) Stop()

Stop stops merge

type NormalStrategy

type NormalStrategy struct {
	// contains filtered or unexported fields
}

NormalStrategy is a strategy to get min item using normal way

func NewNormalStrategy

func NewNormalStrategy() *NormalStrategy

NewNormalStrategy returns a new NormalStrategy

func (*NormalStrategy) Exist

func (n *NormalStrategy) Exist(sourceID string) bool

Exist implements MergeStrategy's Exist function

func (*NormalStrategy) Pop

func (n *NormalStrategy) Pop() MergeItem

Pop implements MergeStrategy's Pop function

func (*NormalStrategy) Push

func (n *NormalStrategy) Push(item MergeItem)

Push implements MergeStrategy's Push function

type Pump

type Pump struct {
	// contains filtered or unexported fields
}

Pump holds the connection to a pump node, and keeps the savepoint of binlog last read

func NewPump

func NewPump(nodeID, addr string, tlsConfig *tls.Config, clusterID uint64, startTs int64, errCh chan error) *Pump

NewPump returns an instance of Pump

func (*Pump) Close

func (p *Pump) Close()

Close sets isClose to 1, and pull binlog will be exit.

func (*Pump) Continue

func (p *Pump) Continue(pctx context.Context)

Continue sets isPaused to 0, and continue pull binlog from pump. This function is reentrant.

func (*Pump) Pause

func (p *Pump) Pause()

Pause sets isPaused to 1, and stop pull binlog from pump. This function is reentrant.

func (*Pump) PullBinlog

func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem

PullBinlog returns the chan to get item from pump

type RelayConfig

type RelayConfig struct {
	LogDir      string `toml:"log-dir" json:"log-dir"`
	MaxFileSize int64  `toml:"max-file-size" json:"max-file-size"`
}

RelayConfig is the Relay log's configuration.

func (RelayConfig) IsEnabled

func (rc RelayConfig) IsEnabled() bool

IsEnabled return true if we need to handle relay log.

type Schema

type Schema struct {
	// contains filtered or unexported fields
}

Schema stores the source TiDB all schema infomations schema infomations could be changed by drainer init and ddls appear

func NewSchema

func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error)

NewSchema returns the Schema object

func (*Schema) CanAppendDefaultValue

func (s *Schema) CanAppendDefaultValue(id int64, schemaVersion int64) bool

CanAppendDefaultValue means we can safely add the default value to the column if missing the value.

func (*Schema) CreateSchema

func (s *Schema) CreateSchema(db *model.DBInfo) error

CreateSchema adds new DBInfo

func (*Schema) CreateTable

func (s *Schema) CreateTable(schemaVersion int64, schema *model.DBInfo, table *model.TableInfo) error

CreateTable creates new TableInfo

func (*Schema) DropSchema

func (s *Schema) DropSchema(id int64) (string, error)

DropSchema deletes the given DBInfo

func (*Schema) DropTable

func (s *Schema) DropTable(id int64) (string, error)

DropTable deletes the given TableInfo

func (*Schema) InitForCreateMySQLSchema

func (s *Schema) InitForCreateMySQLSchema()

InitForCreateMySQLSchema create the schema info for `mysql`, since it's created by KV after TiDB 6.2.

func (*Schema) IsDroppingColumn

func (s *Schema) IsDroppingColumn(id int64) bool

IsDroppingColumn returns true if the table is in the middle of dropping a column

func (*Schema) IsTruncateTableID

func (s *Schema) IsTruncateTableID(id int64) bool

IsTruncateTableID returns true if the table id have been truncated by truncate table DDL

func (*Schema) ReplaceTable

func (s *Schema) ReplaceTable(schemaVersion int64, table *model.TableInfo) error

ReplaceTable replace the table by new tableInfo

func (*Schema) SchemaAndTableName

func (s *Schema) SchemaAndTableName(id int64) (string, string, bool)

SchemaAndTableName returns the tableName by table id

func (*Schema) SchemaByID

func (s *Schema) SchemaByID(id int64) (val *model.DBInfo, ok bool)

SchemaByID returns the DBInfo by schema id

func (*Schema) SchemaByTableID

func (s *Schema) SchemaByTableID(tableID int64) (*model.DBInfo, bool)

SchemaByTableID returns the schema ID by table ID

func (*Schema) SchemaMetaVersion

func (s *Schema) SchemaMetaVersion() int64

SchemaMetaVersion returns the current schemaversion in drainer

func (*Schema) String

func (s *Schema) String() string

func (*Schema) TableByID

func (s *Schema) TableByID(id int64) (val *model.TableInfo, ok bool)

TableByID returns the TableInfo by table id

func (*Schema) TableBySchemaVersion

func (s *Schema) TableBySchemaVersion(id int64, schemaVersion int64) (table *model.TableInfo, ok bool)

TableBySchemaVersion get the table info according the schemaVersion and table id.

type Server

type Server struct {
	ID string
	// contains filtered or unexported fields
}

Server implements the gRPC interface, and maintains the runtime status

func NewServer

func NewServer(cfg *Config) (*Server, error)

NewServer return a instance of binlog-server

func (*Server) ApplyAction

func (s *Server) ApplyAction(w http.ResponseWriter, r *http.Request)

ApplyAction change the pump's state, now can be pause or close.

func (*Server) Close

func (s *Server) Close()

Close stops all goroutines started by drainer server gracefully

func (*Server) DumpBinlog

func (s *Server) DumpBinlog(req *binlog.DumpBinlogReq, stream binlog.Cistern_DumpBinlogServer) (err error)

DumpBinlog implements the gRPC interface of drainer server

func (*Server) DumpDDLJobs

func (s *Server) DumpDDLJobs(ctx context.Context, req *binlog.DumpDDLJobsReq) (resp *binlog.DumpDDLJobsResp, err error)

DumpDDLJobs implements the gRPC interface of drainer server

func (*Server) GetLatestTS

func (s *Server) GetLatestTS(w http.ResponseWriter, r *http.Request)

GetLatestTS returns the last binlog's commit ts which synced to downstream.

func (*Server) Notify

func (s *Server) Notify(ctx context.Context, in *binlog.NotifyReq) (*binlog.NotifyResp, error)

Notify implements the gRPC interface of drainer server

func (*Server) Start

func (s *Server) Start() error

Start runs CisternServer to serve the listening addr, and starts to collect binlog

type Syncer

type Syncer struct {
	// contains filtered or unexported fields
}

Syncer converts tidb binlog to the specified DB sqls, and sync it to target DB

func NewSyncer

func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (*Syncer, error)

NewSyncer returns a Drainer instance

func (*Syncer) Add

func (s *Syncer) Add(b *binlogItem)

Add adds binlogItem to the syncer's input channel

func (*Syncer) Close

func (s *Syncer) Close() error

Close closes syncer.

func (*Syncer) GetLastSyncTime

func (s *Syncer) GetLastSyncTime() time.Time

GetLastSyncTime returns lastSyncTime

func (*Syncer) GetLatestCommitTS

func (s *Syncer) GetLatestCommitTS() int64

GetLatestCommitTS returns the latest commit ts.

func (*Syncer) Start

func (s *Syncer) Start() error

Start starts to sync.

type SyncerConfig

type SyncerConfig struct {
	StrSQLMode        *string            `toml:"sql-mode" json:"sql-mode"`
	SQLMode           mysql.SQLMode      `toml:"-" json:"-"`
	IgnoreTxnCommitTS []int64            `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"`
	IgnoreSchemas     string             `toml:"ignore-schemas" json:"ignore-schemas"`
	IgnoreTables      []filter.TableName `toml:"ignore-table" json:"ignore-table"`
	TxnBatch          int                `toml:"txn-batch" json:"txn-batch"`
	LoopbackControl   bool               `toml:"loopback-control" json:"loopback-control"`
	SyncDDL           bool               `toml:"sync-ddl" json:"sync-ddl"`
	ChannelID         int64              `toml:"channel-id" json:"channel-id"`
	WorkerCount       int                `toml:"worker-count" json:"worker-count"`
	To                *dsync.DBConfig    `toml:"to" json:"to"`
	DoTables          []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
	DoDBs             []string           `toml:"replicate-do-db" json:"replicate-do-db"`
	DestDBType        string             `toml:"db-type" json:"db-type"`
	Relay             RelayConfig        `toml:"relay" json:"relay"`
	// disable* is keep for backward compatibility.
	// if both setted, the disable one take affect.
	DisableDispatchFlag *bool `toml:"-" json:"disable-dispatch-flag"`
	EnableDispatchFlag  *bool `toml:"-" json:"enable-dispatch-flag"`
	DisableDispatchFile *bool `toml:"disable-dispatch" json:"disable-dispatch"`
	EnableDispatchFile  *bool `toml:"enable-dispatch" json:"enable-dispatch"`
	SafeMode            bool  `toml:"safe-mode" json:"safe-mode"`
	// for backward compatibility.
	// disable* is keep for backward compatibility.
	// if both setted, the disable one take affect.
	DisableCausalityFlag *bool `toml:"-" json:"disable-detect-flag"`
	EnableCausalityFlag  *bool `toml:"-" json:"enable-detect-flag"`
	DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
	EnableCausalityFile  *bool `toml:"enable-detect" json:"enable-detect"`
	LoadSchemaSnapshot   bool  `toml:"load-schema-snapshot" json:"load-schema-snapshot"`

	// v2 filter rules
	CaseSensitive    bool                   `toml:"case-sensitive" json:"case-sensitive"`
	TableMigrateRule []TaskTableMigrateRule `toml:"table-migrate-rule" json:"table-migrate-rule"`

	BinlogFilterRule map[string]TaskBinLogFilterRule `toml:"binlog-filter-rule,omitempty" json:"binlog-filter-rule,omitempty"`
}

SyncerConfig is the Syncer's configuration.

func (*SyncerConfig) EnableCausality

func (c *SyncerConfig) EnableCausality() bool

EnableCausality return true if enable causality.

func (*SyncerConfig) EnableDispatch

func (c *SyncerConfig) EnableDispatch() bool

EnableDispatch return true if enable dispatch.

type TableName

type TableName = filter.TableName

TableName stores the table and schema name

type TaskBinLogFilterRule

type TaskBinLogFilterRule struct {
	// event type
	IgnoreEvent *[]string `toml:"ignore-event,omitempty" json:"ignore-event,omitempty"`

	// sql pattern to filter
	IgnoreSQL *[]string `toml:"ignore-sql,omitempty" json:"ignore-sql,omitempty"`
}

TaskBinLogFilterRule defines filtering rules at binlog level

type TaskTableMigrateRule

type TaskTableMigrateRule struct {
	// filter rule name
	BinlogFilterRule *[]string `toml:"binlog-filter-rule,omitempty" json:"binlog-filter-rule,omitempty"`

	// source-related configuration
	Source struct {
		// schema name, wildcard support
		Schema string `toml:"schema" json:"schema"`

		// table name, wildcard support
		Table string `toml:"table" json:"table"`
	} `toml:"source" json:"source"`

	// downstream-related configuration
	Target *struct {
		// schema name, does not support wildcards
		Schema string `toml:"schema" json:"schema"`

		// table name, does not support wildcards
		Table string `toml:"table" json:"table"`
	} `toml:"target,omitempty" json:"target,omitempty"`
}

TaskTableMigrateRule defines upstream table to downstream migrate rules

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL