Documentation ¶
Index ¶
- Constants
- type AsyncSink
- type Capture
- type ChangeFeedRWriter
- type ChangefeedCommonInfo
- type ChangefeedResp
- type JSONTime
- type Owner
- func (o *Owner) Close(ctx context.Context, stepDown func(ctx context.Context) error)
- func (o *Owner) EnqueueJob(job model.AdminJob) error
- func (o *Owner) ManualSchedule(changefeedID model.ChangeFeedID, to model.CaptureID, tableID model.TableID)
- func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error
- func (o *Owner) TriggerRebalance(changefeedID model.ChangeFeedID)
- type OwnerDDLHandler
- type Server
- type Task
- type TaskEvent
- type TaskEventOp
- type TaskWatcher
- type TaskWatcherConfig
Constants ¶
const ( // APIOpVarAdminJob is the key of admin job in HTTP API APIOpVarAdminJob = "admin-job" // APIOpVarChangefeedID is the key of changefeed ID in HTTP API APIOpVarChangefeedID = "cf-id" // APIOpVarTargetCaptureID is the key of to-capture ID in HTTP API APIOpVarTargetCaptureID = "target-cp-id" // APIOpVarTableID is the key of table ID in HTTP API APIOpVarTableID = "table-id" // APIOpForceRemoveChangefeed is used when remove a changefeed APIOpForceRemoveChangefeed = "force-remove" )
const ( // CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. CDCServiceSafePointID = "ticdc" // GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint GCSafepointUpdateInterval = 2 * time.Second // MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache MinGCSafePointCacheUpdateInterval = time.Second * 2 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncSink ¶
type AsyncSink interface { Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error // EmitCheckpointTs emits the checkpoint Ts to downstream data source // this function will return after recording the checkpointTs specified in memory immediately // and the recorded checkpointTs will be sent and updated to downstream data source every second // return err here for the unit test TestOwnerCalcResolvedTs in owner_test EmitCheckpointTs(ctx cdcContext.Context, ts uint64) error // EmitDDLEvent emits DDL event asynchronously and return true if the DDL is executed // the DDL event will be sent to another goroutine and execute to downstream // the caller of this function can call again and again until a true returned EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error Close(ctx context.Context) error }
AsyncSink is an async sink design for owner The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now Other functions are still synchronization
type Capture ¶
type Capture struct {
// contains filtered or unexported fields
}
Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
func NewCapture ¶
func NewCapture( stdCtx context.Context, pdEndpoints []string, pdCli pd.Client, kvStorage tidbkv.Storage, ) (c *Capture, err error)
NewCapture returns a new Capture instance
type ChangeFeedRWriter ¶
type ChangeFeedRWriter interface { // GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error) // GetAllTaskStatus queries all task status of a changefeed, and returns a map // mapping from captureID to TaskStatus GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error) // RemoveAllTaskStatus removes all task status of a changefeed RemoveAllTaskStatus(ctx context.Context, changefeedID string) error // GetAllTaskPositions queries all task positions of a changefeed, and returns a map // mapping from captureID to TaskPositions GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error) // RemoveAllTaskPositions removes all task partitions of a changefeed RemoveAllTaskPositions(ctx context.Context, changefeedID string) error // GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error) // PutAllChangeFeedStatus the changefeed info to storage such as etcd. PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error // LeaseGuardRemoveAllTaskStatus wraps RemoveAllTaskStatus with a context restricted by lease TTL. LeaseGuardRemoveAllTaskStatus(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error // LeaseGuardRemoveAllTaskPositions wraps RemoveAllTaskPositions with a context restricted by lease TTL. LeaseGuardRemoveAllTaskPositions(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error // LeaseGuardPutAllChangeFeedStatus wraps PutAllChangeFeedStatus with a context restricted by lease TTL. LeaseGuardPutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus, leaseID clientv3.LeaseID) error }
ChangeFeedRWriter defines the Reader and Writer for changeFeed
type ChangefeedCommonInfo ¶
type ChangefeedCommonInfo struct { ID string `json:"id"` FeedState model.FeedState `json:"state"` CheckpointTSO uint64 `json:"checkpoint-tso"` CheckpointTime JSONTime `json:"checkpoint-time"` RunningError *model.RunningError `json:"error"` }
ChangefeedCommonInfo holds some common usage information of a changefeed and use by RESTful API only.
type ChangefeedResp ¶
type ChangefeedResp struct { FeedState string `json:"state"` TSO uint64 `json:"tso"` Checkpoint string `json:"checkpoint"` RunningError *model.RunningError `json:"error"` }
ChangefeedResp holds the most common usage information for a changefeed
type JSONTime ¶
JSONTime used to wrap time into json format
func (JSONTime) MarshalJSON ¶
MarshalJSON use to specify the time format
type Owner ¶
type Owner struct {
// contains filtered or unexported fields
}
Owner manages the cdc cluster
func NewOwner ¶
func NewOwner( ctx context.Context, pdClient pd.Client, grpcPool kv.GrpcPool, sess *concurrency.Session, gcTTL int64, flushChangefeedInterval time.Duration, ) (*Owner, error)
NewOwner creates a new Owner instance
func (*Owner) EnqueueJob ¶
EnqueueJob adds an admin job
func (*Owner) ManualSchedule ¶
func (o *Owner) ManualSchedule(changefeedID model.ChangeFeedID, to model.CaptureID, tableID model.TableID)
ManualSchedule moves the table from a capture to another capture
func (*Owner) Run ¶
Run the owner TODO avoid this tick style, this means we get `tickTime` latency here.
func (*Owner) TriggerRebalance ¶
func (o *Owner) TriggerRebalance(changefeedID model.ChangeFeedID)
TriggerRebalance triggers the rebalance in the specified changefeed
type OwnerDDLHandler ¶
type OwnerDDLHandler interface { // PullDDL pulls the ddl jobs and returns resolvedTs of DDL Puller and job list. PullDDL() (resolvedTs uint64, jobs []*timodel.Job, err error) // Close cancels the executing of OwnerDDLHandler and releases resource Close() error }
OwnerDDLHandler defines the ddl handler for Owner which can pull ddl jobs and execute ddl jobs
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the capture server
type TaskEvent ¶
type TaskEvent struct { Op TaskEventOp Task *Task Err error }
TaskEvent represents a task is created or deleted
type TaskEventOp ¶
type TaskEventOp string
TaskEventOp is the operation of a task
const ( TaskOpCreate TaskEventOp = "create" TaskOpDelete TaskEventOp = "delete" )
Task Event Operatrions
type TaskWatcher ¶
type TaskWatcher struct {
// contains filtered or unexported fields
}
TaskWatcher watches on new tasks
func NewTaskWatcher ¶
func NewTaskWatcher(c *Capture, cfg *TaskWatcherConfig) *TaskWatcher
NewTaskWatcher returns a TaskWatcher
type TaskWatcherConfig ¶
TaskWatcherConfig configures a watcher
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package processor implements the processor logic based on ETCD worker(pkg/orchestrator).
|
Package processor implements the processor logic based on ETCD worker(pkg/orchestrator). |
producer/pulsar
Package pulsar provider a pulsar based mq Producer implementation.
|
Package pulsar provider a pulsar based mq Producer implementation. |