Documentation ¶
Index ¶
- Constants
- func NewMetrics(ctxLabelKeys []string) *metrics
- func ReadFilterSource(id execute.DatasetID, r Reader, readSpec ReadFilterSpec, ...) execute.Source
- func ReadGroupSource(id execute.DatasetID, r Reader, readSpec ReadGroupSpec, ...) execute.Source
- func ReadTagKeysSource(id execute.DatasetID, r Reader, readSpec ReadTagKeysSpec, ...) execute.Source
- func ReadTagValuesSource(id execute.DatasetID, r Reader, readSpec ReadTagValuesSpec, ...) execute.Source
- type AllBucketLookup
- type BucketDependencies
- type BucketLookup
- type BucketsDecoder
- type Dependencies
- type FromDependencies
- type FromOpSpec
- type FromProcedureSpec
- type GroupMode
- type HostLookup
- type OrganizationLookup
- type PushDownFilterRule
- type PushDownGroupRule
- type PushDownRangeRule
- type PushDownReadTagKeysRule
- type PushDownReadTagValuesRule
- type ReadFilterSpec
- type ReadGroupPhysSpec
- type ReadGroupSpec
- type ReadRangePhysSpec
- func (s *ReadRangePhysSpec) Copy() plan.ProcedureSpec
- func (s *ReadRangePhysSpec) Kind() plan.ProcedureKind
- func (s *ReadRangePhysSpec) LookupBucketID(ctx context.Context, orgID influxdb.ID, buckets BucketLookup) (influxdb.ID, error)
- func (s *ReadRangePhysSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds
- type ReadTagKeysPhysSpec
- type ReadTagKeysSpec
- type ReadTagValuesPhysSpec
- type ReadTagValuesSpec
- type Reader
- type SortedPivotRule
- type Source
- type StaticLookup
- type Stats
- type StorageDependencies
- type TableIterator
- type ToDependencies
- type ToOpSpec
- type ToProcedureSpec
- type ToTransformation
- func (t *ToTransformation) Finish(id execute.DatasetID, err error)
- func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error
- func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error
- func (t *ToTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error
Constants ¶
const ( ReadRangePhysKind = "ReadRangePhysKind" ReadGroupPhysKind = "ReadGroupPhysKind" ReadTagKeysPhysKind = "ReadTagKeysPhysKind" ReadTagValuesPhysKind = "ReadTagValuesPhysKind" )
const DefaultBufferSize = 1 << 14
const DefaultMeasurementColLabel = "_measurement"
const FromKind = "influxDBFrom"
const ToKind = influxdb.ToKind
ToKind is the kind for the `to` flux function
Variables ¶
This section is empty.
Functions ¶
func NewMetrics ¶
func NewMetrics(ctxLabelKeys []string) *metrics
NewMetrics produces a new metrics objects for an influxdb source. Currently it just collects the duration of read requests into a histogram. ctxLabelKeys is a list of labels to add to the produced metrics. The value for a given key will be read off the context. The context value must be a string or an implementation of the Stringer interface. In addition, produced metrics will be labeled with the orgID and type of operation requested.
func ReadFilterSource ¶
func ReadFilterSource(id execute.DatasetID, r Reader, readSpec ReadFilterSpec, a execute.Administration) execute.Source
func ReadGroupSource ¶
func ReadGroupSource(id execute.DatasetID, r Reader, readSpec ReadGroupSpec, a execute.Administration) execute.Source
func ReadTagKeysSource ¶
func ReadTagKeysSource(id execute.DatasetID, r Reader, readSpec ReadTagKeysSpec, a execute.Administration) execute.Source
func ReadTagValuesSource ¶
func ReadTagValuesSource(id execute.DatasetID, r Reader, readSpec ReadTagValuesSpec, a execute.Administration) execute.Source
Types ¶
type AllBucketLookup ¶
type BucketDependencies ¶
type BucketDependencies AllBucketLookup
type BucketLookup ¶
type BucketsDecoder ¶
type BucketsDecoder struct {
// contains filtered or unexported fields
}
func (*BucketsDecoder) Close ¶
func (bd *BucketsDecoder) Close() error
type Dependencies ¶
type Dependencies struct { StorageDeps StorageDependencies FluxDeps flux.Dependencies }
func NewDependencies ¶
func NewDependencies( reader Reader, writer storage.PointsWriter, bucketSvc influxdb.BucketService, orgSvc influxdb.OrganizationService, ss influxdb.SecretService, metricLabelKeys []string, ) (Dependencies, error)
func (Dependencies) PrometheusCollectors ¶
func (d Dependencies) PrometheusCollectors() []prometheus.Collector
PrometheusCollectors satisfies the prom.PrometheusCollector interface.
type FromDependencies ¶
type FromDependencies struct { Reader Reader BucketLookup BucketLookup OrganizationLookup OrganizationLookup Metrics *metrics }
func (FromDependencies) PrometheusCollectors ¶
func (d FromDependencies) PrometheusCollectors() []prometheus.Collector
PrometheusCollectors satisfies the PrometheusCollector interface.
func (FromDependencies) Validate ¶
func (d FromDependencies) Validate() error
type FromOpSpec ¶
type FromOpSpec struct { Bucket string `json:"bucket,omitempty"` BucketID string `json:"bucketID,omitempty"` }
func (*FromOpSpec) BucketsAccessed ¶
func (s *FromOpSpec) BucketsAccessed(orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter)
BucketsAccessed makes FromOpSpec a query.BucketAwareOperationSpec
func (*FromOpSpec) Kind ¶
func (s *FromOpSpec) Kind() flux.OperationKind
type FromProcedureSpec ¶
func (*FromProcedureSpec) Copy ¶
func (s *FromProcedureSpec) Copy() plan.ProcedureSpec
func (*FromProcedureSpec) Kind ¶
func (s *FromProcedureSpec) Kind() plan.ProcedureKind
func (*FromProcedureSpec) PostPhysicalValidate ¶
func (s *FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error
type GroupMode ¶
type GroupMode int
func ToGroupMode ¶
ToGroupMode accepts the group mode from Flux and produces the appropriate storage group mode.
type HostLookup ¶
type HostLookup interface { Hosts() []string Watch() <-chan struct{} }
type OrganizationLookup ¶
type PushDownFilterRule ¶
type PushDownFilterRule struct{}
PushDownFilterRule is a rule that pushes filters into from procedures to be evaluated in the storage layer. This rule is likely to be replaced by a more generic rule when we have a better framework for pushing filters, etc into sources.
func (PushDownFilterRule) Name ¶
func (PushDownFilterRule) Name() string
func (PushDownFilterRule) Pattern ¶
func (PushDownFilterRule) Pattern() plan.Pattern
type PushDownGroupRule ¶
type PushDownGroupRule struct{}
PushDownGroupRule pushes down a group operation to storage
func (PushDownGroupRule) Name ¶
func (rule PushDownGroupRule) Name() string
func (PushDownGroupRule) Pattern ¶
func (rule PushDownGroupRule) Pattern() plan.Pattern
type PushDownRangeRule ¶
type PushDownRangeRule struct{}
PushDownRangeRule pushes down a range filter to storage
func (PushDownRangeRule) Name ¶
func (rule PushDownRangeRule) Name() string
func (PushDownRangeRule) Pattern ¶
func (rule PushDownRangeRule) Pattern() plan.Pattern
Pattern matches 'from |> range'
type PushDownReadTagKeysRule ¶
type PushDownReadTagKeysRule struct{}
PushDownReadTagKeysRule matches 'ReadRange |> keys() |> keep() |> distinct()'. The 'from()' must have already been merged with 'range' and, optionally, may have been merged with 'filter'. If any other properties have been set on the from procedure, this rule will not rewrite anything.
func (PushDownReadTagKeysRule) Name ¶
func (rule PushDownReadTagKeysRule) Name() string
func (PushDownReadTagKeysRule) Pattern ¶
func (rule PushDownReadTagKeysRule) Pattern() plan.Pattern
type PushDownReadTagValuesRule ¶
type PushDownReadTagValuesRule struct{}
PushDownReadTagValuesRule matches 'ReadRange |> keep(columns: [tag]) |> group() |> distinct(column: tag)'. The 'from()' must have already been merged with 'range' and, optionally, may have been merged with 'filter'. If any other properties have been set on the from procedure, this rule will not rewrite anything.
func (PushDownReadTagValuesRule) Name ¶
func (rule PushDownReadTagValuesRule) Name() string
func (PushDownReadTagValuesRule) Pattern ¶
func (rule PushDownReadTagValuesRule) Pattern() plan.Pattern
type ReadFilterSpec ¶
type ReadGroupPhysSpec ¶
type ReadGroupPhysSpec struct { plan.DefaultCost ReadRangePhysSpec GroupMode flux.GroupMode GroupKeys []string AggregateMethod string }
func (*ReadGroupPhysSpec) Copy ¶
func (s *ReadGroupPhysSpec) Copy() plan.ProcedureSpec
func (*ReadGroupPhysSpec) Kind ¶
func (s *ReadGroupPhysSpec) Kind() plan.ProcedureKind
type ReadGroupSpec ¶
type ReadGroupSpec struct { ReadFilterSpec GroupMode GroupMode GroupKeys []string AggregateMethod string }
type ReadRangePhysSpec ¶
type ReadRangePhysSpec struct { plan.DefaultCost Bucket string BucketID string // FilterSet is set to true if there is a filter. FilterSet bool // Filter is the filter to use when calling into // storage. It must be possible to push down this // filter. Filter *semantic.FunctionExpression Bounds flux.Bounds }
func (*ReadRangePhysSpec) Copy ¶
func (s *ReadRangePhysSpec) Copy() plan.ProcedureSpec
func (*ReadRangePhysSpec) Kind ¶
func (s *ReadRangePhysSpec) Kind() plan.ProcedureKind
func (*ReadRangePhysSpec) LookupBucketID ¶
func (s *ReadRangePhysSpec) LookupBucketID(ctx context.Context, orgID influxdb.ID, buckets BucketLookup) (influxdb.ID, error)
func (*ReadRangePhysSpec) TimeBounds ¶
func (s *ReadRangePhysSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds
TimeBounds implements plan.BoundsAwareProcedureSpec.
type ReadTagKeysPhysSpec ¶
type ReadTagKeysPhysSpec struct {
ReadRangePhysSpec
}
func (*ReadTagKeysPhysSpec) Copy ¶
func (s *ReadTagKeysPhysSpec) Copy() plan.ProcedureSpec
func (*ReadTagKeysPhysSpec) Kind ¶
func (s *ReadTagKeysPhysSpec) Kind() plan.ProcedureKind
type ReadTagKeysSpec ¶
type ReadTagKeysSpec struct {
ReadFilterSpec
}
type ReadTagValuesPhysSpec ¶
type ReadTagValuesPhysSpec struct { ReadRangePhysSpec TagKey string }
func (*ReadTagValuesPhysSpec) Copy ¶
func (s *ReadTagValuesPhysSpec) Copy() plan.ProcedureSpec
func (*ReadTagValuesPhysSpec) Kind ¶
func (s *ReadTagValuesPhysSpec) Kind() plan.ProcedureKind
type ReadTagValuesSpec ¶
type ReadTagValuesSpec struct { ReadFilterSpec TagKey string }
type Reader ¶
type Reader interface { ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error) ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc *memory.Allocator) (TableIterator, error) ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc *memory.Allocator) (TableIterator, error) ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc *memory.Allocator) (TableIterator, error) Close() }
type SortedPivotRule ¶
type SortedPivotRule struct{}
SortedPivotRule is a rule that optimizes a pivot when it is directly after an influxdb from.
func (SortedPivotRule) Name ¶
func (SortedPivotRule) Name() string
func (SortedPivotRule) Pattern ¶
func (SortedPivotRule) Pattern() plan.Pattern
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
func (*Source) AddTransformation ¶
func (s *Source) AddTransformation(t execute.Transformation)
type StaticLookup ¶
type StaticLookup struct {
// contains filtered or unexported fields
}
func NewStaticLookup ¶
func NewStaticLookup(hosts []string) StaticLookup
func (StaticLookup) Hosts ¶
func (l StaticLookup) Hosts() []string
func (StaticLookup) Watch ¶
func (l StaticLookup) Watch() <-chan struct{}
type StorageDependencies ¶
type StorageDependencies struct { FromDeps FromDependencies BucketDeps BucketDependencies ToDeps ToDependencies }
func GetStorageDependencies ¶
func GetStorageDependencies(ctx context.Context) StorageDependencies
func (StorageDependencies) Inject ¶
func (d StorageDependencies) Inject(ctx context.Context) context.Context
func (StorageDependencies) PrometheusCollectors ¶
func (d StorageDependencies) PrometheusCollectors() []prometheus.Collector
PrometheusCollectors satisfies the prom.PrometheusCollector interface.
type TableIterator ¶
type TableIterator interface { flux.TableIterator Statistics() cursors.CursorStats }
TableIterator is a table iterator that also keeps track of cursor statistics from the storage engine.
type ToDependencies ¶
type ToDependencies struct { BucketLookup BucketLookup OrganizationLookup OrganizationLookup PointsWriter storage.PointsWriter }
ToDependencies contains the dependencies for executing the `to` function.
func (ToDependencies) Validate ¶
func (d ToDependencies) Validate() error
Validate returns an error if any required field is unset.
type ToOpSpec ¶
type ToOpSpec struct { Bucket string `json:"bucket"` BucketID string `json:"bucketID"` Org string `json:"org"` OrgID string `json:"orgID"` Host string `json:"host"` Token string `json:"token"` TimeColumn string `json:"timeColumn"` MeasurementColumn string `json:"measurementColumn"` TagColumns []string `json:"tagColumns"` FieldFn interpreter.ResolvedFunction `json:"fieldFn"` }
ToOpSpec is the flux.OperationSpec for the `to` flux function.
func (*ToOpSpec) BucketsAccessed ¶
func (o *ToOpSpec) BucketsAccessed(orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter)
BucketsAccessed returns the buckets accessed by the spec.
func (ToOpSpec) Kind ¶
func (ToOpSpec) Kind() flux.OperationKind
Kind returns the kind for the ToOpSpec function.
type ToProcedureSpec ¶
type ToProcedureSpec struct { plan.DefaultCost Spec *ToOpSpec }
ToProcedureSpec is the procedure spec for the `to` flux function.
func (*ToProcedureSpec) Copy ¶
func (o *ToProcedureSpec) Copy() plan.ProcedureSpec
Copy clones the procedure spec for `to` flux function.
func (*ToProcedureSpec) Kind ¶
func (o *ToProcedureSpec) Kind() plan.ProcedureKind
Kind returns the kind for the procedure spec for the `to` flux function.
type ToTransformation ¶
type ToTransformation struct { Ctx context.Context OrgID platform.ID BucketID platform.ID // contains filtered or unexported fields }
ToTransformation is the transformation for the `to` flux function.
func NewToTransformation ¶
func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, toSpec *ToProcedureSpec, deps ToDependencies) (x *ToTransformation, err error)
NewToTransformation returns a new *ToTransformation with the appropriate fields set.
func (*ToTransformation) Finish ¶
func (t *ToTransformation) Finish(id execute.DatasetID, err error)
Finish is called after the `to` flux function's transformation is done processing.
func (*ToTransformation) RetractTable ¶
RetractTable retracts the table for the transformation for the `to` flux function.
func (*ToTransformation) UpdateProcessingTime ¶
UpdateProcessingTime updates the processing time for the transformation for the `to` flux function.
func (*ToTransformation) UpdateWatermark ¶
UpdateWatermark updates the watermark for the transformation for the `to` flux function.