Documentation ¶
Index ¶
- func AddSQLCond(cond string, args []interface{}, lop string, name string, op string, ...) (string, []interface{})
- func CheckMySQLVersion(expected string) bool
- func ClearNeedSnapshot(id int64, ts time.Time) error
- func Close() error
- func ConnectInfoGet(l *db.Loc, connType db.ConnectionType) (*db.Addr, error)
- func DeleteClusterInfo(name string) error
- func DeleteSchema(name string, typ string) error
- func DeregisterTable(cluster, svc, sdb, table, input, output string, version int) bool
- func DeregisterTableFromState(dbl *db.Loc, table, input, output string, version int, regid int64) bool
- func EmitRegisteredTablesCount()
- func GetClusterInfo(cond string, args ...interface{}) ([]db.Addr, error)
- func GetClusterTask(input string, workerID string, lockTimeout time.Duration) (s string, c string, d string, err error)
- func GetCount(includeDeleted bool) (int, error)
- func GetDB() *sql.DB
- func GetDBAddr() *db.Addr
- func GetGTID(cluster string) (gtid string, seqno int64, err error)
- func GetNeedSnapshotFlag(id int64) (ns bool, err error)
- func GetNoDB() *sql.DB
- func GetOutputSchema(name string, typ string) string
- func GetSchema(svc, sdb, table, input string, output string, version int) (*types.TableSchema, error)
- func GetServerTimestamp() (int64, error)
- func InitManager(ctx context.Context, cfg *config.AppConfig) error
- func InsertClusterInfo(ci *db.Addr) error
- func InsertSchema(name string, typ string, schema string) error
- func PullCurrentSchema(dbl *db.Loc, table, input string) (*types.TableSchema, string)
- func RefreshClusterLock(cluster string, workerID string) bool
- func RefreshTableLock(stateID int64, workerID string) bool
- func RegisterTable(cluster, svc, sdb, table, input, output string, version int, ...) bool
- func RegisterTableInState(dbl *db.Loc, table, input, output string, version int, format string, ...) bool
- func ReplaceSchema(svc, cluster string, s *types.TableSchema, ...) bool
- func Reset() bool
- func SanitizeRegParams(cluster, svc, sdb, table, input string) (string, string, string, string)
- func SaveBinlogState(cluster, gtid string, seqNo uint64) error
- func SchemaGet(namespace string, schemaName string, typ string) (*types.AvroSchema, error)
- func SetGTID(cluster, gtid string) error
- func SyncDeregisteredTables() bool
- func SyncRegisteredTables() bool
- func TableMaxVersion(svc, cluster, sdb, table, input, output string) (int, error)
- func TableRegistered(svc, cluster, sdb, table, input, output string, version int, ...) (bool, error)
- func TableRegisteredInState(id int64) (bool, error)
- func UpdateSchema(name string, typ string, schema string) error
- func ValidateRegistration(svc, sdb, table, input, output string, version int) bool
- type Mgr
- type Row
- type SchemaRow
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddSQLCond ¶
func AddSQLCond(cond string, args []interface{}, lop string, name string, op string, val string) (string, []interface{})
AddSQLCond is a helper function that helps with creating conditional queries over state tables
func CheckMySQLVersion ¶
CheckMySQLVersion check is state db cluster is of given version
func ClearNeedSnapshot ¶
ClearNeedSnapshot clears flag indicating that streamer has taken a snapshot of table
func ConnectInfoGet ¶
ConnectInfoGet resolves database address using state clusters table If no Slaves we need to fall back to master
func DeleteClusterInfo ¶
DeleteClusterInfo delete cluster connection info from state database
func DeleteSchema ¶
DeleteSchema deletes output schema from the state
func DeregisterTable ¶
DeregisterTable removes given table from the state
func DeregisterTableFromState ¶
func DeregisterTableFromState(dbl *db.Loc, table, input, output string, version int, regid int64) bool
DeregisterTableFromState removes given table from the state
func EmitRegisteredTablesCount ¶
func EmitRegisteredTablesCount()
EmitRegisteredTablesCount emits a stat on the number of tables currently registered in state
func GetClusterInfo ¶
GetClusterInfo lists cluster connection info from state database
func GetClusterTask ¶
func GetClusterTask(input string, workerID string, lockTimeout time.Duration) (s string, c string, d string, err error)
GetClusterTask pick first available cluster task and locks it
func GetNeedSnapshotFlag ¶
GetNeedSnapshotFlag returns need_snapshot flag saved in the STATE
func GetNoDB ¶
GetNoDB returns active connection to the state database server Without connecting to any specific db
func GetOutputSchema ¶
GetOutputSchema returns output schema from the state
func GetSchema ¶
func GetSchema(svc, sdb, table, input string, output string, version int) (*types.TableSchema, error)
GetSchema returns structured schema saved in the state for give table
func GetServerTimestamp ¶
GetServerTimestamp fetches the current unix timestamp on the MySQL server
func InitManager ¶
InitManager creates a new state manager
func InsertClusterInfo ¶
InsertClusterInfo adds connection information for the cluster "name" to state
func InsertSchema ¶
InsertSchema inserts output schema into state
func PullCurrentSchema ¶
PullCurrentSchema pulls current table schema from the source MySQL cluster
func RefreshClusterLock ¶
RefreshClusterLock updates the lock time of the given cluster and worker Returns false if the task is not locked by given worker anymore
func RefreshTableLock ¶
RefreshTableLock updates the lock time of the given task and worker Returns false if the task is not locked by given worker anymore
func RegisterTable ¶
func RegisterTable(cluster, svc, sdb, table, input, output string, version int, outputFormat string, params string) bool
RegisterTable adds table for registration. The entry here is subsequently processed later on to update the state table
func RegisterTableInState ¶
func RegisterTableInState(dbl *db.Loc, table, input, output string, version int, format string, params string, regid int64) bool
RegisterTableInState adds table to the state
func ReplaceSchema ¶
func ReplaceSchema(svc, cluster string, s *types.TableSchema, rawSchema, oldGTID, newGTID, input, output string, version int, format string, params string) bool
ReplaceSchema replaces both structured and raw schema definitions saved in the state with new versions provided as parameters Called from changelog reader on ALTER table statement
func SanitizeRegParams ¶
SanitizeRegParams sanitizes the registration options
func SaveBinlogState ¶
SaveBinlogState saves current state of the binlog reader to the state DB. Binlog state is current GTID set and current seqNo
func SyncDeregisteredTables ¶
func SyncDeregisteredTables() bool
SyncDeregisteredTables deregisters the tables based on the deregistration requests that were persisted in the registrations table TODO: Deduplicate with SyncRegisteredTables
func SyncRegisteredTables ¶
func SyncRegisteredTables() bool
SyncRegisteredTables registers the tables based on the registration requests that were persisted in the registrations table
func TableMaxVersion ¶
TableMaxVersion returns maximum version for the table
func TableRegistered ¶
func TableRegistered(svc, cluster, sdb, table, input, output string, version int, includeDeleted bool) (bool, error)
TableRegistered checks if given table is listed in registrations table
func TableRegisteredInState ¶
TableRegisteredInState checks if table with given id is still registered in the state ID is id field from row structure
func UpdateSchema ¶
UpdateSchema inserts output schema into state
func ValidateRegistration ¶
ValidateRegistration validates the registration request
Types ¶
type Mgr ¶
type Mgr struct {
// contains filtered or unexported fields
}
Mgr is a state manager that controls all the state handling
type Row ¶
type Row struct { types.TableLoc ID int64 OutputFormat string Gtid string GtidUpdatedAt time.Time SeqNo uint64 SchemaGtid string RawSchema string SnapshottedAt time.Time CreatedAt time.Time UpdatedAt time.Time NeedSnapshot bool Deleted bool ParamsRaw string Params *config.TableParams }
Row represents a row in the state table
func GetTableByID ¶
GetTableByID return state row for the given table id
func GetTableTask ¶
GetTableTask picks first available task which needs attention and which lock is expired The task is locked by the given workerID and lock time is set to current time
func UpdateSnapshottedAt ¶
UpdateSnapshottedAt advances snapshotted_at to be no older then TableParams.Schedule.Interval from now
func (*Row) SnapshotTimeChanged ¶
SnapshotTimeChanged determines if row has SnapshottedAt updated
type SchemaRow ¶
type SchemaRow struct { Name string `json:"name"` Type string `json:"type"` Body string `json:"body"` }
SchemaRow represents state schema row
func ListOutputSchema ¶
ListOutputSchema lists schemas from state, filtered by cond
type Type ¶
type Type []Row
Type is in-memory representation of state
func GetCond ¶
GetCond returns state rows with given condition in the state, rows with deleted flags are ignored
func GetCondLow ¶
GetCondLow returns state rows with given condition in the state. Allows to select rows with deleted flag set
func GetForCluster ¶
GetForCluster returns state rows for given cluster name
func GetRegCond ¶
GetRegCond returns state rows with given condition in the state, rows with deleted flags are ignored