execute

package
v0.58.1-0...-af594cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2020 License: MIT Imports: 29 Imported by: 0

Documentation

Overview

Package execute contains the implementation of the execution phase in the query engine.

Index

Constants

View Source
const (
	MaxTime = math.MaxInt64
	MinTime = math.MinInt64
)
View Source
const (
	DefaultStartColLabel = "_start"
	DefaultStopColLabel  = "_stop"
	DefaultTimeColLabel  = "_time"
	DefaultValueColLabel = "_value"
)

Variables

View Source
var AllTime = Bounds{
	Start: MinTime,
	Stop:  MaxTime,
}
View Source
var DefaultAggregateConfig = AggregateConfig{
	Columns: []string{DefaultValueColLabel},
}
View Source
var DefaultSelectorConfig = SelectorConfig{
	Column: DefaultValueColLabel,
}

Functions

func AddNewTableCols

func AddNewTableCols(t flux.Table, builder TableBuilder, colMap []int) ([]int, error)

AddNewCols adds the columns of b onto builder that did not already exist. Returns the mapping of builder cols to table cols.

func AddTableCols

func AddTableCols(t flux.Table, builder TableBuilder) error

AddTableCols adds the columns of b onto builder.

func AddTableKeyCols

func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error

func AppendCol

func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error

AppendCol append a column from cr onto builder The indexes bj and cj are builder and col reader indexes respectively.

func AppendCols

func AppendCols(cr flux.ColReader, builder TableBuilder) error

AppendCols appends all columns from cr onto builder. This function assumes that builder and cr have the same column schema.

func AppendKeyValues

func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error

AppendKeyValues appends the key values to the right columns in the builder. The builder is expected to contain the key columns.

func AppendKeyValuesN

func AppendKeyValuesN(key flux.GroupKey, builder TableBuilder, n int) error

AppendKeyValuesN runs AppendKeyValues `n` times. This is different from ```

for i := 0; i < n; i++ {
  AppendKeyValues(key, builder)
}

``` Because it saves the overhead of calculating the column mapping `n` times.

func AppendMappedCols

func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error

AppendMappedCols appends all columns from cr onto builder. The colMap is a map of builder column index to cr column index.

func AppendMappedRecordExplicit

func AppendMappedRecordExplicit(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error

AppendMappedRecordExplicit appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, no value is appended.

func AppendMappedRecordWithNulls

func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error

AppendMappedRecordWithNulls appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, the column is created with null values

func AppendMappedTable

func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error

AppendMappedTable appends data from table t onto builder. The colMap is a map of builder column index to table column index.

func AppendRecord

func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error

AppendRecord appends the record from cr onto builder assuming matching columns.

func AppendTable

func AppendTable(t flux.Table, builder TableBuilder) error

AppendTable appends data from table t onto builder. This function assumes builder and t have the same column schema.

func BuilderColsMatchReader

func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool

BuilderColsMatchReader returns true if builder and cr have identical column sets (order dependent)

func CheckColType

func CheckColType(col flux.ColMeta, typ flux.ColType)

func ColIdx

func ColIdx(label string, cols []flux.ColMeta) int

func ColMap

func ColMap(colMap []int, builder TableBuilder, cols []flux.ColMeta) []int

ColMap writes a mapping of builder index to cols index into colMap. When colMap does not have enough capacity a new colMap is allocated. The colMap is always returned

func ContainsStr

func ContainsStr(strs []string, str string) bool

func ConvertFromKind

func ConvertFromKind(k semantic.Nature) flux.ColType

func ConvertToKind

func ConvertToKind(t flux.ColType) semantic.Nature

func CopyTable

func CopyTable(t flux.Table) (flux.BufferedTable, error)

CopyTable returns a buffered copy of the table and consumes the input table. If the input table is already buffered, it "consumes" the input and returns the same table.

The buffered table can then be copied additional times using the BufferedTable.Copy method.

This method should be used sparingly if at all. It will retain each of the buffers of data coming out of a table so the entire table is materialized in memory. For large datasets, this could potentially cause a problem. The allocator is meant to catch when this happens and prevent it.

func FormatResult

func FormatResult(w io.Writer, res flux.Result) error

FormatResult prints the result to w.

func GroupKeyForRowOn

func GroupKeyForRowOn(i int, cr flux.ColReader, on map[string]bool) flux.GroupKey

func HasCol

func HasCol(label string, cols []flux.ColMeta) bool

func NewAggregateTransformation

func NewAggregateTransformation(d Dataset, c TableBuilderCache, agg Aggregate, config AggregateConfig) *aggregateTransformation

func NewDataset

func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset

func NewEmptyTable

func NewEmptyTable(key flux.GroupKey, cols []flux.ColMeta) flux.Table

NewEmptyTable constructs a new empty table with the given group key and columns.

func NewGroupKey

func NewGroupKey(cols []flux.ColMeta, values []values.Value) flux.GroupKey

func NewIndexSelectorTransformation

func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation

func NewRowSelectorTransformation

func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation

func NewTableBuilderCache

func NewTableBuilderCache(a *memory.Allocator) *tableBuilderCache

func PanicUnknownType

func PanicUnknownType(typ flux.ColType)

func RegisterSource

func RegisterSource(k plan.ProcedureKind, c CreateSource)

func RegisterTransformation

func RegisterTransformation(k plan.ProcedureKind, c CreateTransformation)

RegisterTransformation adds a new registration mapping of procedure kind to transformation.

func ReplaceTransformation

func ReplaceTransformation(k plan.ProcedureKind, c CreateTransformation)

ReplaceTransformation changes an existing transformation registration.

func TablesEqual

func TablesEqual(left, right flux.Table, alloc *memory.Allocator) (bool, error)

TablesEqual takes two flux tables and compares them. Returns false if the tables have different keys, different columns, or if the data in any column does not match. Returns true otherwise. This function will consume the ColumnReader so if you are calling this from the a Process method, you may need to copy the table if you need to iterate over the data for other calculations.

func ValueForRow

func ValueForRow(cr flux.ColReader, i, j int) values.Value

ValueForRow retrieves a value from an arrow column reader at the given index.

Types

type AccumulationMode

type AccumulationMode int
const (
	// DiscardingMode will discard the data associated with a group key
	// after it has been processed.
	DiscardingMode AccumulationMode = iota

	// AccumulatingMode will retain the data associated with a group key
	// after it has been processed. If it has already sent a table with
	// that group key to a downstream transformation, it will signal
	// to that transformation that the previous table should be retracted.
	//
	// This is not implemented at the moment.
	AccumulatingMode
)

type Administration

type Administration interface {
	Context() context.Context

	ResolveTime(qt flux.Time) Time
	StreamContext() StreamContext
	Allocator() *memory.Allocator
	Parents() []DatasetID
}

type Aggregate

type Aggregate interface {
	NewBoolAgg() DoBoolAgg
	NewIntAgg() DoIntAgg
	NewUIntAgg() DoUIntAgg
	NewFloatAgg() DoFloatAgg
	NewStringAgg() DoStringAgg
}

type AggregateConfig

type AggregateConfig struct {
	plan.DefaultCost
	Columns []string `json:"columns"`
}

func (AggregateConfig) Copy

func (*AggregateConfig) ReadArgs

func (c *AggregateConfig) ReadArgs(args flux.Arguments) error

type Allocator

type Allocator struct {
	*memory.Allocator
}

Allocator tracks the amount of memory being consumed by a query. The allocator provides methods similar to make and append, to allocate large slices of data. The allocator also provides a Free method to account for when memory will be freed.

func (*Allocator) AppendBools

func (a *Allocator) AppendBools(slice []bool, vs ...bool) []bool

AppendBools appends bools to a slice

func (*Allocator) AppendFloats

func (a *Allocator) AppendFloats(slice []float64, vs ...float64) []float64

AppendFloats appends float64s to a slice

func (*Allocator) AppendInts

func (a *Allocator) AppendInts(slice []int64, vs ...int64) []int64

AppendInts appends int64s to a slice

func (*Allocator) AppendStrings

func (a *Allocator) AppendStrings(slice []string, vs ...string) []string

AppendStrings appends strings to a slice. Only the string headers are accounted for.

func (*Allocator) AppendTimes

func (a *Allocator) AppendTimes(slice []Time, vs ...Time) []Time

AppendTimes appends Times to a slice

func (*Allocator) AppendUInts

func (a *Allocator) AppendUInts(slice []uint64, vs ...uint64) []uint64

AppendUInts appends uint64s to a slice

func (*Allocator) Bools

func (a *Allocator) Bools(l, c int) []bool

Bools makes a slice of bool values.

func (*Allocator) Floats

func (a *Allocator) Floats(l, c int) []float64

Floats makes a slice of float64 values.

func (*Allocator) Free

func (a *Allocator) Free(n, size int)

Free informs the allocator that memory has been freed.

func (*Allocator) GrowBools

func (a *Allocator) GrowBools(slice []bool, n int) []bool

func (*Allocator) GrowFloats

func (a *Allocator) GrowFloats(slice []float64, n int) []float64

func (*Allocator) GrowInts

func (a *Allocator) GrowInts(slice []int64, n int) []int64

func (*Allocator) GrowStrings

func (a *Allocator) GrowStrings(slice []string, n int) []string

func (*Allocator) GrowTimes

func (a *Allocator) GrowTimes(slice []Time, n int) []Time

func (*Allocator) GrowUInts

func (a *Allocator) GrowUInts(slice []uint64, n int) []uint64

func (*Allocator) Ints

func (a *Allocator) Ints(l, c int) []int64

Ints makes a slice of int64 values.

func (*Allocator) Strings

func (a *Allocator) Strings(l, c int) []string

Strings makes a slice of string values. Only the string headers are accounted for.

func (*Allocator) Times

func (a *Allocator) Times(l, c int) []Time

Times makes a slice of Time values.

func (*Allocator) UInts

func (a *Allocator) UInts(l, c int) []uint64

UInts makes a slice of uint64 values.

type BoolValueFunc

type BoolValueFunc interface {
	ValueBool() bool
}

type Bounds

type Bounds struct {
	Start Time
	Stop  Time
}

func (Bounds) Contains

func (b Bounds) Contains(t Time) bool

func (Bounds) Duration

func (b Bounds) Duration() Duration

func (Bounds) Equal

func (b Bounds) Equal(o Bounds) bool

func (*Bounds) Intersect

func (b *Bounds) Intersect(o Bounds) Bounds

Intersect returns the intersection of two bounds. It returns empty bounds if one of the input bounds are empty. TODO: there are several places that implement bounds and related utilities.

consider a central place for them?

func (Bounds) IsEmpty

func (b Bounds) IsEmpty() bool

func (Bounds) Overlaps

func (b Bounds) Overlaps(o Bounds) bool

func (Bounds) Shift

func (b Bounds) Shift(d Duration) Bounds

func (Bounds) String

func (b Bounds) String() string

type ColListTable

type ColListTable struct {
	// contains filtered or unexported fields
}

ColListTable implements Table using list of columns. All data for the table is stored in RAM. As a result At* methods are provided directly on the table for easy access.

func (*ColListTable) Bools

func (t *ColListTable) Bools(j int) *array.Boolean

func (*ColListTable) Cols

func (t *ColListTable) Cols() []flux.ColMeta

func (*ColListTable) Do

func (t *ColListTable) Do(f func(flux.ColReader) error) error

func (*ColListTable) Done

func (t *ColListTable) Done()

func (*ColListTable) Empty

func (t *ColListTable) Empty() bool

func (*ColListTable) Floats

func (t *ColListTable) Floats(j int) *array.Float64

func (*ColListTable) GetRow

func (t *ColListTable) GetRow(row int) values.Object

GetRow takes a row index and returns the record located at that index in the cache

func (*ColListTable) Ints

func (t *ColListTable) Ints(j int) *array.Int64

func (*ColListTable) Key

func (t *ColListTable) Key() flux.GroupKey

func (*ColListTable) Len

func (t *ColListTable) Len() int

func (*ColListTable) NRows

func (t *ColListTable) NRows() int

func (*ColListTable) RefCount

func (t *ColListTable) RefCount(n int)

func (*ColListTable) Release

func (t *ColListTable) Release()

func (*ColListTable) Retain

func (t *ColListTable) Retain()

func (*ColListTable) Strings

func (t *ColListTable) Strings(j int) *array.Binary

func (*ColListTable) Times

func (t *ColListTable) Times(j int) *array.Int64

func (*ColListTable) UInts

func (t *ColListTable) UInts(j int) *array.Uint64

type ColListTableBuilder

type ColListTableBuilder struct {
	// contains filtered or unexported fields
}

func NewColListTableBuilder

func NewColListTableBuilder(key flux.GroupKey, a *memory.Allocator) *ColListTableBuilder

func (*ColListTableBuilder) AddCol

func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)

func (*ColListTableBuilder) AppendBool

func (b *ColListTableBuilder) AppendBool(j int, value bool) error

func (*ColListTableBuilder) AppendBools

func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error

func (*ColListTableBuilder) AppendFloat

func (b *ColListTableBuilder) AppendFloat(j int, value float64) error

func (*ColListTableBuilder) AppendFloats

func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float64) error

func (*ColListTableBuilder) AppendInt

func (b *ColListTableBuilder) AppendInt(j int, value int64) error

func (*ColListTableBuilder) AppendInts

func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int64) error

func (*ColListTableBuilder) AppendNil

func (b *ColListTableBuilder) AppendNil(j int) error

func (*ColListTableBuilder) AppendString

func (b *ColListTableBuilder) AppendString(j int, value string) error

func (*ColListTableBuilder) AppendStrings

func (b *ColListTableBuilder) AppendStrings(j int, vs *array.Binary) error

func (*ColListTableBuilder) AppendTime

func (b *ColListTableBuilder) AppendTime(j int, value Time) error

func (*ColListTableBuilder) AppendTimes

func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int64) error

func (*ColListTableBuilder) AppendUInt

func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error

func (*ColListTableBuilder) AppendUInts

func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint64) error

func (*ColListTableBuilder) AppendValue

func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error

func (*ColListTableBuilder) Bools

func (b *ColListTableBuilder) Bools(j int) []bool

func (*ColListTableBuilder) ClearData

func (b *ColListTableBuilder) ClearData()

func (*ColListTableBuilder) Cols

func (b *ColListTableBuilder) Cols() []flux.ColMeta

func (*ColListTableBuilder) Floats

func (b *ColListTableBuilder) Floats(j int) []float64

func (*ColListTableBuilder) GetRow

func (b *ColListTableBuilder) GetRow(row int) values.Object

GetRow takes a row index and returns the record located at that index in the cache

func (*ColListTableBuilder) GrowBools

func (b *ColListTableBuilder) GrowBools(j, n int) error

func (*ColListTableBuilder) GrowFloats

func (b *ColListTableBuilder) GrowFloats(j, n int) error

func (*ColListTableBuilder) GrowInts

func (b *ColListTableBuilder) GrowInts(j, n int) error

func (*ColListTableBuilder) GrowStrings

func (b *ColListTableBuilder) GrowStrings(j, n int) error

func (*ColListTableBuilder) GrowTimes

func (b *ColListTableBuilder) GrowTimes(j, n int) error

func (*ColListTableBuilder) GrowUInts

func (b *ColListTableBuilder) GrowUInts(j, n int) error

func (*ColListTableBuilder) Ints

func (b *ColListTableBuilder) Ints(j int) []int64

func (*ColListTableBuilder) Key

func (*ColListTableBuilder) Len

func (b *ColListTableBuilder) Len() int

func (*ColListTableBuilder) LevelColumns

func (b *ColListTableBuilder) LevelColumns() error

func (*ColListTableBuilder) NCols

func (b *ColListTableBuilder) NCols() int

func (*ColListTableBuilder) NRows

func (b *ColListTableBuilder) NRows() int

func (*ColListTableBuilder) Release

func (b *ColListTableBuilder) Release()

func (*ColListTableBuilder) SetBool

func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error

func (*ColListTableBuilder) SetFloat

func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error

func (*ColListTableBuilder) SetInt

func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error

func (*ColListTableBuilder) SetNil

func (b *ColListTableBuilder) SetNil(i, j int) error

func (*ColListTableBuilder) SetString

func (b *ColListTableBuilder) SetString(i int, j int, value string) error

func (*ColListTableBuilder) SetTime

func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error

func (*ColListTableBuilder) SetUInt

func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error

func (*ColListTableBuilder) SetValue

func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error

func (*ColListTableBuilder) SliceColumns

func (b *ColListTableBuilder) SliceColumns(start, stop int) error

SliceColumns iterates over each column of b and re-slices them to the range [start:stop].

func (*ColListTableBuilder) Sort

func (b *ColListTableBuilder) Sort(cols []string, desc bool)

func (*ColListTableBuilder) Strings

func (b *ColListTableBuilder) Strings(j int) []string

func (*ColListTableBuilder) Table

func (b *ColListTableBuilder) Table() (flux.Table, error)

func (*ColListTableBuilder) Times

func (b *ColListTableBuilder) Times(j int) []values.Time

func (*ColListTableBuilder) UInts

func (b *ColListTableBuilder) UInts(j int) []uint64

type CreateSource

type CreateSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)

type CreateTransformation

type CreateTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)

type DataCache

type DataCache interface {
	Table(flux.GroupKey) (flux.Table, error)

	ForEach(func(flux.GroupKey))
	ForEachWithContext(func(flux.GroupKey, Trigger, TableContext))

	DiscardTable(flux.GroupKey)
	ExpireTable(flux.GroupKey)

	SetTriggerSpec(t plan.TriggerSpec)
}

DataCache holds all working data for a transformation.

type Dataset

type Dataset interface {
	Node

	RetractTable(key flux.GroupKey) error
	UpdateProcessingTime(t Time) error
	UpdateWatermark(mark Time) error
	Finish(error)

	SetTriggerSpec(t plan.TriggerSpec)
}

Dataset represents the set of data produced by a transformation.

func NewAggregateTransformationAndDataset

func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, agg Aggregate, config AggregateConfig, a *memory.Allocator) (*aggregateTransformation, Dataset)

func NewIndexSelectorTransformationAndDataset

func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, config SelectorConfig, a *memory.Allocator) (*indexSelectorTransformation, Dataset)

func NewRowSelectorTransformationAndDataset

func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, config SelectorConfig, a *memory.Allocator) (*rowSelectorTransformation, Dataset)

type DatasetID

type DatasetID uuid.UUID
var ZeroDatasetID DatasetID

func DatasetIDFromNodeID

func DatasetIDFromNodeID(id plan.NodeID) DatasetID

func (DatasetID) IsZero

func (id DatasetID) IsZero() bool

func (DatasetID) String

func (id DatasetID) String() string

type Dispatcher

type Dispatcher interface {
	// Schedule fn to be executed
	Schedule(fn ScheduleFunc)
}

Dispatcher schedules work for a query. Each transformation submits work to be done to the dispatcher. Then the dispatcher schedules to work based on the available resources.

type DoBoolAgg

type DoBoolAgg interface {
	ValueFunc
	DoBool(*array.Boolean)
}

type DoBoolIndexSelector

type DoBoolIndexSelector interface {
	DoBool(*array.Boolean) []int
}

type DoBoolRowSelector

type DoBoolRowSelector interface {
	Rower
	DoBool(vs *array.Boolean, cr flux.ColReader)
}

type DoFloatAgg

type DoFloatAgg interface {
	ValueFunc
	DoFloat(*array.Float64)
}

type DoFloatIndexSelector

type DoFloatIndexSelector interface {
	DoFloat(*array.Float64) []int
}

type DoFloatRowSelector

type DoFloatRowSelector interface {
	Rower
	DoFloat(vs *array.Float64, cr flux.ColReader)
}

type DoIntAgg

type DoIntAgg interface {
	ValueFunc
	DoInt(*array.Int64)
}

type DoIntIndexSelector

type DoIntIndexSelector interface {
	DoInt(*array.Int64) []int
}

type DoIntRowSelector

type DoIntRowSelector interface {
	Rower
	DoInt(vs *array.Int64, cr flux.ColReader)
}

type DoStringAgg

type DoStringAgg interface {
	ValueFunc
	DoString(*array.Binary)
}

type DoStringIndexSelector

type DoStringIndexSelector interface {
	DoString(*array.Binary) []int
}

type DoStringRowSelector

type DoStringRowSelector interface {
	Rower
	DoString(vs *array.Binary, cr flux.ColReader)
}

type DoTimeIndexSelector

type DoTimeIndexSelector interface {
	DoTime(*array.Int64) []int
}

type DoTimeRowSelector

type DoTimeRowSelector interface {
	Rower
	DoTime(vs *array.Int64, cr flux.ColReader)
}

type DoUIntAgg

type DoUIntAgg interface {
	ValueFunc
	DoUInt(*array.Uint64)
}

type DoUIntIndexSelector

type DoUIntIndexSelector interface {
	DoUInt(*array.Uint64) []int
}

type DoUIntRowSelector

type DoUIntRowSelector interface {
	Rower
	DoUInt(vs *array.Uint64, cr flux.ColReader)
}

type Duration

type Duration = values.Duration

type Executor

type Executor interface {
	// Execute will begin execution of the plan.Spec using the memory allocator.
	// This returns a mapping of names to the query results.
	// This will also return a channel for the Metadata from the query. The channel
	// may return zero or more values. The returned channel must not require itself to
	// be read so the executor must allocate enough space in the channel so if the channel
	// is unread that it will not block.
	Execute(ctx context.Context, p *plan.Spec, a *memory.Allocator) (map[string]flux.Result, <-chan flux.Metadata, error)
}

func NewExecutor

func NewExecutor(logger *zap.Logger) Executor

type FinishMsg

type FinishMsg interface {
	Message
	Error() error
}

type FloatValueFunc

type FloatValueFunc interface {
	ValueFloat() float64
}

type FormatOptions

type FormatOptions struct {
	// RepeatHeaderCount is the number of rows to print before printing the header again.
	// If zero then the headers are not repeated.
	RepeatHeaderCount int

	NullRepresentation string
}

func DefaultFormatOptions

func DefaultFormatOptions() *FormatOptions

type Formatter

type Formatter struct {
	// contains filtered or unexported fields
}

Formatter writes a table to a Writer.

func NewFormatter

func NewFormatter(tbl flux.Table, opts *FormatOptions) *Formatter

NewFormatter creates a Formatter for a given table. If opts is nil, the DefaultFormatOptions are used.

func (*Formatter) WriteTo

func (f *Formatter) WriteTo(out io.Writer) (int64, error)

WriteTo writes the formatted table data to w.

type GroupKeyBuilder

type GroupKeyBuilder struct {
	// contains filtered or unexported fields
}

GroupKeyBuilder is used to construct a GroupKey by keeping a mutable copy in memory.

func NewGroupKeyBuilder

func NewGroupKeyBuilder(key flux.GroupKey) *GroupKeyBuilder

NewGroupKeyBuilder creates a new GroupKeyBuilder from an existing GroupKey. If the GroupKey passed is nil, a blank GroupKeyBuilder is constructed.

func (*GroupKeyBuilder) AddKeyValue

func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder

AddKeyValue will add a new group key to the existing builder.

func (*GroupKeyBuilder) Build

func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)

Build will construct the GroupKey. If there is any problem with the GroupKey (such as one of the columns is not a valid type), the error will be returned here.

func (*GroupKeyBuilder) Grow

func (gkb *GroupKeyBuilder) Grow(n int)

Grow will grow the internal capacity of the group key to the given number.

func (*GroupKeyBuilder) Len

func (gkb *GroupKeyBuilder) Len() int

Len returns the current length of the group key.

func (*GroupKeyBuilder) SetKeyValue

func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder

SetKeyValue will set an existing key/value to the given pair, or if key is not found, add a new group key to the existing builder.

type GroupLookup

type GroupLookup struct {
	// contains filtered or unexported fields
}

GroupLookup is a container that maps group keys to a value.

The GroupLookup container is optimized for appending values in order and iterating over them in the same order. The GroupLookup will always have a deterministic order for the Range call, but that order may be influenced by the order that inserts happen.

At the current moment, the GroupLookup maintains the groups in sorted order although future implementations may change that.

To optimize inserts, the lookup is kept in an array of arrays. The first layer keeps a group of sorted key groups and each of these groups maintains their own sorted list of keys. Each time a new key is added, it is appended to the end of one of the key lists. If a key needs to be added in the middle of a list, the list is split into two so that the key can be appended. The index of the last list to be used is maintained so that future inserts can skip past the first search for the key list and an insert can be done in constant time. Similarly, a lookup for a key that was just inserted will also be in constant time with the worst case time being O(n log n).

func NewGroupLookup

func NewGroupLookup() *GroupLookup

NewGroupLookup constructs a GroupLookup.

func (*GroupLookup) Clear

func (l *GroupLookup) Clear()

Clear will clear the group lookup and reset it to contain nothing.

func (*GroupLookup) Delete

func (l *GroupLookup) Delete(key flux.GroupKey) (v interface{}, found bool)

Delete will remove the key from this GroupLookup. It will return the same thing as a call to Lookup.

func (*GroupLookup) Lookup

func (l *GroupLookup) Lookup(key flux.GroupKey) (interface{}, bool)

Lookup will retrieve the value associated with the given key if it exists.

func (*GroupLookup) LookupOrCreate

func (l *GroupLookup) LookupOrCreate(key flux.GroupKey, fn func() interface{}) interface{}

LookupOrCreate will retrieve the value associated with the given key or, if it does not exist, will invoke the function to create one and set it in the group lookup.

func (*GroupLookup) Range

func (l *GroupLookup) Range(f func(key flux.GroupKey, value interface{}))

Range will iterate over all groups keys in a stable ordering. Range must not be called within another call to Range. It is safe to call Set/Delete while ranging.

func (*GroupLookup) Set

func (l *GroupLookup) Set(key flux.GroupKey, value interface{})

Set will set the value for the given key. It will overwrite an existing value.

type IndexSelector

type IndexSelector interface {
	NewTimeSelector() DoTimeIndexSelector
	NewBoolSelector() DoBoolIndexSelector
	NewIntSelector() DoIntIndexSelector
	NewUIntSelector() DoUIntIndexSelector
	NewFloatSelector() DoFloatIndexSelector
	NewStringSelector() DoStringIndexSelector
}

type IntValueFunc

type IntValueFunc interface {
	ValueInt() int64
}

type Message

type Message interface {
	Type() MessageType
	SrcDatasetID() DatasetID
}

type MessageQueue

type MessageQueue interface {
	Push(Message)
	Pop() Message
}

MessageQueue provides a concurrency safe queue for messages. The queue must have a single consumer calling Pop.

type MessageType

type MessageType int
const (
	RetractTableType MessageType = iota
	ProcessType
	UpdateWatermarkType
	UpdateProcessingTimeType
	FinishType
)

type MetadataNode

type MetadataNode interface {
	Node
	Metadata() flux.Metadata
}

MetadataNode is a node that has additional metadata that should be added to the result after it is processed.

type Node

type Node interface {
	AddTransformation(t Transformation)
}

type PassthroughDataset

type PassthroughDataset struct {
	// contains filtered or unexported fields
}

PassthroughDataset is a Dataset that will passthrough the processed data to the next Transformation.

func NewPassthroughDataset

func NewPassthroughDataset(id DatasetID) *PassthroughDataset

NewPassthroughDataset constructs a new PassthroughDataset.

func (*PassthroughDataset) AddTransformation

func (d *PassthroughDataset) AddTransformation(t Transformation)

func (*PassthroughDataset) Finish

func (d *PassthroughDataset) Finish(err error)

func (*PassthroughDataset) Process

func (d *PassthroughDataset) Process(tbl flux.Table) error

func (*PassthroughDataset) RetractTable

func (d *PassthroughDataset) RetractTable(key flux.GroupKey) error

func (*PassthroughDataset) SetTriggerSpec

func (d *PassthroughDataset) SetTriggerSpec(t plan.TriggerSpec)

func (*PassthroughDataset) UpdateProcessingTime

func (d *PassthroughDataset) UpdateProcessingTime(t Time) error

func (*PassthroughDataset) UpdateWatermark

func (d *PassthroughDataset) UpdateWatermark(mark Time) error

type ProcessMsg

type ProcessMsg interface {
	Message
	Table() flux.Table
}

type RandomAccessGroupLookup

type RandomAccessGroupLookup struct {
	// contains filtered or unexported fields
}

RandomAccessGroupLookup is a GroupLookup container that is optimized for random access.

func NewRandomAccessGroupLookup

func NewRandomAccessGroupLookup() *RandomAccessGroupLookup

NewRandomAccessGroupLookup constructs a RandomAccessGroupLookup.

func (*RandomAccessGroupLookup) Clear

func (l *RandomAccessGroupLookup) Clear()

Clear will clear the group lookup and reset it to contain nothing.

func (*RandomAccessGroupLookup) Delete

func (l *RandomAccessGroupLookup) Delete(key flux.GroupKey) (v interface{}, found bool)

Delete will remove the key from this GroupLookup. It will return the same thing as a call to Lookup.

func (*RandomAccessGroupLookup) Lookup

func (l *RandomAccessGroupLookup) Lookup(key flux.GroupKey) (interface{}, bool)

Lookup will retrieve the value associated with the given key if it exists.

func (*RandomAccessGroupLookup) LookupOrCreate

func (l *RandomAccessGroupLookup) LookupOrCreate(key flux.GroupKey, fn func() interface{}) interface{}

LookupOrCreate will retrieve the value associated with the given key or, if it does not exist, will invoke the function to create one and set it in the group lookup.

func (*RandomAccessGroupLookup) Range

func (l *RandomAccessGroupLookup) Range(f func(key flux.GroupKey, value interface{}))

Range will iterate over all groups keys in a stable ordering. Range must not be called within another call to Range. It is safe to call Set/Delete while ranging.

func (*RandomAccessGroupLookup) Set

func (l *RandomAccessGroupLookup) Set(key flux.GroupKey, value interface{})

Set will set the value for the given key. It will overwrite an existing value.

type RetractTableMsg

type RetractTableMsg interface {
	Message
	Key() flux.GroupKey
}

type Row

type Row struct {
	Values []interface{}
}

func ReadRow

func ReadRow(i int, cr flux.ColReader) (row Row)

type RowMapFn

type RowMapFn struct {
	// contains filtered or unexported fields
}

func NewRowMapFn

func NewRowMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) (*RowMapFn, error)

func (*RowMapFn) Eval

func (f *RowMapFn) Eval(ctx context.Context, row int, cr flux.ColReader) (values.Object, error)

func (*RowMapFn) Prepare

func (f *RowMapFn) Prepare(cols []flux.ColMeta) error

func (*RowMapFn) Type

func (f *RowMapFn) Type() semantic.MonoType

type RowPredicateFn

type RowPredicateFn struct {
	// contains filtered or unexported fields
}

func NewRowPredicateFn

func NewRowPredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) (*RowPredicateFn, error)

func (*RowPredicateFn) Eval

func (f *RowPredicateFn) Eval(ctx context.Context, record values.Object) (bool, error)

func (*RowPredicateFn) EvalRow

func (f *RowPredicateFn) EvalRow(ctx context.Context, row int, cr flux.ColReader) (bool, error)

func (*RowPredicateFn) InferredInputType

func (f *RowPredicateFn) InferredInputType() semantic.MonoType

InferredInputType will return the inferred input type. This type may contain type variables and will only contain the properties that could be inferred from type inference.

func (*RowPredicateFn) InputType

func (f *RowPredicateFn) InputType() semantic.MonoType

InputType will return the prepared input type. This will be a fully realized type that was created after preparing the function with Prepare.

func (*RowPredicateFn) Prepare

func (f *RowPredicateFn) Prepare(cols []flux.ColMeta) error

type RowReader

type RowReader interface {
	Next() bool
	GetNextRow() ([]values.Value, error)
	ColumnNames() []string
	ColumnTypes() []flux.ColType
	SetColumns([]interface{})
	io.Closer
}

type RowReduceFn

type RowReduceFn struct {
	// contains filtered or unexported fields
}

func NewRowReduceFn

func NewRowReduceFn(fn *semantic.FunctionExpression, scope compiler.Scope) (*RowReduceFn, error)

func (*RowReduceFn) Eval

func (f *RowReduceFn) Eval(ctx context.Context, row int, cr flux.ColReader, extraParams map[string]values.Value) (values.Object, error)

func (*RowReduceFn) Prepare

func (f *RowReduceFn) Prepare(cols []flux.ColMeta, reducerType map[string]semantic.MonoType) error

func (*RowReduceFn) Type

func (f *RowReduceFn) Type() semantic.MonoType

type RowSelector

type RowSelector interface {
	NewTimeSelector() DoTimeRowSelector
	NewBoolSelector() DoBoolRowSelector
	NewIntSelector() DoIntRowSelector
	NewUIntSelector() DoUIntRowSelector
	NewFloatSelector() DoFloatRowSelector
	NewStringSelector() DoStringRowSelector
}

type Rower

type Rower interface {
	Rows() []Row
}

type ScheduleFunc

type ScheduleFunc func(ctx context.Context, throughput int)

ScheduleFunc is a function that represents work to do. The throughput is the maximum number of messages to process for this scheduling.

type SelectorConfig

type SelectorConfig struct {
	plan.DefaultCost
	Column string `json:"column"`
}

func (*SelectorConfig) ReadArgs

func (c *SelectorConfig) ReadArgs(args flux.Arguments) error

type Source

type Source interface {
	Node
	Run(ctx context.Context)
}

func CreateSourceFromDecoder

func CreateSourceFromDecoder(decoder SourceDecoder, dsid DatasetID, a Administration) (Source, error)

CreateSourceFromDecoder takes an implementation of a SourceDecoder, as well as a dataset ID and Administration type and creates an execute.Source.

func CreateSourceFromIterator

func CreateSourceFromIterator(iterator SourceIterator, dsid DatasetID) (Source, error)

CreateSourceFromIterator takes an implementation of a SourceIterator as well as a dataset ID and creates an execute.Source.

type SourceDecoder

type SourceDecoder interface {
	Connect(ctx context.Context) error
	Fetch(ctx context.Context) (bool, error)
	Decode(ctx context.Context) (flux.Table, error)
	Close() error
}

SourceDecoder is an interface that generalizes the process of retrieving data from an unspecified data source.

Connect implements the logic needed to connect directly to the data source.

Fetch implements a single fetch of data from the source (may be called multiple times). Should return false when there is no more data to retrieve.

Decode implements the process of marshaling the data returned by the source into a flux.Table type.

In executing the retrieval process, Connect is called once at the onset, and subsequent calls of Fetch() and Decode() are called iteratively until the data source is fully consumed.

type SourceIterator

type SourceIterator interface {
	// Do will invoke the Source and cause each materialized flux.Table
	// to the given function.
	Do(ctx context.Context, f func(flux.Table) error) error
}

SourceIterator is an interface for iterating over flux.Table values in a source. It provides a common interface for creating an execute.Source in an iterative way.

type StreamContext

type StreamContext interface {
	Bounds() *Bounds
}

StreamContext represents necessary context for a single stream of query data.

type StringValueFunc

type StringValueFunc interface {
	ValueString() string
}

type TableBuilder

type TableBuilder interface {
	Key() flux.GroupKey

	NRows() int
	NCols() int
	Cols() []flux.ColMeta

	// AddCol increases the size of the table by one column.
	// The index of the column is returned.
	AddCol(flux.ColMeta) (int, error)

	// Set sets the value at the specified coordinates
	// The rows and columns must exist before calling set, otherwise Set panics.
	SetValue(i, j int, value values.Value) error
	SetNil(i, j int) error

	// Append will add a single value to the end of a column.  Will set the number of
	// rows in the table to the size of the new column. It's the caller's job to make sure
	// that the expected number of rows in each column is equal.
	AppendBool(j int, value bool) error
	AppendInt(j int, value int64) error
	AppendUInt(j int, value uint64) error
	AppendFloat(j int, value float64) error
	AppendString(j int, value string) error
	AppendTime(j int, value Time) error
	AppendValue(j int, value values.Value) error
	AppendNil(j int) error

	// AppendBools and similar functions will append multiple values to column j.  As above,
	// it will set the numer of rows in the table to the size of the new column.  It's the
	// caller's job to make sure that the expected number of rows in each column is equal.
	AppendBools(j int, vs *array.Boolean) error
	AppendInts(j int, vs *array.Int64) error
	AppendUInts(j int, vs *array.Uint64) error
	AppendFloats(j int, vs *array.Float64) error
	AppendStrings(j int, vs *array.Binary) error
	AppendTimes(j int, vs *array.Int64) error

	// GrowBools and similar functions will extend column j by n zero-values for the respective type.
	// If the column has enough capacity, no reallocation is necessary.  If the capacity is insufficient,
	// a new slice is allocated with 1.5*newCapacity.  As with the Append functions, it is the
	// caller's job to make sure that the expected number of rows in each column is equal.
	GrowBools(j, n int) error
	GrowInts(j, n int) error
	GrowUInts(j, n int) error
	GrowFloats(j, n int) error
	GrowStrings(j, n int) error
	GrowTimes(j, n int) error

	// LevelColumns will check for columns that are too short and Grow them
	// so that each column is of uniform size.
	LevelColumns() error

	// Sort the rows of the by the values of the columns in the order listed.
	Sort(cols []string, desc bool)

	// ClearData removes all rows, while preserving the column meta data.
	ClearData()

	// Release releases any extraneous memory that has been retained.
	Release()

	// Table returns the table that has been built.
	// Further modifications of the builder will not effect the returned table.
	Table() (flux.Table, error)
}

TableBuilder builds tables that can be used multiple times

type TableBuilderCache

type TableBuilderCache interface {
	// TableBuilder returns an existing or new TableBuilder for the given meta data.
	// The boolean return value indicates if TableBuilder is new.
	TableBuilder(key flux.GroupKey) (TableBuilder, bool)
	ForEachBuilder(f func(flux.GroupKey, TableBuilder))
}

type TableContext

type TableContext struct {
	Key   flux.GroupKey
	Count int
}

type TablePredicateFn

type TablePredicateFn struct {
	// contains filtered or unexported fields
}

func NewTablePredicateFn

func NewTablePredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) (*TablePredicateFn, error)

func (*TablePredicateFn) Eval

func (f *TablePredicateFn) Eval(ctx context.Context, tbl flux.Table) (bool, error)

func (*TablePredicateFn) Prepare

func (f *TablePredicateFn) Prepare(tbl flux.Table) error

type Time

type Time = values.Time

func Now

func Now() Time

type Transformation

type Transformation interface {
	RetractTable(id DatasetID, key flux.GroupKey) error
	// Process takes in one flux Table, performs data processing on it and
	// writes that table to a DataCache
	Process(id DatasetID, tbl flux.Table) error
	UpdateWatermark(id DatasetID, t Time) error
	UpdateProcessingTime(id DatasetID, t Time) error
	// Finish indicates that the Transformation is done processing. It is
	// the last method called on the Transformation
	Finish(id DatasetID, err error)
}

Transformation represents functions that stream a set of tables, performs data processing on them and produces an output stream of tables

type TransformationSet

type TransformationSet []Transformation

TransformationSet is a group of transformations.

func (TransformationSet) Finish

func (ts TransformationSet) Finish(id DatasetID, err error)

func (TransformationSet) Process

func (ts TransformationSet) Process(id DatasetID, tbl flux.Table) error

func (TransformationSet) RetractTable

func (ts TransformationSet) RetractTable(id DatasetID, key flux.GroupKey) error

func (TransformationSet) UpdateProcessingTime

func (ts TransformationSet) UpdateProcessingTime(id DatasetID, time Time) error

func (TransformationSet) UpdateWatermark

func (ts TransformationSet) UpdateWatermark(id DatasetID, time Time) error

type Transport

type Transport interface {
	Transformation
	// Finished reports when the Transport has completed and there is no more work to do.
	Finished() <-chan struct{}
}

type Trigger

type Trigger interface {
	Triggered(TriggerContext) bool
	Finished() bool
	Reset()
}

func NewTriggerFromSpec

func NewTriggerFromSpec(spec plan.TriggerSpec) Trigger

type TriggerContext

type TriggerContext struct {
	Table                 TableContext
	Watermark             Time
	CurrentProcessingTime Time
}

type UIntValueFunc

type UIntValueFunc interface {
	ValueUInt() uint64
}

type UpdateProcessingTimeMsg

type UpdateProcessingTimeMsg interface {
	Message
	ProcessingTime() Time
}

type UpdateWatermarkMsg

type UpdateWatermarkMsg interface {
	Message
	WatermarkTime() Time
}

type ValueFunc

type ValueFunc interface {
	Type() flux.ColType
	IsNull() bool
}

type Window

type Window struct {
	Every  Duration
	Period Duration
	Offset Duration
}

func NewWindow

func NewWindow(every, period, offset Duration) (Window, error)

NewWindow creates a window with the given parameters, and normalizes the offset to a small positive duration. It also validates that the durations are valid when used within a window.

func (Window) GetEarliestBounds

func (w Window) GetEarliestBounds(t Time) Bounds

GetEarliestBounds returns the bounds for the earliest window bounds that contains the given time t. For underlapping windows that do not contain time t, the window directly after time t will be returned.

func (Window) GetOverlappingBounds

func (w Window) GetOverlappingBounds(b Bounds) []Bounds

GetOverlappingBounds returns a slice of bounds for each window that overlaps the input bounds b.

func (Window) IsValid

func (w Window) IsValid() error

IsValid will check if this Window is valid and it will return an error if it isn't.

Directories

Path Synopsis
Package executetest contains utilities for testing the query execution phase.
Package executetest contains utilities for testing the query execution phase.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL