Documentation ¶
Index ¶
- Constants
- Variables
- func CreateStorage(pdAddr string) (storage kv.Storage, err error)
- func CreateTiStore(urls string, credential *security.Credential) (kv.Storage, error)
- func GetEtcdKeyCaptureInfo(id string) string
- func GetEtcdKeyChangeFeedInfo(changefeedID string) string
- func GetEtcdKeyChangeFeedList() string
- func GetEtcdKeyChangeFeedStatus(changefeedID string) string
- func GetEtcdKeyJob(changeFeedID string) string
- func GetEtcdKeyTaskPosition(changefeedID, captureID string) string
- func GetEtcdKeyTaskPositionList(changefeedID string) string
- func GetEtcdKeyTaskStatus(changeFeedID, captureID string) string
- func GetEtcdKeyTaskStatusList(changefeedID string) string
- func GetEtcdKeyTaskWorkload(changeFeedID, captureID string) string
- func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) (*meta.Meta, error)
- func InitMetrics(registry *prometheus.Registry)
- func InitWorkerPool()
- func NewSizedRegionRouter(ctx context.Context, sizeLimit int) *sizedRegionRouter
- func RunWorkerPool(ctx context.Context) error
- func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStore kv.Storage)
- func TestSplit(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStore kv.Storage)
- type CDCClient
- type CDCEtcdClient
- func (c CDCEtcdClient) AtomicPutTaskStatus(ctx context.Context, changefeedID string, captureID string, ...) (*model.TaskStatus, int64, error)
- func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error
- func (c CDCEtcdClient) Close() error
- func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
- func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error
- func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) error
- func (c CDCEtcdClient) DeleteTaskPosition(ctx context.Context, changefeedID string, captureID string) error
- func (c CDCEtcdClient) DeleteTaskStatus(ctx context.Context, cfID string, captureID string) error
- func (c CDCEtcdClient) DeleteTaskWorkload(ctx context.Context, changefeedID string, captureID string) error
- func (c CDCEtcdClient) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error)
- func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) (map[string]*model.ChangeFeedStatus, error)
- func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error)
- func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error)
- func (c CDCEtcdClient) GetAllTaskWorkloads(ctx context.Context, changefeedID string) (map[string]*model.TaskWorkload, error)
- func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error)
- func (c CDCEtcdClient) GetCaptureLeases(ctx context.Context) (map[string]int64, error)
- func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)
- func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error)
- func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)
- func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error)
- func (c CDCEtcdClient) GetOwnerID(ctx context.Context, key string) (string, error)
- func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error)
- func (c CDCEtcdClient) GetTaskPosition(ctx context.Context, changefeedID string, captureID string) (int64, *model.TaskPosition, error)
- func (c CDCEtcdClient) GetTaskStatus(ctx context.Context, changefeedID string, captureID string) (int64, *model.TaskStatus, error)
- func (c CDCEtcdClient) GetTaskWorkload(ctx context.Context, changefeedID string, captureID string) (model.TaskWorkload, error)
- func (c CDCEtcdClient) LeaseGuardAtomicPutTaskStatus(ctx context.Context, changefeedID string, captureID string, ...) (*model.TaskStatus, int64, error)
- func (c CDCEtcdClient) LeaseGuardDeleteChangeFeedInfo(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) LeaseGuardDeleteTaskPosition(ctx context.Context, cfID string, captureID string, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) LeaseGuardDeleteTaskStatus(ctx context.Context, cfID string, captureID string, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) LeaseGuardDeleteTaskWorkload(ctx context.Context, cfID string, captureID string, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) LeaseGuardPutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus, ...) error
- func (c CDCEtcdClient) LeaseGuardPutChangeFeedStatus(ctx context.Context, changefeedID string, status *model.ChangeFeedStatus, ...) error
- func (c CDCEtcdClient) LeaseGuardRemoveAllTaskPositions(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) LeaseGuardRemoveAllTaskStatus(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) LeaseGuardRemoveChangeFeedStatus(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) LeaseGuardSaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changefeedID string, ...) error
- func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error
- func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID) error
- func (c CDCEtcdClient) PutChangeFeedStatus(ctx context.Context, changefeedID string, status *model.ChangeFeedStatus) error
- func (c CDCEtcdClient) PutTaskPositionOnChange(ctx context.Context, changefeedID string, captureID string, ...) (bool, error)
- func (c CDCEtcdClient) PutTaskStatus(ctx context.Context, changefeedID string, captureID string, ...) error
- func (c CDCEtcdClient) PutTaskWorkload(ctx context.Context, changefeedID string, captureID model.CaptureID, ...) error
- func (c CDCEtcdClient) RemoveAllTaskPositions(ctx context.Context, changefeedID string) error
- func (c CDCEtcdClient) RemoveAllTaskStatus(ctx context.Context, changefeedID string) error
- func (c CDCEtcdClient) RemoveChangeFeedStatus(ctx context.Context, changefeedID string) error
- func (c CDCEtcdClient) RevokeAllLeases(ctx context.Context, leases map[string]int64) error
- func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
- func (c CDCEtcdClient) SetChangeFeedStatusTTL(ctx context.Context, changefeedID string, ttl int64) error
- type CDCKVClient
- type GrpcPool
- type GrpcPoolImpl
- type LimitRegionRouter
- type PullerInitialization
- type StorageWithCurVersionCache
- type TiKVStorage
- type UpdateTaskStatusFunc
Constants ¶
const ( // EtcdKeyBase is the common prefix of the keys in CDC EtcdKeyBase = "/tidb/cdc" // CaptureOwnerKey is the capture owner path that is saved to etcd CaptureOwnerKey = EtcdKeyBase + "/owner" // CaptureInfoKeyPrefix is the capture info path that is saved to etcd CaptureInfoKeyPrefix = EtcdKeyBase + "/capture" // TaskKeyPrefix is the prefix of task keys TaskKeyPrefix = EtcdKeyBase + "/task" // TaskWorkloadKeyPrefix is the prefix of task workload keys TaskWorkloadKeyPrefix = TaskKeyPrefix + "/workload" // TaskStatusKeyPrefix is the prefix of task status keys TaskStatusKeyPrefix = TaskKeyPrefix + "/status" // TaskPositionKeyPrefix is the prefix of task position keys TaskPositionKeyPrefix = TaskKeyPrefix + "/position" // JobKeyPrefix is the prefix of job keys JobKeyPrefix = EtcdKeyBase + "/job" )
Variables ¶
var NewCDCKVClient func( ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, ) CDCKVClient = NewCDCClient
NewCDCKVClient is the constructor of CDC KV client
Functions ¶
func CreateStorage ¶
CreateStorage creates a tikv Storage instance.
func CreateTiStore ¶
CreateTiStore creates a new tikv storage client
func GetEtcdKeyCaptureInfo ¶
GetEtcdKeyCaptureInfo returns the key of a capture info
func GetEtcdKeyChangeFeedInfo ¶
GetEtcdKeyChangeFeedInfo returns the key of a changefeed config
func GetEtcdKeyChangeFeedList ¶
func GetEtcdKeyChangeFeedList() string
GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config
func GetEtcdKeyChangeFeedStatus ¶
GetEtcdKeyChangeFeedStatus returns the key of a changefeed status
func GetEtcdKeyJob ¶
GetEtcdKeyJob returns the key for a job status
func GetEtcdKeyTaskPosition ¶
GetEtcdKeyTaskPosition returns the key of a task position
func GetEtcdKeyTaskPositionList ¶
GetEtcdKeyTaskPositionList returns the key of a task position without captureID part
func GetEtcdKeyTaskStatus ¶
GetEtcdKeyTaskStatus returns the key for the task status
func GetEtcdKeyTaskStatusList ¶
GetEtcdKeyTaskStatusList returns the key of a task status without captureID part
func GetEtcdKeyTaskWorkload ¶
GetEtcdKeyTaskWorkload returns the key for the task workload
func GetSnapshotMeta ¶
GetSnapshotMeta returns tidb meta information TODO: Simplify the signature of this function
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in the kv package
func InitWorkerPool ¶
func InitWorkerPool()
InitWorkerPool initializs workerpool once, the workerpool must be initialized before any kv event is received.
func NewSizedRegionRouter ¶
NewSizedRegionRouter creates a new sizedRegionRouter
func RunWorkerPool ¶
RunWorkerPool runs the worker pool used by the region worker in kv client v2 It must be running before region worker starts to work
func TestGetKVSimple ¶
TestGetKVSimple test simple KV operations
Types ¶
type CDCClient ¶
type CDCClient struct {
// contains filtered or unexported fields
}
CDCClient to get events from TiKV
func (*CDCClient) EventFeed ¶
func (c *CDCClient) EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, ) error
EventFeed divides a EventFeed request on range boundaries and establishes a EventFeed to each of the individual region. It streams back result on the provided channel. The `Start` and `End` field in input span must be memcomparable encoded.
type CDCEtcdClient ¶
CDCEtcdClient is a wrap of etcd client
func NewCDCEtcdClient ¶
func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient
NewCDCEtcdClient returns a new CDCEtcdClient
func (CDCEtcdClient) AtomicPutTaskStatus ¶
func (c CDCEtcdClient) AtomicPutTaskStatus( ctx context.Context, changefeedID string, captureID string, updateFuncs ...UpdateTaskStatusFunc, ) (*model.TaskStatus, int64, error)
AtomicPutTaskStatus puts task status into etcd atomically.
func (CDCEtcdClient) ClearAllCDCInfo ¶
func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error
ClearAllCDCInfo delete all keys created by CDC
func (CDCEtcdClient) Close ¶
func (c CDCEtcdClient) Close() error
Close releases resources in CDCEtcdClient
func (CDCEtcdClient) CreateChangefeedInfo ¶
func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists.
func (CDCEtcdClient) DeleteCaptureInfo ¶
func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error
DeleteCaptureInfo delete capture info from etcd.
func (CDCEtcdClient) DeleteChangeFeedInfo ¶
func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) error
DeleteChangeFeedInfo deletes a changefeed config from etcd
func (CDCEtcdClient) DeleteTaskPosition ¶
func (c CDCEtcdClient) DeleteTaskPosition(ctx context.Context, changefeedID string, captureID string) error
DeleteTaskPosition remove task position from etcd
func (CDCEtcdClient) DeleteTaskStatus ¶
func (c CDCEtcdClient) DeleteTaskStatus( ctx context.Context, cfID string, captureID string, ) error
DeleteTaskStatus deletes task status from etcd
func (CDCEtcdClient) DeleteTaskWorkload ¶
func (c CDCEtcdClient) DeleteTaskWorkload( ctx context.Context, changefeedID string, captureID string, ) error
DeleteTaskWorkload deletes task workload from etcd
func (CDCEtcdClient) GetAllCDCInfo ¶
GetAllCDCInfo get all keys created by CDC
func (CDCEtcdClient) GetAllChangeFeedStatus ¶
func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) (map[string]*model.ChangeFeedStatus, error)
GetAllChangeFeedStatus queries all changefeed job status
func (CDCEtcdClient) GetAllTaskPositions ¶
func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error)
GetAllTaskPositions queries all task positions of a changefeed, and returns a map mapping from captureID to TaskPositions
func (CDCEtcdClient) GetAllTaskStatus ¶
func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error)
GetAllTaskStatus queries all task status of a changefeed, and returns a map mapping from captureID to TaskStatus
func (CDCEtcdClient) GetAllTaskWorkloads ¶
func (c CDCEtcdClient) GetAllTaskWorkloads(ctx context.Context, changefeedID string) (map[string]*model.TaskWorkload, error)
GetAllTaskWorkloads queries all task workloads of a changefeed, and returns a map mapping from captureID to TaskWorkloads
func (CDCEtcdClient) GetCaptureInfo ¶
func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error)
GetCaptureInfo get capture info from etcd. return errCaptureNotExist if the capture not exists.
func (CDCEtcdClient) GetCaptureLeases ¶
GetCaptureLeases returns a map mapping from capture ID to its lease
func (CDCEtcdClient) GetCaptures ¶
func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)
GetCaptures returns kv revision and CaptureInfo list
func (CDCEtcdClient) GetChangeFeedInfo ¶
func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error)
GetChangeFeedInfo queries the config of a given changefeed
func (CDCEtcdClient) GetChangeFeedStatus ¶
func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)
GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
func (CDCEtcdClient) GetChangeFeeds ¶
func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error)
GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue
func (CDCEtcdClient) GetOwnerID ¶
GetOwnerID returns the owner id by querying etcd
func (CDCEtcdClient) GetProcessors ¶
func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error)
GetProcessors queries all processors of the cdc cluster, and returns a slice of ProcInfoSnap(without table info)
func (CDCEtcdClient) GetTaskPosition ¶
func (c CDCEtcdClient) GetTaskPosition( ctx context.Context, changefeedID string, captureID string, ) (int64, *model.TaskPosition, error)
GetTaskPosition queries task process from etcd, returns
- ModRevision of the given key
- *model.TaskPosition unmarshaled from the value
- error if error happens
func (CDCEtcdClient) GetTaskStatus ¶
func (c CDCEtcdClient) GetTaskStatus( ctx context.Context, changefeedID string, captureID string, ) (int64, *model.TaskStatus, error)
GetTaskStatus queries task status from etcd, returns
- ModRevision of the given key
- *model.TaskStatus unmarshaled from the value
- error if error happens
func (CDCEtcdClient) GetTaskWorkload ¶
func (c CDCEtcdClient) GetTaskWorkload( ctx context.Context, changefeedID string, captureID string, ) (model.TaskWorkload, error)
GetTaskWorkload queries task workload from etcd, returns
- model.TaskWorkload unmarshaled from the value
- error if error happens
func (CDCEtcdClient) LeaseGuardAtomicPutTaskStatus ¶
func (c CDCEtcdClient) LeaseGuardAtomicPutTaskStatus( ctx context.Context, changefeedID string, captureID string, leaseID clientv3.LeaseID, updateFuncs ...UpdateTaskStatusFunc, ) (*model.TaskStatus, int64, error)
LeaseGuardAtomicPutTaskStatus puts task status into etcd atomically.
func (CDCEtcdClient) LeaseGuardDeleteChangeFeedInfo ¶
func (c CDCEtcdClient) LeaseGuardDeleteChangeFeedInfo( ctx context.Context, changefeedID string, leaseID clientv3.LeaseID, ) error
LeaseGuardDeleteChangeFeedInfo is a wrapper to DeleteChangeFeedInfo, with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardDeleteTaskPosition ¶
func (c CDCEtcdClient) LeaseGuardDeleteTaskPosition( ctx context.Context, cfID string, captureID string, leaseID clientv3.LeaseID, ) error
LeaseGuardDeleteTaskPosition is a wrapper to DeleteTaskPosition, with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardDeleteTaskStatus ¶
func (c CDCEtcdClient) LeaseGuardDeleteTaskStatus( ctx context.Context, cfID string, captureID string, leaseID clientv3.LeaseID, ) error
LeaseGuardDeleteTaskStatus is a wrapper to DeleteTaskStatus, with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardDeleteTaskWorkload ¶
func (c CDCEtcdClient) LeaseGuardDeleteTaskWorkload( ctx context.Context, cfID string, captureID string, leaseID clientv3.LeaseID, ) error
LeaseGuardDeleteTaskWorkload is a wrapper to DeleteTaskWorkload, with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardPutAllChangeFeedStatus ¶
func (c CDCEtcdClient) LeaseGuardPutAllChangeFeedStatus( ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus, leaseID clientv3.LeaseID, ) error
LeaseGuardPutAllChangeFeedStatus wraps PutAllChangeFeedStatus with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardPutChangeFeedStatus ¶
func (c CDCEtcdClient) LeaseGuardPutChangeFeedStatus( ctx context.Context, changefeedID string, status *model.ChangeFeedStatus, leaseID clientv3.LeaseID, ) error
LeaseGuardPutChangeFeedStatus is a wrapper to PutChangeFeedStatus, with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardRemoveAllTaskPositions ¶
func (c CDCEtcdClient) LeaseGuardRemoveAllTaskPositions( ctx context.Context, changefeedID string, leaseID clientv3.LeaseID, ) error
LeaseGuardRemoveAllTaskPositions wraps RemoveAllTaskPositions with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardRemoveAllTaskStatus ¶
func (c CDCEtcdClient) LeaseGuardRemoveAllTaskStatus( ctx context.Context, changefeedID string, leaseID clientv3.LeaseID, ) error
LeaseGuardRemoveAllTaskStatus wraps RemoveAllTaskStatus, with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardRemoveChangeFeedStatus ¶
func (c CDCEtcdClient) LeaseGuardRemoveChangeFeedStatus( ctx context.Context, changefeedID string, leaseID clientv3.LeaseID, ) error
LeaseGuardRemoveChangeFeedStatus is a wrapper to RemoveChangeFeedStatus, with a context restricted by lease TTL.
func (CDCEtcdClient) LeaseGuardSaveChangeFeedInfo ¶
func (c CDCEtcdClient) LeaseGuardSaveChangeFeedInfo( ctx context.Context, info *model.ChangeFeedInfo, changefeedID string, leaseID clientv3.LeaseID, ) error
LeaseGuardSaveChangeFeedInfo is a wrapper to SaveChangeFeedInfo, with a context restricted by lease TTL.
func (CDCEtcdClient) PutAllChangeFeedStatus ¶
func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error
PutAllChangeFeedStatus puts ChangeFeedStatus of each changefeed into etcd
func (CDCEtcdClient) PutCaptureInfo ¶
func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID) error
PutCaptureInfo put capture info into etcd.
func (CDCEtcdClient) PutChangeFeedStatus ¶
func (c CDCEtcdClient) PutChangeFeedStatus( ctx context.Context, changefeedID string, status *model.ChangeFeedStatus, ) error
PutChangeFeedStatus puts changefeed synchronization status into etcd
func (CDCEtcdClient) PutTaskPositionOnChange ¶
func (c CDCEtcdClient) PutTaskPositionOnChange( ctx context.Context, changefeedID string, captureID string, info *model.TaskPosition, ) (bool, error)
PutTaskPositionOnChange puts task position information into etcd if the task position value changes or the presvious value does not exist in etcd. returns true if task position is written to etcd.
func (CDCEtcdClient) PutTaskStatus ¶
func (c CDCEtcdClient) PutTaskStatus( ctx context.Context, changefeedID string, captureID string, info *model.TaskStatus, ) error
PutTaskStatus puts task status into etcd.
func (CDCEtcdClient) PutTaskWorkload ¶
func (c CDCEtcdClient) PutTaskWorkload( ctx context.Context, changefeedID string, captureID model.CaptureID, info *model.TaskWorkload, ) error
PutTaskWorkload puts task workload into etcd.
func (CDCEtcdClient) RemoveAllTaskPositions ¶
func (c CDCEtcdClient) RemoveAllTaskPositions(ctx context.Context, changefeedID string) error
RemoveAllTaskPositions removes all task positions of a changefeed
func (CDCEtcdClient) RemoveAllTaskStatus ¶
func (c CDCEtcdClient) RemoveAllTaskStatus(ctx context.Context, changefeedID string) error
RemoveAllTaskStatus removes all task status of a changefeed
func (CDCEtcdClient) RemoveChangeFeedStatus ¶
func (c CDCEtcdClient) RemoveChangeFeedStatus( ctx context.Context, changefeedID string, ) error
RemoveChangeFeedStatus removes changefeed job status from etcd
func (CDCEtcdClient) RevokeAllLeases ¶
RevokeAllLeases revokes all leases passed from parameter
func (CDCEtcdClient) SaveChangeFeedInfo ¶
func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error
SaveChangeFeedInfo stores change feed info into etcd TODO: this should be called from outer system, such as from a TiDB client
func (CDCEtcdClient) SetChangeFeedStatusTTL ¶
func (c CDCEtcdClient) SetChangeFeedStatusTTL( ctx context.Context, changefeedID string, ttl int64, ) error
SetChangeFeedStatusTTL sets the TTL of changefeed synchronization status
type CDCKVClient ¶
type CDCKVClient interface { EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, ) error Close() error }
CDCKVClient is an interface to receives kv changed logs from TiKV
func NewCDCClient ¶
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient)
NewCDCClient creates a CDCClient instance
type GrpcPool ¶
type GrpcPool interface { // GetConn returns an available gRPC ClientConn GetConn(target string) (*sharedConn, error) // ReleaseConn is called when a gRPC stream is released ReleaseConn(sc *sharedConn, target string) // Recycle recycles idle connections periodically RecycleConn(ctx context.Context) // Close tears down all ClientConns maintained in pool Close() }
GrpcPool defines an interface that can serve as a gPRC connection pool. It provides API to get a shared connection from pool and API to decrease usage reference of the shared connection
type GrpcPoolImpl ¶
type GrpcPoolImpl struct {
// contains filtered or unexported fields
}
GrpcPoolImpl implement GrpcPool interface
func NewGrpcPoolImpl ¶
func NewGrpcPoolImpl(ctx context.Context, credential *security.Credential) *GrpcPoolImpl
NewGrpcPoolImpl creates a new GrpcPoolImpl instance
func (*GrpcPoolImpl) GetConn ¶
func (pool *GrpcPoolImpl) GetConn(addr string) (*sharedConn, error)
GetConn implements GrpcPool.GetConn
func (*GrpcPoolImpl) RecycleConn ¶
func (pool *GrpcPoolImpl) RecycleConn(ctx context.Context)
RecycleConn implements GrpcPool.RecycleConn
func (*GrpcPoolImpl) ReleaseConn ¶
func (pool *GrpcPoolImpl) ReleaseConn(sc *sharedConn, addr string)
ReleaseConn implements GrpcPool.ReleaseConn
type LimitRegionRouter ¶
type LimitRegionRouter interface { // Chan returns a singleRegionInfo channel that can be consumed from Chan() <-chan singleRegionInfo // AddRegion adds an singleRegionInfo to buffer, this function is thread-safe AddRegion(task singleRegionInfo) // Acquire acquires one token Acquire(id string) // Release gives back one token, this function is thread-safe Release(id string) // Run runs in background and does some logic work Run(ctx context.Context) error }
LimitRegionRouter defines an interface that can buffer singleRegionInfo and provide token based consumption
type PullerInitialization ¶
type PullerInitialization interface {
IsInitialized() bool
}
PullerInitialization is a workaround to solved cyclic import.
type StorageWithCurVersionCache ¶
StorageWithCurVersionCache adds GetCachedCurrentVersion() to tikv.Storage
func (*StorageWithCurVersionCache) GetCachedCurrentVersion ¶
func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version tidbkv.Version, err error)
GetCachedCurrentVersion gets the cached version of currentVersion, and update the cache if necessary
type TiKVStorage ¶
type TiKVStorage interface { tikv.Storage GetCachedCurrentVersion() (version tidbkv.Version, err error) }
TiKVStorage is the tikv storage interface used by CDC.
type UpdateTaskStatusFunc ¶
type UpdateTaskStatusFunc func(int64, *model.TaskStatus) (updated bool, err error)
UpdateTaskStatusFunc is a function that updates the task status