Documentation ¶
Index ¶
- Constants
- func InitMetrics(registry *prometheus.Registry)
- type Manager
- type Sink
- type Statistics
- func (b *Statistics) AddDDLCount()
- func (b *Statistics) AddRowsCount(count int)
- func (b *Statistics) PrintStatus(ctx context.Context)
- func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error
- func (b *Statistics) RecordDDLExecution(executor func() error) error
- func (b *Statistics) SubRowsCount(count int)
- func (b *Statistics) TotalRowsCount() uint64
- type SyncpointStore
Constants ¶
const ( OptChangefeedID = "_changefeed_id" OptCaptureAddr = "_capture_addr" )
Sink options keys
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in this file
Types ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages table sinks, maintains the relationship between table sinks and backendSink
func NewManager ¶
func NewManager(ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts) *Manager
NewManager creates a new Sink manager
type Sink ¶
type Sink interface { Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error // EmitDDLEvent sends DDL Event to Sink // EmitDDLEvent should execute DDL to downstream synchronously EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) // EmitCheckpointTs sends CheckpointTs to Sink // TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully. EmitCheckpointTs(ctx context.Context, ts uint64) error // Close closes the Sink Close(ctx context.Context) error // Barrier is a synchronous function to wait all events to be flushed in underlying sink // Note once Barrier is called, the resolved ts won't be pushed until the Barrier call returns. Barrier(ctx context.Context) error }
Sink is an abstraction for anything that a changefeed may emit into.
type Statistics ¶
type Statistics struct {
// contains filtered or unexported fields
}
Statistics maintains some status and metrics of the Sink
func NewStatistics ¶
NewStatistics creates a statistics
func (*Statistics) AddDDLCount ¶
func (b *Statistics) AddDDLCount()
AddDDLCount records total number of ddl needs to flush
func (*Statistics) AddRowsCount ¶
func (b *Statistics) AddRowsCount(count int)
AddRowsCount records total number of rows needs to flush
func (*Statistics) PrintStatus ¶
func (b *Statistics) PrintStatus(ctx context.Context)
PrintStatus prints the status of the Sink
func (*Statistics) RecordBatchExecution ¶
func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error
RecordBatchExecution records the cost time of batch execution and batch size
func (*Statistics) RecordDDLExecution ¶
func (b *Statistics) RecordDDLExecution(executor func() error) error
RecordDDLExecution record the time cost of execute ddl
func (*Statistics) SubRowsCount ¶
func (b *Statistics) SubRowsCount(count int)
SubRowsCount records total number of rows needs to flush
func (*Statistics) TotalRowsCount ¶
func (b *Statistics) TotalRowsCount() uint64
TotalRowsCount returns total number of rows
type SyncpointStore ¶
type SyncpointStore interface { // CreateSynctable create a table to record the syncpoints CreateSynctable(ctx context.Context) error // SinkSyncpoint record the syncpoint(a map with ts) in downstream db SinkSyncpoint(ctx context.Context, id string, checkpointTs uint64) error // Close closes the SyncpointSink Close() error }
SyncpointStore is an abstraction for anything that a changefeed may emit into.
func NewSyncpointStore ¶
func NewSyncpointStore(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string) (SyncpointStore, error)
NewSyncpointStore creates a new Spyncpoint sink with the sink-uri