Documentation
¶
Index ¶
- Constants
- Variables
- func AddTransformer(transformerName string, transformer SinkOption) error
- func BalanceBrackets(phrase string) bool
- func BuildIncludeMap(objects []string) (map[TableID]bool, error)
- func DefaultValue(c *changeitem.ColSchema) interface{}
- func GetTypeFromString(dataType string) (schema.Type, error)
- func IsMysqlBinaryType(in string) bool
- func KnownRuntime(r RuntimeType) bool
- func ParseFilter(rawFilter string) (*Table, *WhereStatement, error)
- func RegisterProviderName(providerType ProviderType, name string)
- func RegisterRuntime(r RuntimeType, f func(spec string) (Runtime, error))
- func Restore(column ColSchema, value interface{}) interface{}
- func RestoreChangeItems(batch []ChangeItem)
- func Rows(metrics metrics.Registry, table string, rows int)
- func ToYtSchema(original []ColSchema, fixAnyTypeInPrimaryKey bool) []schema.Column
- func TrimMySQLType(rawColumnType string) string
- func ValidateChangeItem(changeItem *ChangeItem) error
- func ValidateChangeItems(changeItems []ChangeItem) error
- func ValidateChangeItemsPtrs(changeItems []*ChangeItem) error
- type Activate
- type AddTables
- type AsyncMiddleware
- type AsyncSink
- type Asyncer
- type Block
- type ChangeItem
- func MakeDoneTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, ...) []ChangeItem
- func MakeInitTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, ...) []ChangeItem
- func MakeSynchronizeEvent() ChangeItem
- func MakeTxDone(txSequence uint32, lsn uint64, execTS time.Time, ...) ChangeItem
- func UnmarshalChangeItem(changeItemBuf []byte) (*ChangeItem, error)
- func UnmarshalChangeItems(changeItemsBuf []byte) ([]ChangeItem, error)
- type CheckResult
- type CheckType
- type Checksum
- type CleanupResource
- type Closeable
- type ColSchema
- type ColumnInfo
- type ColumnName
- type ColumnSchema
- type ColumnType
- type Committable
- type DBSchema
- type DateColumn
- type Deactivate
- type EmptyIoCloser
- type EndpointDelete
- type EphemeralTask
- type EventSize
- type FakeTask
- type FakeVisitor
- type FastTableSchema
- type Fetchable
- type Filters
- type HomoValuer
- type IncludeTableList
- type Includeable
- type IncrementalStorage
- type IncrementalTable
- type Kind
- type LfLineSplitter
- type LimitedResourceRuntime
- type LoadProgress
- type LocalRuntime
- func (l *LocalRuntime) CurrentJobIndex() int
- func (l *LocalRuntime) IsMain() bool
- func (l *LocalRuntime) NeedRestart(runtime Runtime) bool
- func (l *LocalRuntime) SetVersion(runtimeSpecificVersion string, versionProperties *string) error
- func (l *LocalRuntime) ThreadsNumPerWorker() int
- func (*LocalRuntime) Type() RuntimeType
- func (l *LocalRuntime) Validate() error
- func (l *LocalRuntime) WithDefaults()
- func (l *LocalRuntime) WorkersNum() int
- type LogPosition
- type Middleware
- type MonitorableSlot
- type Movable
- type OldKeysType
- type Partition
- type PositionalStorage
- type Progress
- type PropertyKey
- type ProviderType
- type Pusher
- type ReUpload
- type RegularSnapshot
- type RemoveTables
- type Replication
- type Restart
- type RetriablePartUploadError
- type RunnableTask
- type RunnableVisitor
- type Runtime
- type RuntimeType
- type SampleableStorage
- type ScheduledTask
- type SchemaStorage
- type ShardUploadParams
- type ShardableTask
- type ShardingContextStorage
- type ShardingStorage
- type ShardingTaskRuntime
- type SinkOption
- type Sinker
- type SlotKiller
- type SnapshotableStorage
- type Source
- type SourceReader
- type Start
- type Stop
- type Storage
- type StubSlotKiller
- type Table
- type TableColumns
- type TableDescription
- type TableID
- type TableInfo
- type TableMap
- type TablePartID
- type TableSchema
- type TableSplitter
- type TableUploadError
- type Task
- type TaskType
- func (t *TaskType) DecodeText(ci *pgtype.ConnInfo, src []byte) error
- func (t TaskType) Description(taskParams interface{}) string
- func (t TaskType) EncodeText(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error)
- func (t *TaskType) GobDecode(data []byte) error
- func (t TaskType) GobEncode() ([]byte, error)
- func (t TaskType) MarshalJSON() ([]byte, error)
- func (t TaskType) NewParams() interface{}
- func (t TaskType) String() string
- func (t *TaskType) UnmarshalJSON(data []byte) error
- type TaskTypeName
- type TaskVisitor
- type Termination
- type TestEndpoint
- type TestResult
- type TimeoutableTask
- type TimestampCol
- type Transfer
- type TransferCreate
- type TransferDelete
- type TransferType
- type TransferVersionFreeze
- type TransferVersionFreezeParams
- type TransferVersionUnfreeze
- type TransferVersionUnfreezeParams
- type TransferVersionUpdate
- type TransferVersionUpdateParams
- type Transformation
- type TransformationRuntimeOpts
- type Transformer
- type TransformerError
- type TransformerResult
- type TransformerType
- type TxBound
- type TypedChangeItem
- type TypedValue
- type UpdateTransfer
- type UpdateTransferParams
- type Upload
- type Verify
- type WhereStatement
- type YtCluster
Constants ¶
const ( ProviderTypeMock = ProviderType("mock") ProviderTypeNone = ProviderType("none") )
const ( TransferTypeNone = TransferType("TRANSFER_TYPE_UNSPECIFIED") TransferTypeSnapshotAndIncrement = TransferType("SNAPSHOT_AND_INCREMENT") TransferTypeSnapshotOnly = TransferType("SNAPSHOT_ONLY") TransferTypeIncrementOnly = TransferType("INCREMENT_ONLY") )
const (
LocalRuntimeType = RuntimeType("local")
)
const OriginalTypeMirrorBinary = changeitem.OriginalTypeMirrorBinary
const TableConsumerKeeper = changeitem.TableConsumerKeeper
const TableLSN = changeitem.TableLSN
const TestVersion = 2
TestVersion is the version of unit tests that this will pass
Variables ¶
var AllTasks []Task = makeAllTasks()
var AsyncPushConcurrencyErr = xerrors.NewSentinel("AsyncPush is called after Close")
AsyncPushConcurrencyErr indicates a Push has been called on an already closed AsyncSink. This must not happen and means there are concurrency issues in the implementation of a source.
var ChCreateTableDistributedKind = changeitem.ChCreateTableDistributedKind
var ChCreateTableKind = changeitem.ChCreateTableKind
var ChangeItemFromMap = changeitem.ChangeItemFromMap
var CheckErrorWrapping = dterrors.CheckErrorWrapping
var CheckOpaqueErrorWrapping = dterrors.CheckOpaqueErrorWrapping
var ClickhouseDDLBuilderKind = changeitem.ClickhouseDDLBuilderKind
var ColIDX = changeitem.ColIDX
var Collapse = changeitem.Collapse
var ContainsNonRowItem = changeitem.ContainsNonRowItem
var DDLKind = changeitem.DDLKind
var DeleteKind = changeitem.DeleteKind
var DoneShardedTableLoad = changeitem.DoneShardedTableLoad
var DoneTableLoad = changeitem.DoneTableLoad
var DropTableKind = changeitem.DropTableKind
var Dump = changeitem.Dump
var ElasticsearchDumpIndexKind = changeitem.ElasticsearchDumpIndexKind
var EmptyEventSize = changeitem.EmptyEventSize
var EmptyOldKeys = changeitem.EmptyOldKeys
var FakeTasks []FakeTask = makeFakeTasks()
var FindItemOfKind = changeitem.FindItemOfKind
var GetRawMessageData = changeitem.GetRawMessageData
var InitShardedTableLoad = changeitem.InitShardedTableLoad
var InitTableLoad = changeitem.InitTableLoad
var InsertKind = changeitem.InsertKind
var IsFatal = dterrors.IsFatal
var IsNonShardableError = dterrors.IsNonShardableError
var IsRetriablePartUploadError = dterrors.IsRetriablePartUploadError
var IsSystemTable = changeitem.IsSystemTable
var IsTableUploadError = dterrors.IsTableUploadError
var KeyNames = changeitem.KeyNames
var MakeFastTableSchema = changeitem.MakeFastTableSchema
var MakeMapColNameToIndex = changeitem.MakeMapColNameToIndex
var MakeOriginallyTypedColSchema = changeitem.MakeOriginallyTypedColSchema
var MakeRawMessage = changeitem.MakeRawMessage
var MakeTypedColSchema = changeitem.MakeTypedColSchema
var MongoCreateKind = changeitem.MongoCreateKind
var MongoDropDatabaseKind = changeitem.MongoDropDatabaseKind
var MongoDropKind = changeitem.MongoDropKind
var MongoNoop = changeitem.MongoNoop
var MongoRenameKind = changeitem.MongoRenameKind
var MongoUpdateDocumentKind = changeitem.MongoUpdateDocumentKind
var NewColSchema = changeitem.NewColSchema
var NewFatalError = dterrors.NewFatalError
var NewNonShardableError = dterrors.NewNonShardableError
var NewPartition = changeitem.NewPartition
var NewRetriablePartUploadError = dterrors.NewRetriablePartUploadError
var NewStrictifyError = strictify.NewStrictifyError
var NewTableID = changeitem.NewTableID
var NewTableSchema = changeitem.NewTableSchema
var NewTableUploadError = dterrors.NewTableUploadError
var PgDDLKind = changeitem.PgDDLKind
var PgName = changeitem.PgName
var RawDataColsIDX = changeitem.RawDataColsIDX
var RawDataSchema = changeitem.RawDataSchema
var RawEventSize = changeitem.RawEventSize
var RawMessagePartition = changeitem.RawMessagePartition
var RawMessageSeqNo = changeitem.RawMessageSeqNo
var RawMessageTopic = changeitem.RawMessageTopic
var RawMessageWriteTime = changeitem.RawMessageWriteTime
var RegisterSystemTables = changeitem.RegisterSystemTables
var RunnableTasks []RunnableTask = makeRunnableTasks()
var Sniff = changeitem.Sniff
var SplitByID = changeitem.SplitByID
var SplitByTableID = changeitem.SplitByTableID
var SplitUpdatedPKeys = changeitem.SplitUpdatedPKeys
var SynchronizeKind = changeitem.SynchronizeKind
var TaskTypeByName map[TaskTypeName]TaskType = makeTaskTypeByNameMap()
var TruncateTableKind = changeitem.TruncateTableKind
var UpdateKind = changeitem.UpdateKind
Functions ¶
func AddTransformer ¶
func AddTransformer(transformerName string, transformer SinkOption) error
func BalanceBrackets ¶
func DefaultValue ¶
func DefaultValue(c *changeitem.ColSchema) interface{}
DefaultValue returns a default instance of the type represented by this schema. This method only works safely in heterogenous transfers.
func IsMysqlBinaryType ¶
func KnownRuntime ¶
func KnownRuntime(r RuntimeType) bool
func ParseFilter ¶
func ParseFilter(rawFilter string) (*Table, *WhereStatement, error)
func RegisterProviderName ¶
func RegisterProviderName(providerType ProviderType, name string)
func RegisterRuntime ¶
func RegisterRuntime(r RuntimeType, f func(spec string) (Runtime, error))
func RestoreChangeItems ¶
func RestoreChangeItems(batch []ChangeItem)
func ToYtSchema ¶
func TrimMySQLType ¶
func ValidateChangeItem ¶
func ValidateChangeItem(changeItem *ChangeItem) error
func ValidateChangeItems ¶
func ValidateChangeItems(changeItems []ChangeItem) error
func ValidateChangeItemsPtrs ¶
func ValidateChangeItemsPtrs(changeItems []*ChangeItem) error
Types ¶
type Activate ¶
type Activate struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Activate) Visit ¶
func (t Activate) Visit(v TaskVisitor) interface{}
func (Activate) VisitRunnable ¶
func (t Activate) VisitRunnable(v RunnableVisitor) interface{}
type AddTables ¶
type AddTables struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (AddTables) Visit ¶
func (t AddTables) Visit(v TaskVisitor) interface{}
func (AddTables) VisitRunnable ¶
func (t AddTables) VisitRunnable(v RunnableVisitor) interface{}
type AsyncMiddleware ¶
func ComposeAsyncMiddleware ¶
func ComposeAsyncMiddleware(mw ...AsyncMiddleware) AsyncMiddleware
ComposeAsyncMiddleware builds a pipeline of AsyncMiddlewares. Arguments order should be the same as the desired data flow. ComposeAsyncMiddleware(A, B, C)(sink) call is equivalent to A(B(C(sink)))
type AsyncSink ¶
type AsyncSink interface {
io.Closer
// AsyncPush writes items asynchronously. The error for the given batch of items will be written into the resulting channel when an underlying (synchronous) Push actually happens.
// Note, that AsyncPush takes ownership on slice `items`, so it shouldn't be further used.
AsyncPush(items []ChangeItem) chan error
}
AsyncSink provides asynchronous Push operation, which should be a wrapper over synchronous Push implemented by sink.
All of its methods may be called concurrently.
type ChangeItem ¶
type ChangeItem = changeitem.ChangeItem
func MakeDoneTableLoad ¶
func MakeDoneTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, tableSchema *TableSchema) []ChangeItem
func MakeInitTableLoad ¶
func MakeInitTableLoad(pos LogPosition, table TableDescription, commitTime time.Time, tableSchema *TableSchema) []ChangeItem
func MakeSynchronizeEvent ¶
func MakeSynchronizeEvent() ChangeItem
func MakeTxDone ¶
func UnmarshalChangeItem ¶
func UnmarshalChangeItem(changeItemBuf []byte) (*ChangeItem, error)
func UnmarshalChangeItems ¶
func UnmarshalChangeItems(changeItemsBuf []byte) ([]ChangeItem, error)
type CheckResult ¶
CheckResult describe particular check result that was performed against endpoint / transfer
type Checksum ¶
type Checksum struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Checksum) Visit ¶
func (t Checksum) Visit(v TaskVisitor) interface{}
func (Checksum) VisitRunnable ¶
func (t Checksum) VisitRunnable(v RunnableVisitor) interface{}
type CleanupResource ¶
type CleanupResource struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (CleanupResource) Visit ¶
func (t CleanupResource) Visit(v TaskVisitor) interface{}
func (CleanupResource) VisitRunnable ¶
func (t CleanupResource) VisitRunnable(v RunnableVisitor) interface{}
type ColSchema ¶
type ColSchema = changeitem.ColSchema
type ColumnInfo ¶
type ColumnInfo struct {
Name string
Type *ColumnType
Nullable bool
}
type ColumnName ¶
type ColumnName = changeitem.ColumnName
type ColumnSchema ¶
type ColumnType ¶
type ColumnType interface {
GetRepresentation() string
}
type Committable ¶
type Committable interface {
Sinker
// Commit commits all changes made by all sinks during the operation of this transfer.
// This can be implemented in the future for CH and S3. To implement atomicity, you need to use a transaction that
// captures all actions with destination DB during the snapshot. This transaction must be committed when calling Commit().
Commit() error
}
Committable is a sinker that needs to be completed after snapshot.
type DBSchema ¶
type DBSchema = changeitem.DBSchema
type DateColumn ¶
type DateColumn struct {
From, To, Nano string
}
type Deactivate ¶
type Deactivate struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Deactivate) Visit ¶
func (t Deactivate) Visit(v TaskVisitor) interface{}
func (Deactivate) VisitRunnable ¶
func (t Deactivate) VisitRunnable(v RunnableVisitor) interface{}
type EmptyIoCloser ¶
type EmptyIoCloser struct{} // stub implementation of io.Closer
func NewEmptyIoCloser ¶
func NewEmptyIoCloser() *EmptyIoCloser
func (*EmptyIoCloser) Close ¶
func (s *EmptyIoCloser) Close() error
type EndpointDelete ¶
type EndpointDelete struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (EndpointDelete) Visit ¶
func (t EndpointDelete) Visit(v TaskVisitor) interface{}
func (EndpointDelete) VisitFake ¶
func (t EndpointDelete) VisitFake(v FakeVisitor) interface{}
type EphemeralTask ¶
type EphemeralTask interface {
Task
// contains filtered or unexported methods
}
type EventSize ¶
type EventSize = changeitem.EventSize
type FakeTask ¶
type FakeTask interface {
Task
VisitFake(visitor FakeVisitor) interface{}
// contains filtered or unexported methods
}
type FakeVisitor ¶
type FakeVisitor interface {
OnEndpointDelete(t EndpointDelete) interface{}
OnTransferCreate(t TransferCreate) interface{}
OnTransferDelete(t TransferDelete) interface{}
OnReplication(t Replication) interface{}
OnTermination(t Termination) interface{}
OnTransferVersionUpdate(t TransferVersionUpdate) interface{}
OnTransferVersionFreeze(t TransferVersionFreeze) interface{}
OnTransferVersionUnfreeze(t TransferVersionUnfreeze) interface{}
}
type FastTableSchema ¶
type FastTableSchema = changeitem.FastTableSchema
type Fetchable ¶
type Fetchable interface {
Fetch() ([]ChangeItem, error)
}
type HomoValuer ¶
type HomoValuer interface {
HomoValue() any
}
HomoValuer is the same as sql/driver.Valuer, but for homogenous values
type IncludeTableList ¶
type IncludeTableList interface {
Includeable
IncludeTableList() ([]TableID, error)
}
type Includeable ¶
type Includeable interface {
// Include returns true if the given table is included
Include(tID TableID) bool
}
func NewIntersectionIncludeable ¶
func NewIntersectionIncludeable(a, b Includeable) Includeable
type IncrementalStorage ¶
type IncrementalStorage interface {
GetIncrementalState(ctx context.Context, incremental []IncrementalTable) ([]TableDescription, error)
SetInitialState(tables []TableDescription, incremental []IncrementalTable)
}
type IncrementalTable ¶
type IncrementalTable struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
CursorField string `yaml:"cursor_field"`
InitialState string `yaml:"initial_state"`
}
func (IncrementalTable) Initialized ¶
func (t IncrementalTable) Initialized() bool
func (IncrementalTable) TableID ¶
func (t IncrementalTable) TableID() TableID
type Kind ¶
type Kind = changeitem.Kind
type LfLineSplitter ¶
type LfLineSplitter string
const ( LfLineSplitterNewLine LfLineSplitter = "\n" LfLineSplitterDoNotSplit LfLineSplitter = "do-not-split" LfLineSplitterProtoseq LfLineSplitter = "protoseq" LfLineSplitterOtelLogsProto LfLineSplitter = "otel-logs-proto" )
type LimitedResourceRuntime ¶
type LoadProgress ¶
type LoadProgress func(current, progress, total uint64)
type LocalRuntime ¶
type LocalRuntime struct {
Host string
CurrentJob int
ShardingUpload ShardUploadParams
}
func (*LocalRuntime) CurrentJobIndex ¶
func (l *LocalRuntime) CurrentJobIndex() int
func (*LocalRuntime) IsMain ¶
func (l *LocalRuntime) IsMain() bool
func (*LocalRuntime) NeedRestart ¶
func (l *LocalRuntime) NeedRestart(runtime Runtime) bool
func (*LocalRuntime) SetVersion ¶
func (l *LocalRuntime) SetVersion(runtimeSpecificVersion string, versionProperties *string) error
func (*LocalRuntime) ThreadsNumPerWorker ¶
func (l *LocalRuntime) ThreadsNumPerWorker() int
func (*LocalRuntime) Type ¶
func (*LocalRuntime) Type() RuntimeType
func (*LocalRuntime) Validate ¶
func (l *LocalRuntime) Validate() error
func (*LocalRuntime) WithDefaults ¶
func (l *LocalRuntime) WithDefaults()
func (*LocalRuntime) WorkersNum ¶
func (l *LocalRuntime) WorkersNum() int
type LogPosition ¶
type Middleware ¶
type MonitorableSlot ¶
type Movable ¶
type Movable interface {
Sinker
// Move moves (renames) the given source table into the given destination table
Move(ctx context.Context, src, dst TableID) error
}
Movable is a sinker which can move tables. This interface allows to use temporator middleware.
type OldKeysType ¶
type OldKeysType = changeitem.OldKeysType
type Partition ¶
type Partition = changeitem.Partition
type PositionalStorage ¶
type PositionalStorage interface {
// Position provide info about snapshot read position
Position(ctx context.Context) (*LogPosition, error)
}
PositionalStorage some storages may provide specific position for snapshot consistency
type PropertyKey ¶
type PropertyKey = changeitem.PropertyKey
type ProviderType ¶
type ProviderType string
func (ProviderType) Name ¶
func (p ProviderType) Name() string
type Pusher ¶
type Pusher func(items []ChangeItem) error
func PusherFromAsyncSink ¶
PusherFromAsyncSink wraps the given sink into a (synchronous) pusher interface
type ReUpload ¶
type ReUpload struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (ReUpload) Visit ¶
func (t ReUpload) Visit(v TaskVisitor) interface{}
func (ReUpload) VisitRunnable ¶
func (t ReUpload) VisitRunnable(v RunnableVisitor) interface{}
type RegularSnapshot ¶
type RegularSnapshot struct {
Enabled bool `json:"Enabled" yaml:"enabled"`
Interval time.Duration `json:"Interval" yaml:"interval"`
CronExpression string `json:"CronExpression" yaml:"cron_expression"`
IncrementDelaySeconds int64 `json:"IncrementDelaySeconds" yaml:"increment_delay_seconds"`
Incremental []IncrementalTable `json:"Incremental" yaml:"incremental"`
}
type RemoveTables ¶
type RemoveTables struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (RemoveTables) Visit ¶
func (t RemoveTables) Visit(v TaskVisitor) interface{}
func (RemoveTables) VisitRunnable ¶
func (t RemoveTables) VisitRunnable(v RunnableVisitor) interface{}
type Replication ¶
type Replication struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (Replication) Visit ¶
func (t Replication) Visit(v TaskVisitor) interface{}
func (Replication) VisitFake ¶
func (t Replication) VisitFake(v FakeVisitor) interface{}
type Restart ¶
type Restart struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Restart) Visit ¶
func (t Restart) Visit(v TaskVisitor) interface{}
func (Restart) VisitRunnable ¶
func (t Restart) VisitRunnable(v RunnableVisitor) interface{}
type RetriablePartUploadError ¶
type RetriablePartUploadError = dterrors.RetriablePartUploadError
type RunnableTask ¶
type RunnableTask interface {
Task
VisitRunnable(visitor RunnableVisitor) interface{}
// contains filtered or unexported methods
}
type RunnableVisitor ¶
type RunnableVisitor interface {
OnActivate(t Activate) interface{}
OnAddTables(t AddTables) interface{}
OnUpdateTransfer(t UpdateTransfer) interface{}
OnChecksum(t Checksum) interface{}
OnDeactivate(t Deactivate) interface{}
OnCleanupResource(t CleanupResource) interface{}
OnReUpload(t ReUpload) interface{}
OnRemoveTables(t RemoveTables) interface{}
OnRestart(t Restart) interface{}
OnStart(t Start) interface{}
OnStop(t Stop) interface{}
OnUpload(t Upload) interface{}
OnVerify(t Verify) interface{}
OnTestEndpoint(t TestEndpoint) interface{}
}
type Runtime ¶
type Runtime interface {
NeedRestart(runtime Runtime) bool
WithDefaults()
Validate() error
Type() RuntimeType
SetVersion(runtimeSpecificVersion string, versionProperties *string) error
}
func NewRuntime ¶
func NewRuntime(runtime RuntimeType, runtimeSpec string) (Runtime, error)
type RuntimeType ¶
type RuntimeType string
type SampleableStorage ¶
type SampleableStorage interface {
Storage
TableSizeInBytes(table TableID) (uint64, error)
LoadTopBottomSample(table TableDescription, pusher Pusher) error
LoadRandomSample(table TableDescription, pusher Pusher) error
LoadSampleBySet(table TableDescription, keySet []map[string]interface{}, pusher Pusher) error
TableAccessible(table TableDescription) bool
}
SampleableStorage is for dataplane tests
type ScheduledTask ¶
type ScheduledTask interface {
Stop()
Runtime() Runtime
}
type SchemaStorage ¶
SchemaStorage allow to resolve DB Schema from storage
type ShardUploadParams ¶
Parallelism params
func DefaultShardUploadParams ¶
func DefaultShardUploadParams() *ShardUploadParams
func NewShardUploadParams ¶
func NewShardUploadParams(jobCount int, processCount int) *ShardUploadParams
type ShardableTask ¶
type ShardableTask interface {
Task
// contains filtered or unexported methods
}
type ShardingContextStorage ¶
type ShardingContextStorage interface {
// ShardingContext Return shared data, used on *MAIN* worker;
// Take care, method return OperationState_ShardedUploadState, but only fill field Context;
// Because type of Context is private, this is protoc thing;
ShardingContext() ([]byte, error)
// SetShardingContext for storage, used on *SECONDARY* worker
SetShardingContext(shardedState []byte) error
}
Storage has data, that need to be shared with all workers
type ShardingStorage ¶
type ShardingStorage interface {
ShardTable(ctx context.Context, table TableDescription) ([]TableDescription, error)
}
ShardingStorage is for in table sharding
type ShardingTaskRuntime ¶
type SinkOption ¶
TODO: Drop by making transformers a common middleware
func GetTransformers ¶
func GetTransformers(middlewareNames []string) ([]SinkOption, error)
type Sinker ¶
type Sinker interface {
io.Closer
// Push writes the given items into destination synchronously. If its result is nil, the items are considered to be successfully written to the destination.
// The method must be retriable: it can be called again after it returns an error, except for fatal errors (these must be wrapped in a particular struct)
Push(items []ChangeItem) error
}
Sinker is the destination's data writer interface.
All its methods are guaranteed to be called non-concurrently (synchronously).
TODO: rename to Sink
type SlotKiller ¶
type SlotKiller interface {
KillSlot() error
}
func MakeStubSlotKiller ¶
func MakeStubSlotKiller() SlotKiller
type SnapshotableStorage ¶
type SourceReader ¶
type Start ¶
type Start struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Start) Visit ¶
func (t Start) Visit(v TaskVisitor) interface{}
func (Start) VisitRunnable ¶
func (t Start) VisitRunnable(v RunnableVisitor) interface{}
type Stop ¶
type Stop struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Stop) Visit ¶
func (t Stop) Visit(v TaskVisitor) interface{}
func (Stop) VisitRunnable ¶
func (t Stop) VisitRunnable(v RunnableVisitor) interface{}
type Storage ¶
type Storage interface {
Closeable
Ping() error
LoadTable(ctx context.Context, table TableDescription, pusher Pusher) error
TableSchema(ctx context.Context, table TableID) (*TableSchema, error)
TableList(filter IncludeTableList) (TableMap, error)
ExactTableRowsCount(table TableID) (uint64, error)
EstimateTableRowsCount(table TableID) (uint64, error)
TableExists(table TableID) (bool, error)
}
Storage is for simple storage implementations For extra functionalities implement below storages.
type StubSlotKiller ¶
type StubSlotKiller struct {
}
func (*StubSlotKiller) KillSlot ¶
func (k *StubSlotKiller) KillSlot() error
type TableColumns ¶
type TableColumns = changeitem.TableColumns
type TableDescription ¶
type TableDescription struct {
Name string
Schema string // for example - for mysql here are database name
Filter WhereStatement
EtaRow uint64 // estimated number of rows in the table
Offset uint64 // offset (in rows) along the ordering key (not necessary primary key)
}
func (*TableDescription) Fqtn ¶
func (t *TableDescription) Fqtn() string
func (*TableDescription) ID ¶
func (t *TableDescription) ID() TableID
func (*TableDescription) PartID ¶
func (t *TableDescription) PartID() string
func (*TableDescription) Same ¶
func (t *TableDescription) Same(table string) bool
func (*TableDescription) String ¶
func (t *TableDescription) String() string
type TableID ¶
type TableID = changeitem.TableID
var NonExistentTableID TableID = *NewTableID("", "")
func NewTableIDFromStringPg ¶
NewTableIDFromStringPg parses the given FQTN in PostgreSQL syntax to construct a TableID.
func ParseTableID ¶
func ParseTableIDs ¶
func TableIDsIntersection ¶
TableIDsIntersection returns an intersection of two lists of TableIDs
type TableInfo ¶
type TableInfo struct {
EtaRow uint64
IsView bool
Schema *TableSchema
}
type TableMap ¶
func (*TableMap) ConvertToTableDescriptions ¶
func (m *TableMap) ConvertToTableDescriptions() []TableDescription
func (*TableMap) FakePkeyTables ¶
func (*TableMap) NoKeysTables ¶
func (*TableMap) ToDBSchema ¶
type TablePartID ¶
type TablePartID = changeitem.TablePartID
type TableSchema ¶
type TableSchema = changeitem.TableSchema
type TableSplitter ¶
type TableSplitter struct {
Columns []string
}
type TableUploadError ¶
type TableUploadError = dterrors.TableUploadError
type Task ¶
type Task interface {
Visit(visitor TaskVisitor) interface{}
// contains filtered or unexported methods
}
type TaskType ¶
type TaskType struct {
Task
}
func (TaskType) Description ¶
func (TaskType) EncodeText ¶
func (TaskType) MarshalJSON ¶
func (*TaskType) UnmarshalJSON ¶
type TaskTypeName ¶
type TaskTypeName = string
type Termination ¶
type Termination struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (Termination) Visit ¶
func (t Termination) Visit(v TaskVisitor) interface{}
func (Termination) VisitFake ¶
func (t Termination) VisitFake(v FakeVisitor) interface{}
type TestEndpoint ¶
type TestEndpoint struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (TestEndpoint) HasTimedOut ¶
func (TestEndpoint) Visit ¶
func (t TestEndpoint) Visit(v TaskVisitor) interface{}
func (TestEndpoint) VisitRunnable ¶
func (t TestEndpoint) VisitRunnable(v RunnableVisitor) interface{}
type TestResult ¶
type TestResult struct {
Checks map[CheckType]CheckResult
Schema TableMap
Preview map[TableID][]ChangeItem
}
TestResult aggregated result of test for endpoint
func NewTestResult ¶
func NewTestResult(checks ...CheckType) *TestResult
func (*TestResult) Add ¶
func (t *TestResult) Add(extraChecks ...CheckType)
func (*TestResult) Combine ¶
func (t *TestResult) Combine(partialResults *TestResult)
Combine combines the two checkResult maps into one
func (*TestResult) Err ¶
func (t *TestResult) Err() error
func (*TestResult) NotOk ¶
func (t *TestResult) NotOk(checkType CheckType, err error) *TestResult
func (*TestResult) Ok ¶
func (t *TestResult) Ok(checkType CheckType) *TestResult
type TimeoutableTask ¶
type TimestampCol ¶
type Transfer ¶
type Transfer interface {
// Start the transfer. Retry endlessly when errors occur, until stopped with Stop().
// This method does not block, the work is done in the background.
Start()
// Stop the transfer. May be called multiple times even without prior Start() to clean
// up external resources, e.g. terminate YT operations. Synchronous, i.e. blocks
// until either all resources are released or an error occurrs.
Stop() error
Runtime() Runtime
Error() error
}
type TransferCreate ¶
type TransferCreate struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferCreate) Visit ¶
func (t TransferCreate) Visit(v TaskVisitor) interface{}
func (TransferCreate) VisitFake ¶
func (t TransferCreate) VisitFake(v FakeVisitor) interface{}
type TransferDelete ¶
type TransferDelete struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferDelete) Visit ¶
func (t TransferDelete) Visit(v TaskVisitor) interface{}
func (TransferDelete) VisitFake ¶
func (t TransferDelete) VisitFake(v FakeVisitor) interface{}
type TransferType ¶
type TransferType string
func (*TransferType) Expand ¶
func (t *TransferType) Expand() []TransferType
type TransferVersionFreeze ¶
type TransferVersionFreeze struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferVersionFreeze) Visit ¶
func (t TransferVersionFreeze) Visit(v TaskVisitor) interface{}
func (TransferVersionFreeze) VisitFake ¶
func (t TransferVersionFreeze) VisitFake(v FakeVisitor) interface{}
type TransferVersionFreezeParams ¶
type TransferVersionFreezeParams struct {
CurrentVersion string `json:"current_version"`
}
type TransferVersionUnfreeze ¶
type TransferVersionUnfreeze struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferVersionUnfreeze) Visit ¶
func (t TransferVersionUnfreeze) Visit(v TaskVisitor) interface{}
func (TransferVersionUnfreeze) VisitFake ¶
func (t TransferVersionUnfreeze) VisitFake(v FakeVisitor) interface{}
type TransferVersionUnfreezeParams ¶
type TransferVersionUnfreezeParams struct {
CurrentVersion string `json:"current_version"`
}
type TransferVersionUpdate ¶
type TransferVersionUpdate struct {
// contains filtered or unexported fields
}
Fake tasks {{{
func (TransferVersionUpdate) Visit ¶
func (t TransferVersionUpdate) Visit(v TaskVisitor) interface{}
func (TransferVersionUpdate) VisitFake ¶
func (t TransferVersionUpdate) VisitFake(v FakeVisitor) interface{}
type Transformation ¶
type Transformation interface {
MakeSinkMiddleware() SinkOption
AddTransformer(transformer Transformer) error
RuntimeOpts() TransformationRuntimeOpts
}
type TransformationRuntimeOpts ¶
type TransformationRuntimeOpts struct {
JobIndex int
}
type Transformer ¶
type Transformer interface {
Apply(input []ChangeItem) TransformerResult
Suitable(table TableID, schema *TableSchema) bool
ResultSchema(original *TableSchema) (*TableSchema, error)
Description() string
Type() TransformerType
}
type TransformerError ¶
type TransformerError struct {
Input ChangeItem
Error error
}
type TransformerResult ¶
type TransformerResult struct {
Transformed []ChangeItem
Errors []TransformerError
}
type TransformerType ¶
type TransformerType string
type TxBound ¶
type TxBound = changeitem.TxBound
type TypedChangeItem ¶
type TypedChangeItem ChangeItem
func (*TypedChangeItem) MarshalJSON ¶
func (t *TypedChangeItem) MarshalJSON() ([]byte, error)
func (*TypedChangeItem) UnmarshalJSON ¶
func (t *TypedChangeItem) UnmarshalJSON(data []byte) error
type TypedValue ¶
type UpdateTransfer ¶
type UpdateTransfer struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (UpdateTransfer) Visit ¶
func (t UpdateTransfer) Visit(v TaskVisitor) interface{}
func (UpdateTransfer) VisitRunnable ¶
func (t UpdateTransfer) VisitRunnable(v RunnableVisitor) interface{}
type UpdateTransferParams ¶
type UpdateTransferParams struct {
OldObjects []string `json:"old_objects"`
NewObjects []string `json:"new_objects"`
}
func (UpdateTransferParams) AddedTables ¶
func (p UpdateTransferParams) AddedTables() ([]TableDescription, error)
type Upload ¶
type Upload struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Upload) Visit ¶
func (t Upload) Visit(v TaskVisitor) interface{}
func (Upload) VisitRunnable ¶
func (t Upload) VisitRunnable(v RunnableVisitor) interface{}
type Verify ¶
type Verify struct {
// contains filtered or unexported fields
}
Runnable tasks {{{
func (Verify) HasTimedOut ¶
func (Verify) Visit ¶
func (t Verify) Visit(v TaskVisitor) interface{}
func (Verify) VisitRunnable ¶
func (t Verify) VisitRunnable(v RunnableVisitor) interface{}
type WhereStatement ¶
type WhereStatement string
const NoFilter WhereStatement = WhereStatement("")
func FiltersIntersection ¶
func FiltersIntersection(a WhereStatement, b WhereStatement) WhereStatement
func NotStatement ¶
func NotStatement(a WhereStatement) WhereStatement
Source Files
¶
- async_sink.go
- change_item.go
- change_item_builders.go
- closeable.go
- committable.go
- errors.go
- filter.go
- homo_valuer.go
- includeable.go
- local_runtime.go
- metrics.go
- middleware.go
- model.go
- movable.go
- operations.go
- parsers.go
- provider_type.go
- regular_snapshot.go
- restore.go
- runtime.go
- sink.go
- slot_monitor.go
- source.go
- storage.go
- strictify.go
- task_type.go
- test_result.go
- transfer.go
- transfer_type.go
- transformer.go
- type.go
- typed_change_item.go
- validator.go