state

package
v1.1.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2020 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddSQLCond

func AddSQLCond(cond string, args []interface{}, lop string, name string, op string, val string) (string, []interface{})

AddSQLCond is a helper function that helps with creating conditional queries over state tables

func CheckMySQLVersion

func CheckMySQLVersion(expected string) bool

CheckMySQLVersion check is state db cluster is of given version

func ClearNeedSnapshot

func ClearNeedSnapshot(id int64, ts time.Time) error

ClearNeedSnapshot clears flag indicating that streamer has taken a snapshot of table

func Close

func Close() error

Close uninitializes the state

func ConnectInfoGet

func ConnectInfoGet(l *db.Loc, connType db.ConnectionType) (*db.Addr, error)

ConnectInfoGet resolves database address using state clusters table If no Slaves we need to fall back to master

func DeleteClusterInfo

func DeleteClusterInfo(name string) error

DeleteClusterInfo delete cluster connection info from state database

func DeleteSchema

func DeleteSchema(name string, typ string) error

DeleteSchema deletes output schema from the state

func DeregisterTable

func DeregisterTable(cluster, svc, sdb, table, input, output string, version int) bool

DeregisterTable removes given table from the state

func DeregisterTableFromState

func DeregisterTableFromState(dbl *db.Loc, table, input, output string, version int, regid int64) bool

DeregisterTableFromState removes given table from the state

func EmitRegisteredTablesCount

func EmitRegisteredTablesCount()

EmitRegisteredTablesCount emits a stat on the number of tables currently registered in state

func GetClusterInfo

func GetClusterInfo(cond string, args ...interface{}) ([]db.Addr, error)

GetClusterInfo lists cluster connection info from state database

func GetClusterTask

func GetClusterTask(input string, workerID string, lockTimeout time.Duration) (s string, c string, d string, err error)

GetClusterTask pick first available cluster task and locks it

func GetCount

func GetCount(includeDeleted bool) (int, error)

GetCount returns number of rows in the state

func GetDB

func GetDB() *sql.DB

GetDB returns active db connection to the state

func GetDBAddr

func GetDBAddr() *db.Addr

GetDBAddr return low level address of the database: Host, Port, User, Password

func GetGTID

func GetGTID(cluster string) (gtid string, seqno int64, err error)

GetGTID returns GTID saved in the state for given db locator

func GetNeedSnapshotFlag

func GetNeedSnapshotFlag(id int64) (ns bool, err error)

GetNeedSnapshotFlag returns need_snapshot flag saved in the STATE

func GetNoDB

func GetNoDB() *sql.DB

GetNoDB returns active connection to the state database server Without connecting to any specific db

func GetOutputSchema

func GetOutputSchema(name string, typ string) string

GetOutputSchema returns output schema from the state

func GetSchema

func GetSchema(svc, sdb, table, input string, output string, version int) (*types.TableSchema, error)

GetSchema returns structured schema saved in the state for give table

func GetServerTimestamp

func GetServerTimestamp() (int64, error)

GetServerTimestamp fetches the current unix timestamp on the MySQL server

func InitManager

func InitManager(ctx context.Context, cfg *config.AppConfig) error

InitManager creates a new state manager

func InsertClusterInfo

func InsertClusterInfo(ci *db.Addr) error

InsertClusterInfo adds connection information for the cluster "name" to state

func InsertSchema

func InsertSchema(name string, typ string, schema string) error

InsertSchema inserts output schema into state

func PullCurrentSchema

func PullCurrentSchema(dbl *db.Loc, table, input string) (*types.TableSchema, string)

PullCurrentSchema pulls current table schema from the source MySQL cluster

func RefreshClusterLock

func RefreshClusterLock(cluster string, workerID string) bool

RefreshClusterLock updates the lock time of the given cluster and worker Returns false if the task is not locked by given worker anymore

func RefreshTableLock

func RefreshTableLock(stateID int64, workerID string) bool

RefreshTableLock updates the lock time of the given task and worker Returns false if the task is not locked by given worker anymore

func RegisterTable

func RegisterTable(cluster, svc, sdb, table, input, output string, version int, outputFormat string, params string) bool

RegisterTable adds table for registration. The entry here is subsequently processed later on to update the state table

func RegisterTableInState

func RegisterTableInState(dbl *db.Loc, table, input, output string, version int, format string, params string, regid int64) bool

RegisterTableInState adds table to the state

func ReplaceSchema

func ReplaceSchema(svc, cluster string, s *types.TableSchema, rawSchema, oldGTID, newGTID, input, output string, version int, format string, params string) bool

ReplaceSchema replaces both structured and raw schema definitions saved in the state with new versions provided as parameters Called from changelog reader on ALTER table statement

func Reset

func Reset() bool

Reset resets the state tables and deletes all the existing data

func SanitizeRegParams

func SanitizeRegParams(cluster, svc, sdb, table, input string) (string, string, string, string)

SanitizeRegParams sanitizes the registration options

func SaveBinlogState

func SaveBinlogState(cluster, gtid string, seqNo uint64) error

SaveBinlogState saves current state of the binlog reader to the state DB. Binlog state is current GTID set and current seqNo

func SchemaGet

func SchemaGet(namespace string, schemaName string, typ string) (*types.AvroSchema, error)

SchemaGet is builtin schema resolver

func SetGTID

func SetGTID(cluster, gtid string) error

SetGTID saves given gtid for given db locator

func SyncDeregisteredTables

func SyncDeregisteredTables() bool

SyncDeregisteredTables deregisters the tables based on the deregistration requests that were persisted in the registrations table TODO: Deduplicate with SyncRegisteredTables

func SyncRegisteredTables

func SyncRegisteredTables() bool

SyncRegisteredTables registers the tables based on the registration requests that were persisted in the registrations table

func TableMaxVersion

func TableMaxVersion(svc, cluster, sdb, table, input, output string) (int, error)

TableMaxVersion returns maximum version for the table

func TableRegistered

func TableRegistered(svc, cluster, sdb, table, input, output string, version int, includeDeleted bool) (bool, error)

TableRegistered checks if given table is listed in registrations table

func TableRegisteredInState

func TableRegisteredInState(id int64) (bool, error)

TableRegisteredInState checks if table with given id is still registered in the state ID is id field from row structure

func UpdateSchema

func UpdateSchema(name string, typ string, schema string) error

UpdateSchema inserts output schema into state

func ValidateRegistration

func ValidateRegistration(svc, sdb, table, input, output string, version int) bool

ValidateRegistration validates the registration request

Types

type Mgr

type Mgr struct {
	// contains filtered or unexported fields
}

Mgr is a state manager that controls all the state handling

type Row

type Row struct {
	types.TableLoc
	ID            int64
	OutputFormat  string
	Gtid          string
	GtidUpdatedAt time.Time
	SeqNo         uint64
	SchemaGtid    string
	RawSchema     string
	SnapshottedAt time.Time
	CreatedAt     time.Time
	UpdatedAt     time.Time
	NeedSnapshot  bool
	Deleted       bool
	ParamsRaw     string
	Params        *config.TableParams
}

Row represents a row in the state table

func GetTable

func GetTable(service, cluster, db, table, input string, output string, version int) (*Row, error)

GetTable returns state rows for given service,cluster,db,table,input

func GetTableByID

func GetTableByID(id int64) (*Row, error)

GetTableByID return state row for the given table id

func GetTableTask

func GetTableTask(workerID string, lockTimeout time.Duration) (r *Row, err error)

GetTableTask picks first available task which needs attention and which lock is expired The task is locked by the given workerID and lock time is set to current time

func UpdateSnapshottedAt

func UpdateSnapshottedAt(row *Row, tm time.Time) (*Row, error)

UpdateSnapshottedAt advances snapshotted_at to be no older then TableParams.Schedule.Interval from now

func (*Row) SnapshotTimeChanged

func (r *Row) SnapshotTimeChanged(prev time.Time) bool

SnapshotTimeChanged determines if row has SnapshottedAt updated

func (*Row) TimeForSnapshot

func (r *Row) TimeForSnapshot(now time.Time) bool

TimeForSnapshot determines if row requires snapshot to be taken

type SchemaRow

type SchemaRow struct {
	Name string `json:"name"`
	Type string `json:"type"`
	Body string `json:"body"`
}

SchemaRow represents state schema row

func ListOutputSchema

func ListOutputSchema(cond string, args ...interface{}) ([]SchemaRow, error)

ListOutputSchema lists schemas from state, filtered by cond

type Type

type Type []Row

Type is in-memory representation of state

func Get

func Get() (Type, error)

Get returns all the rows in the state corresponding to non-deleted tables

func GetCond

func GetCond(cond string, args ...interface{}) (Type, error)

GetCond returns state rows with given condition in the state, rows with deleted flags are ignored

func GetCondLow

func GetCondLow(deleted bool, cond string, args ...interface{}) (Type, error)

GetCondLow returns state rows with given condition in the state. Allows to select rows with deleted flag set

func GetForCluster

func GetForCluster(cluster string) (Type, error)

GetForCluster returns state rows for given cluster name

func GetRegCond

func GetRegCond(cond string, args ...interface{}) (Type, error)

GetRegCond returns state rows with given condition in the state, rows with deleted flags are ignored

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL