storage

package
v0.0.0-...-34e0b2d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2024 License: Apache-2.0 Imports: 63 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Ts is blob key "ts"
	Ts = "ts"
	// DDL is blob key "ddl"
	DDL = "ddl"
	// IndexParamsKey is blob key "indexParams"
	IndexParamsKey = "indexParams"
)
View Source
const (
	CloudProviderGCP     = "gcp"
	CloudProviderAWS     = "aws"
	CloudProviderAliyun  = "aliyun"
	CloudProviderAzure   = "azure"
	CloudProviderTencent = "tencent"
)
View Source
const InvalidUniqueID = UniqueID(-1)

InvalidUniqueID is used when the UniqueID is not set (like in return with err)

View Source
const (
	// MagicNumber used in binlog
	MagicNumber int32 = 0xfffabc
)
View Source
const MultiField = "MULTI_FIELD"

mark useMultiFieldFormat if there are multi fields in a log file

Variables

View Source
var (
	// ErrNoMoreRecord is the error that the iterator does not have next record.
	ErrNoMoreRecord = errors.New("no more record")
	// ErrDisposed is the error that the iterator is disposed.
	ErrDisposed = errors.New("iterator is disposed")
)
View Source
var (
	BitMask        = [8]byte{1, 2, 4, 8, 16, 32, 64, 128}
	FlippedBitMask = [8]byte{254, 253, 251, 247, 239, 223, 191, 127}
)
View Source
var CheckBucketRetryAttempts uint = 20

Functions

func AddFieldDataToPayload

func AddFieldDataToPayload(eventWriter *insertEventWriter, dataType schemapb.DataType, singleData FieldData) error

func AddInsertData

func AddInsertData(dataType schemapb.DataType, data interface{}, insertData *InsertData, fieldID int64, rowNum int, eventReader *EventReader, dim int, validData []bool) (dataLength int, err error)

func FindPartitionStatsMaxVersion

func FindPartitionStatsMaxVersion(filePaths []string) (int64, string)

func GetDimFromParams

func GetDimFromParams(params []*commonpb.KeyValuePair) (int, error)

GetDimFromParams get dim from params.

func GetFilesSize

func GetFilesSize(ctx context.Context, paths []string, cm ChunkManager) (int64, error)

func ListAllChunkWithPrefix

func ListAllChunkWithPrefix(ctx context.Context, manager ChunkManager, prefix string, recursive bool) ([]string, []time.Time, error)

ListAllChunkWithPrefix is a helper function to list all objects with same @prefix by using `ListWithPrefix`. `ListWithPrefix` is more efficient way to call if you don't need all chunk at same time.

func Locations

func Locations(pk PrimaryKey, k uint, bfType bloomfilter.BFType) []uint64

Locations returns a list of hash locations representing a data item.

func MergeFieldData

func MergeFieldData(data *InsertData, fid FieldID, field FieldData)

MergeFieldData merge field into data.

func MergeInsertData

func MergeInsertData(buffer *InsertData, datas ...*InsertData)

MergeInsertData append the insert datas to the original buffer.

func Min

func Min(a, b int64) int64

func NewBaseDescriptorEvent

func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int64) *descriptorEvent

func NewBinlogStreamWriters

func NewBinlogStreamWriters(collectionID, partitionID, segmentID UniqueID,
	schema []*schemapb.FieldSchema,
) map[FieldID]*BinlogStreamWriter

func Open

func Open(filepath string) (*os.File, error)

Open opens file as os.Open works, also converts the os errors to Milvus errors

func ParseInt64s2IDs

func ParseInt64s2IDs(pks ...int64) *schemapb.IDs

func ParsePrimaryKeys2IDs

func ParsePrimaryKeys2IDs(pks []PrimaryKey) *schemapb.IDs

func PrintBinlogFiles

func PrintBinlogFiles(fileList []string) error

PrintBinlogFiles call printBinlogFile in turn for the file list specified by parameter fileList. Return an error early if it encounters any error.

func ReadBinary

func ReadBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataType)

ReadBinary read data in bytes and write it into receiver.

The receiver can be any type in int8, int16, int32, int64, float32, float64 and bool
ReadBinary uses LittleEndian ByteOrder.

func ReadData

func ReadData[T any, E interface {
	Value(int) T
	NullBitmapBytes() []byte
}](reader *file.Reader, value []T, validData []bool, numRows int64) (int64, error)

func ReadDataFromAllRowGroups

func ReadDataFromAllRowGroups[T any, E interface {
	ReadBatch(int64, []T, []int16, []int16) (int64, int, error)
}](reader *file.Reader, values []T, columnIdx int, numRows int64) (int64, error)

ReadDataFromAllRowGroups iterates all row groups of file.Reader, and convert column to E. then calls ReadBatch with provided parameters.

func ReadDescriptorEvent

func ReadDescriptorEvent(buffer io.Reader) (*descriptorEvent, error)

ReadDescriptorEvent reads a descriptorEvent from buffer

func ReadFile

func ReadFile(filepath string) ([]byte, error)

ReadFile reads file as os.ReadFile works, also converts the os errors to Milvus errors

func SerializePartitionStatsSnapshot

func SerializePartitionStatsSnapshot(partStats *PartitionStatsSnapshot) ([]byte, error)

func TransferColumnBasedInsertDataToRowBased

func TransferColumnBasedInsertDataToRowBased(data *InsertData) (
	Timestamps []uint64,
	RowIDs []int64,
	RowData []*commonpb.Blob,
	err error,
)

TransferColumnBasedInsertDataToRowBased transfer column-based insert data to row-based rows. Note:

  • ts column must exist in insert data;
  • row id column must exist in insert data;
  • the row num of all column must be equal;
  • num_rows = len(RowData), a row will be assembled into the value of blob with field id order;

func TransferInsertDataToInsertRecord

func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.InsertRecord, error)

func TransferInsertMsgToInsertRecord

func TransferInsertMsgToInsertRecord(schema *schemapb.CollectionSchema, msg *msgstream.InsertMsg) (*segcorepb.InsertRecord, error)

func UnsafeReadBool

func UnsafeReadBool(buf []byte, idx int) bool

#nosec G103

func UnsafeReadByte

func UnsafeReadByte(buf []byte, idx int) byte

#nosec G103

func UnsafeReadFloat32

func UnsafeReadFloat32(buf []byte, idx int) float32

#nosec G103

func UnsafeReadFloat64

func UnsafeReadFloat64(buf []byte, idx int) float64

#nosec G103

func UnsafeReadInt16

func UnsafeReadInt16(buf []byte, idx int) int16

#nosec G103

func UnsafeReadInt32

func UnsafeReadInt32(buf []byte, idx int) int32

#nosec G103

func UnsafeReadInt64

func UnsafeReadInt64(buf []byte, idx int) int64

#nosec G103

func UnsafeReadInt8

func UnsafeReadInt8(buf []byte, idx int) int8

#nosec G103

func WriteFile

func WriteFile(filepath string, data []byte, perm fs.FileMode) error

WriteFile writes file as os.WriteFile works, also converts the os errors to Milvus errors

Types

type ArrayFieldData

type ArrayFieldData struct {
	ElementType schemapb.DataType
	Data        []*schemapb.ScalarField
	ValidData   []bool
}

func (*ArrayFieldData) AppendRow

func (data *ArrayFieldData) AppendRow(row interface{}) error

func (*ArrayFieldData) AppendRows

func (data *ArrayFieldData) AppendRows(rows interface{}) error

func (*ArrayFieldData) GetDataType

func (data *ArrayFieldData) GetDataType() schemapb.DataType

func (*ArrayFieldData) GetMemorySize

func (data *ArrayFieldData) GetMemorySize() int

func (*ArrayFieldData) GetNullable

func (data *ArrayFieldData) GetNullable() bool

func (*ArrayFieldData) GetRow

func (data *ArrayFieldData) GetRow(i int) any

func (*ArrayFieldData) GetRowSize

func (data *ArrayFieldData) GetRowSize(i int) int

func (*ArrayFieldData) GetRows

func (data *ArrayFieldData) GetRows() any

func (*ArrayFieldData) RowNum

func (data *ArrayFieldData) RowNum() int

type AzureObjectStorage

type AzureObjectStorage struct {
	*service.Client
}

func (*AzureObjectStorage) GetObject

func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error)

func (*AzureObjectStorage) PutObject

func (AzureObjectStorage *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error

func (*AzureObjectStorage) RemoveObject

func (AzureObjectStorage *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error

func (*AzureObjectStorage) StatObject

func (AzureObjectStorage *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error)

func (*AzureObjectStorage) WalkWithObjects

func (AzureObjectStorage *AzureObjectStorage) WalkWithObjects(ctx context.Context, bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) error

type BFloat16VectorFieldData

type BFloat16VectorFieldData struct {
	Data []byte
	Dim  int
}

func (*BFloat16VectorFieldData) AppendRow

func (data *BFloat16VectorFieldData) AppendRow(row interface{}) error

func (*BFloat16VectorFieldData) AppendRows

func (data *BFloat16VectorFieldData) AppendRows(rows interface{}) error

AppendRows appends FLATTEN vectors to field data.

func (*BFloat16VectorFieldData) GetDataType

func (data *BFloat16VectorFieldData) GetDataType() schemapb.DataType

func (*BFloat16VectorFieldData) GetMemorySize

func (data *BFloat16VectorFieldData) GetMemorySize() int

func (*BFloat16VectorFieldData) GetNullable

func (data *BFloat16VectorFieldData) GetNullable() bool

func (*BFloat16VectorFieldData) GetRow

func (data *BFloat16VectorFieldData) GetRow(i int) interface{}

func (*BFloat16VectorFieldData) GetRowSize

func (data *BFloat16VectorFieldData) GetRowSize(i int) int

func (*BFloat16VectorFieldData) GetRows

func (data *BFloat16VectorFieldData) GetRows() any

func (*BFloat16VectorFieldData) RowNum

func (data *BFloat16VectorFieldData) RowNum() int

type BatchLocationsCache

type BatchLocationsCache struct {
	// contains filtered or unexported fields
}

func NewBatchLocationsCache

func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache

func (*BatchLocationsCache) Locations

func (lc *BatchLocationsCache) Locations(k uint, bfType bloomfilter.BFType) [][]uint64

func (*BatchLocationsCache) PKs

func (lc *BatchLocationsCache) PKs() []PrimaryKey

func (*BatchLocationsCache) Size

func (lc *BatchLocationsCache) Size() int

type BinaryVectorFieldData

type BinaryVectorFieldData struct {
	Data []byte
	Dim  int
}

func (*BinaryVectorFieldData) AppendRow

func (data *BinaryVectorFieldData) AppendRow(row interface{}) error

func (*BinaryVectorFieldData) AppendRows

func (data *BinaryVectorFieldData) AppendRows(rows interface{}) error

AppendRows appends FLATTEN vectors to field data.

func (*BinaryVectorFieldData) GetDataType

func (data *BinaryVectorFieldData) GetDataType() schemapb.DataType

func (*BinaryVectorFieldData) GetMemorySize

func (data *BinaryVectorFieldData) GetMemorySize() int

func (*BinaryVectorFieldData) GetNullable

func (data *BinaryVectorFieldData) GetNullable() bool

func (*BinaryVectorFieldData) GetRow

func (data *BinaryVectorFieldData) GetRow(i int) any

func (*BinaryVectorFieldData) GetRowSize

func (data *BinaryVectorFieldData) GetRowSize(i int) int

func (*BinaryVectorFieldData) GetRows

func (data *BinaryVectorFieldData) GetRows() any

func (*BinaryVectorFieldData) RowNum

func (data *BinaryVectorFieldData) RowNum() int

type BinlogReader

type BinlogReader struct {
	// contains filtered or unexported fields
}

BinlogReader is an object to read binlog file. Binlog file's format can be found in design docs.

func NewBinlogReader

func NewBinlogReader(data []byte) (*BinlogReader, error)

NewBinlogReader creates binlogReader to read binlog file.

func (*BinlogReader) Close

func (reader *BinlogReader) Close()

Close closes the BinlogReader object. It mainly calls the Close method of the internal events, reclaims resources, and marks itself as closed.

func (*BinlogReader) GetMemoryUsageInBytes

func (event *BinlogReader) GetMemoryUsageInBytes() int32

GetMemoryUsageInBytes returns descriptor Event memory usage in bytes

func (*BinlogReader) NextEventReader

func (reader *BinlogReader) NextEventReader() (*EventReader, error)

NextEventReader iters all events reader to read the binlog file.

func (*BinlogReader) Write

func (event *BinlogReader) Write(buffer io.Writer) error

Write writes descriptor event into buffer

type BinlogStreamWriter

type BinlogStreamWriter struct {
	// contains filtered or unexported fields
}

func (*BinlogStreamWriter) Finalize

func (bsw *BinlogStreamWriter) Finalize() (*Blob, error)

func (*BinlogStreamWriter) GetRecordWriter

func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error)

type BinlogType

type BinlogType int32

BinlogType is to distinguish different files saving different data.

const (
	// InsertBinlog BinlogType for insert data
	InsertBinlog BinlogType = iota
	// DeleteBinlog BinlogType for delete data
	DeleteBinlog
	// DDLBinlog BinlogType for DDL
	DDLBinlog
	// IndexFileBinlog BinlogType for index
	IndexFileBinlog
	// StatsBinlog BinlogType for stats data
	StatsBinlog
)

type Blob

type Blob struct {
	Key        string
	Value      []byte
	MemorySize int64
	RowNum     int64
}

Blob is a pack of key&value

func (Blob) GetKey

func (b Blob) GetKey() string

GetKey returns the key of blob

func (Blob) GetMemorySize

func (b Blob) GetMemorySize() int64

GetMemorySize returns the memory size of blob

func (Blob) GetValue

func (b Blob) GetValue() []byte

GetValue returns the value of blob

type BlobInfo

type BlobInfo struct {
	Length int
}

TODO: fill it info for each blob

type BlobList

type BlobList []*Blob

BlobList implements sort.Interface for a list of Blob

func (BlobList) Len

func (s BlobList) Len() int

Len implements Len in sort.Interface

func (BlobList) Less

func (s BlobList) Less(i, j int) bool

Less implements Less in sort.Interface

func (BlobList) Swap

func (s BlobList) Swap(i, j int)

Swap implements Swap in sort.Interface

type BlobReader

type BlobReader struct {
	// contains filtered or unexported fields
}

BlobReader is implemented because Azure's stream body does not have ReadAt and Seek interfaces. BlobReader is not concurrency safe.

func NewBlobReader

func NewBlobReader(client *blockblob.Client, offset int64) (*BlobReader, error)

func (*BlobReader) Close

func (b *BlobReader) Close() error

func (*BlobReader) Read

func (b *BlobReader) Read(p []byte) (n int, err error)

func (*BlobReader) ReadAt

func (b *BlobReader) ReadAt(p []byte, off int64) (n int, err error)

func (*BlobReader) Seek

func (b *BlobReader) Seek(offset int64, whence int) (int64, error)

type BoolFieldData

type BoolFieldData struct {
	Data      []bool
	ValidData []bool
}

func (*BoolFieldData) AppendRow

func (data *BoolFieldData) AppendRow(row interface{}) error

AppendRow implements FieldData.AppendRow

func (*BoolFieldData) AppendRows

func (data *BoolFieldData) AppendRows(rows interface{}) error

func (*BoolFieldData) GetDataType

func (data *BoolFieldData) GetDataType() schemapb.DataType

GetDataType implements FieldData.GetDataType

func (*BoolFieldData) GetMemorySize

func (data *BoolFieldData) GetMemorySize() int

GetMemorySize implements FieldData.GetMemorySize

func (*BoolFieldData) GetNullable

func (data *BoolFieldData) GetNullable() bool

func (*BoolFieldData) GetRow

func (data *BoolFieldData) GetRow(i int) any

GetRow implements FieldData.GetRow

func (*BoolFieldData) GetRowSize

func (data *BoolFieldData) GetRowSize(i int) int

func (*BoolFieldData) GetRows

func (data *BoolFieldData) GetRows() any

func (*BoolFieldData) RowNum

func (data *BoolFieldData) RowNum() int

RowNum implements FieldData.RowNum

type ChunkManager

type ChunkManager interface {
	// RootPath returns current root path.
	RootPath() string
	// Path returns path of @filePath.
	Path(ctx context.Context, filePath string) (string, error)
	// Size returns path of @filePath.
	Size(ctx context.Context, filePath string) (int64, error)
	// Write writes @content to @filePath.
	Write(ctx context.Context, filePath string, content []byte) error
	// MultiWrite writes multi @content to @filePath.
	MultiWrite(ctx context.Context, contents map[string][]byte) error
	// Exist returns true if @filePath exists.
	Exist(ctx context.Context, filePath string) (bool, error)
	// Read reads @filePath and returns content.
	Read(ctx context.Context, filePath string) ([]byte, error)
	// Reader return a reader for @filePath
	Reader(ctx context.Context, filePath string) (FileReader, error)
	// MultiRead reads @filePath and returns content.
	MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)
	// WalkWithPrefix list files with same @prefix and call @walkFunc for each file.
	// 1. walkFunc return false or reach the last object, WalkWithPrefix will stop and return nil.
	// 2. underlying walking failed or context canceled, WalkWithPrefix will stop and return a error.
	WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) error
	Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)
	// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.
	// if all bytes are read, @err is io.EOF.
	// return other error if read failed.
	ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)
	// Remove delete @filePath.
	Remove(ctx context.Context, filePath string) error
	// MultiRemove delete @filePaths.
	MultiRemove(ctx context.Context, filePaths []string) error
	// RemoveWithPrefix remove files with same @prefix.
	RemoveWithPrefix(ctx context.Context, prefix string) error
}

ChunkManager is to manager chunks. Include Read, Write, Remove chunks.

type ChunkManagerFactory

type ChunkManagerFactory struct {
	// contains filtered or unexported fields
}

func NewChunkManagerFactory

func NewChunkManagerFactory(persistentStorage string, opts ...Option) *ChunkManagerFactory

func NewChunkManagerFactoryWithParam

func NewChunkManagerFactoryWithParam(params *paramtable.ComponentParam) *ChunkManagerFactory

func NewTestChunkManagerFactory

func NewTestChunkManagerFactory(params *paramtable.ComponentParam, rootPath string) *ChunkManagerFactory

func (*ChunkManagerFactory) NewPersistentStorageChunkManager

func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)

type ChunkObjectInfo

type ChunkObjectInfo struct {
	FilePath   string
	ModifyTime time.Time
}

ChunkObjectInfo is to store object info.

type ChunkObjectWalkFunc

type ChunkObjectWalkFunc func(chunkObjectInfo *ChunkObjectInfo) bool

ChunkObjectWalkFunc is the callback function for walking objects. If return false, WalkWithObjects will stop. Otherwise, WalkWithObjects will continue until reach the last object.

type DDLBinlogWriter

type DDLBinlogWriter struct {
	// contains filtered or unexported fields
}

DDLBinlogWriter is an object to write binlog file which saves ddl information.

func NewDDLBinlogWriter

func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter

NewDDLBinlogWriter creates DDLBinlogWriter to write binlog file.

func (*DDLBinlogWriter) Close

func (writer *DDLBinlogWriter) Close()

func (*DDLBinlogWriter) Finish

func (writer *DDLBinlogWriter) Finish() error

Finish allocates buffer and releases resource

func (*DDLBinlogWriter) GetBinlogType

func (writer *DDLBinlogWriter) GetBinlogType() BinlogType

GetBinlogType returns writer's binlogType

func (*DDLBinlogWriter) GetBuffer

func (writer *DDLBinlogWriter) GetBuffer() ([]byte, error)

GetBuffer gets binlog buffer. Return nil if binlog is not finished yet.

func (*DDLBinlogWriter) GetEventNums

func (writer *DDLBinlogWriter) GetEventNums() int32

GetEventNums returns the number of event writers

func (*DDLBinlogWriter) GetRowNums

func (writer *DDLBinlogWriter) GetRowNums() (int32, error)

GetRowNums returns writer's number of rows

func (*DDLBinlogWriter) NextCreateCollectionEventWriter

func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error)

NextCreateCollectionEventWriter returns an event writer to write CreateCollection information to an event.

func (*DDLBinlogWriter) NextCreatePartitionEventWriter

func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error)

NextCreatePartitionEventWriter returns an event writer to write CreatePartition information to an event.

func (*DDLBinlogWriter) NextDropCollectionEventWriter

func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error)

NextDropCollectionEventWriter returns an event writer to write DropCollection information to an event.

func (*DDLBinlogWriter) NextDropPartitionEventWriter

func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error)

NextDropPartitionEventWriter returns an event writer to write DropPartition information to an event.

type DataDefinitionCodec

type DataDefinitionCodec struct {
	// contains filtered or unexported fields
}

DataDefinitionCodec serializes and deserializes the data definition Blob key example: ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}

func NewDataDefinitionCodec

func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec

NewDataDefinitionCodec is constructor for DataDefinitionCodec

func (*DataDefinitionCodec) Deserialize

func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error)

Deserialize transfer blob back to data definition data. From schema, it get all fields. It will sort blob by blob key for blob logid is increasing by time. For each field, it will create a binlog reader, and read all event to the buffer. It returns origin @ts and @ddRequests in the end.

func (*DataDefinitionCodec) Serialize

func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error)

Serialize transfer @ts and @ddRequsts to blob. From schema, it get all fields. For each field, it will create a binlog writer, and write specific event according to the dataDefinition type. It returns blobs in the end.

type DataSet

type DataSet[T any, E interface {
	ReadBatch(int64, []T, []int16, []int16) (int64, int, error)
}] struct {
	// contains filtered or unexported fields
}

func NewDataSet

func NewDataSet[T any, E interface {
	ReadBatch(int64, []T, []int16, []int16) (int64, int, error)
}](reader *file.Reader, columnIdx int, numRows int64) *DataSet[T, E]

func (*DataSet[T, E]) HasNext

func (s *DataSet[T, E]) HasNext() bool

func (*DataSet[T, E]) NextBatch

func (s *DataSet[T, E]) NextBatch(batch int64) ([]T, error)

type DataSorter

type DataSorter struct {
	InsertCodec *InsertCodec
	InsertData  *InsertData
}

DataSorter sorts insert data

func (*DataSorter) Len

func (ds *DataSorter) Len() int

Len returns length of the insert data

func (*DataSorter) Less

func (ds *DataSorter) Less(i, j int) bool

Less returns whether i-th entry is less than j-th entry, using ID field comparison result

func (*DataSorter) Swap

func (ds *DataSorter) Swap(i, j int)

Swap swaps each field's i-th and j-th element

type DeleteBinlogWriter

type DeleteBinlogWriter struct {
	// contains filtered or unexported fields
}

DeleteBinlogWriter is an object to write binlog file which saves delete data.

func NewDeleteBinlogWriter

func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter

NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.

func (*DeleteBinlogWriter) Close

func (writer *DeleteBinlogWriter) Close()

func (*DeleteBinlogWriter) Finish

func (writer *DeleteBinlogWriter) Finish() error

Finish allocates buffer and releases resource

func (*DeleteBinlogWriter) GetBinlogType

func (writer *DeleteBinlogWriter) GetBinlogType() BinlogType

GetBinlogType returns writer's binlogType

func (*DeleteBinlogWriter) GetBuffer

func (writer *DeleteBinlogWriter) GetBuffer() ([]byte, error)

GetBuffer gets binlog buffer. Return nil if binlog is not finished yet.

func (*DeleteBinlogWriter) GetEventNums

func (writer *DeleteBinlogWriter) GetEventNums() int32

GetEventNums returns the number of event writers

func (*DeleteBinlogWriter) GetRowNums

func (writer *DeleteBinlogWriter) GetRowNums() (int32, error)

GetRowNums returns writer's number of rows

func (*DeleteBinlogWriter) NextDeleteEventWriter

func (writer *DeleteBinlogWriter) NextDeleteEventWriter(opts ...PayloadWriterOptions) (*deleteEventWriter, error)

NextDeleteEventWriter returns an event writer to write delete data to an event.

type DeleteCodec

type DeleteCodec struct{}

DeleteCodec serializes and deserializes the delete data

func NewDeleteCodec

func NewDeleteCodec() *DeleteCodec

NewDeleteCodec returns a DeleteCodec

func (*DeleteCodec) Deserialize

func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error)

Deserialize deserializes the deltalog blobs into DeleteData

func (*DeleteCodec) Serialize

func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) (*Blob, error)

Serialize transfer delete data to blob. . For each delete message, it will save "pk,ts" string to binlog.

type DeleteData

type DeleteData struct {
	Pks      []PrimaryKey // primary keys
	Tss      []Timestamp  // timestamps
	RowCount int64
	// contains filtered or unexported fields
}

DeleteData saves each entity delete message represented as <primarykey,timestamp> map. timestamp represents the time when this instance was deleted

func NewDeleteData

func NewDeleteData(pks []PrimaryKey, tss []Timestamp) *DeleteData

func (*DeleteData) Append

func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp)

Append append 1 pk&ts pair to DeleteData

func (*DeleteData) AppendBatch

func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp)

Append append 1 pk&ts pair to DeleteData

func (*DeleteData) Merge

func (data *DeleteData) Merge(other *DeleteData)

func (*DeleteData) Size

func (data *DeleteData) Size() int64

type DeleteLog

type DeleteLog struct {
	Pk     PrimaryKey `json:"pk"`
	Ts     uint64     `json:"ts"`
	PkType int64      `json:"pkType"`
}

func NewDeleteLog

func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog

func (*DeleteLog) Parse

func (dl *DeleteLog) Parse(val string) error

Parse tries to parse string format delete log it try json first then use "," split int,ts format

func (*DeleteLog) UnmarshalJSON

func (dl *DeleteLog) UnmarshalJSON(data []byte) error

type DeltaData

type DeltaData struct {
	// contains filtered or unexported fields
}

DeltaData stores delta data currently only delete tuples are stored

type DeltalogStreamWriter

type DeltalogStreamWriter struct {
	// contains filtered or unexported fields
}

func NewDeltalogStreamWriter

func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *DeltalogStreamWriter

func (*DeltalogStreamWriter) Finalize

func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error)

func (*DeltalogStreamWriter) GetRecordWriter

func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error)

type DescriptorEventDataFixPart

type DescriptorEventDataFixPart struct {
	CollectionID    int64
	PartitionID     int64
	SegmentID       int64
	FieldID         int64
	StartTimestamp  typeutil.Timestamp
	EndTimestamp    typeutil.Timestamp
	PayloadDataType schemapb.DataType
}

DescriptorEventDataFixPart is a memory struct saves events' DescriptorEventData.

type DeserializeReader

type DeserializeReader[T any] struct {
	// contains filtered or unexported fields
}

func NewBinlogDeserializeReader

func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error)

func NewDeltalogDeserializeReader

func NewDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error)

NewDeltalogDeserializeReader is the entry point for the delta log reader. It includes NewDeltalogOneFieldReader, which uses the existing log format with only one column in a log file, and NewDeltalogMultiFieldReader, which uses the new format and supports multiple fields in a log file.

func NewDeltalogMultiFieldReader

func NewDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error)

func NewDeltalogOneFieldReader

func NewDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error)

func NewDeserializeReader

func NewDeserializeReader[T any](rr RecordReader, deserializer Deserializer[T]) *DeserializeReader[T]

func (*DeserializeReader[T]) Close

func (deser *DeserializeReader[T]) Close()

func (*DeserializeReader[T]) Next

func (deser *DeserializeReader[T]) Next() error

Iterate to next value, return error or EOF if no more value.

func (*DeserializeReader[T]) Value

func (deser *DeserializeReader[T]) Value() T

type Deserializer

type Deserializer[T any] func(Record, []T) error

type DoubleFieldData

type DoubleFieldData struct {
	Data      []float64
	ValidData []bool
}

func (*DoubleFieldData) AppendRow

func (data *DoubleFieldData) AppendRow(row interface{}) error

func (*DoubleFieldData) AppendRows

func (data *DoubleFieldData) AppendRows(rows interface{}) error

func (*DoubleFieldData) GetDataType

func (data *DoubleFieldData) GetDataType() schemapb.DataType

func (*DoubleFieldData) GetMemorySize

func (data *DoubleFieldData) GetMemorySize() int

func (*DoubleFieldData) GetNullable

func (data *DoubleFieldData) GetNullable() bool

func (*DoubleFieldData) GetRow

func (data *DoubleFieldData) GetRow(i int) any

func (*DoubleFieldData) GetRowSize

func (data *DoubleFieldData) GetRowSize(i int) int

func (*DoubleFieldData) GetRows

func (data *DoubleFieldData) GetRows() any

func (*DoubleFieldData) RowNum

func (data *DoubleFieldData) RowNum() int

type DoubleFieldValue

type DoubleFieldValue struct {
	Value float64 `json:"value"`
}

DataType_Double

func NewDoubleFieldValue

func NewDoubleFieldValue(v float64) *DoubleFieldValue

func (*DoubleFieldValue) EQ

func (ifv *DoubleFieldValue) EQ(obj ScalarFieldValue) bool

func (*DoubleFieldValue) GE

func (ifv *DoubleFieldValue) GE(obj ScalarFieldValue) bool

func (*DoubleFieldValue) GT

func (ifv *DoubleFieldValue) GT(obj ScalarFieldValue) bool

func (*DoubleFieldValue) GetValue

func (ifv *DoubleFieldValue) GetValue() interface{}

func (*DoubleFieldValue) LE

func (ifv *DoubleFieldValue) LE(obj ScalarFieldValue) bool

func (*DoubleFieldValue) LT

func (ifv *DoubleFieldValue) LT(obj ScalarFieldValue) bool

func (*DoubleFieldValue) MarshalJSON

func (ifv *DoubleFieldValue) MarshalJSON() ([]byte, error)

func (*DoubleFieldValue) SetValue

func (ifv *DoubleFieldValue) SetValue(data interface{}) error

func (*DoubleFieldValue) Size

func (ifv *DoubleFieldValue) Size() int64

func (*DoubleFieldValue) Type

func (ifv *DoubleFieldValue) Type() schemapb.DataType

func (*DoubleFieldValue) UnmarshalJSON

func (ifv *DoubleFieldValue) UnmarshalJSON(data []byte) error

type EventReader

type EventReader struct {
	PayloadReaderInterface
	// contains filtered or unexported fields
}

EventReader is used to parse the events contained in the Binlog file.

func (*EventReader) Close

func (reader *EventReader) Close()

Close closes EventReader object. It mainly calls the Close method of inner PayloadReaderInterface and mark itself as closed.

type EventTypeCode

type EventTypeCode int8

EventTypeCode represents event type by code

const (
	DescriptorEventType EventTypeCode = iota
	InsertEventType
	DeleteEventType
	CreateCollectionEventType
	DropCollectionEventType
	CreatePartitionEventType
	DropPartitionEventType
	IndexFileEventType
	EventTypeEnd
)

EventTypeCode definitions

func (EventTypeCode) String

func (code EventTypeCode) String() string

String returns the string representation

type EventWriter

type EventWriter interface {
	PayloadWriterInterface
	// Finish set meta in header and no data can be added to event writer
	Finish() error
	// Close release resources
	Close()
	// Write serialize to buffer, should call Finish first
	Write(buffer *bytes.Buffer) error
	GetMemoryUsageInBytes() (int32, error)
	SetOffset(offset int32)
}

EventWriter abstracts event writer

type Factory

type Factory interface {
	NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}

type FieldData

type FieldData interface {
	GetMemorySize() int
	RowNum() int
	GetRow(i int) any
	GetRowSize(i int) int
	GetRows() any
	AppendRow(row interface{}) error
	AppendRows(rows interface{}) error
	GetDataType() schemapb.DataType
	GetNullable() bool
}

FieldData defines field data interface

func GetPkFromInsertData

func GetPkFromInsertData(collSchema *schemapb.CollectionSchema, data *InsertData) (FieldData, error)

TODO: string type.

func NewFieldData

func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, cap int) (FieldData, error)

type FieldID

type FieldID = typeutil.UniqueID

FieldID represent the identity number of field in collection and its type is UniqueID

type FieldStats

type FieldStats struct {
	FieldID   int64                            `json:"fieldID"`
	Type      schemapb.DataType                `json:"type"`
	Max       ScalarFieldValue                 `json:"max"`       // for scalar field
	Min       ScalarFieldValue                 `json:"min"`       // for scalar field
	BFType    bloomfilter.BFType               `json:"bfType"`    // for scalar field
	BF        bloomfilter.BloomFilterInterface `json:"bf"`        // for scalar field
	Centroids []VectorFieldValue               `json:"centroids"` // for vector field
}

FieldStats contains statistics data for any column todo: compatible to PrimaryKeyStats

func DeserializeFieldStats

func DeserializeFieldStats(blob *Blob) ([]*FieldStats, error)

func NewFieldStats

func NewFieldStats(fieldID int64, pkType schemapb.DataType, rowNum int64) (*FieldStats, error)

func (*FieldStats) Clone

func (stats *FieldStats) Clone() FieldStats

func (*FieldStats) SetVectorCentroids

func (stats *FieldStats) SetVectorCentroids(centroids ...VectorFieldValue)

SetVectorCentroids update centroids value

func (*FieldStats) UnmarshalJSON

func (stats *FieldStats) UnmarshalJSON(data []byte) error

UnmarshalJSON unmarshal bytes to FieldStats

func (*FieldStats) Update

func (stats *FieldStats) Update(pk ScalarFieldValue)

func (*FieldStats) UpdateByMsgs

func (stats *FieldStats) UpdateByMsgs(msgs FieldData)

func (*FieldStats) UpdateMinMax

func (stats *FieldStats) UpdateMinMax(pk ScalarFieldValue)

UpdateMinMax update min and max value

type FieldStatsReader

type FieldStatsReader struct {
	// contains filtered or unexported fields
}

FieldStatsReader reads stats

func (*FieldStatsReader) GetFieldStatsList

func (sr *FieldStatsReader) GetFieldStatsList() ([]*FieldStats, error)

GetFieldStatsList returns buffer as FieldStats

func (*FieldStatsReader) SetBuffer

func (sr *FieldStatsReader) SetBuffer(buffer []byte)

SetBuffer sets buffer

type FieldStatsWriter

type FieldStatsWriter struct {
	// contains filtered or unexported fields
}

FieldStatsWriter writes stats to buffer

func (*FieldStatsWriter) GenerateByData

func (sw *FieldStatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs ...FieldData) error

GenerateByData writes data from @msgs with @fieldID to @buffer

func (*FieldStatsWriter) GenerateList

func (sw *FieldStatsWriter) GenerateList(stats []*FieldStats) error

GenerateList writes Stats slice to buffer

func (*FieldStatsWriter) GetBuffer

func (sw *FieldStatsWriter) GetBuffer() []byte

GetBuffer returns buffer

type FileReader

type FileReader interface {
	io.Reader
	io.Closer
	io.ReaderAt
	io.Seeker
}

type Float16VectorFieldData

type Float16VectorFieldData struct {
	Data []byte
	Dim  int
}

func (*Float16VectorFieldData) AppendRow

func (data *Float16VectorFieldData) AppendRow(row interface{}) error

func (*Float16VectorFieldData) AppendRows

func (data *Float16VectorFieldData) AppendRows(rows interface{}) error

AppendRows appends FLATTEN vectors to field data.

func (*Float16VectorFieldData) GetDataType

func (data *Float16VectorFieldData) GetDataType() schemapb.DataType

func (*Float16VectorFieldData) GetMemorySize

func (data *Float16VectorFieldData) GetMemorySize() int

func (*Float16VectorFieldData) GetNullable

func (data *Float16VectorFieldData) GetNullable() bool

func (*Float16VectorFieldData) GetRow

func (data *Float16VectorFieldData) GetRow(i int) interface{}

func (*Float16VectorFieldData) GetRowSize

func (data *Float16VectorFieldData) GetRowSize(i int) int

func (*Float16VectorFieldData) GetRows

func (data *Float16VectorFieldData) GetRows() any

func (*Float16VectorFieldData) RowNum

func (data *Float16VectorFieldData) RowNum() int

type FloatFieldData

type FloatFieldData struct {
	Data      []float32
	ValidData []bool
}

func (*FloatFieldData) AppendRow

func (data *FloatFieldData) AppendRow(row interface{}) error

func (*FloatFieldData) AppendRows

func (data *FloatFieldData) AppendRows(rows interface{}) error

func (*FloatFieldData) GetDataType

func (data *FloatFieldData) GetDataType() schemapb.DataType

func (*FloatFieldData) GetMemorySize

func (data *FloatFieldData) GetMemorySize() int

func (*FloatFieldData) GetNullable

func (data *FloatFieldData) GetNullable() bool

func (*FloatFieldData) GetRow

func (data *FloatFieldData) GetRow(i int) any

func (*FloatFieldData) GetRowSize

func (data *FloatFieldData) GetRowSize(i int) int

func (*FloatFieldData) GetRows

func (data *FloatFieldData) GetRows() any

func (*FloatFieldData) RowNum

func (data *FloatFieldData) RowNum() int

type FloatFieldValue

type FloatFieldValue struct {
	Value float32 `json:"value"`
}

DataType_Float

func NewFloatFieldValue

func NewFloatFieldValue(v float32) *FloatFieldValue

func (*FloatFieldValue) EQ

func (ifv *FloatFieldValue) EQ(obj ScalarFieldValue) bool

func (*FloatFieldValue) GE

func (ifv *FloatFieldValue) GE(obj ScalarFieldValue) bool

func (*FloatFieldValue) GT

func (ifv *FloatFieldValue) GT(obj ScalarFieldValue) bool

func (*FloatFieldValue) GetValue

func (ifv *FloatFieldValue) GetValue() interface{}

func (*FloatFieldValue) LE

func (ifv *FloatFieldValue) LE(obj ScalarFieldValue) bool

func (*FloatFieldValue) LT

func (ifv *FloatFieldValue) LT(obj ScalarFieldValue) bool

func (*FloatFieldValue) MarshalJSON

func (ifv *FloatFieldValue) MarshalJSON() ([]byte, error)

func (*FloatFieldValue) SetValue

func (ifv *FloatFieldValue) SetValue(data interface{}) error

func (*FloatFieldValue) Size

func (ifv *FloatFieldValue) Size() int64

func (*FloatFieldValue) Type

func (ifv *FloatFieldValue) Type() schemapb.DataType

func (*FloatFieldValue) UnmarshalJSON

func (ifv *FloatFieldValue) UnmarshalJSON(data []byte) error

type FloatVectorFieldData

type FloatVectorFieldData struct {
	Data []float32
	Dim  int
}

func (*FloatVectorFieldData) AppendRow

func (data *FloatVectorFieldData) AppendRow(row interface{}) error

func (*FloatVectorFieldData) AppendRows

func (data *FloatVectorFieldData) AppendRows(rows interface{}) error

AppendRows appends FLATTEN vectors to field data.

func (*FloatVectorFieldData) GetDataType

func (data *FloatVectorFieldData) GetDataType() schemapb.DataType

func (*FloatVectorFieldData) GetMemorySize

func (data *FloatVectorFieldData) GetMemorySize() int

func (*FloatVectorFieldData) GetNullable

func (data *FloatVectorFieldData) GetNullable() bool

func (*FloatVectorFieldData) GetRow

func (data *FloatVectorFieldData) GetRow(i int) interface{}

func (*FloatVectorFieldData) GetRowSize

func (data *FloatVectorFieldData) GetRowSize(i int) int

func (*FloatVectorFieldData) GetRows

func (data *FloatVectorFieldData) GetRows() any

func (*FloatVectorFieldData) RowNum

func (data *FloatVectorFieldData) RowNum() int

type FloatVectorFieldValue

type FloatVectorFieldValue struct {
	Value []float32 `json:"value"`
}

func NewFloatVectorFieldValue

func NewFloatVectorFieldValue(v []float32) *FloatVectorFieldValue

func (*FloatVectorFieldValue) GetValue

func (ifv *FloatVectorFieldValue) GetValue() interface{}

func (*FloatVectorFieldValue) MarshalJSON

func (ifv *FloatVectorFieldValue) MarshalJSON() ([]byte, error)

func (*FloatVectorFieldValue) SetValue

func (ifv *FloatVectorFieldValue) SetValue(data interface{}) error

func (*FloatVectorFieldValue) Size

func (ifv *FloatVectorFieldValue) Size() int64

func (*FloatVectorFieldValue) Type

func (*FloatVectorFieldValue) UnmarshalJSON

func (ifv *FloatVectorFieldValue) UnmarshalJSON(data []byte) error

type IndexCodec

type IndexCodec struct{}

IndexCodec can serialize and deserialize index

func NewIndexCodec

func NewIndexCodec() *IndexCodec

NewIndexCodec creates IndexCodec

func (*IndexCodec) Deserialize

func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]string, string, UniqueID, error)

Deserialize deserializes index

func (*IndexCodec) Serialize

func (indexCodec *IndexCodec) Serialize(blobs []*Blob, params map[string]string, indexName string, indexID UniqueID) ([]*Blob, error)

Serialize serializes index

type IndexFileBinlogCodec

type IndexFileBinlogCodec struct{}

func NewIndexFileBinlogCodec

func NewIndexFileBinlogCodec() *IndexFileBinlogCodec

NewIndexFileBinlogCodec is constructor for IndexFileBinlogCodec

func (*IndexFileBinlogCodec) Deserialize

func (codec *IndexFileBinlogCodec) Deserialize(blobs []*Blob) (
	datas []*Blob,
	indexParams map[string]string,
	indexName string,
	indexID UniqueID,
	err error,
)

func (*IndexFileBinlogCodec) DeserializeImpl

func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) (
	indexBuildID UniqueID,
	version int64,
	collectionID UniqueID,
	partitionID UniqueID,
	segmentID UniqueID,
	fieldID UniqueID,
	indexParams map[string]string,
	indexName string,
	indexID UniqueID,
	datas []*Blob,
	err error,
)

func (*IndexFileBinlogCodec) Serialize

func (codec *IndexFileBinlogCodec) Serialize(
	indexBuildID UniqueID,
	version int64,
	collectionID UniqueID,
	partitionID UniqueID,
	segmentID UniqueID,
	fieldID UniqueID,
	indexParams map[string]string,
	indexName string,
	indexID UniqueID,
	datas []*Blob,
) ([]*Blob, error)

Serialize serilizes data as blobs.

func (*IndexFileBinlogCodec) SerializeIndexParams

func (codec *IndexFileBinlogCodec) SerializeIndexParams(
	indexBuildID UniqueID,
	version int64,
	collectionID UniqueID,
	partitionID UniqueID,
	segmentID UniqueID,
	fieldID UniqueID,
	indexParams map[string]string,
	indexName string,
	indexID UniqueID,
) (*Blob, error)

SerializeIndexParams serilizes index params as blob.

type IndexFileBinlogWriter

type IndexFileBinlogWriter struct {
	// contains filtered or unexported fields
}

IndexFileBinlogWriter is an object to write binlog file which saves index files

func NewIndexFileBinlogWriter

func NewIndexFileBinlogWriter(
	indexBuildID UniqueID,
	version int64,
	collectionID UniqueID,
	partitionID UniqueID,
	segmentID UniqueID,
	fieldID UniqueID,
	indexName string,
	indexID UniqueID,
	key string,
) *IndexFileBinlogWriter

NewIndexFileBinlogWriter returns a new IndexFileBinlogWriter with provided parameters

func (*IndexFileBinlogWriter) Close

func (writer *IndexFileBinlogWriter) Close()

func (*IndexFileBinlogWriter) Finish

func (writer *IndexFileBinlogWriter) Finish() error

Finish allocates buffer and releases resource

func (*IndexFileBinlogWriter) GetBinlogType

func (writer *IndexFileBinlogWriter) GetBinlogType() BinlogType

GetBinlogType returns writer's binlogType

func (*IndexFileBinlogWriter) GetBuffer

func (writer *IndexFileBinlogWriter) GetBuffer() ([]byte, error)

GetBuffer gets binlog buffer. Return nil if binlog is not finished yet.

func (*IndexFileBinlogWriter) GetEventNums

func (writer *IndexFileBinlogWriter) GetEventNums() int32

GetEventNums returns the number of event writers

func (*IndexFileBinlogWriter) GetRowNums

func (writer *IndexFileBinlogWriter) GetRowNums() (int32, error)

GetRowNums returns writer's number of rows

func (*IndexFileBinlogWriter) NextIndexFileEventWriter

func (writer *IndexFileBinlogWriter) NextIndexFileEventWriter() (*indexFileEventWriter, error)

NextIndexFileEventWriter return next available EventWriter

type InsertBinlogIterator

type InsertBinlogIterator struct {
	PKfieldID int64
	PkType    schemapb.DataType
	// contains filtered or unexported fields
}

InsertBinlogIterator is the iterator of binlog

func NewInsertBinlogIterator deprecated

func NewInsertBinlogIterator(blobs []*Blob, PKfieldID UniqueID, pkType schemapb.DataType) (*InsertBinlogIterator, error)

NewInsertBinlogIterator creates a new iterator

Deprecated: use storage.NewBinlogDeserializeReader instead

func (*InsertBinlogIterator) Dispose

func (itr *InsertBinlogIterator) Dispose()

Dispose disposes the iterator

func (*InsertBinlogIterator) HasNext

func (itr *InsertBinlogIterator) HasNext() bool

HasNext returns true if the iterator have unread record

func (*InsertBinlogIterator) Next

func (itr *InsertBinlogIterator) Next() (interface{}, error)

Next returns the next record

type InsertBinlogWriter

type InsertBinlogWriter struct {
	// contains filtered or unexported fields
}

InsertBinlogWriter is an object to write binlog file which saves insert data.

func NewInsertBinlogWriter

func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64, nullable bool) *InsertBinlogWriter

NewInsertBinlogWriter creates InsertBinlogWriter to write binlog file.

func (*InsertBinlogWriter) Close

func (writer *InsertBinlogWriter) Close()

func (*InsertBinlogWriter) Finish

func (writer *InsertBinlogWriter) Finish() error

Finish allocates buffer and releases resource

func (*InsertBinlogWriter) GetBinlogType

func (writer *InsertBinlogWriter) GetBinlogType() BinlogType

GetBinlogType returns writer's binlogType

func (*InsertBinlogWriter) GetBuffer

func (writer *InsertBinlogWriter) GetBuffer() ([]byte, error)

GetBuffer gets binlog buffer. Return nil if binlog is not finished yet.

func (*InsertBinlogWriter) GetEventNums

func (writer *InsertBinlogWriter) GetEventNums() int32

GetEventNums returns the number of event writers

func (*InsertBinlogWriter) GetRowNums

func (writer *InsertBinlogWriter) GetRowNums() (int32, error)

GetRowNums returns writer's number of rows

func (*InsertBinlogWriter) NextInsertEventWriter

func (writer *InsertBinlogWriter) NextInsertEventWriter(opts ...PayloadWriterOptions) (*insertEventWriter, error)

NextInsertEventWriter returns an event writer to write insert data to an event.

type InsertCodec

type InsertCodec struct {
	Schema *etcdpb.CollectionMeta
}

InsertCodec serializes and deserializes the insert data Blob key example: ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}

func NewInsertCodec

func NewInsertCodec() *InsertCodec

NewInsertCodec creates an InsertCodec

func NewInsertCodecWithSchema

func NewInsertCodecWithSchema(schema *etcdpb.CollectionMeta) *InsertCodec

NewInsertCodecWithSchema creates an InsertCodec with provided collection meta

func (*InsertCodec) Deserialize

func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error)

Deserialize transfer blob back to insert data. From schema, it get all fields. For each field, it will create a binlog reader, and read all event to the buffer. It returns origin @InsertData in the end.

func (*InsertCodec) DeserializeAll

func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
	collectionID UniqueID,
	partitionID UniqueID,
	segmentID UniqueID,
	data *InsertData,
	err error,
)

func (*InsertCodec) DeserializeInto

func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int, insertData *InsertData) (
	collectionID UniqueID,
	partitionID UniqueID,
	segmentID UniqueID,
	err error,
)

func (*InsertCodec) Serialize

func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data ...*InsertData) ([]*Blob, error)

Serialize transforms insert data to blob. It will sort insert data by timestamp. From schema, it gets all fields. For each field, it will create a binlog writer, and write an event to the binlog. It returns binlog buffer in the end.

func (*InsertCodec) SerializePkStats

func (insertCodec *InsertCodec) SerializePkStats(stats *PrimaryKeyStats, rowNum int64) (*Blob, error)

Serialize Pk stats log

func (*InsertCodec) SerializePkStatsByData

func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob, error)

Serialize Pk stats log by insert data

func (*InsertCodec) SerializePkStatsList

func (insertCodec *InsertCodec) SerializePkStatsList(stats []*PrimaryKeyStats, rowNum int64) (*Blob, error)

Serialize Pk stats list to one blob

type InsertData

type InsertData struct {
	// TODO, data should be zero copy by passing data directly to event reader or change Data to map[FieldID]FieldDataArray
	Data  map[FieldID]FieldData // field id to field data
	Infos []BlobInfo
}

InsertData example row_schema: {float_field, int_field, float_vector_field, string_field} Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}

system filed id: 0: unique row id 1: timestamp 100: first user field id 101: second user field id 102: ...

func ColumnBasedInsertMsgToInsertData

func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemapb.CollectionSchema) (idata *InsertData, err error)

ColumnBasedInsertMsgToInsertData converts an InsertMsg msg into InsertData based on provided CollectionSchema collSchema.

This function checks whether all fields are provided in the collSchema.Fields. If any field is missing in the msg, an error will be returned.

This funcion also checks the length of each column. All columns shall have the same length. Also, the InsertData.Infos shall have BlobInfo with this length returned. When the length is not aligned, an error will be returned.

func InsertMsgToInsertData

func InsertMsgToInsertData(msg *msgstream.InsertMsg, schema *schemapb.CollectionSchema) (idata *InsertData, err error)

func NewInsertData

func NewInsertData(schema *schemapb.CollectionSchema) (*InsertData, error)

func NewInsertDataWithCap

func NewInsertDataWithCap(schema *schemapb.CollectionSchema, cap int) (*InsertData, error)

func RowBasedInsertMsgToInsertData

func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemapb.CollectionSchema) (idata *InsertData, err error)

func (*InsertData) Append

func (i *InsertData) Append(row map[FieldID]interface{}) error

func (*InsertData) GetMemorySize

func (i *InsertData) GetMemorySize() int

func (*InsertData) GetRow

func (i *InsertData) GetRow(idx int) map[FieldID]interface{}

func (*InsertData) GetRowNum

func (i *InsertData) GetRowNum() int

func (*InsertData) GetRowSize

func (i *InsertData) GetRowSize(idx int) int

func (*InsertData) IsEmpty

func (iData *InsertData) IsEmpty() bool

type Int16FieldData

type Int16FieldData struct {
	Data      []int16
	ValidData []bool
}

func (*Int16FieldData) AppendRow

func (data *Int16FieldData) AppendRow(row interface{}) error

func (*Int16FieldData) AppendRows

func (data *Int16FieldData) AppendRows(rows interface{}) error

func (*Int16FieldData) GetDataType

func (data *Int16FieldData) GetDataType() schemapb.DataType

func (*Int16FieldData) GetMemorySize

func (data *Int16FieldData) GetMemorySize() int

func (*Int16FieldData) GetNullable

func (data *Int16FieldData) GetNullable() bool

func (*Int16FieldData) GetRow

func (data *Int16FieldData) GetRow(i int) any

func (*Int16FieldData) GetRowSize

func (data *Int16FieldData) GetRowSize(i int) int

func (*Int16FieldData) GetRows

func (data *Int16FieldData) GetRows() any

func (*Int16FieldData) RowNum

func (data *Int16FieldData) RowNum() int

type Int16FieldValue

type Int16FieldValue struct {
	Value int16 `json:"value"`
}

DataType_Int16

func NewInt16FieldValue

func NewInt16FieldValue(v int16) *Int16FieldValue

func (*Int16FieldValue) EQ

func (ifv *Int16FieldValue) EQ(obj ScalarFieldValue) bool

func (*Int16FieldValue) GE

func (ifv *Int16FieldValue) GE(obj ScalarFieldValue) bool

func (*Int16FieldValue) GT

func (ifv *Int16FieldValue) GT(obj ScalarFieldValue) bool

func (*Int16FieldValue) GetValue

func (ifv *Int16FieldValue) GetValue() interface{}

func (*Int16FieldValue) LE

func (ifv *Int16FieldValue) LE(obj ScalarFieldValue) bool

func (*Int16FieldValue) LT

func (ifv *Int16FieldValue) LT(obj ScalarFieldValue) bool

func (*Int16FieldValue) MarshalJSON

func (ifv *Int16FieldValue) MarshalJSON() ([]byte, error)

func (*Int16FieldValue) SetValue

func (ifv *Int16FieldValue) SetValue(data interface{}) error

func (*Int16FieldValue) Size

func (ifv *Int16FieldValue) Size() int64

func (*Int16FieldValue) Type

func (ifv *Int16FieldValue) Type() schemapb.DataType

func (*Int16FieldValue) UnmarshalJSON

func (ifv *Int16FieldValue) UnmarshalJSON(data []byte) error

type Int32FieldData

type Int32FieldData struct {
	Data      []int32
	ValidData []bool
}

func (*Int32FieldData) AppendRow

func (data *Int32FieldData) AppendRow(row interface{}) error

func (*Int32FieldData) AppendRows

func (data *Int32FieldData) AppendRows(rows interface{}) error

func (*Int32FieldData) GetDataType

func (data *Int32FieldData) GetDataType() schemapb.DataType

func (*Int32FieldData) GetMemorySize

func (data *Int32FieldData) GetMemorySize() int

func (*Int32FieldData) GetNullable

func (data *Int32FieldData) GetNullable() bool

func (*Int32FieldData) GetRow

func (data *Int32FieldData) GetRow(i int) any

func (*Int32FieldData) GetRowSize

func (data *Int32FieldData) GetRowSize(i int) int

func (*Int32FieldData) GetRows

func (data *Int32FieldData) GetRows() any

func (*Int32FieldData) RowNum

func (data *Int32FieldData) RowNum() int

type Int32FieldValue

type Int32FieldValue struct {
	Value int32 `json:"value"`
}

DataType_Int32

func NewInt32FieldValue

func NewInt32FieldValue(v int32) *Int32FieldValue

func (*Int32FieldValue) EQ

func (ifv *Int32FieldValue) EQ(obj ScalarFieldValue) bool

func (*Int32FieldValue) GE

func (ifv *Int32FieldValue) GE(obj ScalarFieldValue) bool

func (*Int32FieldValue) GT

func (ifv *Int32FieldValue) GT(obj ScalarFieldValue) bool

func (*Int32FieldValue) GetValue

func (ifv *Int32FieldValue) GetValue() interface{}

func (*Int32FieldValue) LE

func (ifv *Int32FieldValue) LE(obj ScalarFieldValue) bool

func (*Int32FieldValue) LT

func (ifv *Int32FieldValue) LT(obj ScalarFieldValue) bool

func (*Int32FieldValue) MarshalJSON

func (ifv *Int32FieldValue) MarshalJSON() ([]byte, error)

func (*Int32FieldValue) SetValue

func (ifv *Int32FieldValue) SetValue(data interface{}) error

func (*Int32FieldValue) Size

func (ifv *Int32FieldValue) Size() int64

func (*Int32FieldValue) Type

func (ifv *Int32FieldValue) Type() schemapb.DataType

func (*Int32FieldValue) UnmarshalJSON

func (ifv *Int32FieldValue) UnmarshalJSON(data []byte) error

type Int64FieldData

type Int64FieldData struct {
	Data      []int64
	ValidData []bool
}

func GetTimestampFromInsertData

func GetTimestampFromInsertData(data *InsertData) (*Int64FieldData, error)

GetTimestampFromInsertData returns the Int64FieldData for timestamp field.

func (*Int64FieldData) AppendRow

func (data *Int64FieldData) AppendRow(row interface{}) error

func (*Int64FieldData) AppendRows

func (data *Int64FieldData) AppendRows(rows interface{}) error

func (*Int64FieldData) GetDataType

func (data *Int64FieldData) GetDataType() schemapb.DataType

func (*Int64FieldData) GetMemorySize

func (data *Int64FieldData) GetMemorySize() int

func (*Int64FieldData) GetNullable

func (data *Int64FieldData) GetNullable() bool

func (*Int64FieldData) GetRow

func (data *Int64FieldData) GetRow(i int) any

func (*Int64FieldData) GetRowSize

func (data *Int64FieldData) GetRowSize(i int) int

func (*Int64FieldData) GetRows

func (data *Int64FieldData) GetRows() any

func (*Int64FieldData) RowNum

func (data *Int64FieldData) RowNum() int

type Int64FieldValue

type Int64FieldValue struct {
	Value int64 `json:"value"`
}

DataType_Int64

func NewInt64FieldValue

func NewInt64FieldValue(v int64) *Int64FieldValue

func (*Int64FieldValue) EQ

func (ifv *Int64FieldValue) EQ(obj ScalarFieldValue) bool

func (*Int64FieldValue) GE

func (ifv *Int64FieldValue) GE(obj ScalarFieldValue) bool

func (*Int64FieldValue) GT

func (ifv *Int64FieldValue) GT(obj ScalarFieldValue) bool

func (*Int64FieldValue) GetValue

func (ifv *Int64FieldValue) GetValue() interface{}

func (*Int64FieldValue) LE

func (ifv *Int64FieldValue) LE(obj ScalarFieldValue) bool

func (*Int64FieldValue) LT

func (ifv *Int64FieldValue) LT(obj ScalarFieldValue) bool

func (*Int64FieldValue) MarshalJSON

func (ifv *Int64FieldValue) MarshalJSON() ([]byte, error)

func (*Int64FieldValue) SetValue

func (ifv *Int64FieldValue) SetValue(data interface{}) error

func (*Int64FieldValue) Size

func (ifv *Int64FieldValue) Size() int64

func (*Int64FieldValue) Type

func (ifv *Int64FieldValue) Type() schemapb.DataType

func (*Int64FieldValue) UnmarshalJSON

func (ifv *Int64FieldValue) UnmarshalJSON(data []byte) error

type Int64PrimaryKey

type Int64PrimaryKey struct {
	Value int64 `json:"pkValue"`
}

func NewInt64PrimaryKey

func NewInt64PrimaryKey(v int64) *Int64PrimaryKey

func (*Int64PrimaryKey) EQ

func (ip *Int64PrimaryKey) EQ(key PrimaryKey) bool

func (*Int64PrimaryKey) GE

func (ip *Int64PrimaryKey) GE(key PrimaryKey) bool

func (*Int64PrimaryKey) GT

func (ip *Int64PrimaryKey) GT(key PrimaryKey) bool

func (*Int64PrimaryKey) GetValue

func (ip *Int64PrimaryKey) GetValue() interface{}

func (*Int64PrimaryKey) LE

func (ip *Int64PrimaryKey) LE(key PrimaryKey) bool

func (*Int64PrimaryKey) LT

func (ip *Int64PrimaryKey) LT(key PrimaryKey) bool

func (*Int64PrimaryKey) MarshalJSON

func (ip *Int64PrimaryKey) MarshalJSON() ([]byte, error)

func (*Int64PrimaryKey) SetValue

func (ip *Int64PrimaryKey) SetValue(data interface{}) error

func (*Int64PrimaryKey) Size

func (ip *Int64PrimaryKey) Size() int64

func (*Int64PrimaryKey) Type

func (ip *Int64PrimaryKey) Type() schemapb.DataType

func (*Int64PrimaryKey) UnmarshalJSON

func (ip *Int64PrimaryKey) UnmarshalJSON(data []byte) error

type Int64PrimaryKeys

type Int64PrimaryKeys struct {
	// contains filtered or unexported fields
}

func NewInt64PrimaryKeys

func NewInt64PrimaryKeys(cap int) *Int64PrimaryKeys

func (*Int64PrimaryKeys) Append

func (pks *Int64PrimaryKeys) Append(values ...PrimaryKey) error

func (*Int64PrimaryKeys) AppendRaw

func (pks *Int64PrimaryKeys) AppendRaw(values ...int64)

func (*Int64PrimaryKeys) Get

func (pks *Int64PrimaryKeys) Get(idx int) PrimaryKey

func (*Int64PrimaryKeys) Len

func (pks *Int64PrimaryKeys) Len() int

func (*Int64PrimaryKeys) MustAppend

func (pks *Int64PrimaryKeys) MustAppend(values ...PrimaryKey)

func (*Int64PrimaryKeys) MustMerge

func (pks *Int64PrimaryKeys) MustMerge(another PrimaryKeys)

func (*Int64PrimaryKeys) Size

func (pks *Int64PrimaryKeys) Size() int64

func (*Int64PrimaryKeys) Type

func (pks *Int64PrimaryKeys) Type() schemapb.DataType

type Int8FieldData

type Int8FieldData struct {
	Data      []int8
	ValidData []bool
}

func (*Int8FieldData) AppendRow

func (data *Int8FieldData) AppendRow(row interface{}) error

func (*Int8FieldData) AppendRows

func (data *Int8FieldData) AppendRows(rows interface{}) error

func (*Int8FieldData) GetDataType

func (data *Int8FieldData) GetDataType() schemapb.DataType

func (*Int8FieldData) GetMemorySize

func (data *Int8FieldData) GetMemorySize() int

func (*Int8FieldData) GetNullable

func (data *Int8FieldData) GetNullable() bool

func (*Int8FieldData) GetRow

func (data *Int8FieldData) GetRow(i int) any

func (*Int8FieldData) GetRowSize

func (data *Int8FieldData) GetRowSize(i int) int

func (*Int8FieldData) GetRows

func (data *Int8FieldData) GetRows() any

func (*Int8FieldData) RowNum

func (data *Int8FieldData) RowNum() int

type Int8FieldValue

type Int8FieldValue struct {
	Value int8 `json:"value"`
}

DataType_Int8

func NewInt8FieldValue

func NewInt8FieldValue(v int8) *Int8FieldValue

func (*Int8FieldValue) EQ

func (ifv *Int8FieldValue) EQ(obj ScalarFieldValue) bool

func (*Int8FieldValue) GE

func (ifv *Int8FieldValue) GE(obj ScalarFieldValue) bool

func (*Int8FieldValue) GT

func (ifv *Int8FieldValue) GT(obj ScalarFieldValue) bool

func (*Int8FieldValue) GetValue

func (ifv *Int8FieldValue) GetValue() interface{}

func (*Int8FieldValue) LE

func (ifv *Int8FieldValue) LE(obj ScalarFieldValue) bool

func (*Int8FieldValue) LT

func (ifv *Int8FieldValue) LT(obj ScalarFieldValue) bool

func (*Int8FieldValue) MarshalJSON

func (ifv *Int8FieldValue) MarshalJSON() ([]byte, error)

func (*Int8FieldValue) SetValue

func (ifv *Int8FieldValue) SetValue(data interface{}) error

func (*Int8FieldValue) Size

func (ifv *Int8FieldValue) Size() int64

func (*Int8FieldValue) Type

func (ifv *Int8FieldValue) Type() schemapb.DataType

func (*Int8FieldValue) UnmarshalJSON

func (ifv *Int8FieldValue) UnmarshalJSON(data []byte) error

type Iterator

type Iterator interface {
	// HasNext returns true if the iterator have unread record
	HasNext() bool
	// Next returns the next record
	Next() (interface{}, error)
	// Dispose disposes the iterator
	Dispose()
}

Iterator is the iterator interface.

type JSONFieldData

type JSONFieldData struct {
	Data      [][]byte
	ValidData []bool
}

func (*JSONFieldData) AppendRow

func (data *JSONFieldData) AppendRow(row interface{}) error

func (*JSONFieldData) AppendRows

func (data *JSONFieldData) AppendRows(rows interface{}) error

func (*JSONFieldData) GetDataType

func (data *JSONFieldData) GetDataType() schemapb.DataType

func (*JSONFieldData) GetMemorySize

func (data *JSONFieldData) GetMemorySize() int

func (*JSONFieldData) GetNullable

func (data *JSONFieldData) GetNullable() bool

func (*JSONFieldData) GetRow

func (data *JSONFieldData) GetRow(i int) any

func (*JSONFieldData) GetRowSize

func (data *JSONFieldData) GetRowSize(i int) int

func (*JSONFieldData) GetRows

func (data *JSONFieldData) GetRows() any

func (*JSONFieldData) RowNum

func (data *JSONFieldData) RowNum() int

type LocalChunkManager

type LocalChunkManager struct {
	// contains filtered or unexported fields
}

LocalChunkManager is responsible for read and write local file.

func NewLocalChunkManager

func NewLocalChunkManager(opts ...Option) *LocalChunkManager

NewLocalChunkManager create a new local manager object.

func (*LocalChunkManager) Exist

func (lcm *LocalChunkManager) Exist(ctx context.Context, filePath string) (bool, error)

Exist checks whether chunk is saved to local storage.

func (*LocalChunkManager) Mmap

func (lcm *LocalChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)

func (*LocalChunkManager) MultiRead

func (lcm *LocalChunkManager) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)

MultiRead reads the local storage data if exists.

func (*LocalChunkManager) MultiRemove

func (lcm *LocalChunkManager) MultiRemove(ctx context.Context, filePaths []string) error

func (*LocalChunkManager) MultiWrite

func (lcm *LocalChunkManager) MultiWrite(ctx context.Context, contents map[string][]byte) error

MultiWrite writes the data to local storage.

func (*LocalChunkManager) Path

func (lcm *LocalChunkManager) Path(ctx context.Context, filePath string) (string, error)

Path returns the path of local data if exists.

func (*LocalChunkManager) Read

func (lcm *LocalChunkManager) Read(ctx context.Context, filePath string) ([]byte, error)

Read reads the local storage data if exists.

func (*LocalChunkManager) ReadAt

func (lcm *LocalChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error)

ReadAt reads specific position data of local storage if exists.

func (*LocalChunkManager) Reader

func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error)

func (*LocalChunkManager) Remove

func (lcm *LocalChunkManager) Remove(ctx context.Context, filePath string) error

func (*LocalChunkManager) RemoveWithPrefix

func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error

func (*LocalChunkManager) RootPath

func (lcm *LocalChunkManager) RootPath() string

RootPath returns lcm root path.

func (*LocalChunkManager) Size

func (lcm *LocalChunkManager) Size(ctx context.Context, filePath string) (int64, error)

func (*LocalChunkManager) WalkWithPrefix

func (lcm *LocalChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) (err error)

func (*LocalChunkManager) Write

func (lcm *LocalChunkManager) Write(ctx context.Context, filePath string, content []byte) error

Write writes the data to local storage.

type LocationsCache

type LocationsCache struct {
	// contains filtered or unexported fields
}

LocationsCache is a helper struct caching pk bloom filter locations. Note that this helper is not concurrent safe and shall be used in same goroutine.

func NewLocationsCache

func NewLocationsCache(pk PrimaryKey) *LocationsCache

func (*LocationsCache) GetPk

func (lc *LocationsCache) GetPk() PrimaryKey

func (*LocationsCache) Locations

func (lc *LocationsCache) Locations(k uint, bfType bloomfilter.BFType) []uint64

type MergeIterator

type MergeIterator struct {
	// contains filtered or unexported fields
}

MergeIterator merge iterators.

func NewMergeIterator

func NewMergeIterator(iterators []Iterator) *MergeIterator

NewMergeIterator return a new MergeIterator.

func (*MergeIterator) Dispose

func (itr *MergeIterator) Dispose()

Dispose disposes the iterator

func (*MergeIterator) HasNext

func (itr *MergeIterator) HasNext() bool

HasNext returns true if the iterator have unread record

func (*MergeIterator) Next

func (itr *MergeIterator) Next() (interface{}, error)

Next returns the next record

type MinioObjectStorage

type MinioObjectStorage struct {
	*minio.Client
}

func (*MinioObjectStorage) GetObject

func (minioObjectStorage *MinioObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error)

func (*MinioObjectStorage) PutObject

func (minioObjectStorage *MinioObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error

func (*MinioObjectStorage) RemoveObject

func (minioObjectStorage *MinioObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error

func (*MinioObjectStorage) StatObject

func (minioObjectStorage *MinioObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error)

func (*MinioObjectStorage) WalkWithObjects

func (minioObjectStorage *MinioObjectStorage) WalkWithObjects(ctx context.Context, bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) (err error)

type MultiFieldDeltalogStreamWriter

type MultiFieldDeltalogStreamWriter struct {
	// contains filtered or unexported fields
}

func NewMultiFieldDeltalogStreamWriter

func NewMultiFieldDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID, schema []*schemapb.FieldSchema) *MultiFieldDeltalogStreamWriter

func (*MultiFieldDeltalogStreamWriter) Finalize

func (dsw *MultiFieldDeltalogStreamWriter) Finalize() (*Blob, error)

func (*MultiFieldDeltalogStreamWriter) GetRecordWriter

func (dsw *MultiFieldDeltalogStreamWriter) GetRecordWriter() (RecordWriter, error)

type NativePayloadWriter

type NativePayloadWriter struct {
	// contains filtered or unexported fields
}

func (*NativePayloadWriter) AddBFloat16VectorToPayload

func (w *NativePayloadWriter) AddBFloat16VectorToPayload(data []byte, dim int) error

func (*NativePayloadWriter) AddBinaryVectorToPayload

func (w *NativePayloadWriter) AddBinaryVectorToPayload(data []byte, dim int) error

func (*NativePayloadWriter) AddBoolToPayload

func (w *NativePayloadWriter) AddBoolToPayload(data []bool, validData []bool) error

func (*NativePayloadWriter) AddByteToPayload

func (w *NativePayloadWriter) AddByteToPayload(data []byte, validData []bool) error

func (*NativePayloadWriter) AddDataToPayload

func (w *NativePayloadWriter) AddDataToPayload(data interface{}, validData []bool) error

func (*NativePayloadWriter) AddDoubleToPayload

func (w *NativePayloadWriter) AddDoubleToPayload(data []float64, validData []bool) error

func (*NativePayloadWriter) AddFloat16VectorToPayload

func (w *NativePayloadWriter) AddFloat16VectorToPayload(data []byte, dim int) error

func (*NativePayloadWriter) AddFloatToPayload

func (w *NativePayloadWriter) AddFloatToPayload(data []float32, validData []bool) error

func (*NativePayloadWriter) AddFloatVectorToPayload

func (w *NativePayloadWriter) AddFloatVectorToPayload(data []float32, dim int) error

func (*NativePayloadWriter) AddInt16ToPayload

func (w *NativePayloadWriter) AddInt16ToPayload(data []int16, validData []bool) error

func (*NativePayloadWriter) AddInt32ToPayload

func (w *NativePayloadWriter) AddInt32ToPayload(data []int32, validData []bool) error

func (*NativePayloadWriter) AddInt64ToPayload

func (w *NativePayloadWriter) AddInt64ToPayload(data []int64, validData []bool) error

func (*NativePayloadWriter) AddInt8ToPayload

func (w *NativePayloadWriter) AddInt8ToPayload(data []int8, validData []bool) error

func (*NativePayloadWriter) AddOneArrayToPayload

func (w *NativePayloadWriter) AddOneArrayToPayload(data *schemapb.ScalarField, isValid bool) error

func (*NativePayloadWriter) AddOneJSONToPayload

func (w *NativePayloadWriter) AddOneJSONToPayload(data []byte, isValid bool) error

func (*NativePayloadWriter) AddOneStringToPayload

func (w *NativePayloadWriter) AddOneStringToPayload(data string, isValid bool) error

func (*NativePayloadWriter) AddSparseFloatVectorToPayload

func (w *NativePayloadWriter) AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error

func (*NativePayloadWriter) Close

func (w *NativePayloadWriter) Close()

func (*NativePayloadWriter) FinishPayloadWriter

func (w *NativePayloadWriter) FinishPayloadWriter() error

func (*NativePayloadWriter) GetPayloadBufferFromWriter

func (w *NativePayloadWriter) GetPayloadBufferFromWriter() ([]byte, error)

func (*NativePayloadWriter) GetPayloadLengthFromWriter

func (w *NativePayloadWriter) GetPayloadLengthFromWriter() (int, error)

func (*NativePayloadWriter) ReleasePayloadWriter

func (w *NativePayloadWriter) ReleasePayloadWriter()

func (*NativePayloadWriter) Reserve

func (w *NativePayloadWriter) Reserve(size int)

type NullableInt

type NullableInt struct {
	Value *int
}

func NewNullableInt

func NewNullableInt(value int) *NullableInt

NewNullableInt creates a new NullableInt instance

func (NullableInt) GetValue

func (ni NullableInt) GetValue() int

func (NullableInt) IsNull

func (ni NullableInt) IsNull() bool

IsNull checks if the NullableInt is null

type ObjectStorage

type ObjectStorage interface {
	GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error)
	PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error
	StatObject(ctx context.Context, bucketName, objectName string) (int64, error)
	// WalkWithPrefix walks all objects with prefix @prefix, and call walker for each object.
	// WalkWithPrefix will stop if following conditions met:
	// 1. cb return false or reach the last object, WalkWithPrefix will stop and return nil.
	// 2. underlying walking failed or context canceled, WalkWithPrefix will stop and return a error.
	WalkWithObjects(ctx context.Context, bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) error
	RemoveObject(ctx context.Context, bucketName, objectName string) error
}

type Option

type Option func(*config)

Option is used to config the retry function.

func AccessKeyID

func AccessKeyID(accessKeyID string) Option

func Address

func Address(addr string) Option

func BucketName

func BucketName(bucketName string) Option

func CloudProvider

func CloudProvider(cloudProvider string) Option

func CreateBucket

func CreateBucket(createBucket bool) Option

func IAMEndpoint

func IAMEndpoint(iamEndpoint string) Option

func Region

func Region(region string) Option

func RequestTimeout

func RequestTimeout(requestTimeoutMs int64) Option

func RootPath

func RootPath(rootPath string) Option

func SecretAccessKeyID

func SecretAccessKeyID(secretAccessKeyID string) Option

func SslCACert

func SslCACert(sslCACert string) Option

func UseIAM

func UseIAM(useIAM bool) Option

func UseSSL

func UseSSL(useSSL bool) Option

func UseVirtualHost

func UseVirtualHost(useVirtualHost bool) Option

type PartitionStatsSnapshot

type PartitionStatsSnapshot struct {
	SegmentStats map[UniqueID]SegmentStats `json:"segmentStats"`
	Version      int64
}

func DeserializePartitionsStatsSnapshot

func DeserializePartitionsStatsSnapshot(data []byte) (*PartitionStatsSnapshot, error)

func NewPartitionStatsSnapshot

func NewPartitionStatsSnapshot() *PartitionStatsSnapshot

func (*PartitionStatsSnapshot) GetVersion

func (ps *PartitionStatsSnapshot) GetVersion() int64

func (*PartitionStatsSnapshot) SetVersion

func (ps *PartitionStatsSnapshot) SetVersion(v int64)

func (*PartitionStatsSnapshot) UpdateSegmentStats

func (ps *PartitionStatsSnapshot) UpdateSegmentStats(segmentID UniqueID, segmentStats SegmentStats)

type PayloadReader

type PayloadReader struct {
	// contains filtered or unexported fields
}

PayloadReader reads data from payload

func NewPayloadReader

func NewPayloadReader(colType schemapb.DataType, buf []byte, nullable bool) (*PayloadReader, error)

func (*PayloadReader) Close

func (r *PayloadReader) Close() error

Close closes the payload reader

func (*PayloadReader) GetArrayFromPayload

func (r *PayloadReader) GetArrayFromPayload() ([]*schemapb.ScalarField, []bool, error)

func (*PayloadReader) GetArrowRecordReader

func (r *PayloadReader) GetArrowRecordReader() (pqarrow.RecordReader, error)

func (*PayloadReader) GetBFloat16VectorFromPayload

func (r *PayloadReader) GetBFloat16VectorFromPayload() ([]byte, int, error)

GetBFloat16VectorFromPayload returns vector, dimension, error

func (*PayloadReader) GetBinaryVectorFromPayload

func (r *PayloadReader) GetBinaryVectorFromPayload() ([]byte, int, error)

GetBinaryVectorFromPayload returns vector, dimension, error

func (*PayloadReader) GetBoolFromPayload

func (r *PayloadReader) GetBoolFromPayload() ([]bool, []bool, error)

GetBoolFromPayload returns bool slice from payload.

func (*PayloadReader) GetByteArrayDataSet

func (*PayloadReader) GetByteFromPayload

func (r *PayloadReader) GetByteFromPayload() ([]byte, []bool, error)

GetByteFromPayload returns byte slice from payload

func (*PayloadReader) GetDataFromPayload

func (r *PayloadReader) GetDataFromPayload() (interface{}, []bool, int, error)
	`interface{}`: all types.
 `[]bool`: validData, only meaningful to ScalarField.
	`int`: dim, only meaningful to FLOAT/BINARY VECTOR type.
	`error`: error.

func (*PayloadReader) GetDoubleFromPayload

func (r *PayloadReader) GetDoubleFromPayload() ([]float64, []bool, error)

func (*PayloadReader) GetFloat16VectorFromPayload

func (r *PayloadReader) GetFloat16VectorFromPayload() ([]byte, int, error)

GetFloat16VectorFromPayload returns vector, dimension, error

func (*PayloadReader) GetFloatFromPayload

func (r *PayloadReader) GetFloatFromPayload() ([]float32, []bool, error)

func (*PayloadReader) GetFloatVectorFromPayload

func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error)

GetFloatVectorFromPayload returns vector, dimension, error

func (*PayloadReader) GetInt16FromPayload

func (r *PayloadReader) GetInt16FromPayload() ([]int16, []bool, error)

func (*PayloadReader) GetInt32FromPayload

func (r *PayloadReader) GetInt32FromPayload() ([]int32, []bool, error)

func (*PayloadReader) GetInt64FromPayload

func (r *PayloadReader) GetInt64FromPayload() ([]int64, []bool, error)

func (*PayloadReader) GetInt8FromPayload

func (r *PayloadReader) GetInt8FromPayload() ([]int8, []bool, error)

func (*PayloadReader) GetJSONFromPayload

func (r *PayloadReader) GetJSONFromPayload() ([][]byte, []bool, error)

func (*PayloadReader) GetPayloadLengthFromReader

func (r *PayloadReader) GetPayloadLengthFromReader() (int, error)

func (*PayloadReader) GetSparseFloatVectorFromPayload

func (r *PayloadReader) GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error)

func (*PayloadReader) GetStringFromPayload

func (r *PayloadReader) GetStringFromPayload() ([]string, []bool, error)

func (*PayloadReader) ReleasePayloadReader

func (r *PayloadReader) ReleasePayloadReader() error

ReleasePayloadReader release payload reader.

type PayloadReaderInterface

type PayloadReaderInterface interface {
	GetDataFromPayload() (any, []bool, int, error)
	GetBoolFromPayload() ([]bool, []bool, error)
	GetByteFromPayload() ([]byte, []bool, error)
	GetInt8FromPayload() ([]int8, []bool, error)
	GetInt16FromPayload() ([]int16, []bool, error)
	GetInt32FromPayload() ([]int32, []bool, error)
	GetInt64FromPayload() ([]int64, []bool, error)
	GetFloatFromPayload() ([]float32, []bool, error)
	GetDoubleFromPayload() ([]float64, []bool, error)
	GetStringFromPayload() ([]string, []bool, error)
	GetArrayFromPayload() ([]*schemapb.ScalarField, []bool, error)
	GetJSONFromPayload() ([][]byte, []bool, error)
	GetBinaryVectorFromPayload() ([]byte, int, error)
	GetFloat16VectorFromPayload() ([]byte, int, error)
	GetBFloat16VectorFromPayload() ([]byte, int, error)
	GetFloatVectorFromPayload() ([]float32, int, error)
	GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error)
	GetPayloadLengthFromReader() (int, error)

	GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error)
	GetArrowRecordReader() (pqarrow.RecordReader, error)

	ReleasePayloadReader() error
	Close() error
}

PayloadReaderInterface abstracts PayloadReader

type PayloadWriterInterface

type PayloadWriterInterface interface {
	AddDataToPayload(msgs any, valids []bool) error
	AddBoolToPayload(msgs []bool, valids []bool) error
	AddByteToPayload(msgs []byte, valids []bool) error
	AddInt8ToPayload(msgs []int8, valids []bool) error
	AddInt16ToPayload(msgs []int16, valids []bool) error
	AddInt32ToPayload(msgs []int32, valids []bool) error
	AddInt64ToPayload(msgs []int64, valids []bool) error
	AddFloatToPayload(msgs []float32, valids []bool) error
	AddDoubleToPayload(msgs []float64, valids []bool) error
	AddOneStringToPayload(msgs string, isValid bool) error
	AddOneArrayToPayload(msg *schemapb.ScalarField, isValid bool) error
	AddOneJSONToPayload(msg []byte, isValid bool) error
	AddBinaryVectorToPayload(binVec []byte, dim int) error
	AddFloatVectorToPayload(binVec []float32, dim int) error
	AddFloat16VectorToPayload(binVec []byte, dim int) error
	AddBFloat16VectorToPayload(binVec []byte, dim int) error
	AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error
	FinishPayloadWriter() error
	GetPayloadBufferFromWriter() ([]byte, error)
	GetPayloadLengthFromWriter() (int, error)
	ReleasePayloadWriter()
	Reserve(size int)
	Close()
}

PayloadWriterInterface abstracts PayloadWriter

func NewPayloadWriter

func NewPayloadWriter(colType schemapb.DataType, options ...PayloadWriterOptions) (PayloadWriterInterface, error)

type PayloadWriterOptions

type PayloadWriterOptions func(*NativePayloadWriter)

func WithDim

func WithDim(dim int) PayloadWriterOptions

func WithNullable

func WithNullable(nullable bool) PayloadWriterOptions

func WithWriterProps

func WithWriterProps(writerProps *parquet.WriterProperties) PayloadWriterOptions

type PkStatistics

type PkStatistics struct {
	PkFilter bloomfilter.BloomFilterInterface //  bloom filter of pk inside a segment
	MinPK    PrimaryKey                       //	minimal pk value, shortcut for checking whether a pk is inside this segment
	MaxPK    PrimaryKey                       //  maximal pk value, same above
}

pkStatistics contains pk field statistic information

func (*PkStatistics) BatchPkExist

func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []bool

func (*PkStatistics) PkExist

func (st *PkStatistics) PkExist(pk PrimaryKey) bool

func (*PkStatistics) TestLocationCache

func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool

func (*PkStatistics) UpdateMinMax

func (st *PkStatistics) UpdateMinMax(pk PrimaryKey) error

update set pk min/max value if input value is beyond former range.

func (*PkStatistics) UpdatePKRange

func (st *PkStatistics) UpdatePKRange(ids FieldData) error

type PrimaryKey

type PrimaryKey interface {
	GT(key PrimaryKey) bool
	GE(key PrimaryKey) bool
	LT(key PrimaryKey) bool
	LE(key PrimaryKey) bool
	EQ(key PrimaryKey) bool
	MarshalJSON() ([]byte, error)
	UnmarshalJSON(data []byte) error
	SetValue(interface{}) error
	GetValue() interface{}
	Type() schemapb.DataType
	Size() int64
}

func GenInt64PrimaryKeys

func GenInt64PrimaryKeys(data ...int64) ([]PrimaryKey, error)

func GenPrimaryKeyByRawData

func GenPrimaryKeyByRawData(data interface{}, pkType schemapb.DataType) (PrimaryKey, error)

func GenVarcharPrimaryKeys

func GenVarcharPrimaryKeys(data ...string) ([]PrimaryKey, error)

func ParseFieldData2PrimaryKeys

func ParseFieldData2PrimaryKeys(data *schemapb.FieldData) ([]PrimaryKey, error)

func ParseIDs2PrimaryKeys

func ParseIDs2PrimaryKeys(ids *schemapb.IDs) []PrimaryKey

type PrimaryKeyStats

type PrimaryKeyStats struct {
	FieldID int64                            `json:"fieldID"`
	Max     int64                            `json:"max"` // useless, will delete
	Min     int64                            `json:"min"` // useless, will delete
	BFType  bloomfilter.BFType               `json:"bfType"`
	BF      bloomfilter.BloomFilterInterface `json:"bf"`
	PkType  int64                            `json:"pkType"`
	MaxPk   PrimaryKey                       `json:"maxPk"`
	MinPk   PrimaryKey                       `json:"minPk"`
}

PrimaryKeyStats contains statistics data for pk column

func DeserializeStats

func DeserializeStats(blobs []*Blob) ([]*PrimaryKeyStats, error)

DeserializeStats deserialize @blobs as []*PrimaryKeyStats

func DeserializeStatsList

func DeserializeStatsList(blob *Blob) ([]*PrimaryKeyStats, error)

func NewPrimaryKeyStats

func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error)

func (*PrimaryKeyStats) UnmarshalJSON

func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error

UnmarshalJSON unmarshal bytes to PrimaryKeyStats

func (*PrimaryKeyStats) Update

func (stats *PrimaryKeyStats) Update(pk PrimaryKey)

func (*PrimaryKeyStats) UpdateByMsgs

func (stats *PrimaryKeyStats) UpdateByMsgs(msgs FieldData)

func (*PrimaryKeyStats) UpdateMinMax

func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey)

updatePk update minPk and maxPk value

type PrimaryKeys

type PrimaryKeys interface {
	Append(pks ...PrimaryKey) error
	MustAppend(pks ...PrimaryKey)
	Get(idx int) PrimaryKey
	Type() schemapb.DataType
	Size() int64
	Len() int
	MustMerge(pks PrimaryKeys)
}

PrimaryKeys is the interface holding a slice of PrimaryKey

type Record

type Record interface {
	Schema() map[FieldID]schemapb.DataType
	ArrowSchema() *arrow.Schema
	Column(i FieldID) arrow.Array
	Len() int
	Release()
}

type RecordReader

type RecordReader interface {
	Next() error
	Record() Record
	Close()
}

type RecordWriter

type RecordWriter interface {
	Write(r Record) error
	Close()
}

type RecordWriterOptions

type RecordWriterOptions func(*singleFieldRecordWriter)

func WithRecordWriterProps

func WithRecordWriterProps(writerProps *parquet.WriterProperties) RecordWriterOptions

type RemoteChunkManager

type RemoteChunkManager struct {
	// contains filtered or unexported fields
}

RemoteChunkManager is responsible for read and write data stored in minio.

func NewRemoteChunkManager

func NewRemoteChunkManager(ctx context.Context, c *config) (*RemoteChunkManager, error)

func NewRemoteChunkManagerForTesting

func NewRemoteChunkManagerForTesting(c *minio.Client, bucket string, rootPath string) *RemoteChunkManager

NewRemoteChunkManagerForTesting is used for testing.

func (*RemoteChunkManager) Exist

func (mcm *RemoteChunkManager) Exist(ctx context.Context, filePath string) (bool, error)

Exist checks whether chunk is saved to minio storage.

func (*RemoteChunkManager) Mmap

func (mcm *RemoteChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)

func (*RemoteChunkManager) MultiRead

func (mcm *RemoteChunkManager) MultiRead(ctx context.Context, keys []string) ([][]byte, error)

func (*RemoteChunkManager) MultiRemove

func (mcm *RemoteChunkManager) MultiRemove(ctx context.Context, keys []string) error

MultiRemove deletes a objects with @keys.

func (*RemoteChunkManager) MultiWrite

func (mcm *RemoteChunkManager) MultiWrite(ctx context.Context, kvs map[string][]byte) error

MultiWrite saves multiple objects, the path is the key of @kvs. The object value is the value of @kvs.

func (*RemoteChunkManager) Path

func (mcm *RemoteChunkManager) Path(ctx context.Context, filePath string) (string, error)

Path returns the path of minio data if exists.

func (*RemoteChunkManager) Read

func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byte, error)

Read reads the minio storage data if exists.

func (*RemoteChunkManager) ReadAt

func (mcm *RemoteChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error)

ReadAt reads specific position data of minio storage if exists.

func (*RemoteChunkManager) Reader

func (mcm *RemoteChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error)

Reader returns the path of minio data if exists.

func (*RemoteChunkManager) Remove

func (mcm *RemoteChunkManager) Remove(ctx context.Context, filePath string) error

Remove deletes an object with @key.

func (*RemoteChunkManager) RemoveWithPrefix

func (mcm *RemoteChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error

RemoveWithPrefix removes all objects with the same prefix @prefix from minio.

func (*RemoteChunkManager) RootPath

func (mcm *RemoteChunkManager) RootPath() string

RootPath returns minio root path.

func (*RemoteChunkManager) Size

func (mcm *RemoteChunkManager) Size(ctx context.Context, filePath string) (int64, error)

func (*RemoteChunkManager) UnderlyingObjectStorage

func (mcm *RemoteChunkManager) UnderlyingObjectStorage() ObjectStorage

UnderlyingObjectStorage returns the underlying object storage.

func (*RemoteChunkManager) WalkWithPrefix

func (mcm *RemoteChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) (err error)

func (*RemoteChunkManager) Write

func (mcm *RemoteChunkManager) Write(ctx context.Context, filePath string, content []byte) error

Write writes the data to minio storage.

type ScalarFieldValue

type ScalarFieldValue interface {
	GT(key ScalarFieldValue) bool
	GE(key ScalarFieldValue) bool
	LT(key ScalarFieldValue) bool
	LE(key ScalarFieldValue) bool
	EQ(key ScalarFieldValue) bool
	MarshalJSON() ([]byte, error)
	UnmarshalJSON(data []byte) error
	SetValue(interface{}) error
	GetValue() interface{}
	Type() schemapb.DataType
	Size() int64
}

func NewScalarFieldValue

func NewScalarFieldValue(dtype schemapb.DataType, data interface{}) ScalarFieldValue

func NewScalarFieldValueFromGenericValue

func NewScalarFieldValueFromGenericValue(dtype schemapb.DataType, gVal *planpb.GenericValue) (ScalarFieldValue, error)

type SegmentStats

type SegmentStats struct {
	FieldStats []FieldStats `json:"fieldStats"`
	NumRows    int
}

func NewSegmentStats

func NewSegmentStats(fieldStats []FieldStats, rows int) *SegmentStats

type SerializeWriter

type SerializeWriter[T any] struct {
	// contains filtered or unexported fields
}

func NewBinlogSerializeWriter

func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID,
	eventWriters map[FieldID]*BinlogStreamWriter, batchSize int,
) (*SerializeWriter[*Value], error)

func NewDeltalogMultiFieldWriter

func NewDeltalogMultiFieldWriter(partitionID, segmentID UniqueID, eventWriter *MultiFieldDeltalogStreamWriter, batchSize int,
) (*SerializeWriter[*DeleteLog], error)

func NewDeltalogSerializeWriter

func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *DeltalogStreamWriter, batchSize int,
) (*SerializeWriter[*DeleteLog], error)

func NewSerializeRecordWriter

func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriter[T]

func (*SerializeWriter[T]) Close

func (sw *SerializeWriter[T]) Close() error

func (*SerializeWriter[T]) Flush

func (sw *SerializeWriter[T]) Flush() error

func (*SerializeWriter[T]) Write

func (sw *SerializeWriter[T]) Write(value T) error

func (*SerializeWriter[T]) WrittenMemorySize

func (sw *SerializeWriter[T]) WrittenMemorySize() uint64

type Serializer

type Serializer[T any] func([]T) (Record, uint64, error)

type SparseFloatVectorFieldData

type SparseFloatVectorFieldData struct {
	schemapb.SparseFloatArray
}

func (*SparseFloatVectorFieldData) AppendAllRows

func (dst *SparseFloatVectorFieldData) AppendAllRows(src *SparseFloatVectorFieldData)

func (*SparseFloatVectorFieldData) AppendRow

func (data *SparseFloatVectorFieldData) AppendRow(row interface{}) error

func (*SparseFloatVectorFieldData) AppendRows

func (data *SparseFloatVectorFieldData) AppendRows(rows interface{}) error

func (*SparseFloatVectorFieldData) GetDataType

func (data *SparseFloatVectorFieldData) GetDataType() schemapb.DataType

func (*SparseFloatVectorFieldData) GetMemorySize

func (data *SparseFloatVectorFieldData) GetMemorySize() int

func (*SparseFloatVectorFieldData) GetNullable

func (data *SparseFloatVectorFieldData) GetNullable() bool

func (*SparseFloatVectorFieldData) GetRow

func (data *SparseFloatVectorFieldData) GetRow(i int) interface{}

func (*SparseFloatVectorFieldData) GetRowSize

func (data *SparseFloatVectorFieldData) GetRowSize(i int) int

func (*SparseFloatVectorFieldData) GetRows

func (data *SparseFloatVectorFieldData) GetRows() any

func (*SparseFloatVectorFieldData) RowNum

func (data *SparseFloatVectorFieldData) RowNum() int

type StatsLogType

type StatsLogType int64
const (
	DefaultStatsType StatsLogType = iota + 0

	// CompundStatsType log save multiple stats
	// and bloom filters to one file
	CompoundStatsType
)

func (StatsLogType) LogIdx

func (s StatsLogType) LogIdx() string

type StatsReader

type StatsReader struct {
	// contains filtered or unexported fields
}

StatsReader reads stats

func (*StatsReader) GetPrimaryKeyStats

func (sr *StatsReader) GetPrimaryKeyStats() (*PrimaryKeyStats, error)

GetInt64Stats returns buffer as PrimaryKeyStats

func (*StatsReader) GetPrimaryKeyStatsList

func (sr *StatsReader) GetPrimaryKeyStatsList() ([]*PrimaryKeyStats, error)

GetInt64Stats returns buffer as PrimaryKeyStats

func (*StatsReader) SetBuffer

func (sr *StatsReader) SetBuffer(buffer []byte)

SetBuffer sets buffer

type StatsWriter

type StatsWriter struct {
	// contains filtered or unexported fields
}

StatsWriter writes stats to buffer

func (*StatsWriter) Generate

func (sw *StatsWriter) Generate(stats *PrimaryKeyStats) error

Generate writes Stats to buffer

func (*StatsWriter) GenerateByData

func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error

GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer

func (*StatsWriter) GenerateList

func (sw *StatsWriter) GenerateList(stats []*PrimaryKeyStats) error

GenerateList writes Stats slice to buffer

func (*StatsWriter) GetBuffer

func (sw *StatsWriter) GetBuffer() []byte

GetBuffer returns buffer

type StringFieldData

type StringFieldData struct {
	Data      []string
	DataType  schemapb.DataType
	ValidData []bool
}

func (*StringFieldData) AppendRow

func (data *StringFieldData) AppendRow(row interface{}) error

func (*StringFieldData) AppendRows

func (data *StringFieldData) AppendRows(rows interface{}) error

func (*StringFieldData) GetDataType

func (data *StringFieldData) GetDataType() schemapb.DataType

func (*StringFieldData) GetMemorySize

func (data *StringFieldData) GetMemorySize() int

why not binary.Size(data) directly? binary.Size(data) return -1 binary.Size returns how many bytes Write would generate to encode the value v, which must be a fixed-size value or a slice of fixed-size values, or a pointer to such data. If v is neither of these, binary.Size returns -1.

func (*StringFieldData) GetNullable

func (data *StringFieldData) GetNullable() bool

func (*StringFieldData) GetRow

func (data *StringFieldData) GetRow(i int) any

func (*StringFieldData) GetRowSize

func (data *StringFieldData) GetRowSize(i int) int

func (*StringFieldData) GetRows

func (data *StringFieldData) GetRows() any

func (*StringFieldData) RowNum

func (data *StringFieldData) RowNum() int

type StringFieldValue

type StringFieldValue struct {
	Value string `json:"value"`
}

func NewStringFieldValue

func NewStringFieldValue(v string) *StringFieldValue

func (*StringFieldValue) EQ

func (sfv *StringFieldValue) EQ(obj ScalarFieldValue) bool

func (*StringFieldValue) GE

func (sfv *StringFieldValue) GE(obj ScalarFieldValue) bool

func (*StringFieldValue) GT

func (sfv *StringFieldValue) GT(obj ScalarFieldValue) bool

func (*StringFieldValue) GetValue

func (sfv *StringFieldValue) GetValue() interface{}

func (*StringFieldValue) LE

func (sfv *StringFieldValue) LE(obj ScalarFieldValue) bool

func (*StringFieldValue) LT

func (sfv *StringFieldValue) LT(obj ScalarFieldValue) bool

func (*StringFieldValue) MarshalJSON

func (sfv *StringFieldValue) MarshalJSON() ([]byte, error)

func (*StringFieldValue) SetValue

func (sfv *StringFieldValue) SetValue(data interface{}) error

func (*StringFieldValue) Size

func (sfv *StringFieldValue) Size() int64

func (*StringFieldValue) Type

func (sfv *StringFieldValue) Type() schemapb.DataType

func (*StringFieldValue) UnmarshalJSON

func (sfv *StringFieldValue) UnmarshalJSON(data []byte) error

type Timestamp

type Timestamp = typeutil.Timestamp

Timestamp is type alias of typeutil.Timestamp

type UniqueID

type UniqueID = typeutil.UniqueID

UniqueID is type alias of typeutil.UniqueID

func ParseSegmentIDByBinlog

func ParseSegmentIDByBinlog(rootPath, path string) (UniqueID, error)

ParseSegmentIDByBinlog parse segment id from binlog paths if path format is not expected, returns error

type Value

type Value struct {
	ID        int64
	PK        PrimaryKey
	Timestamp int64
	IsDeleted bool
	Value     interface{}
}

Value is the return value of Next

type VarCharFieldValue

type VarCharFieldValue struct {
	Value string `json:"value"`
}

func NewVarCharFieldValue

func NewVarCharFieldValue(v string) *VarCharFieldValue

func (*VarCharFieldValue) EQ

func (vcfv *VarCharFieldValue) EQ(obj ScalarFieldValue) bool

func (*VarCharFieldValue) GE

func (vcfv *VarCharFieldValue) GE(obj ScalarFieldValue) bool

func (*VarCharFieldValue) GT

func (vcfv *VarCharFieldValue) GT(obj ScalarFieldValue) bool

func (*VarCharFieldValue) GetValue

func (vcfv *VarCharFieldValue) GetValue() interface{}

func (*VarCharFieldValue) LE

func (vcfv *VarCharFieldValue) LE(obj ScalarFieldValue) bool

func (*VarCharFieldValue) LT

func (vcfv *VarCharFieldValue) LT(obj ScalarFieldValue) bool

func (*VarCharFieldValue) MarshalJSON

func (vcfv *VarCharFieldValue) MarshalJSON() ([]byte, error)

func (*VarCharFieldValue) SetValue

func (vcfv *VarCharFieldValue) SetValue(data interface{}) error

func (*VarCharFieldValue) Size

func (vcfv *VarCharFieldValue) Size() int64

func (*VarCharFieldValue) Type

func (vcfv *VarCharFieldValue) Type() schemapb.DataType

func (*VarCharFieldValue) UnmarshalJSON

func (vcfv *VarCharFieldValue) UnmarshalJSON(data []byte) error

type VarCharPrimaryKey

type VarCharPrimaryKey struct {
	Value string
}

func NewVarCharPrimaryKey

func NewVarCharPrimaryKey(v string) *VarCharPrimaryKey

func (*VarCharPrimaryKey) EQ

func (vcp *VarCharPrimaryKey) EQ(key PrimaryKey) bool

func (*VarCharPrimaryKey) GE

func (vcp *VarCharPrimaryKey) GE(key PrimaryKey) bool

func (*VarCharPrimaryKey) GT

func (vcp *VarCharPrimaryKey) GT(key PrimaryKey) bool

func (*VarCharPrimaryKey) GetValue

func (vcp *VarCharPrimaryKey) GetValue() interface{}

func (*VarCharPrimaryKey) LE

func (vcp *VarCharPrimaryKey) LE(key PrimaryKey) bool

func (*VarCharPrimaryKey) LT

func (vcp *VarCharPrimaryKey) LT(key PrimaryKey) bool

func (*VarCharPrimaryKey) MarshalJSON

func (vcp *VarCharPrimaryKey) MarshalJSON() ([]byte, error)

func (*VarCharPrimaryKey) SetValue

func (vcp *VarCharPrimaryKey) SetValue(data interface{}) error

func (*VarCharPrimaryKey) Size

func (vcp *VarCharPrimaryKey) Size() int64

func (*VarCharPrimaryKey) Type

func (vcp *VarCharPrimaryKey) Type() schemapb.DataType

func (*VarCharPrimaryKey) UnmarshalJSON

func (vcp *VarCharPrimaryKey) UnmarshalJSON(data []byte) error

type VarcharPrimaryKeys

type VarcharPrimaryKeys struct {
	// contains filtered or unexported fields
}

func NewVarcharPrimaryKeys

func NewVarcharPrimaryKeys(cap int) *VarcharPrimaryKeys

func (*VarcharPrimaryKeys) Append

func (pks *VarcharPrimaryKeys) Append(values ...PrimaryKey) error

func (*VarcharPrimaryKeys) AppendRaw

func (pks *VarcharPrimaryKeys) AppendRaw(values ...string)

func (*VarcharPrimaryKeys) Get

func (pks *VarcharPrimaryKeys) Get(idx int) PrimaryKey

func (*VarcharPrimaryKeys) Len

func (pks *VarcharPrimaryKeys) Len() int

func (*VarcharPrimaryKeys) MustAppend

func (pks *VarcharPrimaryKeys) MustAppend(values ...PrimaryKey)

func (*VarcharPrimaryKeys) MustMerge

func (pks *VarcharPrimaryKeys) MustMerge(another PrimaryKeys)

func (*VarcharPrimaryKeys) Size

func (pks *VarcharPrimaryKeys) Size() int64

func (*VarcharPrimaryKeys) Type

func (pks *VarcharPrimaryKeys) Type() schemapb.DataType

type VectorFieldValue

type VectorFieldValue interface {
	MarshalJSON() ([]byte, error)
	UnmarshalJSON(data []byte) error
	SetValue(interface{}) error
	GetValue() interface{}
	Type() schemapb.DataType
	Size() int64
}

func NewVectorFieldValue

func NewVectorFieldValue(dtype schemapb.DataType, data *schemapb.VectorField) VectorFieldValue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL