Documentation ¶
Overview ¶
Package files is a cloud (gcs, s3) and local file datasource that translates json, csv, files into appropriate interface for qlbridge DataSource so we can run queries. Provides FileHandler interface to allow custom file type handling
Package files implements Cloud Files logic for getting, reading, and converting files into databases. It reads cloud(or local) files, gets lists of tables, and can scan through them using distributed query engine.
Index ¶
- Constants
- Variables
- func FileStoreLoader(ss *schema.Schema) (cloudstorage.StoreReader, error)
- func RegisterFileHandler(scannerType string, fh FileHandler)
- func RegisterFileStore(storeType string, fs FileStoreCreator)
- func SipPartitioner(partitionCt uint64, fi *FileInfo) int
- func TableFromFileAndPath(path, fileIn string) string
- type FileHandler
- type FileHandlerSchema
- type FileHandlerTables
- type FileInfo
- type FilePager
- func (m *FilePager) Close() error
- func (m *FilePager) Columns() []string
- func (m *FilePager) Next() schema.Message
- func (m *FilePager) NextFile() (*FileReader, error)
- func (m *FilePager) NextScanner() (schema.ConnScanner, error)
- func (m *FilePager) RunFetcher()
- func (m *FilePager) WalkExecSource(p *plan.Source) (exec.Task, error)
- type FileReader
- type FileReaderIterator
- type FileSource
- func (m *FileSource) Close() error
- func (m *FileSource) File(o cloudstorage.Object) *FileInfo
- func (m *FileSource) Init()
- func (m *FileSource) Open(tableName string) (schema.Conn, error)
- func (m *FileSource) Setup(ss *schema.Schema) error
- func (m *FileSource) Table(tableName string) (*schema.Table, error)
- func (m *FileSource) Tables() []string
- type FileStore
- type FileStoreCreator
- type FileTable
- type Partitioner
Constants ¶
const (
// SourceType is the registered Source name in the qlbridge source registry
SourceType = "cloudstore"
)
Variables ¶
var (
// Default file queue size to buffer by pager
FileBufferSize = 5
)
var ( // FileColumns are the default columns for the "file" table FileColumns = []string{"file", "table", "path", "partialpath", "size", "partition", "updated", "deleted", "filetype"} )
Functions ¶
func FileStoreLoader ¶
func FileStoreLoader(ss *schema.Schema) (cloudstorage.StoreReader, error)
FileStoreLoader defines the interface for loading files
func RegisterFileHandler ¶
func RegisterFileHandler(scannerType string, fh FileHandler)
RegisterFileHandler Register a FileHandler available by the provided @scannerType
func RegisterFileStore ¶
func RegisterFileStore(storeType string, fs FileStoreCreator)
RegisterFileStore global registry for Registering implementations of FileStore factories of the provided @storeType
func SipPartitioner ¶
func TableFromFileAndPath ¶
Find table name from full path of file, and the path of tables.
There are different "table" naming conventions we use to find table names.
- Support multiple partitioned files in folder which is name of table rootpath/tables/nameoftable/nameoftable1.csv rootpath/tables/nameoftable/nameoftable2.csv
- Suport Table as name of file inside folder rootpath/users.csv rootpath/accounts.csv
Types ¶
type FileHandler ¶
type FileHandler interface { Init(store FileStore, ss *schema.Schema) error // File Each time the underlying FileStore finds a new file it hands it off // to filehandler to determine if it is File or not (directory?), and to to extract any // metadata such as partition, and parse out fields that may exist in File/Folder path File(path string, obj cloudstorage.Object) *FileInfo // Create a scanner for particiular file Scanner(store cloudstorage.StoreReader, fr *FileReader) (schema.ConnScanner, error) // FileAppendColumns provides a method that this file-handler is going to provide additional // columns to the files list table, ie we are going to extract column info from the // folder paths, file-names. // For example: `tables/appearances/2017/appearances.csv may extract the "2017" as "year" // and append that as column to all rows. // Optional: may be nil FileAppendColumns() []string }
FileHandler defines an interface for developers to build new File processing to allow these custom file-types to be queried with SQL.
Features of Filehandler
- File() Converts a cloudstorage object to a FileInfo that describes the File.
- Scanner() create a file-scanner, will allow the scanner to implement any custom file-type reading (csv, protobuf, json, enrcyption).
The File Reading, Opening, Listing is a separate layer, see FileSource for the Cloudstorage layer.
So it is a a factory to create Scanners for a speciffic format type such as csv, json
func NewJsonHandler ¶
func NewJsonHandler(lh datasource.FileLineHandler) FileHandler
NewJsonHandler creates a json file handler for paging new-line delimited rows of json file
func NewJsonHandlerTables ¶
func NewJsonHandlerTables(lh datasource.FileLineHandler, tables []string) FileHandler
NewJsonHandler creates a json file handler for paging new-line delimited rows of json file
type FileHandlerSchema ¶
type FileHandlerSchema interface { FileHandler schema.SourceTableSchema }
FileHandlerSchema - file handlers may optionally provide info about tables contained
type FileHandlerTables ¶
type FileHandlerTables interface { FileHandler Tables() []*FileTable }
FileHandlerTables - file handlers may optionally provide info about tables contained in store
type FileInfo ¶
type FileInfo struct { Path string // Root path Name string // Name/Path of file PartialPath string // non-file-name part of path Table string // Table name this file participates in FileType string // csv, json, etc Partition int // which partition Size int // Content-Length size in bytes AppendCols []driver.Value // Additional Column info extracted from file name/folder path // contains filtered or unexported fields }
FileInfo describes a single file Say a folder of "./tables" is the "root path" specified then say it has folders for "table names" underneath a redundant "tables" ./tables/
/tables/ /appearances/appearances1.csv /players/players1.csv
Name = "tables/appearances/appearances1.csv" Table = "appearances" PartialPath = tables/appearances
func FileInfoFromCloudObject ¶
func FileInfoFromCloudObject(path string, obj cloudstorage.Object) *FileInfo
Convert a cloudstorage object to a File. Interpret the table name for given full file path.
type FilePager ¶
type FilePager struct { Limit int schema.ConnScanner // contains filtered or unexported fields }
FilePager acts like a Partitionied Data Source Conn, wrapping underlying FileSource and paging through list of files and only scanning those that match this pagers partition - by default the partitionct is -1 which means no partitioning
func NewFilePager ¶
func NewFilePager(tableName string, fs *FileSource) *FilePager
NewFilePager creates default new FilePager
func (*FilePager) Columns ¶
Columns part of Conn interface for providing columns for this table/conn
func (*FilePager) Next ¶
Next iterator for next message, wraps the file Scanner, Next file abstractions
func (*FilePager) NextFile ¶
func (m *FilePager) NextFile() (*FileReader, error)
NextFile gets next file
func (*FilePager) NextScanner ¶
func (m *FilePager) NextScanner() (schema.ConnScanner, error)
NextScanner provides the next scanner assuming that each scanner represents different file, and multiple files for single source
func (*FilePager) RunFetcher ¶
func (m *FilePager) RunFetcher()
type FileReader ¶
type FileReader struct { *FileInfo F io.ReadCloser // Actual file reader Exit chan bool // exit channel to shutdown reader }
FileReader file info and access to file to supply to ScannerMakers
type FileReaderIterator ¶
type FileReaderIterator interface { // NextFile returns io.EOF on last file NextFile() (*FileReader, error) }
FileReaderIterator defines a file source that can page through files getting next file from partition
type FileSource ¶
type FileSource struct { Partitioner string // random, ?? (date, keyed?) // contains filtered or unexported fields }
FileSource Source for reading files, and scanning them allowing the contents to be treated as a database, like doing a full table scan in mysql. But, you can partition across files.
- readers: gcs, local-fs
- tablesource: translate lists of files into tables. Normally we would have multiple files per table (ie partitioned, per-day, etc)
- scanners: responsible for file-specific
- files table: a "table" of all the files from this cloud source
func NewFileSource ¶
func NewFileSource() *FileSource
NewFileSource provides a singleton manager for a particular Source Schema, and File-Handler to read/manage all files from a source such as gcs folder x, s3 folder y
func (*FileSource) File ¶
func (m *FileSource) File(o cloudstorage.Object) *FileInfo
func (*FileSource) Init ¶
func (m *FileSource) Init()
func (*FileSource) Open ¶
func (m *FileSource) Open(tableName string) (schema.Conn, error)
Open a connection to given table, partition of Source interface
func (*FileSource) Setup ¶
func (m *FileSource) Setup(ss *schema.Schema) error
Setup the filesource with schema info
type FileStore ¶
type FileStore interface { cloudstorage.StoreReader }
FileStore Defines handler for reading Files, understanding folders and how to create scanners/formatters for files. Created by FileStoreCreator
FileStoreCreator(schema) -> FileStore
FileStore.Objects() -> File FileHandler(File) -> FileScanner FileScanner.Next() -> Row
type FileStoreCreator ¶
FileStoreCreator defines a Factory type for creating FileStore