fluid

module
v0.0.0-...-d8b93a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: MIT

README ΒΆ

Fluid

Overview

Fluid is a small data stream processor that aggregates and filters time-stamped data such as syslog using a simple query language. Similar to other command-line data transformation commands like grep and cut, the fluid utility ...

  • reads data from stdin,
  • writes results to stdout, and
  • logs errors and status information to stderr.

This repository contains the code to build the compiler and engine, i.e., the core tool to manage data streams. Docker deployment, tools and examples that help playing with Fluid can be found in the fluid repository.

Motivation

Many database systems, spreadsheets, and statistical tools calculate and crunch static data, and can do so quickly and efficiently but they often lack a simple way to process live data. In the past decades, dozens of data stream systems emerged that can deal with complex event and stream processing challenges.

In contrast, fluid is a small and simple tool that can process temporal data β€” covering both data sequences in files as well as real-time data such as sensor signals. It is basic but hopefully comprehensive enough to serve as a potential seed for a bigger project that uses distributed computing for complex queries. Therefore, we aimed for a query execution plan format using Cap'n Proto that is easily extensible such that plans could in the future be transferred over the internet.

This is a tiny project, hence the name Fluid. Like a 🐻 standing in a stream catching salmon, Fluid might help you pick and consume tasty data rows and hopefully produce some insightful nuggets in one form or another.

Prerequisites

Install Cap'n Proto
git clone -b master https://github.com/capnproto/capnproto.git
cd capnproto/c++
sudo apt-get install autoconf
sudo apt-get install libtool
autoreconf -i
./configure
make -j3 check
sudo make install
Acknowledgment

The Fluid project rests on the awesomeness of the following technologies and the people behind it to whom I am grateful:

Getting started

Fluid works in two stages:

  1. we compile a query into an execution plan and generated Go code, then
  2. an engine processes input data and produces results according to the plan.

Usage

make fluidc
make fluid
make syslog-example
  • make fluidc creates the compiler that is based on the FQL grammar described below. The command is used to translate a FQL query into a file with the query execution plan.

  • make fluid creates the engine. It uses the plan file created by fluidc and waits for data on stdin that is compatible with the schema named in the query's from clause as defined in the catalog.json file described below.

  • make syslog-example runs a simple FQL query over live syslog data on your system (Linux or MacOS).

Example

With the Fluid Query Language (FQL) we can specify a task in an intuitve manner. Imagine, we want to process time-stamped CSV data like the following from the file foo.csv:

x t
1 2030-01-01T17:00:01βˆ’07:00
2 2030-01-01T17:00:04βˆ’07:00
3 2030-01-01T17:00:11βˆ’07:00
4 2030-01-01T17:00:12βˆ’07:00
5 2030-01-01T17:00:17βˆ’07:00
6 2030-01-01T17:00:26βˆ’07:00
7 2030-01-01T17:00:40βˆ’07:00
8 2030-01-01T17:00:43βˆ’07:00
9 2030-01-01T17:00:49βˆ’07:00

This query computes aggregate results every 10 seconds on the wall clock:

from instance1.database1.schema1.table1 \
group by g2, g1 \
window slice 20 rows \
based on rowid \
aggregate count(a) as aCount, avg(a) as aAvg, avg(b) as bAvg, sum(a) as aSum, first(c) as cFirst, last(c) as cLast, first(t2) as t2First, last(t2) as t2Last, first(rowid) as rowidFirst, last(rowid) as rowidLast \
append t2First, t2Last, rowidFirst, rowidLast, aCount, aSum, aAvg, bAvg, cFirst, cLast, bAvg, aCount \
where aAvg <= 12 \
to xxx \
from
  foo
window
  slice
  10 seconds
  based on t
aggregate
  avg(x) as avg,
  sum(x) as total,
  count() as n,
  first(t) as begin,
  last(t) as end
append
  avg,
  total,
  n,
  seconds(end - begin) as duration,
  end as close
to
  bar
every
    10 wall clock seconds
    chunking
    based on t
from
    foo
where
    x < 10
aggregate
    avg(x) as avg,
    sum(x) as total,
    count(*) as n,
    first(t) as begin,
    last(t) as end
where
    n < 20 and total > 0
append
    avg,
    total,
    n,
    seconds(end - begin) as duration,
    end as close
where
    duration % 3 = 0
to
    bar

The where clauses are shown only for illustration purposes, they have no influence on the result becasue the conditions are all true.

The result is also in the file bar.csv:

avg total n duration close
1.5 3 2 3 2030-01-01T17:00:04βˆ’07:00
4 12 3 6 2030-01-01T17:00:17βˆ’07:00
6 6 1 0 2030-01-01T17:00:26βˆ’07:00
8 24 3 9 2030-01-01T17:00:49βˆ’07:00

For the 10-second time period between 17:00:30 and 17:00:40 there is no input data. Therefore, we won't output any result row for that time window.

Query language

The Query.g4 grammar file contains the language details using ANTLR.

The order of the clauses is:

  1. from
  2. group by (optional)
  3. where (optional)
  4. window
  5. based on (optional)
  6. aggregate
  7. where (optional)
  8. append
  9. where (optional)
  10. to
The from clause
The group by clause
The where clause
The window clause

A window specifies the properties of the sub-sequence of rows in the input data.

A window can be described by

  • time or
  • row count.

There are several forms of windows which can be regarded on a spectrum of flexibility. In Fluid, we give the following names to 3 degrees of flexibility:

  • session (most general),
  • slide, and
  • slice (most restrictive).
The slice window

This is the simplest window and it is a special case of the slide window described next. A slice is an equal width window (duration or number of rows) and the slices are contiguous, one next to the other.

The slide window

An extreme case of a slide window is where the start of the window remains unchanged. You can think of it as a "rubber band" behavior.

The session window

This is the most general kind of window. Instead of having a simple start and end duration, the start and end of a window are defined by a condition, respectively.

The name derives from the textbook example of session window:

A user's online session, where a user

  • logs in
  • logs out
  • or doesn't she log out but her session is timed out eventually.

The syntax for a session could be used to achieve the same behavior as slide and slice. And slide can be used to emulate a slice window. Here is an example. The following window clauses achieve the same result:

slice "10 seconds" based on t
slide "10 seconds" advance every "3 seconds" based on t
session begin when t mod "10 seconds" == "0 seconds" expire after "10 seconds" based on t
// end when t >= t.start + "10 seconds" based on t

session begin when t mod "3 seconds" == "0 seconds" end t == t.start + "10 seconds" based on t
The based on clause

If this clause is present, it specifies the field used to divide the flow of time into intervals. If the field of type timestamp, we compare its values based on time intervals. If it is of type int64, we use the difference in integer values as the distance in number of rows.

The aggregate clause
The append clause
The to clause

On a high level, a FQL query consists of the following clauses that are named by its first keyword.

  • every specifies the window size and how the window moves along the input data.
  • from references the input schema in the catalog.json file.
  • to specifies an the name of the output schema that may or may not exist in the catalog.
  • aggregate is a list of aggregate function calls, and is the main processing of a window is specified.
  • append can be thought of as the SELECT clause in SQL; it allows for projections and simple calculations over scalar values.
  • where uses Boolean expressions to remove rows of the previous clause that we're no longer interested in.

Windows

We implemented three types of window behaviors explained below.

Slice window

Example:

session
  begin  when action = "login"
  end    when action = "logout"
  expire after 30 minutes
group by
  userid

Aggregate functions

Fluid comes with a few typical aggregate functions out-of-the-box.

Function Description
count() Number of input rows
avg(x) Average value of x
sum(x) Total value of x
min(x) Minimum value of x
max(x) Maximum value of x
first(x) First value of x
last(x) Last value of x
hll(x) HyperLogLog
cms(x) CountMin Sketch

Aggregate function extensions

You can extend the family of aggregate functions by:

  • either adding your own implementation in the source code
  • or by using Fluid's facility to add implementations by dynamically linking your code. The implementation of HLL and CMS follows this approach.

Schemas

The fluid command processes data continuously. It needs to know the format and meaning of the input rows. These details are defined in a catalog.json file.

Here is an excerpt of the catalog that can be used for our example:

{
  "name": "my cool catalog",
  "schemas": [
    {
      "name": "foo",
      "format": "csv",
      "fields": [
        {
          "name": "a",
          "type": "integer16",
          "usage": "data"
        },
        {
          "name": "t",
          "type": "timestamp",
          "usage": "time"
        }
      ]
    },
    ...
  ],
  "functions": [
    ...
  ]
}

Schema foo describe the query's input. If we wanted to use the output of the query as input to another query, we could add its schema to the catalog as well.

The usage attribute of a field has two possible values

  • data means that the attribute is treated like normal input
  • time means that this attribute serves as the reference to base window calculations on. There may be several timestamp attributes in the input but only one of them can serve as the time attribute.

Behind the scenes

We use data structures called operators that form a pipelined execution plan like the following:

cat foo.csv β†’ ingress β†’ filter β†’ window β†’ aggregate β†’ filter β†’ append β†’ filter β†’ Egress β†’ | tee bar.csv

One of my original aims of the design was to have each operator run in a different thread and/or perhaps on different compute nodes. This is a linear pipeline but certainly sharding and other data distribution techniques are thinkable for the future that would make such a pipeline more bushy.

The correspondencs between fluid query clauses and Fluid plan operators are shown below:

Query clause Plan operator Description
from ingress reads rows from a data source
to egress transforms the result rows in a certain format (e.g., CSV)
every window reads a certain amount of rows from the ingress filter output
aggregate aggregate aggregates the goup of rows from the window operator
append project applies expressions and removes fields from a row
where filter removes rows from the previous operator's output

Internally, we use further operators for each of the different aggregate functions, i.e., instead of a single aggregate operator, there may be several different kinds.

Directories ΒΆ

Path Synopsis
capnp
data module
cmd
api-server command
call-program command
catalog command
chatty command
datagen command
demo/client command
demo/web command
websockets.go
websockets.go
finnhub-trades command
fluid command
fluidc command
pipe/client command
pipe/server command
run-cmd command
run-stuff command
syslog command
throttle command
tools/reverse command
pkg
engine
Package engine implements the actual query processor.
Package engine implements the actual query processor.
_out/query module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL