cassandra

package
v0.0.0-...-f111e62 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2017 License: MIT Imports: 20 Imported by: 0

README

Cassandra Data source

Provides SQL Access to Cassandra

dataux_cass

Cassandra CQL SQL Query
Tables show tables;
Column Families describe mytable;
WHERE select count(*) from table WHERE exists(a); Some of these are pushed down to cassandra if avaialble.
filter: terms select * from table WHERE year IN (2015,2014,2013);
filter: gte, range select * from table WHERE year BETWEEN 2012 AND 2014
aggs min, max, avg, sum select min(year), max(year), avg(year), sum(year) from table WHERE exists(a); These are poly-filled in the distributed query engine.

Example Usage

go get -u github.com/dataux/dataux
cd $GOPATH/src/github.com/dataux/dataux
go build
./dataux --config=backends/cassandra/cassandra.conf



Hacking

some notes and random info for hacking

# start docker container for testing

docker run --name dataux-cass-test -d cassandra:2


# log onto container to get a cqlsh
docker run -it --rm --net container:lioenv_cass_1 cassandra:2 cqlsh

INSERT INTO user (id, name, deleted, created, updated) VALUES ('user814', 'test_name', false, '2016-07-24','2016-07-24 23:46:01')

cqlsh:datauxtest> select * from system.schema_columnfamilies WHERE keyspace_name = "datauxtest";
SyntaxException: <ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:65 no viable alternative at input 'datauxtest' (...system.schema_columnfamilies WHERE keyspace_name = ["datauxtes]...)">

cqlsh:datauxtest> select * from system.schema_columnfamilies WHERE keyspace_name = 'datauxtest';


 keyspace_name | columnfamily_name | bloom_filter_fp_chance | caching                                     | cf_id                                | column_aliases | comment | compaction_strategy_class                                       | compaction_strategy_options | comparator                                                                                                                                                                                                                                         | compression_parameters                                                   | default_time_to_live | default_validator                         | dropped_columns | gc_grace_seconds | index_interval | is_dense | key_aliases    | key_validator                                                                                                                    | local_read_repair_chance | max_compaction_threshold | max_index_interval | memtable_flush_period_in_ms | min_compaction_threshold | min_index_interval | read_repair_chance | speculative_retry | subcomparator | type     | value_alias
---------------+-------------------+------------------------+---------------------------------------------+--------------------------------------+----------------+---------+-----------------------------------------------------------------+-----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------+----------------------+-------------------------------------------+-----------------+------------------+----------------+----------+----------------+----------------------------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+--------------------+-----------------------------+--------------------------+--------------------+--------------------+-------------------+---------------+----------+-------------
    datauxtest |           article |                   0.01 | {"keys":"ALL", "rows_per_partition":"NONE"} | bfd51db0-4628-11e6-a329-4f04994f1f6a |             [] |         | org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy |                          {} | org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType(63617465676f7279:org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type))) | {"sstable_compression":"org.apache.cassandra.io.compress.LZ4Compressor"} |                    0 | org.apache.cassandra.db.marshal.BytesType |            null |           864000 |           null |    False |     ["author"] |                                                                                         org.apache.cassandra.db.marshal.UTF8Type |                      0.1 |                       32 |               2048 |                           0 |                        4 |                128 |                  0 |    99.0PERCENTILE |          null | Standard |        null
    datauxtest |             event |                   0.01 | {"keys":"ALL", "rows_per_partition":"NONE"} | 5d461c00-46cb-11e6-a329-4f04994f1f6a |         ["ts"] |         | org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy |                          {} |                                                                                                              org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.TimestampType,org.apache.cassandra.db.marshal.UTF8Type) | {"sstable_compression":"org.apache.cassandra.io.compress.LZ4Compressor"} |                    0 | org.apache.cassandra.db.marshal.BytesType |            null |           864000 |           null |    False | ["date","url"] | org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type) |                      0.1 |                       32 |               2048 |                           0 |                        4 |                128 |                  0 |    99.0PERCENTILE |          null | Standard |        null
    datauxtest |              user |                   0.01 | {"keys":"ALL", "rows_per_partition":"NONE"} | bfe91ae0-4628-11e6-a329-4f04994f1f6a |             [] |         | org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy |                          {} |       org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType(726f6c6573:org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type))) | {"sstable_compression":"org.apache.cassandra.io.compress.LZ4Compressor"} |                    0 | org.apache.cassandra.db.marshal.BytesType |            null |           864000 |           null |    False |         ["id"] |                                                                                         org.apache.cassandra.db.marshal.UTF8Type |                      0.1 |                       32 |               2048 |                           0 |                        4 |                128 |                  0 |    99.0PERCENTILE |          null | Standard |        null

select columnfamily_name AS cf, key_aliases from system.schema_columnfamilies WHERE keyspace_name = 'datauxtest';

-- Events Table
CREATE TABLE event (
  url varchar,
  ts timestamp,
  date text,
  jsondata text,
  PRIMARY KEY ((date, url), ts)
);

cqlsh:datauxtest> describe table datauxtest.event;

CREATE TABLE datauxtest.event (
    date text,
    url text,
    ts timestamp,
    jsondata text,
    PRIMARY KEY ((date, url), ts)
) WITH CLUSTERING ORDER BY (ts ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';



# this is from a cassandra 2.2.6
cqlsh:system> describe table system.schema_columnfamilies;

CREATE TABLE system.schema_columnfamilies (
    keyspace_name text,
    columnfamily_name text,
    bloom_filter_fp_chance double,
    caching text,
    cf_id uuid,
    comment text,
    compaction_strategy_class text,
    compaction_strategy_options text,
    comparator text,
    compression_parameters text,
    default_time_to_live int,
    default_validator text,
    dropped_columns map<text, bigint>,
    gc_grace_seconds int,
    is_dense boolean,
    key_validator text,
    local_read_repair_chance double,
    max_compaction_threshold int,
    max_index_interval int,
    memtable_flush_period_in_ms int,
    min_compaction_threshold int,
    min_index_interval int,
    read_repair_chance double,
    speculative_retry text,
    subcomparator text,
    type text,
    PRIMARY KEY (keyspace_name, columnfamily_name)
) WITH CLUSTERING ORDER BY (columnfamily_name ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = 'table definitions'
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 604800
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 3600000
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

# this is from cassandra 2.1.8

cqlsh:datauxtest> describe table system.schema_columnfamilies;

CREATE TABLE system.schema_columnfamilies (
    keyspace_name text,
    columnfamily_name text,
    bloom_filter_fp_chance double,
    caching text,
    cf_id uuid,
    column_aliases text,
    comment text,
    compaction_strategy_class text,
    compaction_strategy_options text,
    comparator text,
    compression_parameters text,
    default_time_to_live int,
    default_validator text,
    dropped_columns map<text, bigint>,
    gc_grace_seconds int,
    index_interval int,
    is_dense boolean,
    key_aliases text,
    key_validator text,
    local_read_repair_chance double,
    max_compaction_threshold int,
    max_index_interval int,
    memtable_flush_period_in_ms int,
    min_compaction_threshold int,
    min_index_interval int,
    read_repair_chance double,
    speculative_retry text,
    subcomparator text,
    type text,
    value_alias text,
    PRIMARY KEY (keyspace_name, columnfamily_name)
) WITH CLUSTERING ORDER BY (columnfamily_name ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = 'ColumnFamily definitions'
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 604800
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 3600000
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

Documentation

Overview

package Cassandra implements a data source (backend) to allow dataux to query cassandra via sql

Index

Constants

View Source
const (
	DataSourceLabel = "cassandra"
)

Variables

View Source
var (
	ErrNoSchema = fmt.Errorf("No schema or configuration exists")

	SchemaRefreshInterval = time.Duration(time.Minute * 5)
)
View Source
var (
	// Default page limit
	DefaultLimit = 5000
)

Functions

func NewCassDialect

func NewCassDialect() expr.DialectWriter

Types

type Mutator

type Mutator struct {
	// contains filtered or unexported fields
}

Mutator a cassandra mutator connection

type ResultReader

type ResultReader struct {
	*exec.TaskBase

	Total int
	Req   *SqlToCql
	// contains filtered or unexported fields
}

ResultReader implements result paging, reading

func NewResultReader

func NewResultReader(req *SqlToCql) *ResultReader

func (*ResultReader) Close

func (m *ResultReader) Close() error

func (*ResultReader) Run

func (m *ResultReader) Run() error

Runs the Cassandra Query

type ResultReaderNext

type ResultReaderNext struct {
	*ResultReader
}

A wrapper, allowing us to implement sql/driver Next() interface

which is different than qlbridge/datasource Next()

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source is a Cassandra datasource, this provides Reads, Insert, Update, Delete - singleton shared instance - creates connections to cassandra (connections perform queries) - provides schema info about cassandra keyspace

func (*Source) Close

func (m *Source) Close() error

func (*Source) DataSource

func (m *Source) DataSource() schema.Source

func (*Source) Init

func (m *Source) Init()

func (*Source) Open

func (m *Source) Open(tableName string) (schema.Conn, error)

func (*Source) Setup

func (m *Source) Setup(ss *schema.SchemaSource) error

func (*Source) Table

func (m *Source) Table(table string) (*schema.Table, error)

func (*Source) Tables

func (m *Source) Tables() []string

type SqlToCql

type SqlToCql struct {
	*exec.TaskBase
	// contains filtered or unexported fields
}

SqlToCql Convert a Sql Query to a cassandra CQL query - responsible for pushing down as much logic to Cql as possible - dialect translator

func NewSqlToCql

func NewSqlToCql(s *Source, t *schema.Table) *SqlToCql

NewSqlToCql create a SQL -> CQL ast converter

func (*SqlToCql) CreateMutator

func (m *SqlToCql) CreateMutator(pc interface{}) (schema.ConnMutator, error)

CreateMutator part of Mutator interface to allow data sources create a stateful

mutation context for update/delete operations.

func (*SqlToCql) Delete

func (m *SqlToCql) Delete(key driver.Value) (int, error)

Delete delete by row

func (*SqlToCql) DeleteExpression

func (m *SqlToCql) DeleteExpression(p interface{}, where expr.Node) (int, error)

DeleteExpression - delete by expression (where clause)

  • For where columns contained in Partition Keys we can push to cassandra
  • for others we might have to do a select -> delete

func (*SqlToCql) Put

func (m *SqlToCql) Put(ctx context.Context, key schema.Key, val interface{}) (schema.Key, error)

Put Interface for mutation (insert, update)

func (*SqlToCql) PutMulti

func (m *SqlToCql) PutMulti(ctx context.Context, keys []schema.Key, src interface{}) ([]schema.Key, error)

func (*SqlToCql) WalkExecSource

func (m *SqlToCql) WalkExecSource(p *plan.Source) (exec.Task, error)

WalkExecSource an interface of executor that allows this source to create its own execution Task so that it can push down as much as it can to cassandra.

func (*SqlToCql) WalkSourceSelect

func (m *SqlToCql) WalkSourceSelect(planner plan.Planner, p *plan.Source) (plan.Task, error)

WalkSourceSelect An interface implemented by this connection allowing the planner to push down as much logic into this source as possible

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL