Documentation ¶
Index ¶
- Variables
- func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData) error
- func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData) error
- func GetDeleteStats(task Task, delData *storage.DeleteData) (map[string]*datapb.PartitionImportStats, error)
- func GetExecPool() *conc.Pool[any]
- func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.CollectionSchema) int
- func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.PartitionImportStats, error)
- func HashDeleteData(task Task, delData *storage.DeleteData) ([]*storage.DeleteData, error)
- func LogStats(manager TaskManager)
- func MergeHashedStats(src, dst map[string]*datapb.PartitionImportStats)
- func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error)
- func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache
- func NewSyncTask(ctx context.Context, allocator allocator.Interface, ...) (syncmgr.Task, error)
- func PickSegment(segments []*datapb.ImportRequestSegment, vchannel string, partitionID int64) int64
- func UnsetAutoID(schema *schemapb.CollectionSchema)
- func WrapLogFields(task Task, fields ...zap.Field) []zap.Field
- func WrapTaskNotFoundError(taskID int64) error
- type HashedData
- type ImportTask
- func (t *ImportTask) Cancel()
- func (t *ImportTask) Clone() Task
- func (t *ImportTask) Execute() []*conc.Future[any]
- func (t *ImportTask) GetPartitionIDs() []int64
- func (t *ImportTask) GetSchema() *schemapb.CollectionSchema
- func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo
- func (t *ImportTask) GetType() TaskType
- func (t *ImportTask) GetVchannels() []string
- type L0ImportTask
- func (t *L0ImportTask) Cancel()
- func (t *L0ImportTask) Clone() Task
- func (t *L0ImportTask) Execute() []*conc.Future[any]
- func (t *L0ImportTask) GetPartitionIDs() []int64
- func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema
- func (t *L0ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo
- func (t *L0ImportTask) GetType() TaskType
- func (t *L0ImportTask) GetVchannels() []string
- type L0PreImportTask
- func (t *L0PreImportTask) Cancel()
- func (t *L0PreImportTask) Clone() Task
- func (t *L0PreImportTask) Execute() []*conc.Future[any]
- func (t *L0PreImportTask) GetPartitionIDs() []int64
- func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema
- func (t *L0PreImportTask) GetType() TaskType
- func (t *L0PreImportTask) GetVchannels() []string
- type PreImportTask
- func (t *PreImportTask) Cancel()
- func (t *PreImportTask) Clone() Task
- func (t *PreImportTask) Execute() []*conc.Future[any]
- func (t *PreImportTask) GetPartitionIDs() []int64
- func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema
- func (t *PreImportTask) GetType() TaskType
- func (t *PreImportTask) GetVchannels() []string
- type Scheduler
- type Task
- func NewImportTask(req *datapb.ImportRequest, manager TaskManager, syncMgr syncmgr.SyncManager, ...) Task
- func NewL0ImportTask(req *datapb.ImportRequest, manager TaskManager, syncMgr syncmgr.SyncManager, ...) Task
- func NewL0PreImportTask(req *datapb.PreImportRequest, manager TaskManager, cm storage.ChunkManager) Task
- func NewPreImportTask(req *datapb.PreImportRequest, manager TaskManager, cm storage.ChunkManager) Task
- type TaskFilter
- type TaskManager
- type TaskType
- type UpdateAction
Constants ¶
This section is empty.
Variables ¶
View Source
var ImportTaskTypeName = map[TaskType]string{
0: "PreImportTask",
1: "ImportTask",
2: "L0PreImportTaskType",
3: "L0ImportTaskType",
}
Functions ¶
func AppendSystemFieldsData ¶
func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData) error
func CheckRowsEqual ¶
func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData) error
func GetDeleteStats ¶
func GetDeleteStats(task Task, delData *storage.DeleteData) (map[string]*datapb.PartitionImportStats, error)
func GetExecPool ¶
func GetInsertDataRowCount ¶
func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.CollectionSchema) int
func GetRowsStats ¶
func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.PartitionImportStats, error)
func HashDeleteData ¶
func HashDeleteData(task Task, delData *storage.DeleteData) ([]*storage.DeleteData, error)
func LogStats ¶
func LogStats(manager TaskManager)
func MergeHashedStats ¶
func MergeHashedStats(src, dst map[string]*datapb.PartitionImportStats)
func NewImportSegmentInfo ¶
func NewMetaCache ¶
func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache
func NewSyncTask ¶
func PickSegment ¶
func PickSegment(segments []*datapb.ImportRequestSegment, vchannel string, partitionID int64) int64
func UnsetAutoID ¶
func UnsetAutoID(schema *schemapb.CollectionSchema)
func WrapTaskNotFoundError ¶
Types ¶
type HashedData ¶
type HashedData [][]*storage.InsertData // [vchannelIndex][partitionIndex]*storage.InsertData
func HashData ¶
func HashData(task Task, rows *storage.InsertData) (HashedData, error)
type ImportTask ¶
type ImportTask struct { *datapb.ImportTaskV2 // contains filtered or unexported fields }
func (*ImportTask) Cancel ¶
func (t *ImportTask) Cancel()
func (*ImportTask) Clone ¶
func (t *ImportTask) Clone() Task
func (*ImportTask) GetPartitionIDs ¶
func (t *ImportTask) GetPartitionIDs() []int64
func (*ImportTask) GetSchema ¶
func (t *ImportTask) GetSchema() *schemapb.CollectionSchema
func (*ImportTask) GetSegmentsInfo ¶
func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo
func (*ImportTask) GetType ¶
func (t *ImportTask) GetType() TaskType
func (*ImportTask) GetVchannels ¶
func (t *ImportTask) GetVchannels() []string
type L0ImportTask ¶
type L0ImportTask struct { *datapb.ImportTaskV2 // contains filtered or unexported fields }
func (*L0ImportTask) Cancel ¶
func (t *L0ImportTask) Cancel()
func (*L0ImportTask) Clone ¶
func (t *L0ImportTask) Clone() Task
func (*L0ImportTask) GetPartitionIDs ¶
func (t *L0ImportTask) GetPartitionIDs() []int64
func (*L0ImportTask) GetSchema ¶
func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema
func (*L0ImportTask) GetSegmentsInfo ¶
func (t *L0ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo
func (*L0ImportTask) GetType ¶
func (t *L0ImportTask) GetType() TaskType
func (*L0ImportTask) GetVchannels ¶
func (t *L0ImportTask) GetVchannels() []string
type L0PreImportTask ¶
type L0PreImportTask struct { *datapb.PreImportTask // contains filtered or unexported fields }
func (*L0PreImportTask) Cancel ¶
func (t *L0PreImportTask) Cancel()
func (*L0PreImportTask) Clone ¶
func (t *L0PreImportTask) Clone() Task
func (*L0PreImportTask) GetPartitionIDs ¶
func (t *L0PreImportTask) GetPartitionIDs() []int64
func (*L0PreImportTask) GetSchema ¶
func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema
func (*L0PreImportTask) GetType ¶
func (t *L0PreImportTask) GetType() TaskType
func (*L0PreImportTask) GetVchannels ¶
func (t *L0PreImportTask) GetVchannels() []string
type PreImportTask ¶
type PreImportTask struct { *datapb.PreImportTask // contains filtered or unexported fields }
func (*PreImportTask) Cancel ¶
func (t *PreImportTask) Cancel()
func (*PreImportTask) Clone ¶
func (t *PreImportTask) Clone() Task
func (*PreImportTask) GetPartitionIDs ¶
func (t *PreImportTask) GetPartitionIDs() []int64
func (*PreImportTask) GetSchema ¶
func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema
func (*PreImportTask) GetType ¶
func (t *PreImportTask) GetType() TaskType
func (*PreImportTask) GetVchannels ¶
func (t *PreImportTask) GetVchannels() []string
type Scheduler ¶
type Scheduler interface { Start() Slots() int64 Close() }
func NewScheduler ¶
func NewScheduler(manager TaskManager) Scheduler
type Task ¶
type Task interface { Execute() []*conc.Future[any] GetJobID() int64 GetTaskID() int64 GetCollectionID() int64 GetPartitionIDs() []int64 GetVchannels() []string GetType() TaskType GetState() datapb.ImportTaskStateV2 GetReason() string GetSchema() *schemapb.CollectionSchema Cancel() Clone() Task }
func NewImportTask ¶
func NewImportTask(req *datapb.ImportRequest, manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager, ) Task
func NewL0ImportTask ¶
func NewL0ImportTask(req *datapb.ImportRequest, manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager, ) Task
func NewL0PreImportTask ¶
func NewL0PreImportTask(req *datapb.PreImportRequest, manager TaskManager, cm storage.ChunkManager, ) Task
func NewPreImportTask ¶
func NewPreImportTask(req *datapb.PreImportRequest, manager TaskManager, cm storage.ChunkManager, ) Task
type TaskFilter ¶
func WithStates ¶
func WithStates(states ...datapb.ImportTaskStateV2) TaskFilter
func WithType ¶
func WithType(taskType TaskType) TaskFilter
type TaskManager ¶
type TaskManager interface { Add(task Task) Update(taskID int64, actions ...UpdateAction) Get(taskID int64) Task GetBy(filters ...TaskFilter) []Task Remove(taskID int64) }
func NewTaskManager ¶
func NewTaskManager() TaskManager
type UpdateAction ¶
type UpdateAction func(task Task)
func UpdateFileStat ¶
func UpdateFileStat(idx int, fileStat *datapb.ImportFileStats) UpdateAction
func UpdateReason ¶
func UpdateReason(reason string) UpdateAction
func UpdateSegmentInfo ¶
func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction
func UpdateState ¶
func UpdateState(state datapb.ImportTaskStateV2) UpdateAction
Click to show internal directories.
Click to hide internal directories.