Documentation ¶
Index ¶
- Variables
- func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)
- func CleanHeaderRow(header []string) []string
- func CompareColumns(columns1 []Column, columns2 []Column) (reshape bool, err error)
- func CreateDummyFields(numCols int) (cols []string)
- func GetISO8601DateMap(t time.Time) map[string]interface{}
- func IsDummy(columns []Column) bool
- func MakeRowsChan() chan []interface{}
- func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool) *jsonStream
- func Row(vals ...interface{}) []interface{}
- func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)
- func Unzip(src string, dest string) ([]string, error)
- type CSV
- func (c *CSV) InferSchema() error
- func (c *CSV) NewReader() (*io.PipeReader, error)
- func (c *CSV) Read() (data Dataset, err error)
- func (c *CSV) ReadStream() (ds *Datastream, err error)
- func (c *CSV) Sample(n int) (Dataset, error)
- func (c *CSV) SetFields(fields []string)
- func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
- type Column
- type ColumnStats
- type Columns
- type Compressor
- type CompressorType
- type Dataflow
- func (df *Dataflow) AddInBytes(bytes uint64)
- func (df *Dataflow) AddOutBytes(bytes uint64)
- func (df *Dataflow) Bytes() (inBytes, outBytes uint64)
- func (df *Dataflow) CleanUp()
- func (df *Dataflow) Close()
- func (df *Dataflow) Count() (cnt uint64)
- func (df *Dataflow) Defer(f func())
- func (df *Dataflow) DsTotalBytes() (bytes uint64)
- func (df *Dataflow) Err() (err error)
- func (df *Dataflow) IsClosed() bool
- func (df *Dataflow) IsEmpty() bool
- func (df *Dataflow) PushStreams(dss ...*Datastream)
- func (df *Dataflow) ResetStats()
- func (df *Dataflow) SetColumns(columns []Column)
- func (df *Dataflow) SetEmpty()
- func (df *Dataflow) Size() int
- func (df *Dataflow) SyncStats()
- func (df *Dataflow) WaitReady() error
- type Dataset
- func (data *Dataset) Append(row []interface{})
- func (data *Dataset) ColValues(col int) []interface{}
- func (data *Dataset) ColValuesStr(col int) []string
- func (data *Dataset) FirstRow() []interface{}
- func (data *Dataset) FirstVal() interface{}
- func (data *Dataset) GetFields() []string
- func (data *Dataset) InferColumnTypes()
- func (data *Dataset) Records() []map[string]interface{}
- func (data *Dataset) SetFields(fields []string)
- func (data *Dataset) Stream() *Datastream
- func (data *Dataset) ToJSONMap() map[string]interface{}
- func (data *Dataset) WriteCsv(path string) error
- type Datastream
- func MergeDataflow(df *Dataflow) (ds *Datastream)
- func NewDatastream(columns Columns) (ds *Datastream)
- func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
- func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
- func ReadCsvStream(path string) (ds *Datastream, err error)
- func (ds *Datastream) AddBytes(b int64)
- func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
- func (ds *Datastream) Clone() *Datastream
- func (ds *Datastream) Close()
- func (ds *Datastream) Collect(limit int) (Dataset, error)
- func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
- func (ds *Datastream) Defer(f func())
- func (ds *Datastream) Err() (err error)
- func (ds *Datastream) GetFields(toLower ...bool) []string
- func (ds *Datastream) IsClosed() bool
- func (ds *Datastream) IsEmpty() bool
- func (ds *Datastream) Map(newColumns Columns, transf func([]interface{}) []interface{}) (nDs *Datastream)
- func (ds *Datastream) MapParallel(transf func([]interface{}) []interface{}, numWorkers int) (nDs *Datastream)
- func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader
- func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)
- func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)
- func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader
- func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
- func (ds *Datastream) Push(row []interface{})
- func (ds *Datastream) Records() <-chan map[string]interface{}
- func (ds *Datastream) ResetStats()
- func (ds *Datastream) SetConfig(configMap map[string]string)
- func (ds *Datastream) SetEmpty()
- func (ds *Datastream) SetFields(fields []string)
- func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
- func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
- func (ds *Datastream) Start() (err error)
- func (ds *Datastream) WaitReady() error
- type GzipCompressor
- type Iterator
- type NoneCompressor
- type Parquet
- type SSHClient
- func (s *SSHClient) Close()
- func (s *SSHClient) Connect() (err error)
- func (s *SSHClient) GetOutput() (stdout string, stderr string)
- func (s *SSHClient) OpenPortForward() (localPort int, err error)
- func (s *SSHClient) RunAsProcess() (localPort int, err error)
- func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)
- type SnappyCompressor
- type StreamProcessor
- func (sp *StreamProcessor) CastRow(row []interface{}, columns []Column) []interface{}
- func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...string) string
- func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) CastType(val interface{}, typ string) interface{}
- func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
- func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ string) interface{}
- func (sp *StreamProcessor) GetType(val interface{}) (typ string)
- func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
- func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
- func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
- func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
- func (sp *StreamProcessor) SetConfig(configMap map[string]string)
- type ZStandardCompressor
Constants ¶
This section is empty.
Variables ¶
var ( // RemoveTrailingDecZeros removes the trailing zeros in CastToString RemoveTrailingDecZeros = false SampleSize = 900 )
Functions ¶
func AutoDecompress ¶ added in v0.2.3
AutoDecompress auto detexts compression to decompress. Otherwise return same reader
func CleanHeaderRow ¶
CleanHeaderRow cleans the header row from incompatible characters
func CompareColumns ¶
CompareColumns compared two columns to see if there are similar
func CreateDummyFields ¶
CreateDummyFields creates dummy columns for csvs with no header row
func GetISO8601DateMap ¶
GetISO8601DateMap return a map of date parts for string formatting
func MakeRowsChan ¶
func MakeRowsChan() chan []interface{}
MakeRowsChan returns a buffered channel with default size
func NewJSONStream ¶ added in v0.3.1
func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool) *jsonStream
func ScanCarrRet ¶
ScanCarrRet removes the \r runes that are without \n rightafter
Types ¶
type CSV ¶
type CSV struct { Path string NoHeader bool Delimiter rune Columns []Column File *os.File Data Dataset Reader io.Reader NoTrace bool // contains filtered or unexported fields }
CSV is a csv object
func (*CSV) NewReader ¶
func (c *CSV) NewReader() (*io.PipeReader, error)
NewReader creates a Reader
func (*CSV) ReadStream ¶
func (c *CSV) ReadStream() (ds *Datastream, err error)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV) WriteStream ¶
func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
WriteStream to CSV file
type Column ¶
type Column struct { Position int `json:"position"` Name string `json:"name"` Type string `json:"type"` DbType string `json:"-"` DbPrecision int `json:"-"` DbScale int `json:"-"` Sourced bool `json:"-"` // whether is was sourced from a typed source Stats ColumnStats `json:"stats"` ColType *sql.ColumnType `json:"-"` // contains filtered or unexported fields }
Column represents a schemata column
func InferFromStats ¶
InferFromStats using the stats to infer data types
func SyncColumns ¶
SyncColumns syncs two columns together
func (*Column) IsDatetime ¶
IsDatetime returns whether the column is a datetime object
type ColumnStats ¶
type ColumnStats struct { MinLen int MaxLen int MaxDecLen int Min int64 Max int64 NullCnt int64 IntCnt int64 DecCnt int64 BoolCnt int64 StringCnt int64 DateCnt int64 TotalCnt int64 Checksum uint64 }
ColumnStats holds statistics for a column
type Columns ¶
type Columns []Column
Columns represent many columns
func MakeColumns ¶ added in v0.0.5
MakeColumns makes columns from a struct
func NewColumnsFromFields ¶
NewColumnsFromFields creates Columns from fields
func (Columns) FieldMap ¶ added in v0.0.5
FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased
type Compressor ¶
type Compressor interface { Self() Compressor Compress(io.Reader) io.Reader Decompress(io.Reader) (io.Reader, error) Suffix() string }
Compressor implements differnt kind of compression
func NewCompressor ¶ added in v0.2.3
func NewCompressor(cpType CompressorType) Compressor
type CompressorType ¶
type CompressorType string
CompressorType is an int type for enum for the Compressor Type
const ( // AutoCompressorType is for auto compression AutoCompressorType CompressorType = "AUTO" // NoneCompressorType is for no compression NoneCompressorType CompressorType = "NONE" // ZipCompressorType is for Zip compression ZipCompressorType CompressorType = "ZIP" // GzipCompressorType is for Gzip compression GzipCompressorType CompressorType = "GZIP" // SnappyCompressorType is for Snappy compression SnappyCompressorType CompressorType = "SNAPPY" // ZStandardCompressorType is for ZStandard ZStandardCompressorType CompressorType = "ZSTD" )
type Dataflow ¶
type Dataflow struct { Columns []Column Buffer [][]interface{} StreamCh chan *Datastream Streams []*Datastream Context g.Context Limit uint64 InBytes uint64 OutBytes uint64 Ready bool Inferred bool FsURL string // contains filtered or unexported fields }
Dataflow is a collection of concurrent Datastreams
func MakeDataFlow ¶
func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)
MakeDataFlow create a dataflow from datastreams
func (*Dataflow) AddInBytes ¶ added in v0.3.0
AddInBytes add ingress bytes
func (*Dataflow) AddOutBytes ¶ added in v0.3.0
AddOutBytes add egress bytes
func (*Dataflow) CleanUp ¶ added in v0.3.0
func (df *Dataflow) CleanUp()
CleanUp refers the defer functions
func (*Dataflow) Defer ¶
func (df *Dataflow) Defer(f func())
Defer runs a given function as close of Dataflow
func (*Dataflow) DsTotalBytes ¶ added in v0.3.0
func (*Dataflow) PushStreams ¶
func (df *Dataflow) PushStreams(dss ...*Datastream)
PushStreams waits until each datastream is ready, then adds them to the dataflow. Should be launched as a goroutine
func (*Dataflow) SetColumns ¶
SetColumns sets the columns
func (*Dataflow) SetEmpty ¶
func (df *Dataflow) SetEmpty()
SetEmpty sets all underlying datastreams empty
type Dataset ¶
type Dataset struct { Result *sqlx.Rows Columns Columns Rows [][]interface{} SQL string Duration float64 Sp *StreamProcessor Inferred bool SafeInference bool NoTrace bool }
Dataset is a query returned dataset
func Collect ¶
func Collect(dss ...*Datastream) (data Dataset, err error)
Collect reads from one or more streams and return a dataset
func NewDatasetFromMap ¶
NewDatasetFromMap return a new dataset
func (*Dataset) ColValuesStr ¶
ColValuesStr returns the values of a one column as array or string
func (*Dataset) FirstRow ¶
func (data *Dataset) FirstRow() []interface{}
FirstRow returns the first row
func (*Dataset) FirstVal ¶
func (data *Dataset) FirstVal() interface{}
FirstVal returns the first value from the first row
func (*Dataset) InferColumnTypes ¶
func (data *Dataset) InferColumnTypes()
InferColumnTypes determines the columns types
func (*Dataset) Stream ¶
func (data *Dataset) Stream() *Datastream
Stream returns a datastream of the dataset
type Datastream ¶
type Datastream struct { Columns Columns Rows chan []interface{} Buffer [][]interface{} Count uint64 Context g.Context Ready bool Bytes uint64 Sp *StreamProcessor SafeInference bool NoTrace bool Inferred bool // contains filtered or unexported fields }
Datastream is a stream of rows
func MergeDataflow ¶
func MergeDataflow(df *Dataflow) (ds *Datastream)
MergeDataflow merges the dataflow streams into one
func NewDatastream ¶
func NewDatastream(columns Columns) (ds *Datastream)
NewDatastream return a new datastream
func NewDatastreamContext ¶
func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
NewDatastreamContext return a new datastream
func NewDatastreamIt ¶
func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
NewDatastreamIt with it
func ReadCsvStream ¶
func ReadCsvStream(path string) (ds *Datastream, err error)
ReadCsvStream reads CSV and returns datasream
func (*Datastream) AddBytes ¶
func (ds *Datastream) AddBytes(b int64)
AddBytes add bytes as processed
func (*Datastream) Chunk ¶
func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
Chunk splits the datastream into chunk datastreams (in sequence)
func (*Datastream) Clone ¶ added in v0.0.5
func (ds *Datastream) Clone() *Datastream
Clone returns a new datastream of the same source
func (*Datastream) Collect ¶
func (ds *Datastream) Collect(limit int) (Dataset, error)
Collect reads a stream and return a dataset limit of 0 is unlimited
func (*Datastream) ConsumeCsvReader ¶ added in v0.1.0
func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
ConsumeCsvReader uses the provided reader to stream rows
func (*Datastream) ConsumeJsonReader ¶ added in v0.1.0
func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream
func (*Datastream) ConsumeXmlReader ¶ added in v0.3.1
func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
ConsumeXmlReader uses the provided reader to stream XML This will put each XML rec as one string value so payload can be processed downstream
func (*Datastream) Defer ¶
func (ds *Datastream) Defer(f func())
Defer runs a given function as close of Datastream
func (*Datastream) GetFields ¶
func (ds *Datastream) GetFields(toLower ...bool) []string
GetFields return the fields of the Data
func (*Datastream) IsClosed ¶
func (ds *Datastream) IsClosed() bool
IsClosed is true is ds is closed
func (*Datastream) IsEmpty ¶
func (ds *Datastream) IsEmpty() bool
IsEmpty returns true is ds.Rows channel as empty
func (*Datastream) Map ¶
func (ds *Datastream) Map(newColumns Columns, transf func([]interface{}) []interface{}) (nDs *Datastream)
Map applies the provided function to every row and returns the result
func (*Datastream) MapParallel ¶
func (ds *Datastream) MapParallel(transf func([]interface{}) []interface{}, numWorkers int) (nDs *Datastream)
MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.
func (*Datastream) NewCsvBufferReader ¶
func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader
NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvBufferReaderChnl ¶
func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)
NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory
func (*Datastream) NewCsvBytesChnl ¶
func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)
NewCsvBytesChnl returns a channel yield chunk of bytes of csv
func (*Datastream) NewCsvReader ¶
func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader
NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvReaderChnl ¶
func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewJsonLinesReaderChnl ¶ added in v0.3.16
func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
NewJsonLinesReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewJsonReaderChnl ¶ added in v0.3.16
func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
NewJsonReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) Push ¶
func (ds *Datastream) Push(row []interface{})
Push return the fields of the Data
func (*Datastream) Records ¶
func (ds *Datastream) Records() <-chan map[string]interface{}
Records return rows of maps
func (*Datastream) SetConfig ¶
func (ds *Datastream) SetConfig(configMap map[string]string)
SetConfig sets the ds.config values
func (*Datastream) SetEmpty ¶
func (ds *Datastream) SetEmpty()
SetEmpty sets the ds.Rows channel as empty
func (*Datastream) SetFields ¶
func (ds *Datastream) SetFields(fields []string)
SetFields sets the fields/columns of the Datastream
func (*Datastream) Shape ¶
func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows
func (*Datastream) Split ¶ added in v0.3.0
func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
Split splits the datastream into parallel datastreams
func (*Datastream) Start ¶
func (ds *Datastream) Start() (err error)
Start generates the stream Should cycle the Iter Func until done
func (*Datastream) WaitReady ¶
func (ds *Datastream) WaitReady() error
WaitReady waits until datastream is ready
type GzipCompressor ¶ added in v0.2.3
type GzipCompressor struct { Compressor // contains filtered or unexported fields }
func (*GzipCompressor) Compress ¶ added in v0.2.3
func (cp *GzipCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*GzipCompressor) Decompress ¶ added in v0.2.3
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*GzipCompressor) Suffix ¶ added in v0.2.3
func (cp *GzipCompressor) Suffix() string
type Iterator ¶
type Iterator struct { Row []interface{} Counter uint64 Context g.Context Closed bool // contains filtered or unexported fields }
Iterator is the row provider for a datastream
type NoneCompressor ¶ added in v0.2.3
type NoneCompressor struct { Compressor // contains filtered or unexported fields }
func (*NoneCompressor) Compress ¶ added in v0.2.3
func (cp *NoneCompressor) Compress(reader io.Reader) io.Reader
func (*NoneCompressor) Decompress ¶ added in v0.2.3
func (*NoneCompressor) Suffix ¶ added in v0.2.3
func (cp *NoneCompressor) Suffix() string
type Parquet ¶
type Parquet struct { Path string Columns []Column File *os.File PFile source.ParquetFile Data *Dataset }
Parquet is a parquet object
func (*Parquet) ReadStream ¶
func (p *Parquet) ReadStream() (*Datastream, error)
ReadStream returns the read Parquet stream into a Datastream https://github.com/xitongsys/parquet-go/blob/master/example/read_partial.go
func (*Parquet) WriteStream ¶
func (p *Parquet) WriteStream(ds *Datastream) error
WriteStream to Parquet file from datastream
type SSHClient ¶
type SSHClient struct { Host string Port int User string Password string TgtHost string TgtPort int PrivateKey string Err error // contains filtered or unexported fields }
SSHClient is a client to connect to a ssh server with the main goal of forwarding ports
func (*SSHClient) OpenPortForward ¶
OpenPortForward forwards the port as specified
func (*SSHClient) RunAsProcess ¶
RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key
type SnappyCompressor ¶ added in v0.2.3
type SnappyCompressor struct { Compressor // contains filtered or unexported fields }
func (*SnappyCompressor) Compress ¶ added in v0.2.3
func (cp *SnappyCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*SnappyCompressor) Decompress ¶ added in v0.2.3
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*SnappyCompressor) Suffix ¶ added in v0.2.3
func (cp *SnappyCompressor) Suffix() string
type StreamProcessor ¶
type StreamProcessor struct { N uint64 // contains filtered or unexported fields }
StreamProcessor processes rows and values
func NewStreamProcessor ¶
func NewStreamProcessor() *StreamProcessor
NewStreamProcessor returns a new StreamProcessor
func (*StreamProcessor) CastRow ¶
func (sp *StreamProcessor) CastRow(row []interface{}, columns []Column) []interface{}
CastRow casts each value of a row
func (*StreamProcessor) CastToString ¶
func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...string) string
CastToString to string. used for csv writing
func (*StreamProcessor) CastToTime ¶
func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
CastToTime converts interface to time
func (*StreamProcessor) CastType ¶
func (sp *StreamProcessor) CastType(val interface{}, typ string) interface{}
CastType casts the type of an interface CastType is used to cast the interface place holders?
func (*StreamProcessor) CastVal ¶
func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/flarco/dbio/iop' -bench '^BenchmarkProcessVal'
func (*StreamProcessor) CastValWithoutStats ¶
func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ string) interface{}
CastValWithoutStats casts the value without counting stats
func (*StreamProcessor) GetType ¶
func (sp *StreamProcessor) GetType(val interface{}) (typ string)
GetType returns the type of an interface
func (*StreamProcessor) ParseString ¶
func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"
func (*StreamProcessor) ParseTime ¶
func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
ParseTime parses a date string and returns time.Time
func (*StreamProcessor) ParseVal ¶
func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
ParseVal parses the value into its appropriate type
func (*StreamProcessor) ProcessRow ¶
func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
ProcessRow processes a row
func (*StreamProcessor) ProcessVal ¶
func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
ProcessVal processes a value
func (*StreamProcessor) SetConfig ¶
func (sp *StreamProcessor) SetConfig(configMap map[string]string)
SetConfig sets the data.Sp.config values
type ZStandardCompressor ¶ added in v0.2.3
type ZStandardCompressor struct { Compressor // contains filtered or unexported fields }
func (*ZStandardCompressor) Compress ¶ added in v0.2.3
func (cp *ZStandardCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*ZStandardCompressor) Decompress ¶ added in v0.2.3
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*ZStandardCompressor) Suffix ¶ added in v0.2.3
func (cp *ZStandardCompressor) Suffix() string