Documentation ¶
Index ¶
- func BuildPhysicalPlan(database string, liveNodes []models.StatelessNode, numOfNodes int) *models.PhysicalPlan
- type DataFilter
- type DataLoadContext
- func (ctx *DataLoadContext) GetSeriesAggregator(lowSeriesIdx uint16, fieldIdx int) (result aggregation.SeriesAggregator)
- func (ctx *DataLoadContext) Grouping()
- func (ctx *DataLoadContext) HasGroupingData() bool
- func (ctx *DataLoadContext) IterateLowSeriesIDs(lowSeriesIDsFromStorage roaring.Container, ...)
- func (ctx *DataLoadContext) NewSeriesAggregator(groupingKey string) uint16
- func (ctx *DataLoadContext) PrepareAggregatorWithoutGrouping()
- func (ctx *DataLoadContext) Reduce(reduceFn func(it series.GroupedIterator))
- type DataLoader
- type FilterResultSet
- type Grouping
- type GroupingBuilder
- type GroupingContext
- type GroupingScanner
- type GroupingSeriesAgg
- type NodeChoose
- type QueryTask
- type ShardExecuteContext
- type Stage
- type StorageExecuteContext
- func (ctx *StorageExecuteContext) CalcSourceSlotRange(familyTime int64) timeutil.SlotRange
- func (ctx *StorageExecuteContext) CollectTagValues(fn func())
- func (ctx *StorageExecuteContext) HasGroupingTagValueIDs() bool
- func (ctx *StorageExecuteContext) HasWhereCondition() bool
- func (ctx *StorageExecuteContext) Release()
- func (ctx *StorageExecuteContext) SortFields()
- type StorageQueryFlow
- type TagFilterResult
- type TaskContext
- type TimeSegmentContext
- type TimeSegmentContexts
- type TimeSegmentResultSet
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildPhysicalPlan ¶ added in v0.2.4
func BuildPhysicalPlan(database string, liveNodes []models.StatelessNode, numOfNodes int) *models.PhysicalPlan
BuildPhysicalPlan returns physical plan based on live nodes and node number, need shuffle live node.
Types ¶
type DataFilter ¶
type DataFilter interface { // Filter filters the data based on metricIDs/fields/seriesIDs/timeRange, // if finds data then returns filter result set, else returns nil. Filter(shardExecuteContext *ShardExecuteContext) ([]FilterResultSet, error) }
DataFilter represents the filter ability over memory database and files under data family.
type DataLoadContext ¶
type DataLoadContext struct { ShardExecuteCtx *ShardExecuteContext IsMultiField, IsGrouping bool MinSeriesID, MaxSeriesID uint16 // range of min/max low series id // if no grouping value is low series ids // if grouping value is index of GroupingSeriesAgg LowSeriesIDs []uint16 SeriesIDHighKey uint16 LowSeriesIDsContainer roaring.Container GroupingSeriesAggRefs []uint16 // series id => GroupingSeriesAgg index WithoutGroupingSeriesAgg *GroupingSeriesAgg GroupingSeriesAgg []*GroupingSeriesAgg Decoder *encoding.TSDDecoder DownSampling func(slotRange timeutil.SlotRange, seriesIdx uint16, fieldIdx int, getter encoding.TSDValueGetter) PendingDataLoadTasks *atomic.Int32 // contains filtered or unexported fields }
DataLoadContext represents data load level query execute context.
func (*DataLoadContext) GetSeriesAggregator ¶
func (ctx *DataLoadContext) GetSeriesAggregator(lowSeriesIdx uint16, fieldIdx int) (result aggregation.SeriesAggregator)
GetSeriesAggregator gets series aggregator with low series id and field index.
func (*DataLoadContext) Grouping ¶
func (ctx *DataLoadContext) Grouping()
Grouping prepares context for grouping query.
func (*DataLoadContext) HasGroupingData ¶
func (ctx *DataLoadContext) HasGroupingData() bool
HasGroupingData returns if it is grouping data.
func (*DataLoadContext) IterateLowSeriesIDs ¶
func (ctx *DataLoadContext) IterateLowSeriesIDs(lowSeriesIDsFromStorage roaring.Container, fn func(seriesIdxFromQuery uint16, seriesIdxFromStorage int), )
IterateLowSeriesIDs iterates low series ids from storage, then found low series id which query need.
func (*DataLoadContext) NewSeriesAggregator ¶
func (ctx *DataLoadContext) NewSeriesAggregator(groupingKey string) uint16
NewSeriesAggregator creates the series aggregator with grouping key for grouping query, returns index of grouping aggregator.
func (*DataLoadContext) PrepareAggregatorWithoutGrouping ¶
func (ctx *DataLoadContext) PrepareAggregatorWithoutGrouping()
PrepareAggregatorWithoutGrouping prepares context for without grouping query.
func (*DataLoadContext) Reduce ¶
func (ctx *DataLoadContext) Reduce(reduceFn func(it series.GroupedIterator))
Reduce reduces down sampling result.
type DataLoader ¶
type DataLoader interface { // Load loads the metric data by given low series id. Load(ctx *DataLoadContext) }
DataLoader represents the loader which load metric data from storage.
type FilterResultSet ¶
type FilterResultSet interface { // Identifier identifies the source of result set(mem/kv etc.). Identifier() string // FamilyTime returns the family time of storage. FamilyTime() int64 // SlotRange returns the slot range of storage. SlotRange() timeutil.SlotRange // Load loads the data from storage, then returns the data loader. Load(ctx *DataLoadContext) DataLoader // SeriesIDs returns the series ids which matches with query series ids. SeriesIDs() *roaring.Bitmap // Close release the resource during doing query operation. Close() }
FilterResultSet represents the filter result set, loads data based on this interface.
type Grouping ¶
type Grouping interface { // GetGroupingScanner returns the grouping scanners based on tag key ids and series ids GetGroupingScanner(tagKeyID tag.KeyID, seriesIDs *roaring.Bitmap) ([]GroupingScanner, error) }
Grouping represents the getter grouping scanners for tag key group by query
type GroupingBuilder ¶
type GroupingBuilder interface { // GetGroupingContext returns the context of group by GetGroupingContext(ctx *ShardExecuteContext) error }
GroupingBuilder represents grouping tag builder.
type GroupingContext ¶
type GroupingContext interface { // BuildGroup builds the grouped series ids by the high key of series id // and the container includes low keys of series id BuildGroup(ctx *DataLoadContext) // ScanTagValueIDs scans grouping context by high key/container of series ids, // then returns grouped tag value ids for each tag key ScanTagValueIDs(highKey uint16, container roaring.Container) []*roaring.Bitmap }
GroupingContext represents the context of group by query for tag keys
func NewGroupContext ¶
func NewGroupContext(tagKeys []tag.KeyID, scanners map[tag.KeyID][]GroupingScanner) GroupingContext
NewGroupContext creates a GroupingContext
type GroupingScanner ¶
type GroupingScanner interface { // GetSeriesAndTagValue returns group by container and tag value ids GetSeriesAndTagValue(highKey uint16) (roaring.Container, []uint32) // GetSeriesIDs returns the series ids in current scanner. GetSeriesIDs() *roaring.Bitmap }
GroupingScanner represents the scanner which scans the group by data by high key of series id
type GroupingSeriesAgg ¶
type GroupingSeriesAgg struct { Key string Aggregator aggregation.SeriesAggregator // for single field query Aggregators aggregation.FieldAggregates // for multi fields query }
GroupingSeriesAgg represents grouping series aggregator.
type NodeChoose ¶ added in v0.2.4
type NodeChoose interface { // Choose chooses the compute nodes then builds physical plan. Choose(database string, numOfNodes int) ([]*models.PhysicalPlan, error) }
NodeChoose represents node choose for data query.
type QueryTask ¶
type QueryTask interface { // BeforeRun invokes before task run. BeforeRun() // Run executes task query logic. Run() error // AfterRun invokes after task run. AfterRun() }
QueryTask represents query task for data search flow.
type ShardExecuteContext ¶
type ShardExecuteContext struct { StorageExecuteCtx *StorageExecuteContext TimeSegmentContext *TimeSegmentContext // result set for each time segment GroupingContext GroupingContext // after get grouping context if it has grouping query SeriesIDsAfterFiltering *roaring.Bitmap // after data filter }
ShardExecuteContext represents shard level query execute context.
func NewShardExecuteContext ¶
func NewShardExecuteContext(storageExecuteCtx *StorageExecuteContext) *ShardExecuteContext
NewShardExecuteContext creates a shard execute context.
func (*ShardExecuteContext) IsSeriesIDsEmpty ¶
func (ctx *ShardExecuteContext) IsSeriesIDsEmpty() bool
IsSeriesIDsEmpty returns if series not found.
func (*ShardExecuteContext) Release ¶
func (ctx *ShardExecuteContext) Release()
Release releases shard context's resource after query.
type StorageExecuteContext ¶
type StorageExecuteContext struct { TaskCtx *TaskContext Query *stmt.Query ShardIDs []models.ShardID ShardContexts []*ShardExecuteContext // set value in plan stage when lookup table. MetricID metric.ID // set value in plan stage when lookup select fields. Fields field.Metas DownSamplingSpecs aggregation.AggregatorSpecs AggregatorSpecs aggregation.AggregatorSpecs // TagKeys cache all tag keys metadata for current query session TagKeys map[string]tag.KeyID // for cache tag key // result which after tag condition metadata filter // set value in tag search, the where clause condition that user input // first find all tag values in where clause, then do tag match TagFilterResult map[string]*TagFilterResult // TODO rename to tag lookup??? // set value in plan stage when lookup group by tags. GroupByTags tag.Metas GroupByTagKeyIDs []tag.KeyID // for group by query store tag value ids for each group tag key GroupingTagValueIDs []*roaring.Bitmap // contains filtered or unexported fields }
StorageExecuteContext represents storage level query execute context.
func (*StorageExecuteContext) CalcSourceSlotRange ¶
func (ctx *StorageExecuteContext) CalcSourceSlotRange(familyTime int64) timeutil.SlotRange
CalcSourceSlotRange returns slot range for filtering by family time and query time range.
func (*StorageExecuteContext) CollectTagValues ¶
func (ctx *StorageExecuteContext) CollectTagValues(fn func())
CollectTagValues collects tag value with lock.
func (*StorageExecuteContext) HasGroupingTagValueIDs ¶
func (ctx *StorageExecuteContext) HasGroupingTagValueIDs() bool
HasGroupingTagValueIDs returns if it needs collect grouping tag value.
func (*StorageExecuteContext) HasWhereCondition ¶
func (ctx *StorageExecuteContext) HasWhereCondition() bool
HasWhereCondition returns if query has where clause condition.
func (*StorageExecuteContext) Release ¶
func (ctx *StorageExecuteContext) Release()
Release releases context's resource after query.
func (*StorageExecuteContext) SortFields ¶
func (ctx *StorageExecuteContext) SortFields()
SortFields sorts fields by field ids for reading data in order.
type StorageQueryFlow ¶
type StorageQueryFlow interface { // Prepare prepares the query flow, builds the flow execute context based on group aggregator specs. Prepare() // Submit submits an async task when do query pipeline. Submit(stage Stage, task func()) // Reduce reduces the down sampling aggregator's result. Reduce(it series.GroupedIterator) // ReduceTagValues reduces the group by tag values. ReduceTagValues(tagKeyIndex int, tagValues map[uint32]string) // Complete completes the query flow with error. Complete(err error) }
StorageQueryFlow represents the storage query engine execute flow.
type TagFilterResult ¶
TagFilterResult represents the tag filter result, include tag key id and tag value ids.
type TaskContext ¶
TaskContext represents task execute context.
func NewTaskContextWithTimeout ¶
func NewTaskContextWithTimeout(ctx context.Context, timeout time.Duration) *TaskContext
NewTaskContextWithTimeout creates a task context with timeout.
func (*TaskContext) Release ¶
func (ctx *TaskContext) Release()
Release releases context's resource after query.
type TimeSegmentContext ¶
type TimeSegmentContext struct { TimeSegments map[int64]*TimeSegmentResultSet // familyTime -> time segment result set list SeriesIDs *roaring.Bitmap // matched series ids after data filter }
TimeSegmentContext represents time segment context
func NewTimeSegmentContext ¶
func NewTimeSegmentContext() *TimeSegmentContext
NewTimeSegmentContext creates a time segment context.
func (*TimeSegmentContext) AddFilterResultSet ¶
func (ts *TimeSegmentContext) AddFilterResultSet(interval timeutil.Interval, rs FilterResultSet)
AddFilterResultSet adds a result set after data filtering.
func (*TimeSegmentContext) GetTimeSegments ¶
func (ts *TimeSegmentContext) GetTimeSegments() (rs TimeSegmentContexts)
GetTimeSegments returns
func (*TimeSegmentContext) Release ¶
func (ts *TimeSegmentContext) Release()
Release releases time segment's data resource after query.
type TimeSegmentContexts ¶
type TimeSegmentContexts []*TimeSegmentResultSet
TimeSegmentContexts represents the time segment slice in query time range.
func (TimeSegmentContexts) Len ¶
func (f TimeSegmentContexts) Len() int
func (TimeSegmentContexts) Less ¶
func (f TimeSegmentContexts) Less(i, j int) bool
func (TimeSegmentContexts) Swap ¶
func (f TimeSegmentContexts) Swap(i, j int)
type TimeSegmentResultSet ¶
type TimeSegmentResultSet struct { FamilyTime int64 Source timeutil.SlotRange Target timeutil.SlotRange BaseSlot int Interval timeutil.Interval IntervalRatio uint16 FilterRS []FilterResultSet }
TimeSegmentResultSet represents the time segment in query time range.
func (*TimeSegmentResultSet) Release ¶
func (ctx *TimeSegmentResultSet) Release()
Release releases filter result set's resource after query.