reader

package
v0.0.0-...-34f85fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: Apache-2.0 Imports: 39 Imported by: 3

Documentation

Index

Constants

View Source
const (
	TomeObject = "_tome" // has marked deleted object

	SkipCollectionState = pb.CollectionState(-100)
	SkipPartitionState  = pb.PartitionState(-100)
)
View Source
const (
	AllCollection = "*"
)

Variables

View Source
var (
	TSMetricVec = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "milvus",
			Subsystem: "cdc",
			Name:      "center_tt",
			Help:      "the center ts, unit: ms",
		}, []string{"channel_name"})
)

Functions

func ForeachChannel

func ForeachChannel(sourcePChannels, targetPChannels []string, f func(sourcePChannel, targetPChannel string) error) error

func GetTSManager

func GetTSManager() *tsManager

func GetVChannelByPChannel

func GetVChannelByPChannel(pChannel string, vChannels []string) string

func GreedyConsumeChan

func GreedyConsumeChan(packChan chan *msgstream.MsgPack, f func(*msgstream.MsgPack))

func IsCollectionNotFoundError

func IsCollectionNotFoundError(err error) bool

func IsDatabaseNotFoundError

func IsDatabaseNotFoundError(err error) bool

func IsDroppedObject

func IsDroppedObject(name string) bool

func NewChannelReader

func NewChannelReader(channelName, seekPosition string,
	mqConfig config.MQConfig,
	dataHandler func(context.Context, *msgstream.MsgPack) bool,
	creator FactoryCreator,
) (api.Reader, error)

func NewCollectionReader

func NewCollectionReader(id string,
	channelManager api.ChannelManager, metaOp api.MetaOp,
	seekPosition map[string]*msgpb.MsgPosition,
	shouldReadFunc ShouldReadFunc,
	readerConfig config.ReaderConfig,
) (api.Reader, error)

func NewEtcdOp

func NewEtcdOp(endpoints []string,
	rootPath, metaPath, defaultPartitionName string,
	etcdConfig config.EtcdConfig, target api.TargetAPI,
) (api.MetaOp, error)

func NewReplicateChannelManager

func NewReplicateChannelManager(mqConfig config.MQConfig,
	factoryCreator FactoryCreator,
	client api.TargetAPI,
	readConfig config.ReaderConfig,
	metaOp api.MetaOp,
	msgPackCallback func(string, *msgstream.MsgPack),
) (api.ChannelManager, error)

func NewTarget

func NewTarget(ctx context.Context, config TargetConfig) (api.TargetAPI, error)

Types

type Barrier

type Barrier struct {
	Dest        int
	BarrierChan chan uint64
	CloseChan   chan struct{}
}

func NewBarrier

func NewBarrier(count int, f func(msgTs uint64, b *Barrier)) *Barrier

type ChannelReader

type ChannelReader struct {
	api.DefaultReader
	// contains filtered or unexported fields
}

func (*ChannelReader) QuitRead

func (c *ChannelReader) QuitRead(ctx context.Context)

func (*ChannelReader) StartRead

func (c *ChannelReader) StartRead(ctx context.Context)

type CollectionInfo

type CollectionInfo struct {
	// contains filtered or unexported fields
}

type CollectionReader

type CollectionReader struct {
	api.DefaultReader
	// contains filtered or unexported fields
}

func (*CollectionReader) ErrorChan

func (reader *CollectionReader) ErrorChan() <-chan error

func (*CollectionReader) QuitRead

func (reader *CollectionReader) QuitRead(ctx context.Context)

func (*CollectionReader) StartRead

func (reader *CollectionReader) StartRead(ctx context.Context)

type DefaultFactoryCreator

type DefaultFactoryCreator struct{}

func (*DefaultFactoryCreator) NewKmsFactory

func (d *DefaultFactoryCreator) NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory

func (*DefaultFactoryCreator) NewPmsFactory

type EtcdOp

type EtcdOp struct {
	// contains filtered or unexported fields
}

func (*EtcdOp) GetAllCollection

func (e *EtcdOp) GetAllCollection(ctx context.Context, filter api.CollectionFilter) ([]*pb.CollectionInfo, error)

func (*EtcdOp) GetAllDroppedObj

func (e *EtcdOp) GetAllDroppedObj() map[string]map[string]uint64

func (*EtcdOp) GetAllPartition

func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error)

func (*EtcdOp) GetCollectionNameByID

func (e *EtcdOp) GetCollectionNameByID(ctx context.Context, id int64) string

func (*EtcdOp) GetDatabaseInfoForCollection

func (e *EtcdOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo

func (*EtcdOp) StartWatch

func (e *EtcdOp) StartWatch()

func (*EtcdOp) SubscribeCollectionEvent

func (e *EtcdOp) SubscribeCollectionEvent(taskID string, consumer api.CollectionEventConsumer)

func (*EtcdOp) SubscribePartitionEvent

func (e *EtcdOp) SubscribePartitionEvent(taskID string, consumer api.PartitionEventConsumer)

func (*EtcdOp) UnsubscribeEvent

func (e *EtcdOp) UnsubscribeEvent(taskID string, eventType api.WatchEventType)

func (*EtcdOp) WatchCollection

func (e *EtcdOp) WatchCollection(ctx context.Context, filter api.CollectionFilter)

func (*EtcdOp) WatchPartition

func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)

type FactoryCreator

type FactoryCreator interface {
	NewPmsFactory(cfg *config.PulsarConfig) msgstream.Factory
	NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory
}

func NewDefaultFactoryCreator

func NewDefaultFactoryCreator() FactoryCreator

type ShouldReadFunc

type ShouldReadFunc func(*pb.CollectionInfo) bool

type TargetClient

type TargetClient struct {
	// contains filtered or unexported fields
}

func (*TargetClient) GetCollectionInfo

func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)

func (*TargetClient) GetDatabaseName

func (t *TargetClient) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)

func (*TargetClient) GetMilvus

func (t *TargetClient) GetMilvus(ctx context.Context, databaseName string) (client.Client, error)

func (*TargetClient) GetPartitionInfo

func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)

type TargetConfig

type TargetConfig struct {
	Address   string
	Username  string
	Password  string
	APIKey    string
	EnableTLS bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL