flux

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2018 License: MIT Imports: 19 Imported by: 88

README

Flux - Influx data language

CircleCI

fluxd is an HTTP server for running Flux queries to one or more InfluxDB servers.

fluxd runs on port 8093 by default

Specification

A complete specification can be found in SPEC.md.

Installation

Generic Installation
  1. Use InfluxDB nightly builds, which can be found here: https://portal.influxdata.com/downloads. Write data to the instance by using telegraf or some other data source.

Note: InfluxDB uses port 8082 for handling Flux queries. Ensure this port is accessible. This port has no authentication.

  1. Download fluxd from nightly builds: https://portal.influxdata.com/downloads .

  2. Start fluxd. It will connect to the default host and port which is localhost:8082. To run in federated mode, add the --storage-hosts option with each host separated by a comma.

fluxd --storage-hosts localhost:8082
  1. To run a query, POST a Flux query string to /query as the query parameter:
curl -XPOST --data-urlencode \
'query=from(bucket:"telegraf/autogen")
    |> filter(fn: (r) => r._measurement == "cpu" AND r._field == "usage_user")
    |> range(start:-170h)
    |> sum()' \
http://localhost:8093/query?organization=my-org

Any value can be used for the organization parameter. It does not apply to running flux queries against the InfluxDB 1.x nightlies but is required.

Docker Installation

There are now images for Flux and InfluxDB nightlies. If you have docker installed on your machine, this can be an easier method of trying out Flux on your local computer.

  1. Create a docker network.
docker network create influxdb
  1. Start the InfluxDB nightly. You can use either the nightly or nightly-alpine tag.
docker volume create influxdb
docker run -d --name=influxdb --net=influxdb -p 8086:8086 -v influxdb:/var/lib/influxdb quay.io/influxdb/influxdb:nightly

Note: If you run influxd in a container and fluxd outside of a container, you must add -p 8082:8082 to expose the flux port.

  1. Start fluxd using the nightly image. There is no alpine image for this yet.
docker run -d --name=flux --net=influxdb -p 8093:8093 quay.io/influxdb/flux:nightly
  1. Follow the instructions from the General Installation section for how to query the server.

  2. When updating, ensure that you pull both nightlies and restart them at the same time. We are making changes often and using the nightlies from the same night will result in fewer problems than only updating one of them.

Prometheus metrics

Metrics are exposed on /metrics. fluxd records the number of queries and the number of different functions within Flux queries

Federated Mode

By passing multiple hosts to the --storage-hosts option, fluxd will query multiple InfluxDB servers.

For example:

fluxd --storage-hosts influxdb1:8082,influxdb2:8082

The results from multiple InfluxDB are merged together as if there was one server.

Getting Started

Flux runs a query by reading a data source into a collection of tables and then passing each table through transformation steps to describe the desired query operations. Each table is composed of zero or more rows. Transformations are represented as functions which take a table of data as an input argument and return a new table that has been transformed. There are also special functions that combine and separate existing tables into new tables with a different grouping.

Basic Syntax

All queries begin with the function from. It is a source function that does not accept any input, but produces a stream of tables as output. All queries must be followed by a range function which will limit the time range that data is selected from. Even if you want all data, you must specify a time range. This is so it is explicit that a user wants to query the entire database instead of the much more common range of time data.

from(bucket: "telegraf/autogen") |> range(start: -5m)

Function parameters are all keyword arguments. There are no positional arguments. The from function takes a single parameter: bucket. This specifies the InfluxDB bucket that data should be read from. The from function will organize the data so that each series is its own table. That means that all transformations will happen per series unless this is changed by using group or window (explained below). If you are familiar with InfluxDB 1.x, this is the opposite of InfluxQL's default behavior. InfluxQL would automatically group all series into the same table.

Functions are separated by the |> operator. This operator will take the stream of tables from one function and it will send it as input to another function that takes a stream of tables as the input. In this case, the from function outputs a stream of tables and sends that output to the range function which filters out any rows from each table that are not within the specified time range. The range function takes two parameters: start and stop. The default stop time is the current time and a duration, like -5m, can be used to specify a relative time from the current time. That means the above query is asking for all data within the last 5 minutes.

The from function creates a table where each row has the following attributes:

  • _measurement - the measurement of the series
  • _field - the field of the series
  • _value - the output value
  • _start - the start time of the table (equal to the range start)
  • _stop - the stop time of the table (equal to the range stop)
  • _time - the time for each row

The |> operator is used extensively in flux so you will see it in all of the query examples. Each transformation can be chained to another transformation so, while the examples below will be simple, they can be combined to yield the desired query and table structure.

Filter rows with an anonymous function

The rows can be filtered by using the filter function. When communicating with influxd, the measurement name is put into the _measurement tag and the field name is put into the _field tag. If you wanted to filter by a specific measurement or field, you could do that by using filter like this:

from(bucket: "telegraf/autogen") |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")

The filter function takes a function returning a boolean as its only parameter. The function parameter name is fn. An anonymous function is defined by using parenthesis to specify the function arguments, using the => operator, and then either followed by a single line with the expression. See the User Defined Functions section for more information about defining functions and how to create your own.

When accessing data in a table, dot syntax or indexing syntax can be used. In the above, we used dot syntax. You can also use the indexing syntax like this: r["_measurement"]. This document will use the dot syntax, but either one can be used anywhere.

It is also common in flux to break up longer lines by including a newline between the different function calls. It is convention to have a newline followed by a tab and the pipe operator before writing the next function.

Limit the number of rows

The limit function can be used to limit the number of points in each table. It takes a single parameter which is n.

from(bucket: "telegraf/autogen") |> range(start: -5m) |> limit(n: 1)
Aggregates and Selectors

Aggregates and selectors will execute the aggregation/selection function on each table. The output is defined for each function, but most aggregates will output a single row for each table and most selectors will select a single row for each table.

To find the mean of each table, you could use the mean() function.

from(bucket: "telegraf/autogen") |> range(start: -5m) |> mean()

If you wanted to find the maximum value in each table, you could use max().

from(bucket: "telegraf/autogen") |> range(start: -5m) |> max()

The full list of aggregation and selection functions is located in the spec.

Grouping and Windowing

Since flux will group each series into its own table, we sometimes need to modify this grouping if we wanted to combine different series. As an example, what if we wanted to know the average user cpu usage for each AWS region? A common schema would be to have two tags: region and host. We would write that query like this:

from(bucket: "telegraf/autogen") |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
    |> group(columns: ["region"])
    |> mean()

The group function would take every row that had the same region value and put it into a single table. If we had servers in two different regions, it would result in us having two different tables.

Similarly, if we wanted to group points into buckets of time, the window function can do that. We can modify the above function to give us the mean for every minute pretty easily.

from(bucket: "telegraf/autogen") |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
    |> group(columns: ["region"])
    |> window(every: 1m)
    |> mean()
Map

It is also possible to perform math and rename the columns by using the map function. The map function takes a function and will execute that function on each row to output a new row for the output table.

from(bucket: "telegraf/autogen") |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
    |> map(fn: (r) => ({_value: r._value / 100}))

The function passed into map must return an object. An object is a key/value structure. By default, the map function will merge any columns within the grouping key into the new row so you do not have to specify all of the existing columns that you do not want to modify. If you do not want to automtaically merge those columns, you can use mergeKey: false as a parameter to map.

Note: Math support is limited right now and the filter is required because the query engine will throw an error if the value is of different types with different series. So you must filter the results so only fields with a single type are selected at the moment.

User Defined Functions

A user can define their own function which can then be reused. To do this, we use the function syntax and assign it to a variable.

add = (table=<-, n) => map(table: table, fn: (r) => ({_value: r._value + n}))
from(bucket: "telegraf/autogen") |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
    |> add(n: 5)

When defining a function, default arguments can be specified by using an equals sign. In addition, a table processing function can be specified by including one parameter that takes <- as an input. The typical parameter name for these is table, but it can be any name since the pipe operator does not use a specific name. In the above example, we build a new function around the existing map function by passing the table to the map function as a parameter instead of with the pipe. If you wanted to use the pipe operator instead, the following is also valid:

add = (table=<-, n) => table |> map(fn: (r) => ({_value: r._value + n}))
from(bucket: "telegraf/autogen") |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
    |> add(n: 5)

Documentation

Overview

Example (Option)

Example_option demonstrates retrieving an option from the Flux interpreter

package main

import (
	"fmt"
	"os"

	"github.com/influxdata/flux"

	_ "github.com/influxdata/flux/options"
)

func main() {
	// Instantiate a new Flux interpreter with pre-populated option and global scopes
	itrp := flux.NewInterpreter()

	// Retrieve the default value for an option
	nowFunc := itrp.Option("now")

	// The now option is a function value whose default behavior is to return
	// the current system time when called. The function now() doesn't take
	// any arguments so can be called with nil.
	nowTime, _ := nowFunc.Function().Call(nil)
	fmt.Fprintf(os.Stderr, "The current system time (UTC) is: %v\n", nowTime)
}
Output:

Example (OverrideDefaultOptionExternally)

Example_overrideDefaultOptionExternally demonstrates how declaring an option in a Flux script will change that option's binding in the options scope of the interpreter.

package main

import (
	"fmt"

	"github.com/influxdata/flux"
	"github.com/influxdata/flux/parser"
	"github.com/influxdata/flux/semantic"

	_ "github.com/influxdata/flux/options"
)

func main() {
	queryString := `
		now = () => 2018-07-13T00:00:00Z
		what_time_is_it = now()`

	itrp := flux.NewInterpreter()

	ast, _ := parser.NewAST(queryString)
	semanticProgram, _ := semantic.New(ast)

	// Evaluate program
	err := itrp.Eval(semanticProgram, nil)
	if err != nil {
		fmt.Println(err)
	}

	// After evaluating the program, lookup the value of what_time_is_it
	now, _ := itrp.GlobalScope().Lookup("what_time_is_it")

	// what_time_is_it? Why it's ....
	fmt.Printf("The new current time (UTC) is: %v", now)
}
Output:

The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
Example (OverrideDefaultOptionInternally)

Example_overrideDefaultOptionInternally demonstrates how one can override a default option that is used in a query before that query is evaluated by the interpreter.

package main

import (
	"fmt"
	"time"

	"github.com/influxdata/flux"
	"github.com/influxdata/flux/parser"
	"github.com/influxdata/flux/semantic"
	"github.com/influxdata/flux/values"

	_ "github.com/influxdata/flux/options"
)

func main() {
	queryString := `what_time_is_it = now()`

	itrp := flux.NewInterpreter()

	ast, _ := parser.NewAST(queryString)
	semanticProgram, _ := semantic.New(ast)

	// Define a new now function which returns a static time value of 2018-07-13T00:00:00.000000000Z
	timeValue := time.Date(2018, 7, 13, 0, 0, 0, 0, time.UTC)
	functionName := "newTime"
	functionType := semantic.NewFunctionType(semantic.FunctionSignature{
		Return: semantic.Time,
	})
	functionCall := func(args values.Object) (values.Value, error) {
		return values.NewTime(values.ConvertTime(timeValue)), nil
	}
	sideEffect := false

	newNowFunc := values.NewFunction(functionName, functionType, functionCall, sideEffect)

	// Override the default now function with the new one
	itrp.SetOption("now", newNowFunc)

	// Evaluate program
	err := itrp.Eval(semanticProgram, nil)
	if err != nil {
		fmt.Println(err)
	}

	// After evaluating the program, lookup the value of what_time_is_it
	now, _ := itrp.GlobalScope().Lookup("what_time_is_it")

	// what_time_is_it? Why it's ....
	fmt.Printf("The new current time (UTC) is: %v", now)
}
Output:

The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
Example (SetOption)

Example_setOption demonstrates setting an option from the Flux interpreter

package main

import (
	"fmt"

	"github.com/influxdata/flux"
	"github.com/influxdata/flux/values"

	_ "github.com/influxdata/flux/options"
)

func main() {
	// Instantiate a new Flux interpreter with pre-populated option and global scopes
	itrp := flux.NewInterpreter()

	// Set a new option from the interpreter
	itrp.SetOption("dummy_option", values.NewInt(3))

	fmt.Printf("dummy_option = %d", itrp.Option("dummy_option").Int())
}
Output:

dummy_option = 3

Index

Examples

Constants

View Source
const (
	TablesParameter = "tables"
)

Variables

View Source
var (
	MinTime = Time{
		Absolute: time.Unix(0, math.MinInt64),
	}
	MaxTime = Time{
		Absolute: time.Unix(0, math.MaxInt64),
	}
	Now = Time{
		IsRelative: true,
	}
)
View Source
var DefaultTrigger = AfterWatermarkTriggerSpec{}
View Source
var TableObjectMonoType semantic.Type
View Source
var TableObjectType = semantic.NewObjectPolyType(

	map[string]semantic.PolyType{
		tableKindKey: semantic.String,
	},
	nil,

	semantic.LabelSet{tableKindKey},
)

Functions

func BuiltIns

func BuiltIns() map[string]values.Value

BuiltIns returns a copy of the builtin values and their declarations.

func Eval

func Eval(itrp *interpreter.Interpreter, q string) error

Eval evaluates the flux string q and update the given interpreter

func FinalizeBuiltIns

func FinalizeBuiltIns()

FinalizeBuiltIns must be called to complete registration. Future calls to RegisterFunction, RegisterBuiltIn or RegisterBuiltInValue will panic.

func FmtJSON

func FmtJSON(f *formatter)

func Formatted

func Formatted(q *Spec, opts ...FormatOption) fmt.Formatter

func FunctionSignature

func FunctionSignature(parameters map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature

FunctionSignature returns a standard functions signature which accepts a table piped argument, with any additional arguments.

func IsEncoderError

func IsEncoderError(err error) bool

IsEncoderError reports whether or not the underlying cause of an error is a valid EncoderError.

func NewInterpreter

func NewInterpreter() *interpreter.Interpreter

NewInterpreter returns an interpreter instance with pre-constructed options and global scopes.

func NumberOfOperations

func NumberOfOperations() int

func RegisterBuiltIn

func RegisterBuiltIn(name, script string)

RegisterBuiltIn adds any variable declarations written in Flux script to the builtin scope.

func RegisterBuiltInOption

func RegisterBuiltInOption(name string, v values.Value)

RegisterBuiltInOption adds the value to the builtin scope.

func RegisterBuiltInValue

func RegisterBuiltInValue(name string, v values.Value)

RegisterBuiltInValue adds the value to the builtin scope.

func RegisterFunction

func RegisterFunction(name string, c CreateOperationSpec, sig semantic.FunctionPolySignature)

RegisterFunction adds a new builtin top level function. Name is the name of the function as it would be called. c is a function reference of type CreateOperationSpec sig is a function signature type that specifies the names and types of each argument for the function.

func RegisterFunctionWithSideEffect

func RegisterFunctionWithSideEffect(name string, c CreateOperationSpec, sig semantic.FunctionPolySignature)

RegisterFunctionWithSideEffect adds a new builtin top level function that produces side effects. For example, the builtin functions yield(), toKafka(), and toHTTP() all produce side effects. name is the name of the function as it would be called c is a function reference of type CreateOperationSpec sig is a function signature type that specifies the names and types of each argument for the function

func RegisterOpSpec

func RegisterOpSpec(k OperationKind, c NewOperationSpec)

RegisterOpSpec registers an operation spec with a given kind. k is a label that uniquely identifies this operation. If the kind has already been registered the call panics. c is a function reference that creates a new, default-initialized opSpec for the given kind. TODO:(nathanielc) make this part of RegisterMethod/RegisterFunction

Types

type Administration

type Administration struct {
	// contains filtered or unexported fields
}

func (*Administration) AddParent

func (a *Administration) AddParent(np *TableObject)

AddParent instructs the evaluation Context that a new edge should be created from the parent to the current operation. Duplicate parents will be removed, so the caller need not concern itself with which parents have already been added.

func (*Administration) AddParentFromArgs

func (a *Administration) AddParentFromArgs(args Arguments) error

AddParentFromArgs reads the args for the `table` argument and adds the value as a parent.

type AfterAtLeastCountTriggerSpec

type AfterAtLeastCountTriggerSpec struct {
	Count int
}

func (AfterAtLeastCountTriggerSpec) Kind

type AfterProcessingTimeTriggerSpec

type AfterProcessingTimeTriggerSpec struct {
	Duration Duration
}

func (AfterProcessingTimeTriggerSpec) Kind

type AfterWatermarkTriggerSpec

type AfterWatermarkTriggerSpec struct {
	AllowedLateness Duration
}

func (AfterWatermarkTriggerSpec) Kind

type Arguments

type Arguments struct {
	interpreter.Arguments
}

func (Arguments) GetDuration

func (a Arguments) GetDuration(name string) (Duration, bool, error)

func (Arguments) GetRequiredDuration

func (a Arguments) GetRequiredDuration(name string) (Duration, error)

func (Arguments) GetRequiredTime

func (a Arguments) GetRequiredTime(name string) (Time, error)

func (Arguments) GetTime

func (a Arguments) GetTime(name string) (Time, bool, error)

type ArrowColReader added in v0.10.0

type ArrowColReader interface {
	Key() GroupKey
	// Cols returns a list of column metadata.
	Cols() []ColMeta
	// Len returns the length of the slices.
	// All slices will have the same length.
	Len() int
	Bools(j int) *array.Boolean
	Ints(j int) *array.Int64
	UInts(j int) *array.Uint64
	Floats(j int) *array.Float64
	Strings(j int) *array.Binary
	Times(j int) *array.Int64
}

ArrowColReader allows access to reading arrow buffers of column data. All data the ArrowColReader exposes is guaranteed to be in memory. Once an ArrowColReader goes out of scope, all slices are considered invalid.

type Bounds

type Bounds struct {
	Start Time
	Stop  Time
	Now   time.Time
}

func (Bounds) HasZero

func (b Bounds) HasZero() bool

HasZero returns true if the given bounds contain a Go zero time value as either Start or Stop.

func (Bounds) IsEmpty

func (b Bounds) IsEmpty() bool

IsEmpty reports whether the given bounds are empty, i.e., if start >= stop.

type ColMeta

type ColMeta struct {
	// Label is the name of the column. The label is unique per table.
	Label string
	// Type is the type of the column. Only basic types are allowed.
	Type ColType
}

ColMeta contains the information about the column metadata.

type ColReader

type ColReader interface {
	Key() GroupKey
	// Cols returns a list of column metadata.
	Cols() []ColMeta
	// Len returns the length of the slices.
	// All slices will have the same length.
	Len() int
	Bools(j int) []bool
	Ints(j int) []int64
	UInts(j int) []uint64
	Floats(j int) []float64
	Strings(j int) []string
	Times(j int) []values.Time
}

ColReader allows access to reading slices of column data. All data the ColReader exposes is guaranteed to be in memory. Once a ColReader goes out of scope all slices are considered invalid.

type ColType

type ColType int

ColType is the type for a column. This covers only basic data types.

const (
	TInvalid ColType = iota
	TBool
	TInt
	TUInt
	TFloat
	TString
	TTime
)

func ColumnType

func ColumnType(typ semantic.Type) ColType

ColumnType returns the column type when given a semantic.Type. It returns flux.TInvalid if the Type is not a valid column type.

func (ColType) String

func (t ColType) String() string

String returns a string representation of the column type.

type Compiler

type Compiler interface {
	// Compile produces a specification for the query.
	Compile(ctx context.Context) (*Spec, error)
	CompilerType() CompilerType
}

Compiler produces a specification for the query.

type CompilerMappings

type CompilerMappings map[CompilerType]CreateCompiler

func (CompilerMappings) Add

type CompilerType

type CompilerType string

CompilerType is the name of a query compiler.

type CreateCompiler

type CreateCompiler func() Compiler

type CreateDialect

type CreateDialect func() Dialect

type CreateOperationSpec

type CreateOperationSpec func(args Arguments, a *Administration) (OperationSpec, error)

type DelimitedMultiResultEncoder

type DelimitedMultiResultEncoder struct {
	Delimiter []byte
	Encoder   interface {
		ResultEncoder
		// EncodeError encodes an error on the writer.
		EncodeError(w io.Writer, err error) error
	}
}

DelimitedMultiResultEncoder encodes multiple results using a trailing delimiter. The delimiter is written after every result.

If an error is encountered when iterating and the error is an encoder error, the error will be returned. Otherwise, the error is assumed to have arisen from query execution, and said error will be encoded with the EncodeError method of the Encoder field.

If the io.Writer implements flusher, it will be flushed after each delimiter.

func (*DelimitedMultiResultEncoder) Encode

type Dialect

type Dialect interface {
	// Encoder creates an encoder for the results
	Encoder() MultiResultEncoder
	// DialectType report the type of the dialect
	DialectType() DialectType
}

Dialect describes how to encode results.

type DialectMappings

type DialectMappings map[DialectType]CreateDialect

func (DialectMappings) Add

type DialectType

type DialectType string

DialectType is the name of a query result dialect.

type Duration

type Duration time.Duration

Duration is a marshalable duration type. TODO make this the real duration parsing not just time.ParseDuration

func (Duration) MarshalText

func (d Duration) MarshalText() ([]byte, error)

func (*Duration) UnmarshalText

func (d *Duration) UnmarshalText(data []byte) error

type Edge

type Edge struct {
	Parent OperationID `json:"parent"`
	Child  OperationID `json:"child"`
}

Edge is a data flow relationship between a parent and a child

type EncoderError

type EncoderError interface {
	IsEncoderError() bool
}

EncoderError is an interface that any error produced from a ResultEncoder implementation should conform to. It allows for differentiation between errors that occur in results, and errors that occur while encoding results.

type FormatOption

type FormatOption func(*formatter)

TODO(nathanielc): Add better options for formatting plans as Graphviz dot format.

type GroupKey

type GroupKey interface {
	Cols() []ColMeta
	Values() []values.Value

	HasCol(label string) bool
	LabelValue(label string) values.Value

	ValueBool(j int) bool
	ValueUInt(j int) uint64
	ValueInt(j int) int64
	ValueFloat(j int) float64
	ValueString(j int) string
	ValueDuration(j int) values.Duration
	ValueTime(j int) values.Time
	Value(j int) values.Value

	Equal(o GroupKey) bool
	Less(o GroupKey) bool
	String() string
}

type IDer

type IDer interface {
	ID(*TableObject) OperationID
}

IDer produces the mapping of table Objects to OpertionIDs

type IDerOpSpec

type IDerOpSpec interface {
	IDer(ider IDer)
}

IDerOpSpec is the interface any operation spec that needs access to OperationIDs in the query spec must implement.

type MultiResultDecoder

type MultiResultDecoder interface {
	// Decode decodes multiple results from r.
	Decode(r io.ReadCloser) (ResultIterator, error)
}

MultiResultDecoder can decode multiple results from a reader.

type MultiResultEncoder

type MultiResultEncoder interface {
	// Encode writes multiple results from r into w.
	// Returns the number of bytes written to w and any error resulting from the encoding process.
	// Errors obtained from the results object should be encoded to w and then discarded.
	Encode(w io.Writer, results ResultIterator) (int64, error)
}

MultiResultEncoder can encode multiple results into a writer.

type NewOperationSpec

type NewOperationSpec func() OperationSpec

func OperationSpecNewFn

func OperationSpecNewFn(k OperationKind) NewOperationSpec

type Operation

type Operation struct {
	ID   OperationID   `json:"id"`
	Spec OperationSpec `json:"spec"`
}

Operation denotes a single operation in a query.

func (Operation) MarshalJSON

func (o Operation) MarshalJSON() ([]byte, error)

func (*Operation) UnmarshalJSON

func (o *Operation) UnmarshalJSON(data []byte) error

type OperationID

type OperationID string

OperationID is a unique ID within a query for the operation.

type OperationKind

type OperationKind string

OperationKind denotes the kind of operations.

type OperationSpec

type OperationSpec interface {
	// Kind returns the kind of the operation.
	Kind() OperationKind
}

OperationSpec specifies an operation as part of a query.

type Option

type Option func(*options)

func Verbose

func Verbose(v bool) Option

type OrFinallyTriggerSpec

type OrFinallyTriggerSpec struct {
	Main    TriggerSpec
	Finally TriggerSpec
}

func (OrFinallyTriggerSpec) Kind

type Priority

type Priority int32

Priority is an integer that represents the query priority. Any positive 32bit integer value may be used. Special constants are provided to represent the extreme high and low priorities.

const (
	// High is the highest possible priority = 0
	High Priority = 0
	// Low is the lowest possible priority = MaxInt32
	Low Priority = math.MaxInt32
)

func (Priority) MarshalText

func (p Priority) MarshalText() ([]byte, error)

func (*Priority) UnmarshalText

func (p *Priority) UnmarshalText(txt []byte) error

type Query

type Query interface {
	// Spec returns the spec used to execute this query.
	// Spec must not be modified.
	Spec() *Spec

	// Ready returns a channel that will deliver the query results.
	// Its possible that the channel is closed before any results arrive,
	// in which case the query should be inspected for an error using Err().
	Ready() <-chan map[string]Result

	// Done must always be called to free resources. It is safe to call Done
	// multiple times.
	Done()

	// Cancel will signal that query execution should stop.
	// Done must still be called to free resources.
	// It is safe to call Cancel multiple times.
	Cancel()

	// Err reports any error the query may have encountered.
	Err() error

	Statisticser
}

Query represents an active query.

type RepeatedTriggerSpec

type RepeatedTriggerSpec struct {
	Trigger TriggerSpec
}

func (RepeatedTriggerSpec) Kind

type ResourceManagement

type ResourceManagement struct {
	// Priority or the query.
	// Queries with a lower value will move to the front of the priority queue.
	// A zero value indicates the highest priority.
	Priority Priority `json:"priority"`
	// ConcurrencyQuota is the number of concurrency workers allowed to process this query.
	// A zero value indicates the planner can pick the optimal concurrency.
	ConcurrencyQuota int `json:"concurrency_quota"`
	// MemoryBytesQuota is the number of bytes of RAM this query may consume.
	// There is a small amount of overhead memory being consumed by a query that will not be counted towards this limit.
	// A zero value indicates unlimited.
	MemoryBytesQuota int64 `json:"memory_bytes_quota"`
}

ResourceManagement defines how the query should consume avaliable resources.

type Result

type Result interface {
	Name() string
	// Tables returns a TableIterator for iterating through results
	Tables() TableIterator
	// Statistics returns statistics collected the processing of the result.
	Statistics() Statistics
}

type ResultDecoder

type ResultDecoder interface {
	// Decode decodes data from r into a result.
	Decode(r io.Reader) (Result, error)
}

ResultDecoder can decode a result from a reader.

type ResultEncoder

type ResultEncoder interface {
	// Encode encodes data from the result into w.
	// Returns the number of bytes written to w and any error.
	Encode(w io.Writer, result Result) (int64, error)
}

ResultEncoder can encode a result into a writer.

type ResultIterator

type ResultIterator interface {
	// More indicates if there are more results.
	More() bool

	// Next returns the next result.
	// If More is false, Next panics.
	Next() Result

	// Release discards the remaining results and frees the currently used resources.
	// It must always be called to free resources. It can be called even if there are
	// more results. It is safe to call Release multiple times.
	Release()

	// Err reports the first error encountered.
	// Err will not report anything unless More has returned false,
	// or the query has been cancelled.
	Err() error

	// Statistics returns any statistics computed by the resultset.
	Statistics() Statistics
}

ResultIterator allows iterating through all results synchronously. A ResultIterator is not thread-safe and all of the methods are expected to be called within the same goroutine. A ResultIterator may implement Statisticser.

func NewMapResultIterator

func NewMapResultIterator(results map[string]Result) ResultIterator

func NewResultIteratorFromQuery

func NewResultIteratorFromQuery(q Query) ResultIterator

func NewSliceResultIterator

func NewSliceResultIterator(results []Result) ResultIterator

type Spec

type Spec struct {
	Operations []*Operation       `json:"operations"`
	Edges      []Edge             `json:"edges"`
	Resources  ResourceManagement `json:"resources"`
	Now        time.Time          `json:"now"`
	// contains filtered or unexported fields
}

Spec specifies a query.

func Compile

func Compile(ctx context.Context, q string, now time.Time, opts ...Option) (*Spec, error)

Compile evaluates a Flux script producing a query Spec. now parameter must be non-zero, that is the default now time should be set before compiling.

func ToSpec

func ToSpec(itrp *interpreter.Interpreter, vals ...values.Value) *Spec

ToSpec creates a query spec from the interpreter and list of values.

func (*Spec) Children

func (q *Spec) Children(id OperationID) []*Operation

Children returns a list of children for a given operation. If the query is invalid no children will be returned.

func (*Spec) Functions

func (q *Spec) Functions() ([]string, error)

Functions return the names of all functions used in the plan

func (*Spec) Parents

func (q *Spec) Parents(id OperationID) []*Operation

Parents returns a list of parents for a given operation. If the query is invalid no parents will be returned.

func (*Spec) Validate

func (q *Spec) Validate() error

Validate ensures the query is a valid DAG.

func (*Spec) Walk

func (q *Spec) Walk(f func(o *Operation) error) error

Walk calls f on each operation exactly once. The function f will be called on an operation only after all of its parents have already been passed to f.

type Statistics

type Statistics struct {
	// TotalDuration is the total amount of time in nanoseconds spent.
	TotalDuration time.Duration `json:"total_duration"`
	// CompileDuration is the amount of time in nanoseconds spent compiling the query.
	CompileDuration time.Duration `json:"compile_duration"`
	// QueueDuration is the amount of time in nanoseconds spent queueing.
	QueueDuration time.Duration `json:"queue_duration"`
	// PlanDuration is the amount of time in nanoseconds spent in plannig the query.
	PlanDuration time.Duration `json:"plan_duration"`
	// RequeueDuration is the amount of time in nanoseconds spent requeueing.
	RequeueDuration time.Duration `json:"requeue_duration"`
	// ExecuteDuration is the amount of time in nanoseconds spent in executing the query.
	ExecuteDuration time.Duration `json:"execute_duration"`

	// Concurrency is the number of goroutines allocated to process the query
	Concurrency int `json:"concurrency"`
	// MaxAllocated is the maximum number of bytes the query allocated.
	MaxAllocated int64 `json:"max_allocated"`

	// ScannedValues is the number of values scanned.
	ScannedValues int `json:"scanned_values"`
	// ScannedBytes number of uncompressed bytes scanned.
	ScannedBytes int `json:"scanned_bytes"`
}

Statistics is a collection of statisitcs about the processing of a query.

func (Statistics) Add added in v0.7.1

func (s Statistics) Add(other Statistics) Statistics

Add returns the sum of s and other.

type Statisticser

type Statisticser interface {
	// Statistics reports the statisitcs for the query.
	// The statisitcs are not complete until the query is finished.
	Statistics() Statistics
}

Statisticser reports statisitcs about query processing.

type Table

type Table interface {
	Key() GroupKey

	Cols() []ColMeta

	// Do calls f to process the data contained within the table.
	// The function f will be called zero or more times.
	Do(f func(ColReader) error) error

	// DoArrow calls f to process the data contained within the table.
	// It uses the arrow buffers.
	DoArrow(f func(ArrowColReader) error) error

	// RefCount modifies the reference count on the table by n.
	// When the RefCount goes to zero, the table is freed.
	RefCount(n int)

	// Empty returns whether the table contains no records.
	Empty() bool

	// Stats returns collected statistics about this table during processing.
	Statistics() Statistics
}

type TableIterator

type TableIterator interface {
	Do(f func(Table) error) error
	Statistics() Statistics
}

type TableObject

type TableObject struct {
	Kind    OperationKind
	Spec    OperationSpec
	Parents values.Array
	// contains filtered or unexported fields
}

func (*TableObject) Array

func (t *TableObject) Array() values.Array

func (*TableObject) Bool

func (t *TableObject) Bool() bool

func (*TableObject) Duration

func (t *TableObject) Duration() values.Duration

func (*TableObject) Equal

func (t *TableObject) Equal(rhs values.Value) bool

func (*TableObject) Float

func (t *TableObject) Float() float64

func (*TableObject) Function

func (t *TableObject) Function() values.Function

func (*TableObject) Get

func (t *TableObject) Get(name string) (values.Value, bool)

func (*TableObject) Int

func (t *TableObject) Int() int64

func (*TableObject) Len

func (t *TableObject) Len() int

func (*TableObject) Object

func (t *TableObject) Object() values.Object

func (*TableObject) Operation

func (t *TableObject) Operation(ider IDer) *Operation

func (*TableObject) PolyType

func (t *TableObject) PolyType() semantic.PolyType

func (*TableObject) Range

func (t *TableObject) Range(f func(name string, v values.Value))

func (*TableObject) Regexp

func (t *TableObject) Regexp() *regexp.Regexp

func (*TableObject) Set

func (t *TableObject) Set(name string, v values.Value)

func (*TableObject) Str

func (t *TableObject) Str() string

func (*TableObject) String

func (t *TableObject) String() string

func (*TableObject) Time

func (t *TableObject) Time() values.Time

func (*TableObject) ToSpec

func (t *TableObject) ToSpec() *Spec

func (*TableObject) Type

func (t *TableObject) Type() semantic.Type

func (*TableObject) UInt

func (t *TableObject) UInt() uint64

type Time

type Time struct {
	IsRelative bool
	Relative   time.Duration
	Absolute   time.Time
}

Time represents either a relative or absolute time. If Time is its zero value then it represents a time.Time{}. To represent the now time you must set IsRelative to true.

func ToQueryTime

func ToQueryTime(value values.Value) (Time, error)

func (Time) IsZero

func (t Time) IsZero() bool

func (Time) MarshalText

func (t Time) MarshalText() ([]byte, error)

func (Time) Time

func (t Time) Time(now time.Time) time.Time

Time returns the time specified relative to now.

func (*Time) UnmarshalText

func (t *Time) UnmarshalText(data []byte) error

type TriggerKind

type TriggerKind int
const (
	AfterWatermark TriggerKind = iota
	Repeated
	AfterProcessingTime
	AfterAtLeastCount
	OrFinally
)

type TriggerSpec

type TriggerSpec interface {
	Kind() TriggerKind
}

Directories

Path Synopsis
ast
Package ast declares the types used to represent the syntax tree for Flux source code.
Package ast declares the types used to represent the syntax tree for Flux source code.
asttest
Package asttest implements utilities for testing the abstract syntax tree.
Package asttest implements utilities for testing the abstract syntax tree.
asttest/cmpgen
cmpgen generates comparison options for the asttest package.
cmpgen generates comparison options for the asttest package.
Package builtin contains all packages related to Flux built-ins are imported and initialized.
Package builtin contains all packages related to Flux built-ins are imported and initialized.
cmd
The compiler package provides a compiler and Go runtime for a subset of the Flux language.
The compiler package provides a compiler and Go runtime for a subset of the Flux language.
Package complete provides types to aid with auto-completion of Flux scripts in editors.
Package complete provides types to aid with auto-completion of Flux scripts in editors.
Package control controls which resources a query may consume.
Package control controls which resources a query may consume.
controltest
Package controltest provides a controller for use in tests.
Package controltest provides a controller for use in tests.
Package csv contains the csv result encoders and decoders.
Package csv contains the csv result encoders and decoders.
Package execute contains the implementation of the execution phase in the query engine.
Package execute contains the implementation of the execution phase in the query engine.
executetest
Package executetest contains utilities for testing the query execution phase.
Package executetest contains utilities for testing the query execution phase.
Package functions is a collection of built-in functions that are callable in the flux query processor.
Package functions is a collection of built-in functions that are callable in the flux query processor.
transformations
Package transformations contains the implementations for the builtin transformation functions.
Package transformations contains the implementations for the builtin transformation functions.
internal
staticarray
Package staticarray defines implementations of flux arrays with static data.
Package staticarray defines implementations of flux arrays with static data.
tools Module
Package interpreter provides the implementation of the Flux interpreter.
Package interpreter provides the implementation of the Flux interpreter.
Package mock contains mock implementations of the query package interfaces for testing.
Package mock contains mock implementations of the query package interfaces for testing.
Package parser implements a parser for Flux source files.
Package parser implements a parser for Flux source files.
plantest
Package plantest contains utilities for testing each query planning phase
Package plantest contains utilities for testing each query planning phase
Package querytest contains utilities for testing the query end-to-end.
Package querytest contains utilities for testing the query end-to-end.
Package repl implements the read-eval-print-loop for the command line flux query console.
Package repl implements the read-eval-print-loop for the command line flux query console.
The semantic package provides a graph structure that represents the meaning of a Flux script.
The semantic package provides a graph structure that represents the meaning of a Flux script.
semantictest
Package semantictest contains utilities for testing the semantic package.
Package semantictest contains utilities for testing the semantic package.
Package values declares the flux data types and implements them.
Package values declares the flux data types and implements them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL