README
ΒΆ
FluxPipe is an experimental stand-alone Flux API for serverless workers and embedded datasources
Execute your Flux scripts locally, in serverless functions or anywhere else - decoupled from the data and database.
Fluxpipe runs at 141,6Km/h* and is compatible with InfluxDB 3.0 / IOx, ClickHouse, Grafana and beyond!
InfluxDB Flux is a lightweight scripting language for querying databases and working with data. [^1]
Need a practical Flux introduction? Check out the official page or 3 Minutes to Flux
Demo
Try our serverless demo or launch your own instance to instantly fall in love with flux
Instructions
Download a binary release, docker or build from source
π¦ Download Binary
curl -fsSL github.com/metrico/fluxpipe/releases/latest/download/fluxpipe-server -O \
&& chmod +x fluxpipe-server
./fluxpipe-server -port 8086
Run with -h for a full list of parameters
π Using Docker
docker pull ghcr.io/metrico/fluxpipe:latest
docker run -ti --rm -p 8086:8086 ghcr.io/metrico/fluxpipe:latest
π Usage
π‘ Check out the scripts folder for working examples
Playground
Fluxpipe embeds a playground interface to instantly execute queries (borrowed from ClickHouse [^2])
HTTP API
Fluxpipe serves a simple REST API loosely compatible with existing flux integrations and clients
Fluxpipe is compatible with the native Grafana InfluxDB/Flux datasource (url + organization fields are required!)
You can query InfluxDB 3.0 IOx with raw SQL using the native sql.from handler
import "sql"
sql.from(
driverName: "influxdb-iox",
dataSourceName: "iox://iox-server:443/qryn_logs",
query: "SELECT level, sender, body FROM logs WHERE body LIKE '%DELETE%' limit 10",
)
You can query InfluxDB 3.0 IOx with Flux using the iox.from handler
> import "contrib/qxip/iox"
iox.from(
bucket: "test",
host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
token: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
limit: "10",
columns: "time, level, sender",
table: "logs",
start: -100d,
)
_time:time level:string sender:string
------------------------------ ---------------------- ----------------------
2023-08-31T00:00:00.091362490Z info logtest
2023-08-31T00:00:00.091380034Z info logtest
2023-08-31T00:00:00.091381374Z info logtest
2023-08-31T00:00:00.091382470Z info logtest
2023-08-31T00:00:00.091383354Z info logtest
2023-08-31T00:00:00.091384514Z info logtest
2023-08-31T00:00:00.091385496Z info logtest
2023-08-31T00:00:00.091387718Z info logtest
2023-08-31T00:00:00.091389187Z info logtest
2023-08-31T00:00:00.091390136Z info logtest
import "contrib/qxip/clickhouse"
clickhouse.query(
url: "https://play@play.clickhouse.com",
query: "SELECT database, total_rows FROM tables WHERE total_rows > 0"
)
|> rename(columns: {database: "_value", total_rows: "_data"})
|> keep(columns: ["_value","_data"])


import "contrib/qxip/logql"
option logql.defaultURL = "http://qryn:3100"
logql.query_range(
query: "rate({job=\"dummy-server\"}[5m])",
start: v.timeRangeStart,
end: v.timeRangeStop
)
|> map(fn: (r) => ({r with _time: time(v: uint(v: r.timestamp_ns)), _value: float(v: r.value) }))
|> drop(columns: ["timestamp_ns", "value"])
|> sort(columns: ["_time"])
|> group(columns: ["labels"])

Usage with curl
curl -XPOST localhost:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-d 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 3, fn: (n) => n)'
#datatype,string,long,dateTime:RFC3339,long
#group,false,false,false,false
#default,_result,,,
,result,table,_time,_value
,,0,2022-04-01T00:00:00Z,1
,,0,2022-04-01T00:00:36Z,2
,,0,2022-04-01T00:01:12Z,3
Secrets
Flux builds using EnvironmentSecretService accessing system environment variables from flux scripts.
import "influxdata/influxdb/secrets"
key = secrets.get(key: "ENV_SECRET")
STDIN CMD
Fluxpipe can be used as a command-line tool and stdin pipeline processor
echo 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 5, fn: (n) => 1)' \
| ./fluxpipe-server -stdin
#datatype,string,long,dateTime:RFC3339,long
#group,false,false,false,false
#default,_result,,,
,result,table,_time,_value
,,0,2022-04-01T00:00:00Z,1
,,0,2022-04-01T00:00:36Z,1
,,0,2022-04-01T00:01:12Z,1
,,0,2022-04-01T00:01:48Z,1
,,0,2022-04-01T00:02:24Z,1
cat scripts/csv.flux | ./fluxpipe-server -stdin
cat scripts/sql.flux | ./fluxpipe-server -stdin
Public Demo
Grafana Datasource
Configure your Grafana instance with our public demo endpoint (limited resources)
Documentation
Flux(pipe) is built using the InfluxCommunity/flux fork which contains additional features and contributions.
All the standard and additional functions available in Fluxpipe are included in the Flux community documentation
Status
- Fluxlib
- parser
- executor
- Contribs
- contrib/qxip/clickhouse
- contrib/qxip/logql
- contrib/qxip/hash
- ENV secrets
- STDIN pipeline
- HTTP api
- plaintext
- json support
- web playground
[^1]: Project is not affiliated or endorsed by Influxdata or Grafana Labs. All rights belong to their respective owners. [^2]: Used under Apache2 terms. Project is not affiliated or endorsed by ClickHouse Inc. All rights belong to their respective owners.

