api

package
v0.0.0-...-34f85fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: Apache-2.0 Imports: 9 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelManager

type ChannelManager interface {
	SetCtx(ctx context.Context)
	AddDroppedCollection(ids []int64)
	AddDroppedPartition(ids []int64)

	StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
	StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
	AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error

	GetChannelChan() <-chan string
	GetMsgChan(pChannel string) <-chan *msgstream.MsgPack
	GetEventChan() <-chan *ReplicateAPIEvent
}

ChannelManager a target must promise a manager

type CollectionEventConsumer

type CollectionEventConsumer CollectionFilter

type CollectionFilter

type CollectionFilter func(*pb.CollectionInfo) bool

CollectionFilter the filter will be used before the collection is filled the schema info

type CreateCollectionParam

type CreateCollectionParam struct {
	MsgBaseParam
	ReplicateParam
	Schema           *entity.Schema
	ShardsNum        int32
	ConsistencyLevel commonpb.ConsistencyLevel
	Properties       []*commonpb.KeyValuePair
}

type CreateDatabaseParam

type CreateDatabaseParam struct {
	ReplicateParam
	milvuspb.CreateDatabaseRequest
}

type CreateIndexParam

type CreateIndexParam struct {
	ReplicateParam
	milvuspb.CreateIndexRequest
}

type CreatePartitionParam

type CreatePartitionParam struct {
	MsgBaseParam
	ReplicateParam
	CollectionName string
	PartitionName  string
}

type DataHandler

type DataHandler interface {
	CreateCollection(ctx context.Context, param *CreateCollectionParam) error
	DropCollection(ctx context.Context, param *DropCollectionParam) error
	CreatePartition(ctx context.Context, param *CreatePartitionParam) error
	DropPartition(ctx context.Context, param *DropPartitionParam) error

	// Deprecated
	Insert(ctx context.Context, param *InsertParam) error
	// Deprecated
	Delete(ctx context.Context, param *DeleteParam) error
	Flush(ctx context.Context, param *FlushParam) error

	LoadCollection(ctx context.Context, param *LoadCollectionParam) error
	ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error
	LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error
	ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error

	CreateIndex(ctx context.Context, param *CreateIndexParam) error
	DropIndex(ctx context.Context, param *DropIndexParam) error

	CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error
	DropDatabase(ctx context.Context, param *DropDatabaseParam) error

	ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error

	DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error
	DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error
	DescribePartition(ctx context.Context, param *DescribePartitionParam) error
}

type DefaultChannelManager

type DefaultChannelManager struct{}

func (*DefaultChannelManager) AddDroppedCollection

func (d *DefaultChannelManager) AddDroppedCollection(ids []int64)

func (*DefaultChannelManager) AddDroppedPartition

func (d *DefaultChannelManager) AddDroppedPartition(ids []int64)

func (*DefaultChannelManager) AddPartition

func (d *DefaultChannelManager) AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error

func (*DefaultChannelManager) GetChannelChan

func (d *DefaultChannelManager) GetChannelChan() <-chan string

func (*DefaultChannelManager) GetEventChan

func (d *DefaultChannelManager) GetEventChan() <-chan *ReplicateAPIEvent

func (*DefaultChannelManager) GetMsgChan

func (d *DefaultChannelManager) GetMsgChan(pChannel string) <-chan *msgstream.MsgPack

func (*DefaultChannelManager) SetCtx

func (d *DefaultChannelManager) SetCtx(ctx context.Context)

func (*DefaultChannelManager) StartReadCollection

func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error

func (*DefaultChannelManager) StopReadCollection

func (d *DefaultChannelManager) StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error

type DefaultDataHandler

type DefaultDataHandler struct{}

func (*DefaultDataHandler) CreateCollection

func (d *DefaultDataHandler) CreateCollection(ctx context.Context, param *CreateCollectionParam) error

func (*DefaultDataHandler) CreateDatabase

func (d *DefaultDataHandler) CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error

func (*DefaultDataHandler) CreateIndex

func (d *DefaultDataHandler) CreateIndex(ctx context.Context, param *CreateIndexParam) error

func (*DefaultDataHandler) CreatePartition

func (d *DefaultDataHandler) CreatePartition(ctx context.Context, param *CreatePartitionParam) error

func (*DefaultDataHandler) Delete

func (d *DefaultDataHandler) Delete(ctx context.Context, param *DeleteParam) error

func (*DefaultDataHandler) DescribeCollection

func (d *DefaultDataHandler) DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error

func (*DefaultDataHandler) DescribeDatabase

func (d *DefaultDataHandler) DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error

func (*DefaultDataHandler) DescribePartition

func (d *DefaultDataHandler) DescribePartition(ctx context.Context, param *DescribePartitionParam) error

func (*DefaultDataHandler) DropCollection

func (d *DefaultDataHandler) DropCollection(ctx context.Context, param *DropCollectionParam) error

func (*DefaultDataHandler) DropDatabase

func (d *DefaultDataHandler) DropDatabase(ctx context.Context, param *DropDatabaseParam) error

func (*DefaultDataHandler) DropIndex

func (d *DefaultDataHandler) DropIndex(ctx context.Context, param *DropIndexParam) error

func (*DefaultDataHandler) DropPartition

func (d *DefaultDataHandler) DropPartition(ctx context.Context, param *DropPartitionParam) error

func (*DefaultDataHandler) Flush

func (d *DefaultDataHandler) Flush(ctx context.Context, param *FlushParam) error

func (*DefaultDataHandler) Insert

func (d *DefaultDataHandler) Insert(ctx context.Context, param *InsertParam) error

func (*DefaultDataHandler) LoadCollection

func (d *DefaultDataHandler) LoadCollection(ctx context.Context, param *LoadCollectionParam) error

func (*DefaultDataHandler) LoadPartitions

func (d *DefaultDataHandler) LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error

func (*DefaultDataHandler) ReleaseCollection

func (d *DefaultDataHandler) ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error

func (*DefaultDataHandler) ReleasePartitions

func (d *DefaultDataHandler) ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error

func (*DefaultDataHandler) ReplicateMessage

func (d *DefaultDataHandler) ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error

type DefaultMessageManager

type DefaultMessageManager struct{}

func (*DefaultMessageManager) Close

func (d *DefaultMessageManager) Close(channelName string)

func (*DefaultMessageManager) ReplicateMessage

func (d *DefaultMessageManager) ReplicateMessage(message *ReplicateMessage)

type DefaultMetaOp

type DefaultMetaOp struct{}

func (*DefaultMetaOp) GetAllCollection

func (d *DefaultMetaOp) GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)

func (*DefaultMetaOp) GetAllDroppedObj

func (d *DefaultMetaOp) GetAllDroppedObj() map[string]map[string]uint64

func (*DefaultMetaOp) GetAllPartition

func (d *DefaultMetaOp) GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error)

func (*DefaultMetaOp) GetCollectionNameByID

func (d *DefaultMetaOp) GetCollectionNameByID(ctx context.Context, id int64) string

func (*DefaultMetaOp) GetDatabaseInfoForCollection

func (d *DefaultMetaOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo

func (*DefaultMetaOp) StartWatch

func (d *DefaultMetaOp) StartWatch()

func (*DefaultMetaOp) SubscribeCollectionEvent

func (d *DefaultMetaOp) SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer)

func (*DefaultMetaOp) SubscribePartitionEvent

func (d *DefaultMetaOp) SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer)

func (*DefaultMetaOp) UnsubscribeEvent

func (d *DefaultMetaOp) UnsubscribeEvent(taskID string, eventType WatchEventType)

func (*DefaultMetaOp) WatchCollection

func (d *DefaultMetaOp) WatchCollection(ctx context.Context, filter CollectionFilter)

func (*DefaultMetaOp) WatchPartition

func (d *DefaultMetaOp) WatchPartition(ctx context.Context, filter PartitionFilter)

type DefaultReader

type DefaultReader struct{}

DefaultReader All CDCReader implements should combine it

func (*DefaultReader) ErrorChan

func (d *DefaultReader) ErrorChan() <-chan error

func (*DefaultReader) QuitRead

func (d *DefaultReader) QuitRead(ctx context.Context)

func (*DefaultReader) StartRead

func (d *DefaultReader) StartRead(ctx context.Context)

StartRead the return value is nil, and if you receive the data from the nil chan, will block forever, not panic

type DefaultTargetAPI

type DefaultTargetAPI struct{}

func (*DefaultTargetAPI) GetCollectionInfo

func (d *DefaultTargetAPI) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)

func (*DefaultTargetAPI) GetDatabaseName

func (d *DefaultTargetAPI) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)

func (*DefaultTargetAPI) GetPartitionInfo

func (d *DefaultTargetAPI) GetPartitionInfo(ctx context.Context, collectionName string, databaseName string) (*model.CollectionInfo, error)

type DefaultWriter

type DefaultWriter struct{}

func (*DefaultWriter) HandleOpMessagePack

func (d *DefaultWriter) HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error)

func (*DefaultWriter) HandleReplicateAPIEvent

func (d *DefaultWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error

func (*DefaultWriter) HandleReplicateMessage

func (d *DefaultWriter) HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error)

type DeleteParam

type DeleteParam struct {
	MsgBaseParam
	ReplicateParam
	CollectionName string
	PartitionName  string
	Column         entity.Column
}

type DescribeCollectionParam

type DescribeCollectionParam struct {
	ReplicateParam
	Name string
}

type DescribeDatabaseParam

type DescribeDatabaseParam struct {
	ReplicateParam
	Name string
}

type DescribePartitionParam

type DescribePartitionParam struct {
	ReplicateParam
	CollectionName string
	PartitionName  string
}

type DropCollectionParam

type DropCollectionParam struct {
	MsgBaseParam
	ReplicateParam
	CollectionName string
}

type DropDatabaseParam

type DropDatabaseParam struct {
	ReplicateParam
	milvuspb.DropDatabaseRequest
}

type DropIndexParam

type DropIndexParam struct {
	ReplicateParam
	milvuspb.DropIndexRequest
}

type DropPartitionParam

type DropPartitionParam struct {
	MsgBaseParam
	ReplicateParam
	CollectionName string
	PartitionName  string
}

type FlushParam

type FlushParam struct {
	ReplicateParam
	milvuspb.FlushRequest
}

type InsertParam

type InsertParam struct {
	MsgBaseParam
	ReplicateParam
	CollectionName string
	PartitionName  string
	Columns        []entity.Column
}

type LoadCollectionParam

type LoadCollectionParam struct {
	ReplicateParam
	milvuspb.LoadCollectionRequest
}

type LoadPartitionsParam

type LoadPartitionsParam struct {
	ReplicateParam
	milvuspb.LoadPartitionsRequest
}

type MessageManager

type MessageManager interface {
	ReplicateMessage(message *ReplicateMessage)
	Close(channelName string)
}

type MetaOp

type MetaOp interface {
	// WatchCollection its implementation should make sure it's only called once. The WatchPartition is same
	WatchCollection(ctx context.Context, filter CollectionFilter)
	WatchPartition(ctx context.Context, filter PartitionFilter)
	StartWatch()

	// SubscribeCollectionEvent an event only is consumed once. The SubscribePartitionEvent is same
	// TODO need to consider the many target, maybe try the method a meta op corresponds to a target
	SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer)
	SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer)
	UnsubscribeEvent(taskID string, eventType WatchEventType)

	GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)
	GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error)
	GetAllDroppedObj() map[string]map[string]uint64
	GetCollectionNameByID(ctx context.Context, id int64) string
	GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
}

MetaOp meta operation

type MsgBaseParam

type MsgBaseParam struct {
	Base *commonpb.MsgBase
}

type PartitionEventConsumer

type PartitionEventConsumer PartitionFilter

type PartitionFilter

type PartitionFilter func(info *pb.PartitionInfo) bool

type Reader

type Reader interface {
	StartRead(ctx context.Context)
	QuitRead(ctx context.Context)
	ErrorChan() <-chan error
}

type ReleaseCollectionParam

type ReleaseCollectionParam struct {
	ReplicateParam
	milvuspb.ReleaseCollectionRequest
}

type ReleasePartitionsParam

type ReleasePartitionsParam struct {
	ReplicateParam
	milvuspb.ReleasePartitionsRequest
}

type ReplicateAPIEvent

type ReplicateAPIEvent struct {
	EventType      ReplicateAPIEventType
	CollectionInfo *pb.CollectionInfo
	PartitionInfo  *pb.PartitionInfo
	ReplicateInfo  *commonpb.ReplicateInfo
	ReplicateParam ReplicateParam
	Error          error
}

type ReplicateAPIEventType

type ReplicateAPIEventType int
const (
	ReplicateCreateCollection ReplicateAPIEventType = iota + 1
	ReplicateDropCollection
	ReplicateCreatePartition
	ReplicateDropPartition

	ReplicateError = 100
)

func (ReplicateAPIEventType) String

func (r ReplicateAPIEventType) String() string

type ReplicateMessage

type ReplicateMessage struct {
	Ctx         context.Context
	Param       *ReplicateMessageParam
	SuccessFunc func(param *ReplicateMessageParam)
	FailFunc    func(param *ReplicateMessageParam, err error)
}

type ReplicateMessageParam

type ReplicateMessageParam struct {
	MsgBaseParam
	ReplicateParam
	ChannelName                  string
	BeginTs, EndTs               uint64
	MsgsBytes                    [][]byte
	StartPositions, EndPositions []*msgpb.MsgPosition

	TargetMsgPosition string
}

type ReplicateParam

type ReplicateParam struct {
	Database string
}

type TargetAPI

type TargetAPI interface {
	GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
	GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
	GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)
}

type WatchEventType

type WatchEventType int
const (
	CollectionEventType WatchEventType = iota + 1
	PartitionEventType
)

type Writer

type Writer interface {
	HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
	HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error)
	HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL