Documentation ¶
Index ¶
- Variables
- func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)
- func CleanHeaderRow(header []string) []string
- func CleanName(name string) (newName string)
- func CompareColumns(columns1 Columns, columns2 Columns) (reshape bool, err error)
- func CreateDummyFields(numCols int) (cols []string)
- func DecimalByteArrayToString(dec []byte, precision int, scale int) string
- func EnsureBinDuckDB(version string) (binPath string, err error)
- func FormatValue(val any, column Column, connType dbio.Type) (newVal string)
- func GetISO8601DateMap(t time.Time) map[string]interface{}
- func IsDummy(columns []Column) bool
- func Iso8601ToGoLayout(dateFormat string) (goDateFormat string)
- func MakeDecNumScale(scale int) *big.Rat
- func MakeRowsChan() chan []any
- func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream
- func OpenTunnelSSH(tgtHost string, tgtPort int, tunnelURL, privateKey, passphrase string) (localPort int, err error)
- func Row(vals ...any) []any
- func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)
- func StrIntToBinary(num string, order string, length int, signed bool) string
- func StringToDecimalByteArray(s string, numSca *big.Rat, pType parquet.Type, length int) (decBytes []byte)
- func Unzip(src string, dest string) (nodes []map[string]any, err error)
- type Avro
- type Batch
- func (b *Batch) AddTransform(transf func(row []any) []any)
- func (b *Batch) Close()
- func (b *Batch) ColumnsChanged() bool
- func (b *Batch) Ds() *Datastream
- func (b *Batch) ID() string
- func (b *Batch) IsFirst() bool
- func (b *Batch) Push(row []any)
- func (b *Batch) Shape(tgtColumns Columns, pause ...bool) (err error)
- type BatchReader
- type CSV
- func (c *CSV) InferSchema() error
- func (c *CSV) NewReader() (*io.PipeReader, error)
- func (c *CSV) Read() (data Dataset, err error)
- func (c *CSV) ReadStream() (ds *Datastream, err error)
- func (c *CSV) ReadStreamContext(ctx context.Context) (ds *Datastream, err error)
- func (c *CSV) Sample(n int) (Dataset, error)
- func (c *CSV) SetFields(fields []string)
- func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
- type Column
- func (col *Column) EvaluateConstraint(value any, sp *StreamProcessor) (err error)
- func (col *Column) GetNativeType(t dbio.Type) (nativeType string, err error)
- func (col *Column) GoType() reflect.Type
- func (col *Column) HasNulls() bool
- func (col *Column) HasNullsPlus1() bool
- func (col *Column) IsBinary() bool
- func (col *Column) IsBool() bool
- func (col *Column) IsDate() bool
- func (col *Column) IsDatetime() bool
- func (col *Column) IsDecimal() bool
- func (col *Column) IsFloat() bool
- func (col *Column) IsInteger() bool
- func (col *Column) IsKeyType(keyType KeyType) bool
- func (col *Column) IsNumber() bool
- func (col *Column) IsString() bool
- func (col *Column) IsUnique() bool
- func (col *Column) Key() string
- func (col *Column) SetConstraint()
- func (col *Column) SetLengthPrecisionScale()
- func (col *Column) SetMetadata(key string, value string)
- type ColumnCasing
- type ColumnConstraint
- type ColumnStats
- type ColumnType
- func (ct ColumnType) IsBinary() bool
- func (ct ColumnType) IsBool() bool
- func (ct ColumnType) IsDate() bool
- func (ct ColumnType) IsDatetime() bool
- func (ct ColumnType) IsDecimal() bool
- func (ct ColumnType) IsFloat() bool
- func (ct ColumnType) IsInteger() bool
- func (ct ColumnType) IsJSON() bool
- func (ct ColumnType) IsNumber() bool
- func (ct ColumnType) IsString() bool
- func (ct ColumnType) IsValid() bool
- type Columns
- func (cols Columns) Clone() (newCols Columns)
- func (cols Columns) Coerce(castCols Columns, hasHeader bool) (newCols Columns)
- func (cols Columns) Data(includeParent bool) (fields []string, rows [][]any)
- func (cols Columns) Dataset() Dataset
- func (cols Columns) DbTypes(args ...bool) []string
- func (cols Columns) FieldMap(toLower bool) map[string]int
- func (cols Columns) GetColumn(name string) *Column
- func (cols Columns) GetKeys(keyType KeyType) Columns
- func (cols Columns) GetMissing(newCols ...Column) (missing Columns)
- func (cols Columns) IsDifferent(newCols Columns) bool
- func (cols Columns) IsDummy() bool
- func (cols Columns) IsSimilarTo(otherCols Columns) bool
- func (cols Columns) JSON(includeParent bool) (output string)
- func (cols Columns) Keys() []string
- func (cols Columns) MakeRec(row []any) map[string]any
- func (cols Columns) MakeShaper(tgtColumns Columns) (shaper *Shaper, err error)
- func (cols Columns) Merge(newCols Columns, overwrite bool) (col2 Columns, added schemaChg, changed []schemaChg)
- func (cols Columns) Names(args ...bool) []string
- func (cols Columns) PrettyTable(includeParent bool) (output string)
- func (cols Columns) SetKeys(keyType KeyType, colNames ...string) (err error)
- func (cols Columns) SetMetadata(key, value string, colNames ...string) (err error)
- func (cols Columns) Sourced() (sourced bool)
- func (cols Columns) Types(args ...bool) []string
- func (cols Columns) WithoutMeta() (newCols Columns)
- type Compressor
- type CompressorType
- type ConstraintEvalFunc
- type CsvDuckDb
- type Dataflow
- func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)
- func (df *Dataflow) AddEgressBytes(bytes uint64)
- func (df *Dataflow) BufferDataset() Dataset
- func (df *Dataflow) Bytes() (inBytes, outBytes uint64)
- func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool
- func (df *Dataflow) CleanUp()
- func (df *Dataflow) Close()
- func (df *Dataflow) CloseCurrentBatches()
- func (df *Dataflow) Collect() (data Dataset, err error)
- func (df *Dataflow) Count() (cnt uint64)
- func (df *Dataflow) Defer(f func())
- func (df *Dataflow) DsTotalBytes() (bytes uint64)
- func (df *Dataflow) Err() (err error)
- func (df *Dataflow) IsClosed() bool
- func (df *Dataflow) IsEmpty() bool
- func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)
- func (df *Dataflow) MergeColumns(columns []Column, inferred bool) (processOk bool)
- func (df *Dataflow) Pause(exceptDs ...string) bool
- func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
- func (df *Dataflow) SetBatchLimit(limit int64)
- func (df *Dataflow) SetConfig(cfg StreamConfig)
- func (df *Dataflow) SetEmpty()
- func (df *Dataflow) SetReady()
- func (df *Dataflow) Size() int
- func (df *Dataflow) StreamConfig() (cfg StreamConfig)
- func (df *Dataflow) SyncColumns()
- func (df *Dataflow) SyncStats()
- func (df *Dataflow) Unpause(exceptDs ...string)
- func (df *Dataflow) WaitClosed()
- func (df *Dataflow) WaitReady() error
- type Dataset
- func (data *Dataset) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (data *Dataset) Append(row ...[]any)
- func (data *Dataset) ColValues(col int) []interface{}
- func (data *Dataset) ColValuesStr(col int) []string
- func (data *Dataset) FirstRow() []interface{}
- func (data *Dataset) FirstVal() interface{}
- func (data *Dataset) GetFields(lower ...bool) []string
- func (data *Dataset) InferColumnTypes()
- func (data *Dataset) Pick(colNames ...string) (nData Dataset)
- func (data *Dataset) PrettyTable(fields ...string) (output string)
- func (data *Dataset) Print(limit int)
- func (data *Dataset) Records(lower ...bool) []map[string]interface{}
- func (data *Dataset) RecordsCasted(lower ...bool) []map[string]interface{}
- func (data *Dataset) RecordsString(lower ...bool) []map[string]string
- func (data *Dataset) SetFields(fields []string)
- func (data *Dataset) Sort(args ...any)
- func (data *Dataset) Stream(Props ...map[string]string) *Datastream
- func (data *Dataset) ToJSONMap() map[string]interface{}
- func (data *Dataset) WriteCsv(dest io.Writer) (tbw int, err error)
- type Datastream
- func MergeDataflow(df *Dataflow) (dsN *Datastream)
- func NewDatastream(columns Columns) (ds *Datastream)
- func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
- func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
- func ReadCsvStream(path string) (ds *Datastream, err error)
- func (ds *Datastream) AddBytes(b int64)
- func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (ds *Datastream) CastRowToString(row []any) []string
- func (ds *Datastream) CastRowToStringSafe(row []any) []string
- func (ds *Datastream) CastToStringSafeMask(row []any) []string
- func (ds *Datastream) ChangeColumn(i int, newType ColumnType)
- func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
- func (ds *Datastream) Close()
- func (ds *Datastream) Collect(limit int) (Dataset, error)
- func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)
- func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (err error)
- func (ds *Datastream) ConsumeCsvReaderDuckDb(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeDeltaReader(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeExcelReader(reader io.Reader, props map[string]string) (err error)
- func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[string]string) (err error)
- func (ds *Datastream) ConsumeIcebergReader(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXML bool) (err error)
- func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeParquetReaderDuckDb(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error)
- func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)
- func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
- func (ds *Datastream) Defer(f func())
- func (ds *Datastream) Df() *Dataflow
- func (ds *Datastream) Err() (err error)
- func (ds *Datastream) GetConfig() (configMap map[string]string)
- func (ds *Datastream) GetFields(args ...bool) []string
- func (ds *Datastream) IsClosed() bool
- func (ds *Datastream) LatestBatch() *Batch
- func (ds *Datastream) Limited(limit ...int) bool
- func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)
- func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)
- func (ds *Datastream) NewBatch(columns Columns) *Batch
- func (ds *Datastream) NewCsvBufferReader(sc StreamConfig) *bytes.Reader
- func (ds *Datastream) NewCsvBufferReaderChnl(sc StreamConfig) (readerChn chan *bytes.Reader)
- func (ds *Datastream) NewCsvBytesChnl(sc StreamConfig) (dataChn chan *[]byte)
- func (ds *Datastream) NewCsvReader(sc StreamConfig) *io.PipeReader
- func (ds *Datastream) NewCsvReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) NewExcelReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator
- func (ds *Datastream) NewJsonLinesReaderChnl(sc StreamConfig) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewJsonReaderChnl(sc StreamConfig) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewParquetArrowReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) NewParquetReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) Pause()
- func (ds *Datastream) Push(row []any)
- func (ds *Datastream) Records() <-chan map[string]any
- func (ds *Datastream) Rows() chan []any
- func (ds *Datastream) SetConfig(configMap map[string]string)
- func (ds *Datastream) SetEmpty()
- func (ds *Datastream) SetFields(fields []string)
- func (ds *Datastream) SetFileURI()
- func (ds *Datastream) SetIterator(it *Iterator)
- func (ds *Datastream) SetMetadata(jsonStr string)
- func (ds *Datastream) SetReady()
- func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
- func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
- func (ds *Datastream) Start() (err error)
- func (ds *Datastream) TryPause() bool
- func (ds *Datastream) Unpause()
- func (ds *Datastream) WaitClosed()
- func (ds *Datastream) WaitReady() error
- type DeltaReader
- type DuckDb
- func (duck *DuckDb) AddExtension(extension string)
- func (duck *DuckDb) Close() error
- func (duck *DuckDb) Describe(query string) (columns Columns, err error)
- func (duck *DuckDb) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (duck *DuckDb) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (duck *DuckDb) ExecMultiContext(ctx context.Context, sqls ...string) (result sql.Result, err error)
- func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options DuckDbCopyOptions) (sql string, err error)
- func (duck *DuckDb) GetProp(key string) string
- func (duck *DuckDb) GetScannerFunc(format dbio.FileType) (scanFunc string)
- func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStreamConfig) (sql string)
- func (duck *DuckDb) Open(timeOut ...int) (err error)
- func (duck *DuckDb) PrepareFsSecretAndURI(uri string) string
- func (duck *DuckDb) Props() map[string]string
- func (duck *DuckDb) Query(sql string, options ...map[string]interface{}) (data Dataset, err error)
- func (duck *DuckDb) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (data Dataset, err error)
- func (duck *DuckDb) Quote(col string) (qName string)
- func (duck *DuckDb) SetProp(key string, value string)
- func (duck *DuckDb) Stream(sql string, options ...map[string]interface{}) (ds *Datastream, err error)
- func (duck *DuckDb) StreamContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *Datastream, err error)
- func (duck *DuckDb) SubmitSQL(sql string, showChanges bool) (err error)
- type DuckDbCopyOptions
- type Excel
- func (xls *Excel) GetDataset(sheet string) (data Dataset)
- func (xls *Excel) GetDatasetFromRange(sheet, cellRange string) (data Dataset, err error)
- func (xls *Excel) RefreshSheets() (err error)
- func (xls *Excel) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
- func (xls *Excel) WriteToFile(path string) (err error)
- func (xls *Excel) WriteToWriter(w io.Writer) (err error)
- type FileStreamConfig
- type GoogleSheet
- func (ggs *GoogleSheet) DeleteSheet(shtName string) (err error)
- func (ggs *GoogleSheet) GetDataset(shtName string) (data Dataset, err error)
- func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRange string) (data Dataset, err error)
- func (ggs *GoogleSheet) RefreshSheets() (err error)
- func (ggs *GoogleSheet) URL() string
- func (ggs *GoogleSheet) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
- type GzipCompressor
- type IcebergReader
- type Iterator
- type KeyType
- type KeyValue
- type Metadata
- type NoneCompressor
- type Parquet
- type ParquetArrowDumper
- type ParquetArrowReader
- type ParquetArrowWriter
- type ParquetDuckDb
- type ParquetWriter
- type ReaderReady
- type RecNode
- func (rn *RecNode) Compression() compress.Codec
- func (rn *RecNode) Encoding() encoding.Encoding
- func (rn *RecNode) Fields() []parquet.Field
- func (rn *RecNode) GoType() reflect.Type
- func (rn *RecNode) ID() int
- func (rn *RecNode) Leaf() bool
- func (rn *RecNode) Optional() bool
- func (rn *RecNode) Repeated() bool
- func (rn *RecNode) Required() bool
- func (rn *RecNode) String() string
- func (rn *RecNode) Type() parquet.Type
- type Record
- type SAS
- type SSHClient
- func (s *SSHClient) Close()
- func (s *SSHClient) Connect() (err error)
- func (s *SSHClient) GetOutput() (stdout string, stderr string)
- func (s *SSHClient) OpenPortForward() (localPort int, err error)
- func (s *SSHClient) RunAsProcess() (localPort int, err error)
- func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)
- type Shaper
- type SnappyCompressor
- type StreamConfig
- type StreamProcessor
- func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}
- func (sp *StreamProcessor) CastToBool(i interface{}) (b bool, err error)
- func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToStringSafe(i int, val interface{}, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToStringSafeMask(i int, val interface{}, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}
- func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
- func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}
- func (sp *StreamProcessor) ColStats() map[int]*ColumnStats
- func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)
- func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
- func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
- func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
- func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
- func (sp *StreamProcessor) ResetConfig()
- func (sp *StreamProcessor) SetConfig(configMap map[string]string)
- type Transform
- type TransformList
- type Transformers
- type ZStandardCompressor
Constants ¶
This section is empty.
Variables ¶
var ( // RemoveTrailingDecZeros removes the trailing zeros in CastToString RemoveTrailingDecZeros = false SampleSize = 900 )
var ( DuckDbVersion = "1.1.1" DuckDbUseTempFile = false )
var ( TransformDecodeLatin1 = Transform{ Name: "decode_latin1", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeISO8859_1, val) return newVal, err }, } TransformDecodeLatin5 = Transform{ Name: "decode_latin5", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeISO8859_5, val) return newVal, err }, } TransformDecodeLatin9 = Transform{ Name: "decode_latin9", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeISO8859_15, val) return newVal, err }, } TransformDecodeUtf8 = Transform{ Name: "decode_utf8", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeUTF8, val) return newVal, err }, } TransformDecodeUtf8Bom = Transform{ Name: "decode_utf8_bom", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeUTF8BOM, val) return newVal, err }, } TransformDecodeUtf16 = Transform{ Name: "decode_utf16", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeUTF16, val) return newVal, err }, } TransformDecodeWindows1250 = Transform{ Name: "decode_windows1250", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeWindows1250, val) return newVal, err }, } TransformDecodeWindows1252 = Transform{ Name: "decode_windows1252", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.DecodeWindows1252, val) return newVal, err }, } TransformDuckdbListToText = Transform{ Name: "duckdb_list_to_text", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.duckDbListAsText(val), nil }, } TransformEncodeLatin1 = Transform{ Name: "encode_latin1", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.EncodeISO8859_1, val) return newVal, err }, } TransformEncodeLatin5 = Transform{ Name: "encode_latin5", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.EncodeISO8859_5, val) return newVal, err }, } TransformEncodeLatin9 = Transform{ Name: "encode_latin9", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.EncodeISO8859_15, val) return newVal, err }, } TransformEncodeUtf8 = Transform{ Name: "encode_utf8", FuncString: func(sp *StreamProcessor, val string) (string, error) { return fmt.Sprintf("%q", val), nil newVal, _, err := transform.String(sp.transformers.EncodeUTF8, val) return newVal, err }, } TransformEncodeUtf8Bom = Transform{ Name: "encode_utf8_bom", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.EncodeUTF8BOM, val) return newVal, err }, } TransformEncodeUtf16 = Transform{ Name: "encode_utf16", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.EncodeUTF16, val) return newVal, err }, } TransformEncodeWindows1250 = Transform{ Name: "encode_windows1250", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.EncodeWindows1250, val) return newVal, err }, } TransformEncodeWindows1252 = Transform{ Name: "encode_windows1252", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.EncodeWindows1252, val) return newVal, err }, } TransformHashMd5 = Transform{ Name: "hash_md5", FuncString: func(sp *StreamProcessor, val string) (string, error) { return g.MD5(val), nil }, } TransformHashSha256 = Transform{ Name: "hash_sha256", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.SHA256(val), nil }, } TransformHashSha512 = Transform{ Name: "hash_sha512", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.SHA512(val), nil }, } TransformParseBit = Transform{ Name: "parse_bit", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.ParseBit(sp, val) }, } TransformParseFix = Transform{ Name: "parse_fix", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.ParseFIX(sp, val) }, } TransformParseUuid = Transform{ Name: "parse_uuid", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.ParseUUID(sp, val) }, } TransformParseMsUuid = Transform{ Name: "parse_ms_uuid", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.ParseMsUUID(sp, val) }, } TransformReplace0x00 = Transform{ Name: "replace_0x00", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.Replace0x00(sp, val) }, } TransformReplaceAccents = Transform{ Name: "replace_accents", FuncString: func(sp *StreamProcessor, val string) (string, error) { newVal, _, err := transform.String(sp.transformers.Accent, val) return newVal, err }, } TransformReplaceNonPrintable = Transform{ Name: "replace_non_printable", FuncString: func(sp *StreamProcessor, val string) (string, error) { return Transforms.ReplaceNonPrintable(val), nil }, } TransformTrimSpace = Transform{ Name: "trim_space", FuncString: func(sp *StreamProcessor, val string) (string, error) { return strings.TrimSpace(val), nil }, } // used as lookup, cannot return null since is not pointer TransformEmptyAsNull = Transform{ Name: "empty_as_null", FuncString: func(sp *StreamProcessor, val string) (string, error) { return val, nil }, } TransformSetTimezone = Transform{ Name: "set_timezone", // contains filtered or unexported fields } )
var AllCompressorType = []struct { Value CompressorType TSName string }{ {AutoCompressorType, "AutoCompressorType"}, {NoneCompressorType, "NoneCompressorType"}, {ZipCompressorType, "ZipCompressorType"}, {GzipCompressorType, "GzipCompressorType"}, {SnappyCompressorType, "SnappyCompressorType"}, {ZStandardCompressorType, "ZStandardCompressorType"}, }
var KeyTypes = []KeyType{AggregateKey, ClusterKey, DuplicateKey, HashKey, IndexKey, PartitionKey, PrimaryKey, SortKey, UniqueKey, UpdateKey}
var Transforms transformsNS
var TransformsMap = map[string]Transform{}
Functions ¶
func AutoDecompress ¶
AutoDecompress auto detects compression to decompress. Otherwise return same reader
func CleanHeaderRow ¶
CleanHeaderRow cleans the header row from incompatible characters
func CompareColumns ¶
CompareColumns compared two columns to see if there are similar
func CreateDummyFields ¶
CreateDummyFields creates dummy columns for csvs with no header row
func DecimalByteArrayToString ¶ added in v1.1.6
DecimalByteArrayToString converts bytes to decimal string from https://github.com/xitongsys/parquet-go/blob/8ca067b2bd324788a77bf61d4e1ef9a5f8b4b1d2/types/converter.go#L131
func EnsureBinDuckDB ¶ added in v1.2.16
EnsureBinDuckDB ensures duckdb binary exists if missing, downloads and uses
func FormatValue ¶ added in v1.2.21
FormatValue format as sql expression (adds quotes)
func GetISO8601DateMap ¶
GetISO8601DateMap return a map of date parts for string formatting
func Iso8601ToGoLayout ¶
https://www.w3.org/QA/Tips/iso-date https://www.w3.org/TR/NOTE-datetime https://www.iso.org/iso-8601-date-and-time-format.html
func MakeDecNumScale ¶ added in v1.1.6
func MakeRowsChan ¶
func MakeRowsChan() chan []any
MakeRowsChan returns a buffered channel with default size
func NewJSONStream ¶
func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream
func OpenTunnelSSH ¶ added in v1.2.15
func ScanCarrRet ¶
ScanCarrRet removes the \r runes that are without \n rightafter
func StrIntToBinary ¶ added in v1.1.6
order=LittleEndian or BigEndian; length is byte num
func StringToDecimalByteArray ¶ added in v1.1.6
func StringToDecimalByteArray(s string, numSca *big.Rat, pType parquet.Type, length int) (decBytes []byte)
StringToDecimalByteArray converts a string decimal to bytes improvised from https://github.com/xitongsys/parquet-go/blob/8ca067b2bd324788a77bf61d4e1ef9a5f8b4b1d2/types/types.go#L81 This function is costly, and slows write dramatically. TODO: Find ways to optimize, if possible
Types ¶
type Avro ¶
type Avro struct { Path string Reader *goavro.OCFReader Data *Dataset // contains filtered or unexported fields }
Avro is a avro` object
func NewAvroStream ¶
func NewAvroStream(reader io.ReadSeeker, columns Columns) (a *Avro, err error)
type Batch ¶
type Batch struct { Columns Columns Rows chan []any Previous *Batch Count int64 Limit int64 // contains filtered or unexported fields }
func (*Batch) AddTransform ¶
func (*Batch) ColumnsChanged ¶
func (*Batch) Ds ¶
func (b *Batch) Ds() *Datastream
type BatchReader ¶
type CSV ¶
type CSV struct { Path string NoHeader bool Delimiter rune Escape string Quote string FieldsPerRecord int Columns []Column File *os.File Data Dataset Reader io.Reader Config map[string]string NoDebug bool // contains filtered or unexported fields }
CSV is a csv object
func (*CSV) NewReader ¶
func (c *CSV) NewReader() (*io.PipeReader, error)
NewReader creates a Reader
func (*CSV) ReadStream ¶
func (c *CSV) ReadStream() (ds *Datastream, err error)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV) ReadStreamContext ¶ added in v1.1.15
func (c *CSV) ReadStreamContext(ctx context.Context) (ds *Datastream, err error)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV) WriteStream ¶
func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
WriteStream to CSV file
type Column ¶
type Column struct { Position int `json:"position"` Name string `json:"name"` Type ColumnType `json:"type"` DbType string `json:"db_type,omitempty"` DbPrecision int `json:"db_precision,omitempty"` DbScale int `json:"db_scale,omitempty"` Sourced bool `json:"-"` // whether col was sourced/inferred from a typed source Stats ColumnStats `json:"stats,omitempty"` Table string `json:"table,omitempty"` Schema string `json:"schema,omitempty"` Database string `json:"database,omitempty"` Description string `json:"description,omitempty"` FileURI string `json:"file_uri,omitempty"` Constraint *ColumnConstraint `json:"constraint,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` // contains filtered or unexported fields }
Column represents a schemata column
func InferFromStats ¶
InferFromStats using the stats to infer data types
func (*Column) EvaluateConstraint ¶ added in v1.2.16
func (col *Column) EvaluateConstraint(value any, sp *StreamProcessor) (err error)
EvaluateConstraint evaluates a value against the constraint function
func (*Column) GetNativeType ¶ added in v1.2.16
GetNativeType returns the native column type from generic
func (*Column) HasNullsPlus1 ¶ added in v1.1.7
HasNullsPlus1 denotes when a column is all nulls plus 1 non-null
func (*Column) IsDatetime ¶
IsDatetime returns whether the column is a datetime object
func (*Column) SetConstraint ¶ added in v1.2.16
func (col *Column) SetConstraint()
func (*Column) SetLengthPrecisionScale ¶
func (col *Column) SetLengthPrecisionScale()
SetLengthPrecisionScale parse length, precision, scale
func (*Column) SetMetadata ¶
type ColumnCasing ¶ added in v1.2.22
type ColumnCasing string
ColumnCasing is the casing method to use
const ( SourceColumnCasing ColumnCasing = "source" // keeps source column name casing. The default. TargetColumnCasing ColumnCasing = "target" // converts casing according to target database. Lower-case for files. SnakeColumnCasing ColumnCasing = "snake" // converts snake casing according to target database. Lower-case for files. UpperColumnCasing ColumnCasing = "upper" // make it upper case LowerColumnCasing ColumnCasing = "lower" // make it lower case )
func (*ColumnCasing) Apply ¶ added in v1.2.22
func (cc *ColumnCasing) Apply(name string, tgtConnType dbio.Type) string
Apply applies column casing to provided name. If cc is nil or SourceColumnCasing, it returns the original value
func (*ColumnCasing) Equals ¶ added in v1.2.22
func (cc *ColumnCasing) Equals(val ColumnCasing) bool
Equals evaluates equality for column casing (pointer safe)
func (*ColumnCasing) IsEmpty ¶ added in v1.2.22
func (cc *ColumnCasing) IsEmpty() bool
IsEmpty return true if nil or blank
type ColumnConstraint ¶ added in v1.2.16
type ColumnConstraint struct { Expression string `json:"expression,omitempty"` Errors []string `json:"errors,omitempty"` FailCnt uint64 `json:"fail_cnt,omitempty"` EvalFunc ConstraintEvalFunc `json:"-"` }
type ColumnStats ¶
type ColumnStats struct { MinLen int `json:"min_len,omitempty"` MaxLen int `json:"max_len,omitempty"` MaxDecLen int `json:"max_dec_len,omitempty"` Min int64 `json:"min"` Max int64 `json:"max"` NullCnt int64 `json:"null_cnt"` IntCnt int64 `json:"int_cnt,omitempty"` DecCnt int64 `json:"dec_cnt,omitempty"` BoolCnt int64 `json:"bool_cnt,omitempty"` JsonCnt int64 `json:"json_cnt,omitempty"` StringCnt int64 `json:"string_cnt,omitempty"` DateCnt int64 `json:"date_cnt,omitempty"` DateTimeCnt int64 `json:"datetime_cnt,omitempty"` DateTimeZCnt int64 `json:"datetimez_cnt,omitempty"` TotalCnt int64 `json:"total_cnt"` UniqCnt int64 `json:"uniq_cnt"` Checksum uint64 `json:"checksum"` }
ColumnStats holds statistics for a column
func (*ColumnStats) DistinctPercent ¶
func (cs *ColumnStats) DistinctPercent() float64
func (*ColumnStats) DuplicateCount ¶
func (cs *ColumnStats) DuplicateCount() int64
func (*ColumnStats) DuplicatePercent ¶
func (cs *ColumnStats) DuplicatePercent() float64
type ColumnType ¶
type ColumnType string
const ( BigIntType ColumnType = "bigint" BinaryType ColumnType = "binary" BoolType ColumnType = "bool" DateType ColumnType = "date" DatetimeType ColumnType = "datetime" DecimalType ColumnType = "decimal" IntegerType ColumnType = "integer" JsonType ColumnType = "json" SmallIntType ColumnType = "smallint" StringType ColumnType = "string" TextType ColumnType = "text" TimestampType ColumnType = "timestamp" TimestampzType ColumnType = "timestampz" FloatType ColumnType = "float" TimeType ColumnType = "time" TimezType ColumnType = "timez" )
func NativeTypeToGeneral ¶ added in v1.2.19
func NativeTypeToGeneral(name, dbType string, connType dbio.Type) (colType ColumnType)
func (ColumnType) IsBinary ¶ added in v1.2.3
func (ct ColumnType) IsBinary() bool
IsBinary returns whether the column is a binary
func (ColumnType) IsBool ¶
func (ct ColumnType) IsBool() bool
IsBool returns whether the column is a boolean
func (ColumnType) IsDate ¶ added in v1.1.8
func (ct ColumnType) IsDate() bool
IsDatetime returns whether the column is a datetime object
func (ColumnType) IsDatetime ¶
func (ct ColumnType) IsDatetime() bool
IsDatetime returns whether the column is a datetime object
func (ColumnType) IsDecimal ¶
func (ct ColumnType) IsDecimal() bool
IsDecimal returns whether the column is a decimal
func (ColumnType) IsFloat ¶ added in v1.1.14
func (ct ColumnType) IsFloat() bool
IsFloat returns whether the column is a float
func (ColumnType) IsInteger ¶
func (ct ColumnType) IsInteger() bool
IsInteger returns whether the column is an integer
func (ColumnType) IsJSON ¶
func (ct ColumnType) IsJSON() bool
IsJSON returns whether the column is a json
func (ColumnType) IsNumber ¶
func (ct ColumnType) IsNumber() bool
IsNumber returns whether the column is a decimal or an integer
func (ColumnType) IsString ¶
func (ct ColumnType) IsString() bool
IsString returns whether the column is a string
func (ColumnType) IsValid ¶
func (ct ColumnType) IsValid() bool
IsValid returns whether the column has a valid type
type Columns ¶
type Columns []Column
Columns represent many columns
func NewColumns ¶
NewColumnsFromFields creates Columns from fields
func NewColumnsFromFields ¶
NewColumnsFromFields creates Columns from fields
func (Columns) DbTypes ¶
DbTypes return the column names/db types args -> (lower bool, cleanUp bool)
func (Columns) FieldMap ¶
FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased
func (Columns) GetMissing ¶ added in v1.1.8
GetMissing returns the missing columns from newCols
func (Columns) IsDifferent ¶
func (Columns) IsSimilarTo ¶
IsSimilarTo returns true if has same number of columns and contains the same columns, but may be in different order
func (Columns) MakeShaper ¶
func (Columns) PrettyTable ¶ added in v1.1.8
PrettyTable returns a text pretty table
func (Columns) SetMetadata ¶ added in v1.2.15
SetMetadata sets metadata for columns
func (Columns) WithoutMeta ¶ added in v1.2.2
WithoutMeta returns the columns with metadata columns
type Compressor ¶
type Compressor interface { Self() Compressor Compress(io.Reader) io.Reader Decompress(io.Reader) (io.Reader, error) Suffix() string }
Compressor implements differnt kind of compression
func NewCompressor ¶
func NewCompressor(cpType CompressorType) Compressor
type CompressorType ¶
type CompressorType string
CompressorType is an int type for enum for the Compressor Type
const ( // AutoCompressorType is for auto compression AutoCompressorType CompressorType = "auto" // NoneCompressorType is for no compression NoneCompressorType CompressorType = "none" // ZipCompressorType is for Zip compression ZipCompressorType CompressorType = "zip" // GzipCompressorType is for Gzip compression GzipCompressorType CompressorType = "gzip" // SnappyCompressorType is for Snappy compression SnappyCompressorType CompressorType = "snappy" // ZStandardCompressorType is for ZStandard ZStandardCompressorType CompressorType = "zstd" )
func CompressorTypePtr ¶
func CompressorTypePtr(v CompressorType) *CompressorType
CompressorTypePtr returns a pointer to the CompressorType value passed in.
type ConstraintEvalFunc ¶ added in v1.2.16
type CsvDuckDb ¶ added in v1.2.19
func NewCsvReaderDuckDb ¶ added in v1.2.19
func NewCsvReaderDuckDb(uri string, sc *StreamConfig, props ...string) (*CsvDuckDb, error)
func (*CsvDuckDb) MakeQuery ¶ added in v1.2.19
func (r *CsvDuckDb) MakeQuery(fsc FileStreamConfig) string
type Dataflow ¶
type Dataflow struct { Columns Columns Buffer [][]interface{} StreamCh chan *Datastream Streams []*Datastream Context *g.Context Limit uint64 EgressBytes uint64 Ready bool Inferred bool FsURL string OnColumnChanged func(col Column) error OnColumnAdded func(col Column) error StreamMap map[string]*Datastream SchemaVersion int // for column type version // contains filtered or unexported fields }
Dataflow is a collection of concurrent Datastreams
func MakeDataFlow ¶
func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)
MakeDataFlow create a dataflow from datastreams
func NewDataflowContext ¶ added in v1.1.15
func (*Dataflow) AddColumns ¶
func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)
SetColumns sets the columns
func (*Dataflow) AddEgressBytes ¶ added in v1.2.2
AddEgressBytes add egress bytes
func (*Dataflow) BufferDataset ¶ added in v1.2.25
BufferDataset return the buffer as a dataset
func (*Dataflow) ChangeColumn ¶
func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool
SetColumns sets the columns
func (*Dataflow) CloseCurrentBatches ¶
func (df *Dataflow) CloseCurrentBatches()
func (*Dataflow) Defer ¶
func (df *Dataflow) Defer(f func())
Defer runs a given function as close of Dataflow
func (*Dataflow) DsTotalBytes ¶
func (*Dataflow) MakeStreamCh ¶
func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)
MakeStreamCh determines whether to merge all the streams into one or keep them separate. If data is small per stream, it's best to merge For example, Bigquery has limits on number of operations can be called within a time limit
func (*Dataflow) MergeColumns ¶ added in v1.1.15
SetColumns sets the columns
func (*Dataflow) PushStreamChan ¶
func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
func (*Dataflow) SetBatchLimit ¶ added in v1.2.11
SetBatchLimit set the ds.Batch.Limit
func (*Dataflow) SetConfig ¶ added in v1.1.15
func (df *Dataflow) SetConfig(cfg StreamConfig)
SetConfig set the Sp config
func (*Dataflow) SetEmpty ¶
func (df *Dataflow) SetEmpty()
SetEmpty sets all underlying datastreams empty
func (*Dataflow) StreamConfig ¶ added in v1.2.15
func (df *Dataflow) StreamConfig() (cfg StreamConfig)
StreamConfig get the first Sp config
func (*Dataflow) SyncColumns ¶
func (df *Dataflow) SyncColumns()
SyncColumns a workaround to synch the ds.Columns to the df.Columns
func (*Dataflow) SyncStats ¶
func (df *Dataflow) SyncStats()
SyncStats sync stream processor stats aggregated to the df.Columns
func (*Dataflow) WaitClosed ¶
func (df *Dataflow) WaitClosed()
WaitClosed waits until dataflow is closed hack to make sure all streams are pushed
type Dataset ¶
type Dataset struct { Result *sqlx.Rows Columns Columns Rows [][]interface{} SQL string Duration float64 Sp *StreamProcessor Inferred bool SafeInference bool NoDebug bool }
Dataset is a query returned dataset
func NewDatasetFromMap ¶
NewDatasetFromMap return a new dataset
func NewExcelDataset ¶ added in v1.2.2
func (*Dataset) AddColumns ¶
SetColumns sets the columns
func (*Dataset) ColValuesStr ¶
ColValuesStr returns the values of a one column as array or string
func (*Dataset) FirstRow ¶
func (data *Dataset) FirstRow() []interface{}
FirstRow returns the first row
func (*Dataset) FirstVal ¶
func (data *Dataset) FirstVal() interface{}
FirstVal returns the first value from the first row
func (*Dataset) InferColumnTypes ¶
func (data *Dataset) InferColumnTypes()
InferColumnTypes determines the columns types
func (*Dataset) PrettyTable ¶ added in v1.1.8
func (*Dataset) RecordsCasted ¶
RecordsCasted return rows of maps or casted values
func (*Dataset) RecordsString ¶
RecordsString return rows of maps or string values
func (*Dataset) Sort ¶
Sort sorts by cols example: `data.Sort(0, 2, 3, false)` will sort col0, col2, col3 descending example: `data.Sort(0, 2, true)` will sort col0, col2 ascending
func (*Dataset) Stream ¶
func (data *Dataset) Stream(Props ...map[string]string) *Datastream
Stream returns a datastream of the dataset
type Datastream ¶
type Datastream struct { Columns Columns Buffer [][]any BatchChan chan *Batch Batches []*Batch CurrentBatch *Batch Count uint64 Context *g.Context Ready bool Bytes atomic.Uint64 Sp *StreamProcessor SafeInference bool NoDebug bool Inferred bool ID string Metadata Metadata // map of column name to metadata type // contains filtered or unexported fields }
Datastream is a stream of rows
func MergeDataflow ¶
func MergeDataflow(df *Dataflow) (dsN *Datastream)
MergeDataflow merges the dataflow streams into one
func NewDatastream ¶
func NewDatastream(columns Columns) (ds *Datastream)
NewDatastream return a new datastream
func NewDatastreamContext ¶
func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
NewDatastreamContext return a new datastream
func NewDatastreamIt ¶
func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
NewDatastreamIt with it
func ReadCsvStream ¶
func ReadCsvStream(path string) (ds *Datastream, err error)
ReadCsvStream reads CSV and returns datasream
func (*Datastream) AddBytes ¶
func (ds *Datastream) AddBytes(b int64)
AddBytes add bytes as processed
func (*Datastream) AddColumns ¶
func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)
SetColumns sets the columns
func (*Datastream) CastRowToString ¶
func (ds *Datastream) CastRowToString(row []any) []string
CastRowToString returns the row as string casted
func (*Datastream) CastRowToStringSafe ¶ added in v1.2.6
func (ds *Datastream) CastRowToStringSafe(row []any) []string
CastRowToStringSafe returns the row as string casted (safer)
func (*Datastream) CastToStringSafeMask ¶ added in v1.2.14
func (ds *Datastream) CastToStringSafeMask(row []any) []string
CastToStringSafeMask returns the row as string mask casted ( evensafer)
func (*Datastream) ChangeColumn ¶
func (ds *Datastream) ChangeColumn(i int, newType ColumnType)
ChangeColumn applies a column type change
func (*Datastream) Chunk ¶
func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
Chunk splits the datastream into chunk datastreams (in sequence)
func (*Datastream) Collect ¶
func (ds *Datastream) Collect(limit int) (Dataset, error)
Collect reads a stream and return a dataset limit of 0 is unlimited
func (*Datastream) ConsumeAvroReader ¶
func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)
ConsumeAvroReader uses the provided reader to stream rows
func (*Datastream) ConsumeAvroReaderSeeker ¶
func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)
ConsumeAvroReaderSeeker uses the provided reader to stream rows
func (*Datastream) ConsumeCsvReader ¶
func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
ConsumeCsvReader uses the provided reader to stream rows
func (*Datastream) ConsumeCsvReaderChl ¶ added in v1.2.4
func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (err error)
ConsumeCsvReaderChl reads a channel of readers. Should be safe to use with header top row
func (*Datastream) ConsumeCsvReaderDuckDb ¶ added in v1.2.19
func (ds *Datastream) ConsumeCsvReaderDuckDb(uri string, sc FileStreamConfig) (err error)
ConsumeCsvReaderDuckDb uses the provided reader to stream rows
func (*Datastream) ConsumeDeltaReader ¶ added in v1.2.16
func (ds *Datastream) ConsumeDeltaReader(uri string, sc FileStreamConfig) (err error)
ConsumeDeltaReader uses the provided reader to stream rows
func (*Datastream) ConsumeExcelReader ¶ added in v1.2.2
ConsumeSASReader uses the provided reader to stream rows
func (*Datastream) ConsumeExcelReaderSeeker ¶ added in v1.2.2
func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[string]string) (err error)
ConsumeSASReaderSeeker uses the provided reader to stream rows
func (*Datastream) ConsumeIcebergReader ¶ added in v1.2.16
func (ds *Datastream) ConsumeIcebergReader(uri string, sc FileStreamConfig) (err error)
ConsumeIcebergReader uses the provided reader to stream rows
func (*Datastream) ConsumeJsonReader ¶
func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream
func (*Datastream) ConsumeJsonReaderChl ¶ added in v1.2.6
func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXML bool) (err error)
func (*Datastream) ConsumeParquetReader ¶
func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream) ConsumeParquetReaderDuckDb ¶ added in v1.2.16
func (ds *Datastream) ConsumeParquetReaderDuckDb(uri string, sc FileStreamConfig) (err error)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream) ConsumeParquetReaderSeeker ¶
func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream) ConsumeSASReader ¶
func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)
ConsumeSASReader uses the provided reader to stream rows
func (*Datastream) ConsumeSASReaderSeeker ¶
func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)
ConsumeSASReaderSeeker uses the provided reader to stream rows
func (*Datastream) ConsumeXmlReader ¶
func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
ConsumeXmlReader uses the provided reader to stream XML This will put each XML rec as one string value so payload can be processed downstream
func (*Datastream) Defer ¶
func (ds *Datastream) Defer(f func())
Defer runs a given function as close of Datastream
func (*Datastream) Df ¶
func (ds *Datastream) Df() *Dataflow
func (*Datastream) GetConfig ¶
func (ds *Datastream) GetConfig() (configMap map[string]string)
GetConfig get config
func (*Datastream) GetFields ¶
func (ds *Datastream) GetFields(args ...bool) []string
GetFields return the fields of the Data
func (*Datastream) IsClosed ¶
func (ds *Datastream) IsClosed() bool
IsClosed is true is ds is closed
func (*Datastream) LatestBatch ¶
func (ds *Datastream) LatestBatch() *Batch
func (*Datastream) Limited ¶ added in v1.2.4
func (ds *Datastream) Limited(limit ...int) bool
func (*Datastream) Map ¶
func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)
Map applies the provided function to every row and returns the result
func (*Datastream) MapParallel ¶
func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)
MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.
func (*Datastream) NewBatch ¶
func (ds *Datastream) NewBatch(columns Columns) *Batch
NewBatch create new batch with fixed columns should be used each time column type changes, or columns are added
func (*Datastream) NewCsvBufferReader ¶
func (ds *Datastream) NewCsvBufferReader(sc StreamConfig) *bytes.Reader
NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvBufferReaderChnl ¶
func (ds *Datastream) NewCsvBufferReaderChnl(sc StreamConfig) (readerChn chan *bytes.Reader)
NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory
func (*Datastream) NewCsvBytesChnl ¶
func (ds *Datastream) NewCsvBytesChnl(sc StreamConfig) (dataChn chan *[]byte)
NewCsvBytesChnl returns a channel yield chunk of bytes of csv
func (*Datastream) NewCsvReader ¶
func (ds *Datastream) NewCsvReader(sc StreamConfig) *io.PipeReader
NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvReaderChnl ¶
func (ds *Datastream) NewCsvReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewExcelReaderChnl ¶ added in v1.2.2
func (ds *Datastream) NewExcelReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
func (*Datastream) NewIterator ¶
func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator
func (*Datastream) NewJsonLinesReaderChnl ¶
func (ds *Datastream) NewJsonLinesReaderChnl(sc StreamConfig) (readerChn chan *io.PipeReader)
NewJsonLinesReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewJsonReaderChnl ¶
func (ds *Datastream) NewJsonReaderChnl(sc StreamConfig) (readerChn chan *io.PipeReader)
func (*Datastream) NewParquetArrowReaderChnl ¶ added in v1.1.7
func (ds *Datastream) NewParquetArrowReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
NewParquetArrowReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes WARN: Not using this one since it doesn't write Decimals properly.
func (*Datastream) NewParquetReaderChnl ¶
func (ds *Datastream) NewParquetReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
NewParquetReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) Pause ¶
func (ds *Datastream) Pause()
func (*Datastream) Records ¶
func (ds *Datastream) Records() <-chan map[string]any
Records return rows of maps
func (*Datastream) Rows ¶
func (ds *Datastream) Rows() chan []any
func (*Datastream) SetConfig ¶
func (ds *Datastream) SetConfig(configMap map[string]string)
SetConfig sets the ds.config values
func (*Datastream) SetEmpty ¶
func (ds *Datastream) SetEmpty()
SetEmpty sets the ds.Rows channel as empty
func (*Datastream) SetFields ¶
func (ds *Datastream) SetFields(fields []string)
SetFields sets the fields/columns of the Datastream
func (*Datastream) SetFileURI ¶ added in v1.1.15
func (ds *Datastream) SetFileURI()
SetFileURI sets the FileURI of the columns of the Datastream
func (*Datastream) SetIterator ¶ added in v1.1.14
func (ds *Datastream) SetIterator(it *Iterator)
func (*Datastream) SetMetadata ¶
func (ds *Datastream) SetMetadata(jsonStr string)
func (*Datastream) Shape ¶
func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows
func (*Datastream) Split ¶
func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
Split splits the datastream into parallel datastreams
func (*Datastream) Start ¶
func (ds *Datastream) Start() (err error)
Start generates the stream Should cycle the Iter Func until done
func (*Datastream) TryPause ¶
func (ds *Datastream) TryPause() bool
func (*Datastream) WaitClosed ¶
func (ds *Datastream) WaitClosed()
WaitClosed waits until dataflow is closed hack to make sure all streams are pushed
func (*Datastream) WaitReady ¶
func (ds *Datastream) WaitReady() error
WaitReady waits until datastream is ready
type DeltaReader ¶ added in v1.2.16
func NewDeltaReader ¶ added in v1.2.16
func NewDeltaReader(uri string, props ...string) (*DeltaReader, error)
func (*DeltaReader) Close ¶ added in v1.2.16
func (r *DeltaReader) Close() error
func (*DeltaReader) Columns ¶ added in v1.2.16
func (r *DeltaReader) Columns() (Columns, error)
func (*DeltaReader) MakeQuery ¶ added in v1.2.19
func (r *DeltaReader) MakeQuery(sc FileStreamConfig) string
type DuckDb ¶ added in v1.2.16
type DuckDb struct { Context *g.Context Proc *process.Proc // contains filtered or unexported fields }
DuckDb is a Duck DB compute layer
func NewDuckDb ¶ added in v1.2.16
NewDuckDb creates a new DuckDb instance with the given context and properties
func (*DuckDb) AddExtension ¶ added in v1.2.16
AddExtension adds an extension to the DuckDb instance if it's not already present
func (*DuckDb) ExecContext ¶ added in v1.2.16
func (duck *DuckDb) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
ExecContext executes a SQL query with context and returns the result
func (*DuckDb) ExecMultiContext ¶ added in v1.2.16
func (duck *DuckDb) ExecMultiContext(ctx context.Context, sqls ...string) (result sql.Result, err error)
ExecMultiContext executes multiple SQL queries with context and returns the result
func (*DuckDb) GenerateCopyStatement ¶ added in v1.2.25
func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options DuckDbCopyOptions) (sql string, err error)
func (*DuckDb) GetProp ¶ added in v1.2.16
GetProp retrieves a property value for the DuckDb instance
func (*DuckDb) GetScannerFunc ¶ added in v1.2.19
func (*DuckDb) MakeScanQuery ¶ added in v1.2.19
func (*DuckDb) PrepareFsSecretAndURI ¶ added in v1.2.16
PrepareFsSecretAndURI prepares the secret configuration from the fs_props and modifies the URI if necessary for different storage types (S3, Google Cloud Storage, Azure Blob Storage). It returns the modified URI string.
The function handles the following storage types: - Local files: Removes the "file://" prefix - S3: Configures AWS credentials and handles Cloudflare R2 storage - Google Cloud Storage: Sets up GCS credentials - Azure Blob Storage: Configures Azure connection string or account name
It uses the DuckDb instance's properties to populate the secret configuration.
func (*DuckDb) Props ¶ added in v1.2.16
Props returns all properties of the DuckDb instance as a map
func (*DuckDb) QueryContext ¶ added in v1.2.16
func (duck *DuckDb) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (data Dataset, err error)
QueryContext runs a sql query with context, returns `Dataset`
func (*DuckDb) Stream ¶ added in v1.2.16
func (duck *DuckDb) Stream(sql string, options ...map[string]interface{}) (ds *Datastream, err error)
Stream runs a sql query, returns `Datastream`
func (*DuckDb) StreamContext ¶ added in v1.2.16
func (duck *DuckDb) StreamContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *Datastream, err error)
StreamContext runs a sql query with context, returns `Datastream`
type DuckDbCopyOptions ¶ added in v1.2.25
type Excel ¶ added in v1.2.2
type Excel struct { File *excelize.File Sheets []string Path string // contains filtered or unexported fields }
Excel represent an Excel object pointing to its file
func NewExcelFromFile ¶ added in v1.2.2
NewExcelFromFile return a new Excel instance from a local file
func NewExcelFromReader ¶ added in v1.2.2
NewExcelFromReader return a new Excel instance from a reader
func (*Excel) GetDataset ¶ added in v1.2.2
GetDataset returns a dataset of the provided sheet
func (*Excel) GetDatasetFromRange ¶ added in v1.2.2
GetDatasetFromRange returns a dataset of the provided sheet / range cellRange example: `$AH$13:$AI$20` or `AH13:AI20` or `A:E`
func (*Excel) RefreshSheets ¶ added in v1.2.2
RefreshSheets refresh sheet index data
func (*Excel) WriteSheet ¶ added in v1.2.2
func (xls *Excel) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
WriteSheet write a datastream into a sheet mode can be: `new`, `append` or `overwrite`. Default is `new`
func (*Excel) WriteToFile ¶ added in v1.2.2
WriteToFile write to a file
type FileStreamConfig ¶ added in v1.2.19
type FileStreamConfig struct { Limit int `json:"limit"` Select []string `json:"select"` SQL string `json:"sql"` Format dbio.FileType `json:"format"` IncrementalKey string `json:"incremental_key"` IncrementalValue string `json:"incremental_value"` FileSelect *[]string `json:"file_select"` // a list of files to include. Props map[string]string `json:"props"` }
func (*FileStreamConfig) ShouldUseDuckDB ¶ added in v1.2.19
func (sc *FileStreamConfig) ShouldUseDuckDB() bool
type GoogleSheet ¶ added in v1.2.2
type GoogleSheet struct { Sheets []string SpreadsheetID string // contains filtered or unexported fields }
GoogleSheet represent a Google Sheet object
func NewGoogleSheet ¶ added in v1.2.2
func NewGoogleSheet(props ...string) (ggs *GoogleSheet, err error)
NewGoogleSheet is a blank spreadsheet title is the new spreadsheet title
func NewGoogleSheetFromURL ¶ added in v1.2.2
func NewGoogleSheetFromURL(urlStr string, props ...string) (ggs *GoogleSheet, err error)
NewGoogleSheetFromURL return a new GoogleSheet instance from a provided url
func (*GoogleSheet) DeleteSheet ¶ added in v1.2.2
func (ggs *GoogleSheet) DeleteSheet(shtName string) (err error)
func (*GoogleSheet) GetDataset ¶ added in v1.2.2
func (ggs *GoogleSheet) GetDataset(shtName string) (data Dataset, err error)
GetDataset returns a dataset of the sheet
func (*GoogleSheet) GetDatasetFromRange ¶ added in v1.2.2
func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRange string) (data Dataset, err error)
GetDatasetFromRange returns a dataset from the specified range
func (*GoogleSheet) RefreshSheets ¶ added in v1.2.2
func (ggs *GoogleSheet) RefreshSheets() (err error)
RefreshSheets refreshes sheets data
func (*GoogleSheet) URL ¶ added in v1.2.2
func (ggs *GoogleSheet) URL() string
func (*GoogleSheet) WriteSheet ¶ added in v1.2.2
func (ggs *GoogleSheet) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
WriteSheet write a datastream into a sheet mode can be: `new`, `append` or `overwrite`. Default is `new`
type GzipCompressor ¶
type GzipCompressor struct { Compressor // contains filtered or unexported fields }
func (*GzipCompressor) Compress ¶
func (cp *GzipCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*GzipCompressor) Decompress ¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*GzipCompressor) Suffix ¶
func (cp *GzipCompressor) Suffix() string
type IcebergReader ¶ added in v1.2.16
func NewIcebergReader ¶ added in v1.2.16
func NewIcebergReader(uri string, props ...string) (*IcebergReader, error)
func (*IcebergReader) Close ¶ added in v1.2.16
func (i *IcebergReader) Close() error
func (*IcebergReader) Columns ¶ added in v1.2.16
func (r *IcebergReader) Columns() (Columns, error)
func (*IcebergReader) MakeQuery ¶ added in v1.2.19
func (r *IcebergReader) MakeQuery(sc FileStreamConfig) string
type Iterator ¶
type Iterator struct { Row []any Reprocess chan []any IsCasted bool RowIsCasted bool Counter uint64 StreamRowNum uint64 Context *g.Context Closed bool // contains filtered or unexported fields }
Iterator is the row provider for a datastream
func (*Iterator) BelowEqualIncrementalVal ¶ added in v1.2.10
BelowEqualIncrementalVal evaluates the incremental value against the incrementalCol this is used when the stream is a file with incremental mode (unable to filter at source like a database) it.incrementalVal and it.incrementalColI need to be set
func (*Iterator) Ds ¶
func (it *Iterator) Ds() *Datastream
type KeyType ¶
type KeyType string
const ( AggregateKey KeyType = "aggregate" ClusterKey KeyType = "cluster" DistributionKey KeyType = "distribution" DuplicateKey KeyType = "duplicate" HashKey KeyType = "hash" IndexKey KeyType = "index" PartitionKey KeyType = "partition" PrimaryKey KeyType = "primary" SortKey KeyType = "sort" UniqueKey KeyType = "unique" UpdateKey KeyType = "update" )
func (KeyType) MetadataKey ¶ added in v1.2.15
type Metadata ¶
type NoneCompressor ¶
type NoneCompressor struct { Compressor // contains filtered or unexported fields }
func (*NoneCompressor) Decompress ¶
func (*NoneCompressor) Suffix ¶
func (cp *NoneCompressor) Suffix() string
type Parquet ¶
type Parquet struct { Path string Reader *parquet.Reader Data *Dataset // contains filtered or unexported fields }
Parquet is a parquet object
func NewParquetReader ¶ added in v1.1.7
type ParquetArrowDumper ¶ added in v1.1.6
type ParquetArrowDumper struct {
// contains filtered or unexported fields
}
func NewParquetArrowDumper ¶ added in v1.1.6
func NewParquetArrowDumper(ccReader file.ColumnChunkReader) *ParquetArrowDumper
func (*ParquetArrowDumper) Next ¶ added in v1.1.6
func (pad *ParquetArrowDumper) Next() (interface{}, bool)
type ParquetArrowReader ¶ added in v1.1.6
type ParquetArrowReader struct { Path string Reader *file.Reader Data *Dataset Context *g.Context // contains filtered or unexported fields }
ParquetArrowReader is a parquet reader object
func NewParquetArrowReader ¶ added in v1.1.7
func NewParquetArrowReader(reader *os.File, selected []string) (p *ParquetArrowReader, err error)
func (*ParquetArrowReader) Columns ¶ added in v1.1.6
func (p *ParquetArrowReader) Columns() Columns
type ParquetArrowWriter ¶ added in v1.1.6
func NewParquetArrowWriter ¶ added in v1.1.6
func NewParquetArrowWriter(w io.Writer, columns Columns, codec compress.Compression) (p *ParquetArrowWriter, err error)
func (*ParquetArrowWriter) AppendNewRowGroup ¶ added in v1.1.6
func (p *ParquetArrowWriter) AppendNewRowGroup() (err error)
func (*ParquetArrowWriter) Close ¶ added in v1.1.6
func (p *ParquetArrowWriter) Close() (err error)
func (*ParquetArrowWriter) Columns ¶ added in v1.1.6
func (p *ParquetArrowWriter) Columns() Columns
func (*ParquetArrowWriter) WriteRow ¶ added in v1.1.6
func (p *ParquetArrowWriter) WriteRow(row []any) (err error)
type ParquetDuckDb ¶ added in v1.2.16
func NewParquetReaderDuckDb ¶ added in v1.2.16
func NewParquetReaderDuckDb(uri string, props ...string) (*ParquetDuckDb, error)
func (*ParquetDuckDb) Close ¶ added in v1.2.16
func (r *ParquetDuckDb) Close() error
func (*ParquetDuckDb) Columns ¶ added in v1.2.16
func (r *ParquetDuckDb) Columns() (Columns, error)
func (*ParquetDuckDb) MakeQuery ¶ added in v1.2.19
func (r *ParquetDuckDb) MakeQuery(sc FileStreamConfig) string
type ParquetWriter ¶ added in v1.1.7
type ParquetWriter struct { Writer *parquet.Writer WriterMap *parquet.GenericWriter[map[string]any] // contains filtered or unexported fields }
func NewParquetWriter ¶ added in v1.1.7
func NewParquetWriterMap ¶ added in v1.2.12
func (*ParquetWriter) Close ¶ added in v1.1.7
func (pw *ParquetWriter) Close() error
func (*ParquetWriter) WriteRec ¶ added in v1.2.12
func (pw *ParquetWriter) WriteRec(row []any) error
func (*ParquetWriter) WriteRow ¶ added in v1.1.7
func (pw *ParquetWriter) WriteRow(row []any) error
type ReaderReady ¶ added in v1.2.6
type RecNode ¶
type RecNode struct {
// contains filtered or unexported fields
}
func NewRecNode ¶
func (*RecNode) Compression ¶
type SAS ¶
type SAS struct { Path string Reader *datareader.SAS7BDAT Data *Dataset // contains filtered or unexported fields }
SAS is a sas7bdat object
func NewSASStream ¶
func NewSASStream(reader io.ReadSeeker, columns Columns) (s *SAS, err error)
type SSHClient ¶
type SSHClient struct { Host string Port int User string Password string TgtHost string TgtPort int PrivateKey string Passphrase string Err error // contains filtered or unexported fields }
SSHClient is a client to connect to a ssh server with the main goal of forwarding ports
func (*SSHClient) OpenPortForward ¶
OpenPortForward forwards the port as specified
func (*SSHClient) RunAsProcess ¶
RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key
type SnappyCompressor ¶
type SnappyCompressor struct { Compressor // contains filtered or unexported fields }
func (*SnappyCompressor) Compress ¶
func (cp *SnappyCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*SnappyCompressor) Decompress ¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*SnappyCompressor) Suffix ¶
func (cp *SnappyCompressor) Suffix() string
type StreamConfig ¶ added in v1.1.15
type StreamConfig struct { EmptyAsNull bool `json:"empty_as_null"` Header bool `json:"header"` Compression CompressorType `json:"compression"` // AUTO | ZIP | GZIP | SNAPPY | NONE NullIf string `json:"null_if"` NullAs string `json:"null_as"` DatetimeFormat string `json:"datetime_format"` SkipBlankLines bool `json:"skip_blank_lines"` Delimiter string `json:"delimiter"` Escape string `json:"escape"` Quote string `json:"quote"` FileMaxRows int64 `json:"file_max_rows"` FileMaxBytes int64 `json:"file_max_bytes"` BatchLimit int64 `json:"batch_limit"` MaxDecimals int `json:"max_decimals"` Flatten bool `json:"flatten"` FieldsPerRec int `json:"fields_per_rec"` Jmespath string `json:"jmespath"` Sheet string `json:"sheet"` ColumnCasing ColumnCasing `json:"column_casing"` BoolAsInt bool `json:"-"` Columns Columns `json:"columns"` // list of column types. Can be partial list! likely is! Map map[string]string `json:"-"` // contains filtered or unexported fields }
func DefaultStreamConfig ¶ added in v1.1.15
func DefaultStreamConfig() StreamConfig
func LoaderStreamConfig ¶ added in v1.2.22
func LoaderStreamConfig(header bool) StreamConfig
func (*StreamConfig) ToMap ¶ added in v1.2.22
func (sc *StreamConfig) ToMap() map[string]string
type StreamProcessor ¶
type StreamProcessor struct { N uint64 Config StreamConfig // contains filtered or unexported fields }
StreamProcessor processes rows and values
func NewStreamProcessor ¶
func NewStreamProcessor() *StreamProcessor
NewStreamProcessor returns a new StreamProcessor
func (*StreamProcessor) CastRow ¶
func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}
CastRow casts each value of a row slows down processing about 40%?
func (*StreamProcessor) CastToBool ¶ added in v1.2.12
func (sp *StreamProcessor) CastToBool(i interface{}) (b bool, err error)
CastToBool converts interface to bool
func (*StreamProcessor) CastToString ¶
func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string
CastToString to string. used for csv writing slows processing down 5% with upstream CastRow or 35% without upstream CastRow
func (*StreamProcessor) CastToStringSafe ¶ added in v1.2.6
func (sp *StreamProcessor) CastToStringSafe(i int, val interface{}, valType ...ColumnType) string
CastToStringSafe to string (safer)
func (*StreamProcessor) CastToStringSafeMask ¶ added in v1.2.14
func (sp *StreamProcessor) CastToStringSafeMask(i int, val interface{}, valType ...ColumnType) string
CastToStringSafe to masks to count bytes (even safer)
func (*StreamProcessor) CastToTime ¶
func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
CastToTime converts interface to time
func (*StreamProcessor) CastType ¶
func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}
CastType casts the type of an interface CastType is used to cast the interface place holders?
func (*StreamProcessor) CastVal ¶
func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/slingdata-io/sling-cli/core/dbio/iop' -bench '^BenchmarkProcessVal'
func (*StreamProcessor) CastValWithoutStats ¶
func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}
CastValWithoutStats casts the value without counting stats
func (*StreamProcessor) ColStats ¶ added in v1.2.10
func (sp *StreamProcessor) ColStats() map[int]*ColumnStats
func (*StreamProcessor) GetType ¶
func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)
GetType returns the type of an interface
func (*StreamProcessor) ParseString ¶
func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"
func (*StreamProcessor) ParseTime ¶
func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
ParseTime parses a date string and returns time.Time
func (*StreamProcessor) ParseVal ¶
func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
ParseVal parses the value into its appropriate type
func (*StreamProcessor) ProcessRow ¶
func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
ProcessRow processes a row
func (*StreamProcessor) ProcessVal ¶
func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
ProcessVal processes a value
func (*StreamProcessor) ResetConfig ¶ added in v1.1.15
func (sp *StreamProcessor) ResetConfig()
func (*StreamProcessor) SetConfig ¶
func (sp *StreamProcessor) SetConfig(configMap map[string]string)
SetConfig sets the data.Sp.config values
type Transform ¶ added in v1.2.2
type Transform struct { Name string FuncString func(*StreamProcessor, string) (string, error) FuncTime func(*StreamProcessor, *time.Time) error // contains filtered or unexported fields }
type TransformList ¶ added in v1.2.25
type TransformList []Transform
func (TransformList) HasTransform ¶ added in v1.2.25
func (tl TransformList) HasTransform(t Transform) bool
type Transformers ¶ added in v1.2.2
type Transformers struct { Accent transform.Transformer DecodeUTF8 transform.Transformer DecodeUTF8BOM transform.Transformer DecodeUTF16 transform.Transformer DecodeISO8859_1 transform.Transformer DecodeISO8859_5 transform.Transformer DecodeISO8859_15 transform.Transformer DecodeWindows1250 transform.Transformer DecodeWindows1252 transform.Transformer EncodeUTF8 transform.Transformer EncodeUTF8BOM transform.Transformer EncodeUTF16 transform.Transformer EncodeISO8859_1 transform.Transformer EncodeISO8859_5 transform.Transformer EncodeISO8859_15 transform.Transformer EncodeWindows1250 transform.Transformer EncodeWindows1252 transform.Transformer }
func NewTransformers ¶ added in v1.2.2
func NewTransformers() Transformers
type ZStandardCompressor ¶
type ZStandardCompressor struct { Compressor // contains filtered or unexported fields }
func (*ZStandardCompressor) Compress ¶
func (cp *ZStandardCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*ZStandardCompressor) Decompress ¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*ZStandardCompressor) Suffix ¶
func (cp *ZStandardCompressor) Suffix() string