Documentation ¶
Index ¶
- type ChannelManager
- type CollectionEventConsumer
- type CollectionFilter
- type CreateCollectionParam
- type CreateDatabaseParam
- type CreateIndexParam
- type CreatePartitionParam
- type DataHandler
- type DefaultChannelManager
- func (d *DefaultChannelManager) AddDroppedCollection(ids []int64)
- func (d *DefaultChannelManager) AddDroppedPartition(ids []int64)
- func (d *DefaultChannelManager) AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, ...) error
- func (d *DefaultChannelManager) GetChannelChan() <-chan string
- func (d *DefaultChannelManager) GetEventChan() <-chan *ReplicateAPIEvent
- func (d *DefaultChannelManager) GetMsgChan(pChannel string) <-chan *msgstream.MsgPack
- func (d *DefaultChannelManager) SetCtx(ctx context.Context)
- func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, ...) error
- func (d *DefaultChannelManager) StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
- type DefaultDataHandler
- func (d *DefaultDataHandler) CreateCollection(ctx context.Context, param *CreateCollectionParam) error
- func (d *DefaultDataHandler) CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error
- func (d *DefaultDataHandler) CreateIndex(ctx context.Context, param *CreateIndexParam) error
- func (d *DefaultDataHandler) CreatePartition(ctx context.Context, param *CreatePartitionParam) error
- func (d *DefaultDataHandler) Delete(ctx context.Context, param *DeleteParam) error
- func (d *DefaultDataHandler) DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error
- func (d *DefaultDataHandler) DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error
- func (d *DefaultDataHandler) DescribePartition(ctx context.Context, param *DescribePartitionParam) error
- func (d *DefaultDataHandler) DropCollection(ctx context.Context, param *DropCollectionParam) error
- func (d *DefaultDataHandler) DropDatabase(ctx context.Context, param *DropDatabaseParam) error
- func (d *DefaultDataHandler) DropIndex(ctx context.Context, param *DropIndexParam) error
- func (d *DefaultDataHandler) DropPartition(ctx context.Context, param *DropPartitionParam) error
- func (d *DefaultDataHandler) Flush(ctx context.Context, param *FlushParam) error
- func (d *DefaultDataHandler) Insert(ctx context.Context, param *InsertParam) error
- func (d *DefaultDataHandler) LoadCollection(ctx context.Context, param *LoadCollectionParam) error
- func (d *DefaultDataHandler) LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error
- func (d *DefaultDataHandler) ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error
- func (d *DefaultDataHandler) ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error
- func (d *DefaultDataHandler) ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error
- type DefaultMessageManager
- type DefaultMetaOp
- func (d *DefaultMetaOp) GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)
- func (d *DefaultMetaOp) GetAllDroppedObj() map[string]map[string]uint64
- func (d *DefaultMetaOp) GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error)
- func (d *DefaultMetaOp) GetCollectionNameByID(ctx context.Context, id int64) string
- func (d *DefaultMetaOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
- func (d *DefaultMetaOp) StartWatch()
- func (d *DefaultMetaOp) SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer)
- func (d *DefaultMetaOp) SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer)
- func (d *DefaultMetaOp) UnsubscribeEvent(taskID string, eventType WatchEventType)
- func (d *DefaultMetaOp) WatchCollection(ctx context.Context, filter CollectionFilter)
- func (d *DefaultMetaOp) WatchPartition(ctx context.Context, filter PartitionFilter)
- type DefaultReader
- type DefaultTargetAPI
- func (d *DefaultTargetAPI) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
- func (d *DefaultTargetAPI) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)
- func (d *DefaultTargetAPI) GetPartitionInfo(ctx context.Context, collectionName string, databaseName string) (*model.CollectionInfo, error)
- type DefaultWriter
- func (d *DefaultWriter) HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error)
- func (d *DefaultWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
- func (d *DefaultWriter) HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error)
- type DeleteParam
- type DescribeCollectionParam
- type DescribeDatabaseParam
- type DescribePartitionParam
- type DropCollectionParam
- type DropDatabaseParam
- type DropIndexParam
- type DropPartitionParam
- type FlushParam
- type InsertParam
- type LoadCollectionParam
- type LoadPartitionsParam
- type MessageManager
- type MetaOp
- type MsgBaseParam
- type PartitionEventConsumer
- type PartitionFilter
- type Reader
- type ReleaseCollectionParam
- type ReleasePartitionsParam
- type ReplicateAPIEvent
- type ReplicateAPIEventType
- type ReplicateMessage
- type ReplicateMessageParam
- type ReplicateParam
- type TargetAPI
- type WatchEventType
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelManager ¶
type ChannelManager interface { SetCtx(ctx context.Context) AddDroppedCollection(ids []int64) AddDroppedPartition(ids []int64) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error GetChannelChan() <-chan string GetMsgChan(pChannel string) <-chan *msgstream.MsgPack GetEventChan() <-chan *ReplicateAPIEvent }
ChannelManager a target must promise a manager
type CollectionEventConsumer ¶
type CollectionEventConsumer CollectionFilter
type CollectionFilter ¶
type CollectionFilter func(*pb.CollectionInfo) bool
CollectionFilter the filter will be used before the collection is filled the schema info
type CreateCollectionParam ¶
type CreateCollectionParam struct { MsgBaseParam ReplicateParam Schema *entity.Schema ShardsNum int32 ConsistencyLevel commonpb.ConsistencyLevel Properties []*commonpb.KeyValuePair }
type CreateDatabaseParam ¶
type CreateDatabaseParam struct { ReplicateParam milvuspb.CreateDatabaseRequest }
type CreateIndexParam ¶
type CreateIndexParam struct { ReplicateParam milvuspb.CreateIndexRequest }
type CreatePartitionParam ¶
type CreatePartitionParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string }
type DataHandler ¶
type DataHandler interface { CreateCollection(ctx context.Context, param *CreateCollectionParam) error DropCollection(ctx context.Context, param *DropCollectionParam) error CreatePartition(ctx context.Context, param *CreatePartitionParam) error DropPartition(ctx context.Context, param *DropPartitionParam) error // Deprecated Insert(ctx context.Context, param *InsertParam) error // Deprecated Delete(ctx context.Context, param *DeleteParam) error Flush(ctx context.Context, param *FlushParam) error LoadCollection(ctx context.Context, param *LoadCollectionParam) error ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error CreateIndex(ctx context.Context, param *CreateIndexParam) error DropIndex(ctx context.Context, param *DropIndexParam) error CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error DropDatabase(ctx context.Context, param *DropDatabaseParam) error ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error DescribePartition(ctx context.Context, param *DescribePartitionParam) error }
type DefaultChannelManager ¶
type DefaultChannelManager struct{}
func (*DefaultChannelManager) AddDroppedCollection ¶
func (d *DefaultChannelManager) AddDroppedCollection(ids []int64)
func (*DefaultChannelManager) AddDroppedPartition ¶
func (d *DefaultChannelManager) AddDroppedPartition(ids []int64)
func (*DefaultChannelManager) AddPartition ¶
func (d *DefaultChannelManager) AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error
func (*DefaultChannelManager) GetChannelChan ¶
func (d *DefaultChannelManager) GetChannelChan() <-chan string
func (*DefaultChannelManager) GetEventChan ¶
func (d *DefaultChannelManager) GetEventChan() <-chan *ReplicateAPIEvent
func (*DefaultChannelManager) GetMsgChan ¶
func (d *DefaultChannelManager) GetMsgChan(pChannel string) <-chan *msgstream.MsgPack
func (*DefaultChannelManager) SetCtx ¶
func (d *DefaultChannelManager) SetCtx(ctx context.Context)
func (*DefaultChannelManager) StartReadCollection ¶
func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
func (*DefaultChannelManager) StopReadCollection ¶
func (d *DefaultChannelManager) StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
type DefaultDataHandler ¶
type DefaultDataHandler struct{}
func (*DefaultDataHandler) CreateCollection ¶
func (d *DefaultDataHandler) CreateCollection(ctx context.Context, param *CreateCollectionParam) error
func (*DefaultDataHandler) CreateDatabase ¶
func (d *DefaultDataHandler) CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error
func (*DefaultDataHandler) CreateIndex ¶
func (d *DefaultDataHandler) CreateIndex(ctx context.Context, param *CreateIndexParam) error
func (*DefaultDataHandler) CreatePartition ¶
func (d *DefaultDataHandler) CreatePartition(ctx context.Context, param *CreatePartitionParam) error
func (*DefaultDataHandler) Delete ¶
func (d *DefaultDataHandler) Delete(ctx context.Context, param *DeleteParam) error
func (*DefaultDataHandler) DescribeCollection ¶
func (d *DefaultDataHandler) DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error
func (*DefaultDataHandler) DescribeDatabase ¶
func (d *DefaultDataHandler) DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error
func (*DefaultDataHandler) DescribePartition ¶
func (d *DefaultDataHandler) DescribePartition(ctx context.Context, param *DescribePartitionParam) error
func (*DefaultDataHandler) DropCollection ¶
func (d *DefaultDataHandler) DropCollection(ctx context.Context, param *DropCollectionParam) error
func (*DefaultDataHandler) DropDatabase ¶
func (d *DefaultDataHandler) DropDatabase(ctx context.Context, param *DropDatabaseParam) error
func (*DefaultDataHandler) DropIndex ¶
func (d *DefaultDataHandler) DropIndex(ctx context.Context, param *DropIndexParam) error
func (*DefaultDataHandler) DropPartition ¶
func (d *DefaultDataHandler) DropPartition(ctx context.Context, param *DropPartitionParam) error
func (*DefaultDataHandler) Flush ¶
func (d *DefaultDataHandler) Flush(ctx context.Context, param *FlushParam) error
func (*DefaultDataHandler) Insert ¶
func (d *DefaultDataHandler) Insert(ctx context.Context, param *InsertParam) error
func (*DefaultDataHandler) LoadCollection ¶
func (d *DefaultDataHandler) LoadCollection(ctx context.Context, param *LoadCollectionParam) error
func (*DefaultDataHandler) LoadPartitions ¶
func (d *DefaultDataHandler) LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error
func (*DefaultDataHandler) ReleaseCollection ¶
func (d *DefaultDataHandler) ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error
func (*DefaultDataHandler) ReleasePartitions ¶
func (d *DefaultDataHandler) ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error
func (*DefaultDataHandler) ReplicateMessage ¶
func (d *DefaultDataHandler) ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error
type DefaultMessageManager ¶
type DefaultMessageManager struct{}
func (*DefaultMessageManager) Close ¶
func (d *DefaultMessageManager) Close(channelName string)
func (*DefaultMessageManager) ReplicateMessage ¶
func (d *DefaultMessageManager) ReplicateMessage(message *ReplicateMessage)
type DefaultMetaOp ¶
type DefaultMetaOp struct{}
func (*DefaultMetaOp) GetAllCollection ¶
func (d *DefaultMetaOp) GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)
func (*DefaultMetaOp) GetAllDroppedObj ¶
func (d *DefaultMetaOp) GetAllDroppedObj() map[string]map[string]uint64
func (*DefaultMetaOp) GetAllPartition ¶
func (d *DefaultMetaOp) GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error)
func (*DefaultMetaOp) GetCollectionNameByID ¶
func (d *DefaultMetaOp) GetCollectionNameByID(ctx context.Context, id int64) string
func (*DefaultMetaOp) GetDatabaseInfoForCollection ¶
func (d *DefaultMetaOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
func (*DefaultMetaOp) StartWatch ¶
func (d *DefaultMetaOp) StartWatch()
func (*DefaultMetaOp) SubscribeCollectionEvent ¶
func (d *DefaultMetaOp) SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer)
func (*DefaultMetaOp) SubscribePartitionEvent ¶
func (d *DefaultMetaOp) SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer)
func (*DefaultMetaOp) UnsubscribeEvent ¶
func (d *DefaultMetaOp) UnsubscribeEvent(taskID string, eventType WatchEventType)
func (*DefaultMetaOp) WatchCollection ¶
func (d *DefaultMetaOp) WatchCollection(ctx context.Context, filter CollectionFilter)
func (*DefaultMetaOp) WatchPartition ¶
func (d *DefaultMetaOp) WatchPartition(ctx context.Context, filter PartitionFilter)
type DefaultReader ¶
type DefaultReader struct{}
DefaultReader All CDCReader implements should combine it
func (*DefaultReader) ErrorChan ¶
func (d *DefaultReader) ErrorChan() <-chan error
func (*DefaultReader) QuitRead ¶
func (d *DefaultReader) QuitRead(ctx context.Context)
func (*DefaultReader) StartRead ¶
func (d *DefaultReader) StartRead(ctx context.Context)
StartRead the return value is nil, and if you receive the data from the nil chan, will block forever, not panic
type DefaultTargetAPI ¶
type DefaultTargetAPI struct{}
func (*DefaultTargetAPI) GetCollectionInfo ¶
func (d *DefaultTargetAPI) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
func (*DefaultTargetAPI) GetDatabaseName ¶
func (*DefaultTargetAPI) GetPartitionInfo ¶
func (d *DefaultTargetAPI) GetPartitionInfo(ctx context.Context, collectionName string, databaseName string) (*model.CollectionInfo, error)
type DefaultWriter ¶
type DefaultWriter struct{}
func (*DefaultWriter) HandleOpMessagePack ¶
func (*DefaultWriter) HandleReplicateAPIEvent ¶
func (d *DefaultWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
type DeleteParam ¶
type DeleteParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string Column entity.Column }
type DescribeCollectionParam ¶
type DescribeCollectionParam struct { ReplicateParam Name string }
type DescribeDatabaseParam ¶
type DescribeDatabaseParam struct { ReplicateParam Name string }
type DescribePartitionParam ¶
type DescribePartitionParam struct { ReplicateParam CollectionName string PartitionName string }
type DropCollectionParam ¶
type DropCollectionParam struct { MsgBaseParam ReplicateParam CollectionName string }
type DropDatabaseParam ¶
type DropDatabaseParam struct { ReplicateParam milvuspb.DropDatabaseRequest }
type DropIndexParam ¶
type DropIndexParam struct { ReplicateParam milvuspb.DropIndexRequest }
type DropPartitionParam ¶
type DropPartitionParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string }
type FlushParam ¶
type FlushParam struct { ReplicateParam milvuspb.FlushRequest }
type InsertParam ¶
type InsertParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string Columns []entity.Column }
type LoadCollectionParam ¶
type LoadCollectionParam struct { ReplicateParam milvuspb.LoadCollectionRequest }
type LoadPartitionsParam ¶
type LoadPartitionsParam struct { ReplicateParam milvuspb.LoadPartitionsRequest }
type MessageManager ¶
type MessageManager interface { ReplicateMessage(message *ReplicateMessage) Close(channelName string) }
type MetaOp ¶
type MetaOp interface { // WatchCollection its implementation should make sure it's only called once. The WatchPartition is same WatchCollection(ctx context.Context, filter CollectionFilter) WatchPartition(ctx context.Context, filter PartitionFilter) StartWatch() // SubscribeCollectionEvent an event only is consumed once. The SubscribePartitionEvent is same // TODO need to consider the many target, maybe try the method a meta op corresponds to a target SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer) SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer) UnsubscribeEvent(taskID string, eventType WatchEventType) GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error) GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error) GetAllDroppedObj() map[string]map[string]uint64 GetCollectionNameByID(ctx context.Context, id int64) string GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo }
MetaOp meta operation
type MsgBaseParam ¶
type PartitionEventConsumer ¶
type PartitionEventConsumer PartitionFilter
type PartitionFilter ¶
type PartitionFilter func(info *pb.PartitionInfo) bool
type ReleaseCollectionParam ¶
type ReleaseCollectionParam struct { ReplicateParam milvuspb.ReleaseCollectionRequest }
type ReleasePartitionsParam ¶
type ReleasePartitionsParam struct { ReplicateParam milvuspb.ReleasePartitionsRequest }
type ReplicateAPIEvent ¶
type ReplicateAPIEvent struct { EventType ReplicateAPIEventType CollectionInfo *pb.CollectionInfo PartitionInfo *pb.PartitionInfo ReplicateInfo *commonpb.ReplicateInfo ReplicateParam ReplicateParam Error error }
type ReplicateAPIEventType ¶
type ReplicateAPIEventType int
const ( ReplicateCreateCollection ReplicateAPIEventType = iota + 1 ReplicateDropCollection ReplicateCreatePartition ReplicateDropPartition ReplicateError = 100 )
func (ReplicateAPIEventType) String ¶
func (r ReplicateAPIEventType) String() string
type ReplicateMessage ¶
type ReplicateMessage struct { Ctx context.Context Param *ReplicateMessageParam SuccessFunc func(param *ReplicateMessageParam) FailFunc func(param *ReplicateMessageParam, err error) }
type ReplicateMessageParam ¶
type ReplicateMessageParam struct { MsgBaseParam ReplicateParam ChannelName string BeginTs, EndTs uint64 MsgsBytes [][]byte StartPositions, EndPositions []*msgpb.MsgPosition TargetMsgPosition string }
type ReplicateParam ¶
type ReplicateParam struct {
Database string
}
type TargetAPI ¶
type TargetAPI interface { GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error) }
type WatchEventType ¶
type WatchEventType int
const ( CollectionEventType WatchEventType = iota + 1 PartitionEventType )
type Writer ¶
type Writer interface { HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error) HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error) }
Click to show internal directories.
Click to hide internal directories.