cdc

package
v0.0.0-...-485a10e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: Apache-2.0 Imports: 64 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// APIOpVarAdminJob is the key of admin job in HTTP API
	APIOpVarAdminJob = "admin-job"
	// APIOpVarChangefeedID is the key of changefeed ID in HTTP API
	APIOpVarChangefeedID = "cf-id"
	// APIOpVarTargetCaptureID is the key of to-capture ID in HTTP API
	APIOpVarTargetCaptureID = "target-cp-id"
	// APIOpVarTableID is the key of table ID in HTTP API
	APIOpVarTableID = "table-id"
	// APIOpForceRemoveChangefeed is used when remove a changefeed
	APIOpForceRemoveChangefeed = "force-remove"
)
View Source
const (
	// CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint.
	CDCServiceSafePointID = "ticdc"
	// GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint
	GCSafepointUpdateInterval = 2 * time.Second
	// MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache
	MinGCSafePointCacheUpdateInterval = time.Second * 2
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncSink

type AsyncSink interface {
	Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
	// EmitCheckpointTs emits the checkpoint Ts to downstream data source
	// this function will return after recording the checkpointTs specified in memory immediately
	// and the recorded checkpointTs will be sent and updated to downstream data source every second
	// return err here for the unit test TestOwnerCalcResolvedTs in owner_test
	EmitCheckpointTs(ctx cdcContext.Context, ts uint64) error
	// EmitDDLEvent emits DDL event asynchronously and return true if the DDL is executed
	// the DDL event will be sent to another goroutine and execute to downstream
	// the caller of this function can call again and again until a true returned
	EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error)
	SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error
	Close(ctx context.Context) error
}

AsyncSink is an async sink design for owner The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now Other functions are still synchronization

type Capture

type Capture struct {
	// contains filtered or unexported fields
}

Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.

func NewCapture

func NewCapture(
	stdCtx context.Context,
	pdEndpoints []string,
	pdCli pd.Client,
	kvStorage tidbkv.Storage,
) (c *Capture, err error)

NewCapture returns a new Capture instance

func (*Capture) Campaign

func (c *Capture) Campaign(ctx context.Context) error

Campaign to be an owner

func (*Capture) Cleanup

func (c *Capture) Cleanup()

Cleanup cleans all dynamic resources

func (*Capture) Close

func (c *Capture) Close(ctx context.Context) error

Close closes the capture by unregistering it from etcd

func (*Capture) Resign

func (c *Capture) Resign(ctx context.Context) error

Resign lets a owner start a new election.

func (*Capture) Run

func (c *Capture) Run(ctx context.Context) (err error)

Run runs the Capture mainloop

type ChangeFeedRWriter

type ChangeFeedRWriter interface {

	// GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue
	GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error)

	// GetAllTaskStatus queries all task status of a changefeed, and returns a map
	// mapping from captureID to TaskStatus
	GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error)

	// RemoveAllTaskStatus removes all task status of a changefeed
	RemoveAllTaskStatus(ctx context.Context, changefeedID string) error

	// GetAllTaskPositions queries all task positions of a changefeed, and returns a map
	// mapping from captureID to TaskPositions
	GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error)

	// RemoveAllTaskPositions removes all task partitions of a changefeed
	RemoveAllTaskPositions(ctx context.Context, changefeedID string) error

	// GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
	GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)

	// PutAllChangeFeedStatus the changefeed info to storage such as etcd.
	PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error

	// LeaseGuardRemoveAllTaskStatus wraps RemoveAllTaskStatus with a context restricted by lease TTL.
	LeaseGuardRemoveAllTaskStatus(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error

	// LeaseGuardRemoveAllTaskPositions wraps RemoveAllTaskPositions with a context restricted by lease TTL.
	LeaseGuardRemoveAllTaskPositions(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error

	// LeaseGuardPutAllChangeFeedStatus wraps PutAllChangeFeedStatus with a context restricted by lease TTL.
	LeaseGuardPutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus, leaseID clientv3.LeaseID) error
}

ChangeFeedRWriter defines the Reader and Writer for changeFeed

type ChangefeedCommonInfo

type ChangefeedCommonInfo struct {
	ID             string              `json:"id"`
	FeedState      model.FeedState     `json:"state"`
	CheckpointTSO  uint64              `json:"checkpoint-tso"`
	CheckpointTime JSONTime            `json:"checkpoint-time"`
	RunningError   *model.RunningError `json:"error"`
}

ChangefeedCommonInfo holds some common usage information of a changefeed and use by RESTful API only.

type ChangefeedResp

type ChangefeedResp struct {
	FeedState    string              `json:"state"`
	TSO          uint64              `json:"tso"`
	Checkpoint   string              `json:"checkpoint"`
	RunningError *model.RunningError `json:"error"`
}

ChangefeedResp holds the most common usage information for a changefeed

type JSONTime

type JSONTime time.Time

JSONTime used to wrap time into json format

func (JSONTime) MarshalJSON

func (t JSONTime) MarshalJSON() ([]byte, error)

MarshalJSON use to specify the time format

type Owner

type Owner struct {
	// contains filtered or unexported fields
}

Owner manages the cdc cluster

func NewOwner

func NewOwner(
	ctx context.Context,
	pdClient pd.Client,
	grpcPool kv.GrpcPool,
	sess *concurrency.Session,
	gcTTL int64,
	flushChangefeedInterval time.Duration,
) (*Owner, error)

NewOwner creates a new Owner instance

func (*Owner) Close

func (o *Owner) Close(ctx context.Context, stepDown func(ctx context.Context) error)

Close stops a running owner

func (*Owner) EnqueueJob

func (o *Owner) EnqueueJob(job model.AdminJob) error

EnqueueJob adds an admin job

func (*Owner) ManualSchedule

func (o *Owner) ManualSchedule(changefeedID model.ChangeFeedID, to model.CaptureID, tableID model.TableID)

ManualSchedule moves the table from a capture to another capture

func (*Owner) Run

func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error

Run the owner TODO avoid this tick style, this means we get `tickTime` latency here.

func (*Owner) TriggerRebalance

func (o *Owner) TriggerRebalance(changefeedID model.ChangeFeedID)

TriggerRebalance triggers the rebalance in the specified changefeed

type OwnerDDLHandler

type OwnerDDLHandler interface {
	// PullDDL pulls the ddl jobs and returns resolvedTs of DDL Puller and job list.
	PullDDL() (resolvedTs uint64, jobs []*timodel.Job, err error)

	// Close cancels the executing of OwnerDDLHandler and releases resource
	Close() error
}

OwnerDDLHandler defines the ddl handler for Owner which can pull ddl jobs and execute ddl jobs

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the capture server

func NewServer

func NewServer(pdEndpoints []string) (*Server, error)

NewServer creates a Server instance.

func (*Server) Close

func (s *Server) Close()

Close closes the server.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

Run runs the server.

type Task

type Task struct {
	ChangeFeedID string
	CheckpointTS uint64
}

Task is dispatched by the owner

type TaskEvent

type TaskEvent struct {
	Op   TaskEventOp
	Task *Task
	Err  error
}

TaskEvent represents a task is created or deleted

type TaskEventOp

type TaskEventOp string

TaskEventOp is the operation of a task

const (
	TaskOpCreate TaskEventOp = "create"
	TaskOpDelete TaskEventOp = "delete"
)

Task Event Operatrions

type TaskWatcher

type TaskWatcher struct {
	// contains filtered or unexported fields
}

TaskWatcher watches on new tasks

func NewTaskWatcher

func NewTaskWatcher(c *Capture, cfg *TaskWatcherConfig) *TaskWatcher

NewTaskWatcher returns a TaskWatcher

func (*TaskWatcher) Watch

func (w *TaskWatcher) Watch(ctx context.Context) <-chan *TaskEvent

Watch on the new tasks, a channel is returned

type TaskWatcherConfig

type TaskWatcherConfig struct {
	Prefix      string
	ChannelSize int64
}

TaskWatcherConfig configures a watcher

Directories

Path Synopsis
Package processor implements the processor logic based on ETCD worker(pkg/orchestrator).
Package processor implements the processor logic based on ETCD worker(pkg/orchestrator).
producer/pulsar
Package pulsar provider a pulsar based mq Producer implementation.
Package pulsar provider a pulsar based mq Producer implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL