importv2

package
v0.0.0-...-34e0b2d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ImportTaskTypeName = map[TaskType]string{
	0: "PreImportTask",
	1: "ImportTask",
	2: "L0PreImportTaskType",
	3: "L0ImportTaskType",
}

Functions

func AppendSystemFieldsData

func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData) error

func CheckRowsEqual

func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData) error

func GetDeleteStats

func GetDeleteStats(task Task, delData *storage.DeleteData) (map[string]*datapb.PartitionImportStats, error)

func GetExecPool

func GetExecPool() *conc.Pool[any]

func GetInsertDataRowCount

func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.CollectionSchema) int

func GetRowsStats

func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.PartitionImportStats, error)

func HashDeleteData

func HashDeleteData(task Task, delData *storage.DeleteData) ([]*storage.DeleteData, error)

func LogStats

func LogStats(manager TaskManager)

func MergeHashedStats

func MergeHashedStats(src, dst map[string]*datapb.PartitionImportStats)

func NewImportSegmentInfo

func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error)

func NewMetaCache

func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache

func NewSyncTask

func NewSyncTask(ctx context.Context,
	allocator allocator.Interface,
	metaCaches map[string]metacache.MetaCache,
	ts uint64,
	segmentID, partitionID, collectionID int64, vchannel string,
	insertData *storage.InsertData,
	deleteData *storage.DeleteData,
) (syncmgr.Task, error)

func PickSegment

func PickSegment(segments []*datapb.ImportRequestSegment, vchannel string, partitionID int64) int64

func UnsetAutoID

func UnsetAutoID(schema *schemapb.CollectionSchema)

func WrapLogFields

func WrapLogFields(task Task, fields ...zap.Field) []zap.Field

func WrapTaskNotFoundError

func WrapTaskNotFoundError(taskID int64) error

Types

type HashedData

type HashedData [][]*storage.InsertData // [vchannelIndex][partitionIndex]*storage.InsertData

func HashData

func HashData(task Task, rows *storage.InsertData) (HashedData, error)

type ImportTask

type ImportTask struct {
	*datapb.ImportTaskV2
	// contains filtered or unexported fields
}

func (*ImportTask) Cancel

func (t *ImportTask) Cancel()

func (*ImportTask) Clone

func (t *ImportTask) Clone() Task

func (*ImportTask) Execute

func (t *ImportTask) Execute() []*conc.Future[any]

func (*ImportTask) GetPartitionIDs

func (t *ImportTask) GetPartitionIDs() []int64

func (*ImportTask) GetSchema

func (t *ImportTask) GetSchema() *schemapb.CollectionSchema

func (*ImportTask) GetSegmentsInfo

func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo

func (*ImportTask) GetType

func (t *ImportTask) GetType() TaskType

func (*ImportTask) GetVchannels

func (t *ImportTask) GetVchannels() []string

type L0ImportTask

type L0ImportTask struct {
	*datapb.ImportTaskV2
	// contains filtered or unexported fields
}

func (*L0ImportTask) Cancel

func (t *L0ImportTask) Cancel()

func (*L0ImportTask) Clone

func (t *L0ImportTask) Clone() Task

func (*L0ImportTask) Execute

func (t *L0ImportTask) Execute() []*conc.Future[any]

func (*L0ImportTask) GetPartitionIDs

func (t *L0ImportTask) GetPartitionIDs() []int64

func (*L0ImportTask) GetSchema

func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema

func (*L0ImportTask) GetSegmentsInfo

func (t *L0ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo

func (*L0ImportTask) GetType

func (t *L0ImportTask) GetType() TaskType

func (*L0ImportTask) GetVchannels

func (t *L0ImportTask) GetVchannels() []string

type L0PreImportTask

type L0PreImportTask struct {
	*datapb.PreImportTask
	// contains filtered or unexported fields
}

func (*L0PreImportTask) Cancel

func (t *L0PreImportTask) Cancel()

func (*L0PreImportTask) Clone

func (t *L0PreImportTask) Clone() Task

func (*L0PreImportTask) Execute

func (t *L0PreImportTask) Execute() []*conc.Future[any]

func (*L0PreImportTask) GetPartitionIDs

func (t *L0PreImportTask) GetPartitionIDs() []int64

func (*L0PreImportTask) GetSchema

func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema

func (*L0PreImportTask) GetType

func (t *L0PreImportTask) GetType() TaskType

func (*L0PreImportTask) GetVchannels

func (t *L0PreImportTask) GetVchannels() []string

type PreImportTask

type PreImportTask struct {
	*datapb.PreImportTask
	// contains filtered or unexported fields
}

func (*PreImportTask) Cancel

func (t *PreImportTask) Cancel()

func (*PreImportTask) Clone

func (t *PreImportTask) Clone() Task

func (*PreImportTask) Execute

func (t *PreImportTask) Execute() []*conc.Future[any]

func (*PreImportTask) GetPartitionIDs

func (t *PreImportTask) GetPartitionIDs() []int64

func (*PreImportTask) GetSchema

func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema

func (*PreImportTask) GetType

func (t *PreImportTask) GetType() TaskType

func (*PreImportTask) GetVchannels

func (t *PreImportTask) GetVchannels() []string

type Scheduler

type Scheduler interface {
	Start()
	Slots() int64
	Close()
}

func NewScheduler

func NewScheduler(manager TaskManager) Scheduler

type Task

type Task interface {
	Execute() []*conc.Future[any]
	GetJobID() int64
	GetTaskID() int64
	GetCollectionID() int64
	GetPartitionIDs() []int64
	GetVchannels() []string
	GetType() TaskType
	GetState() datapb.ImportTaskStateV2
	GetReason() string
	GetSchema() *schemapb.CollectionSchema
	Cancel()
	Clone() Task
}

func NewImportTask

func NewImportTask(req *datapb.ImportRequest,
	manager TaskManager,
	syncMgr syncmgr.SyncManager,
	cm storage.ChunkManager,
) Task

func NewL0ImportTask

func NewL0ImportTask(req *datapb.ImportRequest,
	manager TaskManager,
	syncMgr syncmgr.SyncManager,
	cm storage.ChunkManager,
) Task

func NewL0PreImportTask

func NewL0PreImportTask(req *datapb.PreImportRequest,
	manager TaskManager,
	cm storage.ChunkManager,
) Task

func NewPreImportTask

func NewPreImportTask(req *datapb.PreImportRequest,
	manager TaskManager,
	cm storage.ChunkManager,
) Task

type TaskFilter

type TaskFilter func(task Task) bool

func WithStates

func WithStates(states ...datapb.ImportTaskStateV2) TaskFilter

func WithType

func WithType(taskType TaskType) TaskFilter

type TaskManager

type TaskManager interface {
	Add(task Task)
	Update(taskID int64, actions ...UpdateAction)
	Get(taskID int64) Task
	GetBy(filters ...TaskFilter) []Task
	Remove(taskID int64)
}

func NewTaskManager

func NewTaskManager() TaskManager

type TaskType

type TaskType int
const (
	PreImportTaskType   TaskType = 0
	ImportTaskType      TaskType = 1
	L0PreImportTaskType TaskType = 2
	L0ImportTaskType    TaskType = 3
)

func (TaskType) String

func (t TaskType) String() string

type UpdateAction

type UpdateAction func(task Task)

func UpdateFileStat

func UpdateFileStat(idx int, fileStat *datapb.ImportFileStats) UpdateAction

func UpdateReason

func UpdateReason(reason string) UpdateAction

func UpdateSegmentInfo

func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction

func UpdateState

func UpdateState(state datapb.ImportTaskStateV2) UpdateAction

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL