Documentation
¶
Index ¶
- Variables
- func MetricDataSearch(ctx context.Context, param *models.ExecuteParam, statement *stmtpkg.Query, ...) (any, error)
- func MetricMetadataSearch(ctx context.Context, param *models.ExecuteParam, ...) (any, error)
- func MetricMetadataSearchWithResult(ctx context.Context, param *models.ExecuteParam, ...) (any, error)
- func NewTransportManager(taskClientFactory rpc.TaskClientFactory, ...) rpc.TransportManager
- type Pipeline
- type PipelineManager
- type RequestManager
- type SearchMgr
- type TaskHandler
- type TaskManager
- type TaskProcessor
Constants ¶
This section is empty.
Variables ¶
var ( ErrOnlySupportIntermediateTask = errors.New("only intermediate task is supported") ErrNoAvailableStorageNode = errors.New("no available storage node for server") ErrDatabaseNotExist = errors.New("database not exist") ErrUnmarshalPlan = errors.New("unmarshal physical plan error") ErrUnmarshalQuery = errors.New("unmarshal query statement error") ErrUnmarshalSuggest = errors.New("unmarshal metadata suggest statement error") ErrBadPhysicalPlan = errors.New("bad plan") ErrNoSendStream = errors.New("send stream not found") ErrTaskSend = errors.New("send task request error") ErrResponseSend = errors.New("send response error") ErrNoDatabase = errors.New("not found database") )
Functions ¶
func MetricDataSearch ¶ added in v0.2.4
func MetricDataSearch(ctx context.Context, param *models.ExecuteParam, statement *stmtpkg.Query, mgr *SearchMgr, ) (any, error)
BrokerExecutor represents the broker query executor, 1) chooses the storage nodes that the data is relatively complete 2) chooses broker nodes for root and intermediate computing from all available broker nodes 3) storage node as leaf computing node does filter and atomic compute 4) intermediate computing nodes are optional, only need if it has grouping query, does order by for grouping 4) root computing node does function and expression computing 5) finally returns result set to user
NOTICE: there are some scenarios:
- some assignment shards not in query replica shards, maybe some expectant results are lost in data in offline shard, WHY can query not completely data, because of for the system availability.
func MetricMetadataSearch ¶ added in v0.2.4
func MetricMetadataSearch(ctx context.Context, param *models.ExecuteParam, statement *stmtpkg.MetricMetadata, mgr *SearchMgr, ) (any, error)
MetricMetadata represents the metadata query executor, includes: 1. suggest metric name 2. suggest tag keys by spec metric name 3. suggest tag values by spec metric name and tag key 4. suggest fields by spec metric name
func MetricMetadataSearchWithResult ¶ added in v0.2.4
func MetricMetadataSearchWithResult(ctx context.Context, param *models.ExecuteParam, statement *stmtpkg.MetricMetadata, mgr *SearchMgr, ) (any, error)
MetricMetadataSearchWithResult represents the metadata query executor and retruns the final result set.
func NewTransportManager ¶ added in v0.2.4
func NewTransportManager( taskClientFactory rpc.TaskClientFactory, taskServerFactory rpc.TaskServerFactory, registry *linmetric.Registry, ) rpc.TransportManager
NewTransportManager creates a rpc transport manager instance.
Types ¶
type Pipeline ¶
type Pipeline interface { // Execute executes the stage(sub plan tree). Execute(stage stagepkg.Stage) // Stats returns the stats of stages. Stats() []*models.StageStats }
Pipeline represents the pipeline execution model, pipeline executes all query stage.
func NewExecutePipeline ¶
func NewExecutePipeline(tracker *trackerpkg.StageTracker, completeCallback func(err error)) Pipeline
NewExecutePipeline creates a Pipeline instance for executing query stage.
type PipelineManager ¶
type PipelineManager interface { // AddPipeline adds a Pipeline when it starts. AddPipeline(requestID string, pipeline Pipeline) // RemovePipeline removes a Pipeline when it completed. RemovePipeline(requestID string) // GetPipeline returns a Pipeline by given request id, if not exist then return nil. GetPipeline(requestID string) Pipeline // GetAllAlivePipelines returns all alive request ids for pipelines. GetAllAlivePipelines() []string }
PipelineManager represents the manager which store current exuecting Pipeline.
func GetPipelineManager ¶
func GetPipelineManager() PipelineManager
GetPipelineManager returns a singleton PipelineManager instance.
type RequestManager ¶ added in v0.2.4
type RequestManager interface { // NewRequest creates a new request and returns request id. NewRequest(req *models.Request) string // CompleteRequest completes a request by given request id. CompleteRequest(requestID string) // GetAliveRequests returns all alive request. GetAliveRequests() []*models.Request }
RequestManager represents the request manager which store lin query reuqest.
func GetRequestManager ¶ added in v0.2.4
func GetRequestManager() RequestManager
GetRequestManager returns a singleton RequestManager instance.
type SearchMgr ¶ added in v0.2.4
type SearchMgr struct { // for intermediate processor set reqeust id, must keep using same request id RequestID string Timeout time.Duration CurNode models.StatelessNode Choose flow.NodeChoose TaskMgr TaskManager TransportMgr rpc.TransportManager }
SearchMgr represents the dependencies for searching.
type TaskHandler ¶
type TaskHandler struct {
// contains filtered or unexported fields
}
TaskHandler represents the task rpc handler
func NewTaskHandler ¶
func NewTaskHandler( cfg config.Query, fct rpc.TaskServerFactory, processor TaskProcessor, pool concurrent.Pool, ) *TaskHandler
NewTaskHandler creates the task rpc handler
func (*TaskHandler) Handle ¶
func (q *TaskHandler) Handle(stream protoCommonV1.TaskService_HandleServer) (err error)
Handle handles the task request based on grpc stream
type TaskManager ¶ added in v0.2.4
type TaskManager interface { rpc.TaskReceiver // AddTask adds task context by request id. AddTask(requestID string, taskCtx context.TaskContext) // RemoveTask removes task context by request id. RemoveTask(requestID string) }
TaskManager represents the task manager for current node. FIXME: need remove when target offline
func NewTaskManager ¶ added in v0.2.4
func NewTaskManager(workerPool concurrent.Pool, registry *linmetric.Registry) TaskManager
NewTaskManager creates the task manager.
type TaskProcessor ¶
type TaskProcessor interface { // Process processes the task request. Process(ctx *flow.TaskContext, stream protoCommonV1.TaskService_HandleServer, req *protoCommonV1.TaskRequest) error }
TaskProcessor represents the task processor, all task processors are async.
func NewIntermediateTaskProcessor ¶ added in v0.2.4
func NewIntermediateTaskProcessor( curNode models.StatelessNode, timeout time.Duration, stateMgr broker.StateManager, taskMgr TaskManager, transportMgr rpc.TransportManager, ) TaskProcessor
NewIntermediateTaskProcessor creates a intermediate task processor.
func NewLeafTaskProcessor ¶
func NewLeafTaskProcessor( currentNode models.Node, engine tsdb.Engine, taskServerFactory rpc.TaskServerFactory, ) TaskProcessor
NewLeafTaskProcessor creates the leaf task