sink

package
v0.0.0-...-485a10e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OptChangefeedID = "_changefeed_id"
	OptCaptureAddr  = "_capture_addr"
)

Sink options keys

Variables

This section is empty.

Functions

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages table sinks, maintains the relationship between table sinks and backendSink

func NewManager

func NewManager(ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts) *Manager

NewManager creates a new Sink manager

func (*Manager) Close

func (m *Manager) Close(ctx context.Context) error

Close closes the Sink manager and backend Sink, this method can be reentrantly called

func (*Manager) CreateTableSink

func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) Sink

CreateTableSink creates a table sink

type Sink

type Sink interface {
	Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error

	// EmitRowChangedEvents sends Row Changed Event to Sink
	// EmitRowChangedEvents may write rows to downstream directly;
	EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error

	// EmitDDLEvent sends DDL Event to Sink
	// EmitDDLEvent should execute DDL to downstream synchronously
	EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error

	// FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream.
	// TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents`
	FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error)

	// EmitCheckpointTs sends CheckpointTs to Sink
	// TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully.
	EmitCheckpointTs(ctx context.Context, ts uint64) error

	// Close closes the Sink
	Close(ctx context.Context) error

	// Barrier is a synchronous function to wait all events to be flushed in underlying sink
	// Note once Barrier is called, the resolved ts won't be pushed until the Barrier call returns.
	Barrier(ctx context.Context) error
}

Sink is an abstraction for anything that a changefeed may emit into.

func NewSink

func NewSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string, filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error)

NewSink creates a new sink with the sink-uri

type Statistics

type Statistics struct {
	// contains filtered or unexported fields
}

Statistics maintains some status and metrics of the Sink

func NewStatistics

func NewStatistics(ctx context.Context, name string, opts map[string]string) *Statistics

NewStatistics creates a statistics

func (*Statistics) AddDDLCount

func (b *Statistics) AddDDLCount()

AddDDLCount records total number of ddl needs to flush

func (*Statistics) AddRowsCount

func (b *Statistics) AddRowsCount(count int)

AddRowsCount records total number of rows needs to flush

func (*Statistics) PrintStatus

func (b *Statistics) PrintStatus(ctx context.Context)

PrintStatus prints the status of the Sink

func (*Statistics) RecordBatchExecution

func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error

RecordBatchExecution records the cost time of batch execution and batch size

func (*Statistics) RecordDDLExecution

func (b *Statistics) RecordDDLExecution(executor func() error) error

RecordDDLExecution record the time cost of execute ddl

func (*Statistics) SubRowsCount

func (b *Statistics) SubRowsCount(count int)

SubRowsCount records total number of rows needs to flush

func (*Statistics) TotalRowsCount

func (b *Statistics) TotalRowsCount() uint64

TotalRowsCount returns total number of rows

type SyncpointStore

type SyncpointStore interface {
	// CreateSynctable create a table to record the syncpoints
	CreateSynctable(ctx context.Context) error

	// SinkSyncpoint record the syncpoint(a map with ts) in downstream db
	SinkSyncpoint(ctx context.Context, id string, checkpointTs uint64) error

	// Close closes the SyncpointSink
	Close() error
}

SyncpointStore is an abstraction for anything that a changefeed may emit into.

func NewSyncpointStore

func NewSyncpointStore(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string) (SyncpointStore, error)

NewSyncpointStore creates a new Spyncpoint sink with the sink-uri

Directories

Path Synopsis
pulsar
Package pulsar provider a pulsar based mq Producer implementation.
Package pulsar provider a pulsar based mq Producer implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL