Documentation ¶
Index ¶
- Constants
- Variables
- func ForeachChannel(sourcePChannels, targetPChannels []string, ...) error
- func GetTSManager() *tsManager
- func GetVChannelByPChannel(pChannel string, vChannels []string) string
- func GreedyConsumeChan(packChan chan *msgstream.MsgPack, f func(*msgstream.MsgPack))
- func IsCollectionNotFoundError(err error) bool
- func IsDatabaseNotFoundError(err error) bool
- func IsDroppedObject(name string) bool
- func NewChannelReader(channelName, seekPosition string, mqConfig config.MQConfig, ...) (api.Reader, error)
- func NewCollectionReader(id string, channelManager api.ChannelManager, metaOp api.MetaOp, ...) (api.Reader, error)
- func NewEtcdOp(endpoints []string, rootPath, metaPath, defaultPartitionName string, ...) (api.MetaOp, error)
- func NewReplicateChannelManager(mqConfig config.MQConfig, factoryCreator FactoryCreator, client api.TargetAPI, ...) (api.ChannelManager, error)
- func NewTarget(ctx context.Context, config TargetConfig) (api.TargetAPI, error)
- type Barrier
- type ChannelReader
- type CollectionInfo
- type CollectionReader
- type DefaultFactoryCreator
- type EtcdOp
- func (e *EtcdOp) GetAllCollection(ctx context.Context, filter api.CollectionFilter) ([]*pb.CollectionInfo, error)
- func (e *EtcdOp) GetAllDroppedObj() map[string]map[string]uint64
- func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error)
- func (e *EtcdOp) GetCollectionNameByID(ctx context.Context, id int64) string
- func (e *EtcdOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
- func (e *EtcdOp) StartWatch()
- func (e *EtcdOp) SubscribeCollectionEvent(taskID string, consumer api.CollectionEventConsumer)
- func (e *EtcdOp) SubscribePartitionEvent(taskID string, consumer api.PartitionEventConsumer)
- func (e *EtcdOp) UnsubscribeEvent(taskID string, eventType api.WatchEventType)
- func (e *EtcdOp) WatchCollection(ctx context.Context, filter api.CollectionFilter)
- func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)
- type FactoryCreator
- type ShouldReadFunc
- type TargetClient
- func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
- func (t *TargetClient) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)
- func (t *TargetClient) GetMilvus(ctx context.Context, databaseName string) (client.Client, error)
- func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
- type TargetConfig
Constants ¶
View Source
const ( TomeObject = "_tome" // has marked deleted object SkipCollectionState = pb.CollectionState(-100) SkipPartitionState = pb.PartitionState(-100) )
View Source
const (
AllCollection = "*"
)
Variables ¶
View Source
var ( TSMetricVec = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "milvus", Subsystem: "cdc", Name: "center_tt", Help: "the center ts, unit: ms", }, []string{"channel_name"}) )
Functions ¶
func ForeachChannel ¶
func GetTSManager ¶
func GetTSManager() *tsManager
func GetVChannelByPChannel ¶
func GreedyConsumeChan ¶
func IsDatabaseNotFoundError ¶
func IsDroppedObject ¶
func NewChannelReader ¶
func NewCollectionReader ¶
func NewCollectionReader(id string, channelManager api.ChannelManager, metaOp api.MetaOp, seekPosition map[string]*msgpb.MsgPosition, shouldReadFunc ShouldReadFunc, readerConfig config.ReaderConfig, ) (api.Reader, error)
func NewReplicateChannelManager ¶
func NewReplicateChannelManager(mqConfig config.MQConfig, factoryCreator FactoryCreator, client api.TargetAPI, readConfig config.ReaderConfig, metaOp api.MetaOp, msgPackCallback func(string, *msgstream.MsgPack), ) (api.ChannelManager, error)
Types ¶
type ChannelReader ¶
type ChannelReader struct { api.DefaultReader // contains filtered or unexported fields }
func (*ChannelReader) QuitRead ¶
func (c *ChannelReader) QuitRead(ctx context.Context)
func (*ChannelReader) StartRead ¶
func (c *ChannelReader) StartRead(ctx context.Context)
type CollectionInfo ¶
type CollectionInfo struct {
// contains filtered or unexported fields
}
type CollectionReader ¶
type CollectionReader struct { api.DefaultReader // contains filtered or unexported fields }
func (*CollectionReader) ErrorChan ¶
func (reader *CollectionReader) ErrorChan() <-chan error
func (*CollectionReader) QuitRead ¶
func (reader *CollectionReader) QuitRead(ctx context.Context)
func (*CollectionReader) StartRead ¶
func (reader *CollectionReader) StartRead(ctx context.Context)
type DefaultFactoryCreator ¶
type DefaultFactoryCreator struct{}
func (*DefaultFactoryCreator) NewKmsFactory ¶
func (d *DefaultFactoryCreator) NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory
func (*DefaultFactoryCreator) NewPmsFactory ¶
func (d *DefaultFactoryCreator) NewPmsFactory(cfg *config.PulsarConfig) msgstream.Factory
type EtcdOp ¶
type EtcdOp struct {
// contains filtered or unexported fields
}
func (*EtcdOp) GetAllCollection ¶
func (e *EtcdOp) GetAllCollection(ctx context.Context, filter api.CollectionFilter) ([]*pb.CollectionInfo, error)
func (*EtcdOp) GetAllPartition ¶
func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error)
func (*EtcdOp) GetCollectionNameByID ¶
func (*EtcdOp) GetDatabaseInfoForCollection ¶
func (*EtcdOp) StartWatch ¶
func (e *EtcdOp) StartWatch()
func (*EtcdOp) SubscribeCollectionEvent ¶
func (e *EtcdOp) SubscribeCollectionEvent(taskID string, consumer api.CollectionEventConsumer)
func (*EtcdOp) SubscribePartitionEvent ¶
func (e *EtcdOp) SubscribePartitionEvent(taskID string, consumer api.PartitionEventConsumer)
func (*EtcdOp) UnsubscribeEvent ¶
func (e *EtcdOp) UnsubscribeEvent(taskID string, eventType api.WatchEventType)
func (*EtcdOp) WatchCollection ¶
func (e *EtcdOp) WatchCollection(ctx context.Context, filter api.CollectionFilter)
func (*EtcdOp) WatchPartition ¶
func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)
type FactoryCreator ¶
type FactoryCreator interface { NewPmsFactory(cfg *config.PulsarConfig) msgstream.Factory NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory }
func NewDefaultFactoryCreator ¶
func NewDefaultFactoryCreator() FactoryCreator
type ShouldReadFunc ¶
type ShouldReadFunc func(*pb.CollectionInfo) bool
type TargetClient ¶
type TargetClient struct {
// contains filtered or unexported fields
}
func (*TargetClient) GetCollectionInfo ¶
func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
func (*TargetClient) GetDatabaseName ¶
func (*TargetClient) GetPartitionInfo ¶
func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
Click to show internal directories.
Click to hide internal directories.