View Source
const (
	OptChangefeedID = "_changefeed_id"
	OptCaptureAddr  = "_capture_addr"

Sink options keys


This section is empty.


func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file


type Sink

type Sink interface {
	Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error

	// EmitRowChangedEvents sends Row Changed Event to Sink
	// EmitRowChangedEvents may write rows to downstream directly;
	EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error

	// EmitDDLEvent sends DDL Event to Sink
	// EmitDDLEvent should execute DDL to downstream synchronously
	EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error

	// FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream.
	// TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents`
	FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error)

	// EmitCheckpointTs sends CheckpointTs to Sink
	// TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully.
	EmitCheckpointTs(ctx context.Context, ts uint64) error

	// Close closes the Sink
	Close() error

Sink is an abstraction for anything that a changefeed may emit into.

func NewSink

func NewSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string, filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error)

NewSink creates a new sink with the sink-uri

type Statistics

type Statistics struct {

	// contains filtered or unexported fields


Statistics maintains some status and metrics of the Sink

func NewStatistics

func NewStatistics(ctx context.Context, name string, opts map[string]string) *Statistics

NewStatistics creates a statistics

func (*Statistics) AddRowsCount

func (b *Statistics) AddRowsCount(count int)

AddRowsCount records total number of rows needs to flush

func (*Statistics) PrintStatus

func (b *Statistics) PrintStatus()

PrintStatus prints the status of the Sink

func (*Statistics) RecordBatchExecution

func (b *Statistics) RecordBatchExecution(executer func() (int, error)) error

RecordBatchExecution records the cost time of batch execution and batch size

func (*Statistics) SubRowsCount

func (b *Statistics) SubRowsCount(count int)

SubRowsCount records total number of rows needs to flush


Path Synopsis
producer/pulsar Package pulsar provider a pulsar based mq Producer implementation.