datasource

package
v0.0.0-...-daaaa79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2015 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Register

func Register(name string, source DataSource)

Register makes a datasource available by the provided name. If Register is called twice with the same name or if source is nil, it panics.

func SourceIterChannel

func SourceIterChannel(iter Iterator, filter expr.Node, sigCh <-chan bool) <-chan Message

Types

type Aggregations

type Aggregations interface {
	DataSource
	Aggregate(expr.SqlStatement) error
}

type ContextSimple

type ContextSimple struct {
	Data map[string]value.Value
	// contains filtered or unexported fields
}

func NewContextSimple

func NewContextSimple() *ContextSimple

func NewContextSimpleData

func NewContextSimpleData(data map[string]value.Value) *ContextSimple

func NewContextSimpleTs

func NewContextSimpleTs(data map[string]value.Value, ts time.Time) *ContextSimple

func (*ContextSimple) All

func (m *ContextSimple) All() map[string]value.Value

func (*ContextSimple) Body

func (m *ContextSimple) Body() interface{}

func (*ContextSimple) Commit

func (m *ContextSimple) Commit(rowInfo []expr.SchemaInfo, row expr.RowWriter) error

func (*ContextSimple) Delete

func (m *ContextSimple) Delete(row map[string]value.Value) error

func (ContextSimple) Get

func (m ContextSimple) Get(key string) (value.Value, bool)

func (*ContextSimple) Key

func (m *ContextSimple) Key() uint64

func (*ContextSimple) Put

func (*ContextSimple) Row

func (m *ContextSimple) Row() map[string]value.Value

func (*ContextSimple) Ts

func (m *ContextSimple) Ts() time.Time

type ContextUrlValues

type ContextUrlValues struct {
	Data url.Values
	// contains filtered or unexported fields
}

func NewContextUrlValues

func NewContextUrlValues(uv url.Values) *ContextUrlValues

func NewContextUrlValuesTs

func NewContextUrlValuesTs(uv url.Values, ts time.Time) *ContextUrlValues

func (*ContextUrlValues) Body

func (m *ContextUrlValues) Body() interface{}

func (*ContextUrlValues) Delete

func (m *ContextUrlValues) Delete(delRow map[string]value.Value) error

func (ContextUrlValues) Get

func (m ContextUrlValues) Get(key string) (value.Value, bool)

func (*ContextUrlValues) Key

func (m *ContextUrlValues) Key() uint64

func (ContextUrlValues) Put

func (ContextUrlValues) Row

func (m ContextUrlValues) Row() map[string]value.Value

func (*ContextUrlValues) String

func (m *ContextUrlValues) String() string

func (ContextUrlValues) Ts

func (m ContextUrlValues) Ts() time.Time

type ContextWriterEmpty

type ContextWriterEmpty struct{}

func (*ContextWriterEmpty) Delete

func (m *ContextWriterEmpty) Delete(delRow map[string]value.Value) error

func (*ContextWriterEmpty) Put

type CsvDataSource

type CsvDataSource struct {
	// contains filtered or unexported fields
}

Csv DataStoure, implements qlbridge DataSource to scan through data

see interfaces possible but they are

func NewCsvSource

func NewCsvSource(ior io.Reader, exit <-chan bool) (*CsvDataSource, error)

Csv reader assumes we are getting first row as headers

func (*CsvDataSource) Close

func (m *CsvDataSource) Close() error

func (*CsvDataSource) Columns

func (m *CsvDataSource) Columns() []string

func (*CsvDataSource) CreateIterator

func (m *CsvDataSource) CreateIterator(filter expr.Node) Iterator

func (*CsvDataSource) MesgChan

func (m *CsvDataSource) MesgChan(filter expr.Node) <-chan Message

func (*CsvDataSource) Next

func (m *CsvDataSource) Next() Message

func (*CsvDataSource) Open

func (m *CsvDataSource) Open(connInfo string) (SourceConn, error)

func (*CsvDataSource) Tables

func (m *CsvDataSource) Tables() []string

type DataSource

type DataSource interface {
	Tables() []string
	Open(connInfo string) (SourceConn, error)
	Close() error
}

A datasource is most likely a database, file, api, in-mem data etc something that provides data rows. If the source is a regular database it can do its own Filter, Seek, Sort, etc. It may not implement all features of a database, in which case we will use our own execution engine.

Minimum Features:

  • Scanning: iterate through messages/rows, use expr to evaluate this is the minium we need to implement sql select
  • Schema Tables: at a minium tables available, the column level data can be introspected so is optional

Optional Features:

  • Seek ie, key-value lookup, or indexed rows
  • Projection ie, selecting specific fields
  • Where filtering response
  • GroupBy
  • Aggregations ie, count(*), avg() etc
  • Sort sort response, very important for fast joins

Non Select based Sql DML Operations:

  • Delete
  • Update
  • Upsert
  • Insert

DDL/Schema Operations

  • schema discovery
  • create
  • index

type DataSourceFeatures

type DataSourceFeatures struct {
	Features Features
	DataSource
}

func NewFeaturedSource

func NewFeaturedSource(src DataSource) *DataSourceFeatures

type DataSources

type DataSources struct {
	// contains filtered or unexported fields
}

Our internal map of different types of datasources that are registered for our runtime system to use

func DataSourcesRegistry

func DataSourcesRegistry() *DataSources

get registry of all datasource types

func (*DataSources) Get

func (m *DataSources) Get(sourceType string) *DataSourceFeatures

func (*DataSources) String

func (m *DataSources) String() string

type Features

type Features struct {
	Scan         bool
	Seek         bool
	Where        bool
	GroupBy      bool
	Sort         bool
	Aggregations bool
}

We do type introspection in advance to speed up runtime feature detection for datasources

type GroupBy

type GroupBy interface {
	DataSource
	GroupBy(expr.SqlStatement) error
}

type Iterator

type Iterator interface {
	Next() Message
}

simple iterator interface for paging through a datastore Messages/rows

  • used for scanning
  • for datasources that implement exec.Visitor() (ie, select) this represents the alreader filtered, calculated rows

type JsonWrapper

type JsonWrapper json.RawMessage

func (*JsonWrapper) MarshalJSON

func (m *JsonWrapper) MarshalJSON() ([]byte, error)

func (*JsonWrapper) Scan

func (m *JsonWrapper) Scan(src interface{}) error

func (*JsonWrapper) Unmarshal

func (m *JsonWrapper) Unmarshal(v interface{}) error

func (*JsonWrapper) UnmarshalJSON

func (m *JsonWrapper) UnmarshalJSON(data []byte) error

Unmarshall bytes into this typed struct

func (JsonWrapper) Value

func (m JsonWrapper) Value() (driver.Value, error)

This is the go sql/driver interface we need to implement to allow conversion back forth

type Message

type Message interface {
	Key() uint64
	Body() interface{}
}

represents a message routable by the topology. The Key() method is used to route the message in certain topologies. Body() is used to express something user specific. see "https://github.com/mdmarek/topo" AND http://github.com/lytics/grid

type Projection

type Projection interface {
	// Describe the Columns etc
	Projection() (*expr.Projection, error)
}

Some data sources that implement more features, can provide

their own projection.

type RuntimeConfig

type RuntimeConfig struct {
	Sources *DataSources // All registered DataSources from which we can create connections

	DisableRecover bool
	// contains filtered or unexported fields
}

The RuntimeSchema config providing access to available datasources

given connection info, get datasource

func NewRuntimeConfig

func NewRuntimeConfig() *RuntimeConfig

func (*RuntimeConfig) Conn

func (m *RuntimeConfig) Conn(db string) SourceConn

Get connection for given Database

@db      database name

func (*RuntimeConfig) DataSource

func (m *RuntimeConfig) DataSource(connInfo string) DataSource

given connection info, get datasource

@connInfo =    csv:///dev/stdin
               mockcsv

func (*RuntimeConfig) SetConnInfo

func (m *RuntimeConfig) SetConnInfo(connInfo string)

Our RunTime configuration possibly only supports a single schema/connection info. for example, the sql/driver interface, so will be set here.

@connInfo =    csv:///dev/stdin

type Scanner

type Scanner interface {
	ScannerColumns
	// create a new iterator for underlying datasource
	CreateIterator(filter expr.Node) Iterator
	MesgChan(filter expr.Node) <-chan Message
}

A scanner, most basic of data sources, just iterate through

rows without any optimizations

type ScannerColumns

type ScannerColumns interface {
	Columns() []string
}

Interface for a data source exposing column positions for []driver.Value iteration

type Seeker

type Seeker interface {
	DataSource
	// Just because we have Get, Multi-Get, doesn't mean we can seek all
	// expressions, find out.
	CanSeek(*expr.SqlSelect)
	Get(key string) Message
	MultiGet(keys []string) []Message
}

Interface for Seeking row values instead of scanning (ie, Indexed)

type Sort

type Sort interface {
	DataSource
	Sort(expr.SqlStatement) error
}

type SourceConn

type SourceConn interface {
	Close() error
}

Connection, only one guaranteed feature, although should implement many more (scan, seek, etc)

func OpenConn

func OpenConn(sourceName, sourceConfig string) (SourceConn, error)

Open a datasource

sourcename = "csv", "elasticsearch"

type SourcePlanner

type SourcePlanner interface {
	// Accept a sql statement, to plan the execution
	//  ideally, this would be done by planner but, we need
	//  source specific planners, as each backend has different features
	Accept(expr.SubVisitor) (Scanner, error)
}

Some sources can do their own planning

type SqlDriverMessage

type SqlDriverMessage struct {
	Vals []driver.Value
	Id   uint64
}

func (*SqlDriverMessage) Body

func (m *SqlDriverMessage) Body() interface{}

func (*SqlDriverMessage) Key

func (m *SqlDriverMessage) Key() uint64

type SqlDriverMessageMap

type SqlDriverMessageMap struct {
	Vals map[string]driver.Value
	Id   uint64
}

func NewSqlDriverMessageMap

func NewSqlDriverMessageMap() *SqlDriverMessageMap

func (*SqlDriverMessageMap) Body

func (m *SqlDriverMessageMap) Body() interface{}

func (*SqlDriverMessageMap) Get

func (m *SqlDriverMessageMap) Get(key string) (value.Value, bool)

func (*SqlDriverMessageMap) Key

func (m *SqlDriverMessageMap) Key() uint64

func (*SqlDriverMessageMap) Row

func (m *SqlDriverMessageMap) Row() map[string]value.Value

func (*SqlDriverMessageMap) Ts

func (m *SqlDriverMessageMap) Ts() time.Time

type StaticDataSource

type StaticDataSource struct {
	// contains filtered or unexported fields
}

Static DataSource, implements qlbridge DataSource to allow

in memory native go data to have a Schema and implement
other DataSource interfaces such as Open, Close

func NewStaticDataSource

func NewStaticDataSource(name string, data [][]driver.Value, cols []string) *StaticDataSource

func NewStaticDataValue

func NewStaticDataValue(data interface{}, name string) *StaticDataSource

func (*StaticDataSource) Close

func (m *StaticDataSource) Close() error

func (*StaticDataSource) Columns

func (m *StaticDataSource) Columns() []string

func (*StaticDataSource) CreateIterator

func (m *StaticDataSource) CreateIterator(filter expr.Node) Iterator

func (*StaticDataSource) MesgChan

func (m *StaticDataSource) MesgChan(filter expr.Node) <-chan Message

func (*StaticDataSource) Next

func (m *StaticDataSource) Next() Message

func (*StaticDataSource) Open

func (m *StaticDataSource) Open(connInfo string) (SourceConn, error)

func (*StaticDataSource) Tables

func (m *StaticDataSource) Tables() []string

type TimeValue

type TimeValue time.Time

func (*TimeValue) MarshalJSON

func (m *TimeValue) MarshalJSON() ([]byte, error)

func (*TimeValue) Scan

func (m *TimeValue) Scan(src interface{}) error

func (TimeValue) Time

func (m TimeValue) Time() time.Time

func (*TimeValue) Unmarshal

func (m *TimeValue) Unmarshal(v interface{}) error

func (*TimeValue) UnmarshalJSON

func (m *TimeValue) UnmarshalJSON(data []byte) error

func (TimeValue) Value

func (m TimeValue) Value() (driver.Value, error)

type UrlValuesMsg

type UrlValuesMsg struct {
	// contains filtered or unexported fields
}

func NewUrlValuesMsg

func NewUrlValuesMsg(id uint64, body *ContextUrlValues) *UrlValuesMsg

func (*UrlValuesMsg) Body

func (m *UrlValuesMsg) Body() interface{}

func (*UrlValuesMsg) Key

func (m *UrlValuesMsg) Key() uint64

func (*UrlValuesMsg) String

func (m *UrlValuesMsg) String() string

type ValueContextWrapper

type ValueContextWrapper struct {
	*SqlDriverMessage
	// contains filtered or unexported fields
}

func NewValueContextWrapper

func NewValueContextWrapper(msg *SqlDriverMessage, cols map[string]*expr.Column) *ValueContextWrapper

func (*ValueContextWrapper) Get

func (m *ValueContextWrapper) Get(key string) (value.Value, bool)

func (*ValueContextWrapper) Row

func (m *ValueContextWrapper) Row() map[string]value.Value

func (*ValueContextWrapper) Ts

func (m *ValueContextWrapper) Ts() time.Time

type WhereFilter

type WhereFilter interface {
	DataSource
	Filter(expr.SqlStatement) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL