flow

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildPhysicalPlan added in v0.2.4

func BuildPhysicalPlan(database string, liveNodes []models.StatelessNode, numOfNodes int) *models.PhysicalPlan

BuildPhysicalPlan returns physical plan based on live nodes and node number, need shuffle live node.

Types

type DataFilter

type DataFilter interface {
	// Filter filters the data based on metricIDs/fields/seriesIDs/timeRange,
	// if finds data then returns filter result set, else returns nil.
	Filter(shardExecuteContext *ShardExecuteContext) ([]FilterResultSet, error)
}

DataFilter represents the filter ability over memory database and files under data family.

type DataLoadContext

type DataLoadContext struct {
	ShardExecuteCtx          *ShardExecuteContext
	IsMultiField, IsGrouping bool

	MinSeriesID, MaxSeriesID uint16
	// range of min/max low series id
	// if no grouping value is low series ids
	// if grouping value is index of GroupingSeriesAgg
	LowSeriesIDs          []uint16
	SeriesIDHighKey       uint16
	LowSeriesIDsContainer roaring.Container

	GroupingSeriesAggRefs    []uint16 // series id => GroupingSeriesAgg index
	WithoutGroupingSeriesAgg *GroupingSeriesAgg
	GroupingSeriesAgg        []*GroupingSeriesAgg

	Decoder      *encoding.TSDDecoder
	DownSampling func(slotRange timeutil.SlotRange, seriesIdx uint16, fieldIdx int, getter encoding.TSDValueGetter)

	PendingDataLoadTasks *atomic.Int32
	// contains filtered or unexported fields
}

DataLoadContext represents data load level query execute context.

func (*DataLoadContext) GetSeriesAggregator

func (ctx *DataLoadContext) GetSeriesAggregator(lowSeriesIdx uint16, fieldIdx int) (result aggregation.SeriesAggregator)

GetSeriesAggregator gets series aggregator with low series id and field index.

func (*DataLoadContext) Grouping

func (ctx *DataLoadContext) Grouping()

Grouping prepares context for grouping query.

func (*DataLoadContext) HasGroupingData

func (ctx *DataLoadContext) HasGroupingData() bool

HasGroupingData returns if it is grouping data.

func (*DataLoadContext) IterateLowSeriesIDs

func (ctx *DataLoadContext) IterateLowSeriesIDs(lowSeriesIDsFromStorage roaring.Container,
	fn func(seriesIdxFromQuery uint16, seriesIdxFromStorage int),
)

IterateLowSeriesIDs iterates low series ids from storage, then found low series id which query need.

func (*DataLoadContext) NewSeriesAggregator

func (ctx *DataLoadContext) NewSeriesAggregator(groupingKey string) uint16

NewSeriesAggregator creates the series aggregator with grouping key for grouping query, returns index of grouping aggregator.

func (*DataLoadContext) PrepareAggregatorWithoutGrouping

func (ctx *DataLoadContext) PrepareAggregatorWithoutGrouping()

PrepareAggregatorWithoutGrouping prepares context for without grouping query.

func (*DataLoadContext) Reduce

func (ctx *DataLoadContext) Reduce(reduceFn func(it series.GroupedIterator))

Reduce reduces down sampling result.

type DataLoader

type DataLoader interface {
	// Load loads the metric data by given low series id.
	Load(ctx *DataLoadContext)
}

DataLoader represents the loader which load metric data from storage.

type FilterResultSet

type FilterResultSet interface {
	// Identifier identifies the source of result set(mem/kv etc.).
	Identifier() string
	// FamilyTime returns the family time of storage.
	FamilyTime() int64
	// SlotRange returns the slot range of storage.
	SlotRange() timeutil.SlotRange
	// Load loads the data from storage, then returns the data loader.
	Load(ctx *DataLoadContext) DataLoader
	// SeriesIDs returns the series ids which matches with query series ids.
	SeriesIDs() *roaring.Bitmap
	// Close release the resource during doing query operation.
	Close()
}

FilterResultSet represents the filter result set, loads data based on this interface.

type Grouping

type Grouping interface {
	// GetGroupingScanner returns the grouping scanners based on tag key ids and series ids
	GetGroupingScanner(tagKeyID tag.KeyID, seriesIDs *roaring.Bitmap) ([]GroupingScanner, error)
}

Grouping represents the getter grouping scanners for tag key group by query

type GroupingBuilder

type GroupingBuilder interface {
	// GetGroupingContext returns the context of group by
	GetGroupingContext(ctx *ShardExecuteContext) error
}

GroupingBuilder represents grouping tag builder.

type GroupingContext

type GroupingContext interface {
	// BuildGroup builds the grouped series ids by the high key of series id
	// and the container includes low keys of series id
	BuildGroup(ctx *DataLoadContext)
	// ScanTagValueIDs scans grouping context by high key/container of series ids,
	// then returns grouped tag value ids for each tag key
	ScanTagValueIDs(highKey uint16, container roaring.Container) []*roaring.Bitmap
}

GroupingContext represents the context of group by query for tag keys

func NewGroupContext

func NewGroupContext(tagKeys []tag.KeyID, scanners map[tag.KeyID][]GroupingScanner) GroupingContext

NewGroupContext creates a GroupingContext

type GroupingScanner

type GroupingScanner interface {
	// GetSeriesAndTagValue returns group by container and tag value ids
	GetSeriesAndTagValue(highKey uint16) (roaring.Container, []uint32)
	// GetSeriesIDs returns the series ids in current scanner.
	GetSeriesIDs() *roaring.Bitmap
}

GroupingScanner represents the scanner which scans the group by data by high key of series id

type GroupingSeriesAgg

type GroupingSeriesAgg struct {
	Key         string
	Aggregator  aggregation.SeriesAggregator // for single field query
	Aggregators aggregation.FieldAggregates  // for multi fields query
}

GroupingSeriesAgg represents grouping series aggregator.

type NodeChoose added in v0.2.4

type NodeChoose interface {
	// Choose chooses the compute nodes then builds physical plan.
	Choose(database string, numOfNodes int) ([]*models.PhysicalPlan, error)
}

NodeChoose represents node choose for data query.

type QueryTask

type QueryTask interface {
	// BeforeRun invokes before task run.
	BeforeRun()
	// Run executes task query logic.
	Run() error
	// AfterRun invokes after task run.
	AfterRun()
}

QueryTask represents query task for data search flow.

type ShardExecuteContext

type ShardExecuteContext struct {
	StorageExecuteCtx  *StorageExecuteContext
	TimeSegmentContext *TimeSegmentContext // result set for each time segment

	GroupingContext         GroupingContext // after get grouping context if it has grouping query
	SeriesIDsAfterFiltering *roaring.Bitmap // after data filter
}

ShardExecuteContext represents shard level query execute context.

func NewShardExecuteContext

func NewShardExecuteContext(storageExecuteCtx *StorageExecuteContext) *ShardExecuteContext

NewShardExecuteContext creates a shard execute context.

func (*ShardExecuteContext) IsSeriesIDsEmpty

func (ctx *ShardExecuteContext) IsSeriesIDsEmpty() bool

IsSeriesIDsEmpty returns if series not found.

func (*ShardExecuteContext) Release

func (ctx *ShardExecuteContext) Release()

Release releases shard context's resource after query.

type Stage

type Stage int

Stage is the definition of query stage

const (
	FilteringStage Stage = iota + 1
	GroupingStage
	ScannerStage
	DownSamplingStage
)

func (Stage) String

func (qs Stage) String() string

String returns string value of stage.

type StorageExecuteContext

type StorageExecuteContext struct {
	TaskCtx       *TaskContext
	Query         *stmt.Query
	ShardIDs      []models.ShardID
	ShardContexts []*ShardExecuteContext

	// set value in plan stage when lookup table.
	MetricID metric.ID

	// set value in plan stage when lookup select fields.
	Fields            field.Metas
	DownSamplingSpecs aggregation.AggregatorSpecs
	AggregatorSpecs   aggregation.AggregatorSpecs

	// TagKeys cache all tag keys metadata for current query session
	TagKeys map[string]tag.KeyID // for cache tag key

	// result which after tag condition metadata filter
	// set value in tag search, the where clause condition that user input
	// first find all tag values in where clause, then do tag match
	TagFilterResult map[string]*TagFilterResult // TODO rename to tag lookup???

	// set value in plan stage when lookup group by tags.
	GroupByTags      tag.Metas
	GroupByTagKeyIDs []tag.KeyID
	// for group by query store tag value ids for each group tag key
	GroupingTagValueIDs []*roaring.Bitmap
	// contains filtered or unexported fields
}

StorageExecuteContext represents storage level query execute context.

func (*StorageExecuteContext) CalcSourceSlotRange

func (ctx *StorageExecuteContext) CalcSourceSlotRange(familyTime int64) timeutil.SlotRange

CalcSourceSlotRange returns slot range for filtering by family time and query time range.

func (*StorageExecuteContext) CollectTagValues

func (ctx *StorageExecuteContext) CollectTagValues(fn func())

CollectTagValues collects tag value with lock.

func (*StorageExecuteContext) HasGroupingTagValueIDs

func (ctx *StorageExecuteContext) HasGroupingTagValueIDs() bool

HasGroupingTagValueIDs returns if it needs collect grouping tag value.

func (*StorageExecuteContext) HasWhereCondition

func (ctx *StorageExecuteContext) HasWhereCondition() bool

HasWhereCondition returns if query has where clause condition.

func (*StorageExecuteContext) Release

func (ctx *StorageExecuteContext) Release()

Release releases context's resource after query.

func (*StorageExecuteContext) SortFields

func (ctx *StorageExecuteContext) SortFields()

SortFields sorts fields by field ids for reading data in order.

type StorageQueryFlow

type StorageQueryFlow interface {
	// Prepare prepares the query flow, builds the flow execute context based on group aggregator specs.
	Prepare()
	// Submit submits an async task when do query pipeline.
	Submit(stage Stage, task func())
	// Reduce reduces the down sampling aggregator's result.
	Reduce(it series.GroupedIterator)
	// ReduceTagValues reduces the group by tag values.
	ReduceTagValues(tagKeyIndex int, tagValues map[uint32]string)
	// Complete completes the query flow with error.
	Complete(err error)
}

StorageQueryFlow represents the storage query engine execute flow.

type TagFilterResult

type TagFilterResult struct {
	TagKeyID    tag.KeyID
	TagValueIDs *roaring.Bitmap
}

TagFilterResult represents the tag filter result, include tag key id and tag value ids.

type TaskContext

type TaskContext struct {
	Ctx    context.Context
	Cancel context.CancelFunc

	Start time.Time
}

TaskContext represents task execute context.

func NewTaskContextWithTimeout

func NewTaskContextWithTimeout(ctx context.Context, timeout time.Duration) *TaskContext

NewTaskContextWithTimeout creates a task context with timeout.

func (*TaskContext) Release

func (ctx *TaskContext) Release()

Release releases context's resource after query.

type TimeSegmentContext

type TimeSegmentContext struct {
	TimeSegments map[int64]*TimeSegmentResultSet // familyTime -> time segment result set list
	SeriesIDs    *roaring.Bitmap                 // matched series ids after data filter
}

TimeSegmentContext represents time segment context

func NewTimeSegmentContext

func NewTimeSegmentContext() *TimeSegmentContext

NewTimeSegmentContext creates a time segment context.

func (*TimeSegmentContext) AddFilterResultSet

func (ts *TimeSegmentContext) AddFilterResultSet(interval timeutil.Interval, rs FilterResultSet)

AddFilterResultSet adds a result set after data filtering.

func (*TimeSegmentContext) GetTimeSegments

func (ts *TimeSegmentContext) GetTimeSegments() (rs TimeSegmentContexts)

GetTimeSegments returns

func (*TimeSegmentContext) Release

func (ts *TimeSegmentContext) Release()

Release releases time segment's data resource after query.

type TimeSegmentContexts

type TimeSegmentContexts []*TimeSegmentResultSet

TimeSegmentContexts represents the time segment slice in query time range.

func (TimeSegmentContexts) Len

func (f TimeSegmentContexts) Len() int

func (TimeSegmentContexts) Less

func (f TimeSegmentContexts) Less(i, j int) bool

func (TimeSegmentContexts) Swap

func (f TimeSegmentContexts) Swap(i, j int)

type TimeSegmentResultSet

type TimeSegmentResultSet struct {
	FamilyTime int64

	Source timeutil.SlotRange
	Target timeutil.SlotRange

	BaseSlot int

	Interval      timeutil.Interval
	IntervalRatio uint16

	FilterRS []FilterResultSet
}

TimeSegmentResultSet represents the time segment in query time range.

func (*TimeSegmentResultSet) Release

func (ctx *TimeSegmentResultSet) Release()

Release releases filter result set's resource after query.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL