Documentation
¶
Index ¶
- func ConvertInternalImportFile(file *msgpb.ImportFile, _ int) *internalpb.ImportFile
- func DeregisterRateCollector(label string)
- func RegisterRateCollector(label string)
- func StartTracer(msg msgstream.TsMsg, name string) (context.Context, trace.Span)
- type ChannelCheckpointUpdater
- type MsgHandler
- type PipelineParams
- type RateCollector
- type StatsUpdater
- type Tickler
- type TimeRange
- type TimeTickSender
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertInternalImportFile ¶
func ConvertInternalImportFile(file *msgpb.ImportFile, _ int) *internalpb.ImportFile
func DeregisterRateCollector ¶
func DeregisterRateCollector(label string)
func RegisterRateCollector ¶
func RegisterRateCollector(label string)
Types ¶
type ChannelCheckpointUpdater ¶
type ChannelCheckpointUpdater struct {
// contains filtered or unexported fields
}
func NewChannelCheckpointUpdater ¶
func NewChannelCheckpointUpdater(broker broker.Broker) *ChannelCheckpointUpdater
func NewChannelCheckpointUpdaterWithCallback ¶
func NewChannelCheckpointUpdaterWithCallback(broker broker.Broker, updateDoneCallback func(*msgpb.MsgPosition)) *ChannelCheckpointUpdater
NewChannelCheckpointUpdaterWithCallback creates a ChannelCheckpointUpdater with a callback function
func (*ChannelCheckpointUpdater) AddTask ¶
func (ccu *ChannelCheckpointUpdater) AddTask(channelPos *msgpb.MsgPosition, flush bool, callback func())
func (*ChannelCheckpointUpdater) Close ¶
func (ccu *ChannelCheckpointUpdater) Close()
func (*ChannelCheckpointUpdater) Start ¶
func (ccu *ChannelCheckpointUpdater) Start()
type MsgHandler ¶
type MsgHandler interface { HandleCreateSegment(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2) error HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error HandleManualFlush(vchannel string, flushMsg message.ImmutableManualFlushMessageV2) error HandleImport(ctx context.Context, vchannel string, importMsg *msgpb.ImportMsg) error HandleSchemaChange(ctx context.Context, vchannel string, msg *adaptor.SchemaChangeMessageBody) error }
type PipelineParams ¶
type PipelineParams struct { Ctx context.Context Broker broker.Broker SyncMgr syncmgr.SyncManager TimeTickSender StatsUpdater // reference to TimeTickSender CompactionExecutor compactor.Executor // reference to compaction executor MsgStreamFactory dependency.Factory DispClient msgdispatcher.Client ChunkManager storage.ChunkManager Session *sessionutil.Session WriteBufferManager writebuffer.BufferManager CheckpointUpdater *ChannelCheckpointUpdater Allocator allocator.Interface MsgHandler MsgHandler }
type RateCollector ¶
type RateCollector struct { *ratelimitutil.RateCollector // contains filtered or unexported fields }
RateCollector helps to collect and calculate values (like rate, timeTick and etc...).
func GetRateCollector ¶
func GetRateCollector() *RateCollector
func (*RateCollector) GetMinFlowGraphTt ¶
func (r *RateCollector) GetMinFlowGraphTt() (string, typeutil.Timestamp)
GetMinFlowGraphTt returns the vchannel and minimal time tick of flow graphs.
func (*RateCollector) RemoveFlowGraphChannel ¶
func (r *RateCollector) RemoveFlowGraphChannel(channel string)
RemoveFlowGraphChannel removes channel from flowGraphTt.
func (*RateCollector) UpdateFlowGraphTt ¶
func (r *RateCollector) UpdateFlowGraphTt(channel string, t typeutil.Timestamp)
UpdateFlowGraphTt updates RateCollector's flow graph time tick.
type StatsUpdater ¶
type Tickler ¶
type Tickler struct {
// contains filtered or unexported fields
}
Tickler counts every time when called inc(),
func NewTickler ¶
func NewTickler() *Tickler
func (*Tickler) GetProgressSig ¶
func (t *Tickler) GetProgressSig() chan struct{}
type TimeTickSender ¶
type TimeTickSender struct {
// contains filtered or unexported fields
}
TimeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically TimeTickSender hold segmentStats cache for each channel, after send succeeds will clean the cache earlier than last sent timestamp
func NewTimeTickSender ¶
func (*TimeTickSender) GetLatestTimestamp ¶
func (m *TimeTickSender) GetLatestTimestamp(channel string) typeutil.Timestamp
func (*TimeTickSender) Start ¶
func (m *TimeTickSender) Start()
func (*TimeTickSender) Stop ¶
func (m *TimeTickSender) Stop()
func (*TimeTickSender) Update ¶
func (m *TimeTickSender) Update(channelName string, timestamp uint64, segStats []*commonpb.SegmentStats)