ingest

package
v0.0.0-...-503c688 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobCheckpointVersionCurrent = JobCheckpointVersion1
	JobCheckpointVersion1       = 1
)

JobCheckpointVersionCurrent is the current version of the checkpoint.

View Source
const (
	LitErrAllocMemFail      string = "[ddl-ingest] allocate memory failed"
	LitErrCreateDirFail     string = "[ddl-ingest] create ingest sort path error"
	LitErrStatDirFail       string = "[ddl-ingest] stat ingest sort path error"
	LitErrDeleteDirFail     string = "[ddl-ingest] delete ingest sort path error"
	LitErrCreateBackendFail string = "[ddl-ingest] build ingest backend failed"
	LitErrGetBackendFail    string = "[ddl-ingest] cannot get ingest backend"
	LitErrCreateEngineFail  string = "[ddl-ingest] build ingest engine failed"
	LitErrCreateContextFail string = "[ddl-ingest] build ingest writer context failed"
	LitErrGetEngineFail     string = "[ddl-ingest] can not get ingest engine info"
	LitErrGetStorageQuota   string = "[ddl-ingest] get storage quota error"
	LitErrCloseEngineErr    string = "[ddl-ingest] close engine error"
	LitErrCleanEngineErr    string = "[ddl-ingest] clean engine error"
	LitErrFlushEngineErr    string = "[ddl-ingest] flush engine data err"
	LitErrIngestDataErr     string = "[ddl-ingest] ingest data into storage error"
	LitErrRemoteDupExistErr string = "[ddl-ingest] remote duplicate index key exist"
	LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than ingest limit"
	LitErrUpdateDiskStats   string = "[ddl-ingest] update disk usage error"
	LitWarnEnvInitFail      string = "[ddl-ingest] initialize environment failed"
	LitWarnConfigError      string = "[ddl-ingest] build config for backend failed"
	LitInfoEnvInitSucc      string = "[ddl-ingest] init global ingest backend environment finished"
	LitInfoSortDir          string = "[ddl-ingest] the ingest sorted directory"
	LitInfoCreateBackend    string = "[ddl-ingest] create one backend for an DDL job"
	LitInfoCloseBackend     string = "[ddl-ingest] close one backend for DDL job"
	LitInfoOpenEngine       string = "[ddl-ingest] open an engine for index reorg task"
	LitInfoAddWriter        string = "[ddl-ingest] reuse engine and add a writer for index reorg task"
	LitInfoCreateWrite      string = "[ddl-ingest] create one local writer for index reorg task"
	LitInfoCloseEngine      string = "[ddl-ingest] flush all writer and get closed engine"
	LitInfoRemoteDupCheck   string = "[ddl-ingest] start remote duplicate checking"
	LitInfoStartImport      string = "[ddl-ingest] start to import data"
	LitInfoChgMemSetting    string = "[ddl-ingest] change memory setting for ingest"
	LitInfoInitMemSetting   string = "[ddl-ingest] initial memory setting for ingest"
	LitInfoUnsafeImport     string = "[ddl-ingest] do a partial import data into the storage"
	LitErrCloseWriterErr    string = "[ddl-ingest] close writer error"
)

Message const text

Variables

View Source
var (
	// LitBackCtxMgr is the entry for the lightning backfill process.
	LitBackCtxMgr BackendCtxMgr
	// LitMemRoot is used to track the memory usage of the lightning backfill process.
	LitMemRoot MemRoot
	// LitDiskRoot is used to track the disk usage of the lightning backfill process.
	LitDiskRoot DiskRoot
	// LitRLimit is the max open file number of the lightning backfill process.
	LitRLimit uint64
	// LitSortPath is the sort path for the lightning backfill process.
	LitSortPath string
	// LitInitialized is the flag indicates whether the lightning backfill process is initialized.
	LitInitialized bool
)
View Source
var (
	// StructSizeBackendCtx is the size of litBackendCtx.
	StructSizeBackendCtx int64
	// StructSizeEngineInfo is the size of engineInfo.
	StructSizeEngineInfo int64
	// StructSizeWriterCtx is the size of writerContext.
	StructSizeWriterCtx int64
)
View Source
var GenLightningDataDirForTest = genLightningDataDir

GenLightningDataDirForTest is only used for test.

View Source
var ImporterRangeConcurrencyForTest *atomic.Int32

ImporterRangeConcurrencyForTest is only used for test.

Functions

func ConfigSortPath

func ConfigSortPath() string

ConfigSortPath returns the sort path for lightning.

func DecodeBackendTag

func DecodeBackendTag(name string) (int64, error)

DecodeBackendTag decodes the backend tag to job ID.

func EncodeBackendTag

func EncodeBackendTag(jobID int64) string

EncodeBackendTag encodes the job ID to backend tag. The backend tag is also used as the file name of the local index data files.

func InitGlobalLightningEnv

func InitGlobalLightningEnv()

InitGlobalLightningEnv initialize Lightning backfill environment.

func InitInstanceAddr

func InitInstanceAddr() string

InitInstanceAddr returns the string concat with instance address and temp-dir.

func NewMemRootImpl

func NewMemRootImpl(maxQuota int64, bcCtxMgr *litBackendCtxMgr) *memRootImpl

NewMemRootImpl creates a new memRootImpl.

func RiskOfDiskFull

func RiskOfDiskFull(available, capacity uint64) bool

RiskOfDiskFull checks if the disk has less than 10% space.

Types

type BackendCtx

type BackendCtx interface {
	Register(jobID, indexID int64, schemaName, tableName string) (Engine, error)
	Unregister(jobID, indexID int64)

	CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error
	FinishImport(indexID int64, unique bool, tbl table.Table) error
	ResetWorkers(jobID, indexID int64)
	Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error)
	Done() bool
	SetDone()

	AttachCheckpointManager(*CheckpointManager)
	GetCheckpointManager() *CheckpointManager
}

BackendCtx is the backend context for add index reorg task.

type BackendCtxMgr

type BackendCtxMgr interface {
	CheckAvailable() (bool, error)
	Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error)
	Unregister(jobID int64)
	Load(jobID int64) (BackendCtx, bool)
}

BackendCtxMgr is used to manage the backend context.

type CheckpointManager

type CheckpointManager struct {
	// contains filtered or unexported fields
}

CheckpointManager is a checkpoint manager implementation that used by non-distributed reorganization.

func NewCheckpointManager

func NewCheckpointManager(ctx context.Context, flushCtrl FlushController,
	sessPool *sess.Pool, jobID, indexID int64) (*CheckpointManager, error)

NewCheckpointManager creates a new checkpoint manager.

func (*CheckpointManager) Close

func (s *CheckpointManager) Close()

Close closes the checkpoint manager.

func (*CheckpointManager) IsComplete

func (s *CheckpointManager) IsComplete(end kv.Key) bool

IsComplete checks if the task is complete. This is called before the reader reads the data and decides whether to skip the current task.

func (*CheckpointManager) Register

func (s *CheckpointManager) Register(taskID int, end kv.Key)

Register registers a new task.

func (*CheckpointManager) Reset

func (s *CheckpointManager) Reset(newPhysicalID int64, start, end kv.Key)

Reset resets the checkpoint manager between two partitions.

func (*CheckpointManager) Status

func (s *CheckpointManager) Status() (int, kv.Key)

Status returns the status of the checkpoint.

func (*CheckpointManager) Sync

func (s *CheckpointManager) Sync()

Sync syncs the checkpoint.

func (*CheckpointManager) UpdateCurrent

func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error

UpdateCurrent updates the current keys of the task. This is called by the writer after writing the local engine to update the current number of rows written.

func (*CheckpointManager) UpdateTotal

func (s *CheckpointManager) UpdateTotal(taskID int, added int, last bool)

UpdateTotal updates the total keys of the task. This is called by the reader after reading the data to update the number of rows contained in the current chunk.

type Config

type Config struct {
	Lightning    *lightning.Config
	KeyspaceName string
}

Config is the configuration for the lightning local backend used in DDL.

type DiskRoot

type DiskRoot interface {
	UpdateUsage()
	ShouldImport() bool
	UsageInfo() string
	PreCheckUsage() error
	StartupCheck() error
}

DiskRoot is used to track the disk usage for the lightning backfill process.

func NewDiskRootImpl

func NewDiskRootImpl(path string, bcCtx *litBackendCtxMgr) DiskRoot

NewDiskRootImpl creates a new DiskRoot.

type Engine

type Engine interface {
	Flush() error
	ImportAndClean() error
	Clean()
	CreateWriter(id int, unique bool) (Writer, error)
}

Engine is the interface for the engine that can be used to write key-value pairs.

type FlushController

type FlushController interface {
	Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error)
}

FlushController is an interface to control the flush of the checkpoint.

type FlushMode

type FlushMode byte

FlushMode is used to control how to flush.

const (
	// FlushModeAuto means flush when the memory table size reaches the threshold.
	FlushModeAuto FlushMode = iota
	// FlushModeForceLocal means flush all data to local storage.
	FlushModeForceLocal
	// FlushModeForceLocalAndCheckDiskQuota means flush all data to local storage and check disk quota.
	FlushModeForceLocalAndCheckDiskQuota
	// FlushModeForceGlobal means import all data in local storage to global storage.
	FlushModeForceGlobal
)

type JobReorgMeta

type JobReorgMeta struct {
	Checkpoint *ReorgCheckpoint `json:"reorg_checkpoint"`
}

JobReorgMeta is the metadata for a reorg job.

type MemRoot

type MemRoot interface {
	Consume(size int64)
	Release(size int64)
	CheckConsume(size int64) bool
	ConsumeWithTag(tag string, size int64)
	ReleaseWithTag(tag string)

	SetMaxMemoryQuota(quota int64)
	MaxMemoryQuota() int64
	CurrentUsage() int64
	CurrentUsageWithTag(tag string) int64
	RefreshConsumption()
}

MemRoot is used to track the memory usage for the lightning backfill process.

type MockBackendCtx

type MockBackendCtx struct {
	// contains filtered or unexported fields
}

MockBackendCtx is a mock backend context.

func (*MockBackendCtx) AttachCheckpointManager

func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager)

AttachCheckpointManager attaches a checkpoint manager to the backend context.

func (*MockBackendCtx) CollectRemoteDuplicateRows

func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) error

CollectRemoteDuplicateRows implements BackendCtx.CollectRemoteDuplicateRows interface.

func (*MockBackendCtx) Done

func (*MockBackendCtx) Done() bool

Done implements BackendCtx.Done interface.

func (*MockBackendCtx) FinishImport

func (*MockBackendCtx) FinishImport(indexID int64, _ bool, _ table.Table) error

FinishImport implements BackendCtx.FinishImport interface.

func (*MockBackendCtx) Flush

func (*MockBackendCtx) Flush(_ int64, _ FlushMode) (flushed bool, imported bool, err error)

Flush implements BackendCtx.Flush interface.

func (*MockBackendCtx) GetCheckpointManager

func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager

GetCheckpointManager returns the checkpoint manager attached to the backend context.

func (*MockBackendCtx) Register

func (m *MockBackendCtx) Register(jobID, indexID int64, _, _ string) (Engine, error)

Register implements BackendCtx.Register interface.

func (*MockBackendCtx) ResetWorkers

func (*MockBackendCtx) ResetWorkers(_, _ int64)

ResetWorkers implements BackendCtx.ResetWorkers interface.

func (*MockBackendCtx) SetDone

func (*MockBackendCtx) SetDone()

SetDone implements BackendCtx.SetDone interface.

func (*MockBackendCtx) Unregister

func (*MockBackendCtx) Unregister(jobID, indexID int64)

Unregister implements BackendCtx.Unregister interface.

type MockBackendCtxMgr

type MockBackendCtxMgr struct {
	// contains filtered or unexported fields
}

MockBackendCtxMgr is a mock backend context manager.

func NewMockBackendCtxMgr

func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBackendCtxMgr

NewMockBackendCtxMgr creates a new mock backend context manager.

func (*MockBackendCtxMgr) CheckAvailable

func (*MockBackendCtxMgr) CheckAvailable() (bool, error)

CheckAvailable implements BackendCtxMgr.Available interface.

func (*MockBackendCtxMgr) Load

func (m *MockBackendCtxMgr) Load(jobID int64) (BackendCtx, bool)

Load implements BackendCtxMgr.Load interface.

func (*MockBackendCtxMgr) Register

func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error)

Register implements BackendCtxMgr.Register interface.

func (*MockBackendCtxMgr) Unregister

func (m *MockBackendCtxMgr) Unregister(jobID int64)

Unregister implements BackendCtxMgr.Unregister interface.

type MockEngineInfo

type MockEngineInfo struct {
	// contains filtered or unexported fields
}

MockEngineInfo is a mock engine info.

func (*MockEngineInfo) Clean

func (*MockEngineInfo) Clean()

Clean implements Engine.Clean interface.

func (*MockEngineInfo) CreateWriter

func (m *MockEngineInfo) CreateWriter(id int, _ bool) (Writer, error)

CreateWriter implements Engine.CreateWriter interface.

func (*MockEngineInfo) Flush

func (*MockEngineInfo) Flush() error

Flush implements Engine.Flush interface.

func (*MockEngineInfo) ImportAndClean

func (*MockEngineInfo) ImportAndClean() error

ImportAndClean implements Engine.ImportAndClean interface.

type MockWriter

type MockWriter struct {
	// contains filtered or unexported fields
}

MockWriter is a mock writer.

func (*MockWriter) LockForWrite

func (*MockWriter) LockForWrite() func()

LockForWrite implements Writer.LockForWrite interface.

func (*MockWriter) WriteRow

func (m *MockWriter) WriteRow(key, idxVal []byte, _ kv.Handle) error

WriteRow implements Writer.WriteRow interface.

type ReorgCheckpoint

type ReorgCheckpoint struct {
	LocalSyncKey   kv.Key `json:"local_sync_key"`
	LocalKeyCount  int    `json:"local_key_count"`
	GlobalSyncKey  kv.Key `json:"global_sync_key"`
	GlobalKeyCount int    `json:"global_key_count"`
	InstanceAddr   string `json:"instance_addr"`

	PhysicalID int64  `json:"physical_id"`
	StartKey   kv.Key `json:"start_key"`
	EndKey     kv.Key `json:"end_key"`

	Version int64 `json:"version"`
}

ReorgCheckpoint is the checkpoint for a reorg job.

type TaskCheckpoint

type TaskCheckpoint struct {
	// contains filtered or unexported fields
}

TaskCheckpoint is the checkpoint for a single task.

type Writer

type Writer interface {
	WriteRow(key, idxVal []byte, handle tidbkv.Handle) error
	LockForWrite() (unlock func())
}

Writer is the interface for the writer that can be used to write key-value pairs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL