iop

package
v1.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: GPL-3.0 Imports: 70 Imported by: 5

README

Input-Process-Output (ipo)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// RemoveTrailingDecZeros removes the trailing zeros in CastToString
	RemoveTrailingDecZeros = false
	SampleSize             = 900
)
View Source
var Transforms = map[Transform]TransformFunc{}

Functions

func AutoDecompress

func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)

AutoDecompress auto detects compression to decompress. Otherwise return same reader

func CleanHeaderRow

func CleanHeaderRow(header []string) []string

CleanHeaderRow cleans the header row from incompatible characters

func CleanName

func CleanName(name string) (newName string)

func CompareColumns

func CompareColumns(columns1 Columns, columns2 Columns) (reshape bool, err error)

CompareColumns compared two columns to see if there are similar

func CreateDummyFields

func CreateDummyFields(numCols int) (cols []string)

CreateDummyFields creates dummy columns for csvs with no header row

func DecimalByteArrayToString added in v1.1.6

func DecimalByteArrayToString(dec []byte, precision int, scale int) string

DecimalByteArrayToString converts bytes to decimal string from https://github.com/xitongsys/parquet-go/blob/8ca067b2bd324788a77bf61d4e1ef9a5f8b4b1d2/types/converter.go#L131

func GetISO8601DateMap

func GetISO8601DateMap(t time.Time) map[string]interface{}

GetISO8601DateMap return a map of date parts for string formatting

func IsDummy

func IsDummy(columns []Column) bool

IsDummy returns true if the columns are injected by CreateDummyFields

func MakeDecNumScale added in v1.1.6

func MakeDecNumScale(scale int) *big.Rat

func MakeRowsChan

func MakeRowsChan() chan []any

MakeRowsChan returns a buffered channel with default size

func NewJSONStream

func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream

func ParseFIX added in v1.1.13

func ParseFIX(message string) (fixMap map[string]any, err error)

ParseFIX converts a FIX message into a json format

func Row

func Row(vals ...any) []any

Row is a row

func ScanCarrRet

func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)

ScanCarrRet removes the \r runes that are without \n rightafter

func StrIntToBinary added in v1.1.6

func StrIntToBinary(num string, order string, length int, signed bool) string

order=LittleEndian or BigEndian; length is byte num

func StringToDecimalByteArray added in v1.1.6

func StringToDecimalByteArray(s string, numSca *big.Rat, pType parquet.Type, length int) (decBytes []byte)

StringToDecimalByteArray converts a string decimal to bytes improvised from https://github.com/xitongsys/parquet-go/blob/8ca067b2bd324788a77bf61d4e1ef9a5f8b4b1d2/types/types.go#L81 This function is costly, and slows write dramatically. TODO: Find ways to optimize, if possible

func Unzip

func Unzip(src string, dest string) (nodes []map[string]any, err error)

Unzip will decompress a zip archive, moving all files and folders within the zip file (parameter 1) to an output directory (parameter 2).

Types

type Avro

type Avro struct {
	Path   string
	Reader *goavro.OCFReader
	Data   *Dataset
	// contains filtered or unexported fields
}

Avro is a avro` object

func NewAvroStream

func NewAvroStream(reader io.ReadSeeker, columns Columns) (a *Avro, err error)

func (*Avro) Columns

func (a *Avro) Columns() Columns

type Batch

type Batch struct {
	Columns  Columns
	Rows     chan []any
	Previous *Batch
	Count    int64
	Limit    int64
	// contains filtered or unexported fields
}

func (*Batch) AddTransform

func (b *Batch) AddTransform(transf func(row []any) []any)

func (*Batch) Close

func (b *Batch) Close()

func (*Batch) ColumnsChanged

func (b *Batch) ColumnsChanged() bool

func (*Batch) Ds

func (b *Batch) Ds() *Datastream

func (*Batch) ID

func (b *Batch) ID() string

func (*Batch) IsFirst

func (b *Batch) IsFirst() bool

func (*Batch) Push

func (b *Batch) Push(row []any)

func (*Batch) Shape

func (b *Batch) Shape(tgtColumns Columns, pause ...bool) (err error)

type BatchReader

type BatchReader struct {
	Batch   *Batch
	Columns Columns
	Reader  io.Reader
	Counter int
}

type CSV

type CSV struct {
	Path            string
	NoHeader        bool
	Delimiter       rune
	Escape          string
	FieldsPerRecord int
	Columns         []Column
	File            *os.File
	Data            Dataset
	Reader          io.Reader
	Config          map[string]string
	NoDebug         bool
	// contains filtered or unexported fields
}

CSV is a csv object

func (*CSV) InferSchema

func (c *CSV) InferSchema() error

InferSchema returns a sample of n rows

func (*CSV) NewReader

func (c *CSV) NewReader() (*io.PipeReader, error)

NewReader creates a Reader

func (*CSV) Read

func (c *CSV) Read() (data Dataset, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) ReadStream

func (c *CSV) ReadStream() (ds *Datastream, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) ReadStreamContext added in v1.1.15

func (c *CSV) ReadStreamContext(ctx context.Context) (ds *Datastream, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) Sample

func (c *CSV) Sample(n int) (Dataset, error)

Sample returns a sample of n rows

func (*CSV) SetFields

func (c *CSV) SetFields(fields []string)

SetFields sets the fields

func (*CSV) WriteStream

func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)

WriteStream to CSV file

type Column

type Column struct {
	Position    int         `json:"position"`
	Name        string      `json:"name"`
	Type        ColumnType  `json:"type"`
	DbType      string      `json:"db_type,omitempty"`
	DbPrecision int         `json:"db_precision,omitempty"`
	DbScale     int         `json:"db_scale,omitempty"`
	Sourced     bool        `json:"-"` // whether col was sourced/inferred from a typed source
	Stats       ColumnStats `json:"stats,omitempty"`

	Table       string `json:"table,omitempty"`
	Schema      string `json:"schema,omitempty"`
	Database    string `json:"database,omitempty"`
	Description string `json:"description,omitempty"`
	FileURI     string `json:"file_uri,omitempty"`

	Metadata map[string]string `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

Column represents a schemata column

func InferFromStats

func InferFromStats(columns []Column, safe bool, noDebug bool) []Column

InferFromStats using the stats to infer data types

func (*Column) GoType

func (col *Column) GoType() reflect.Type

func (*Column) HasNulls added in v1.1.7

func (col *Column) HasNulls() bool

func (*Column) HasNullsPlus1 added in v1.1.7

func (col *Column) HasNullsPlus1() bool

HasNullsPlus1 denotes when a column is all nulls plus 1 non-null

func (*Column) IsBinary added in v1.2.3

func (col *Column) IsBinary() bool

IsBinary returns whether the column is a binary

func (*Column) IsBool

func (col *Column) IsBool() bool

IsBool returns whether the column is a boolean

func (*Column) IsDate added in v1.1.14

func (col *Column) IsDate() bool

IsDate returns whether the column is a datet object

func (*Column) IsDatetime

func (col *Column) IsDatetime() bool

IsDatetime returns whether the column is a datetime object

func (*Column) IsDecimal

func (col *Column) IsDecimal() bool

IsDecimal returns whether the column is a decimal

func (*Column) IsFloat added in v1.1.14

func (col *Column) IsFloat() bool

IsFloat returns whether the column is a float

func (*Column) IsInteger

func (col *Column) IsInteger() bool

IsInteger returns whether the column is an integer

func (*Column) IsKeyType added in v1.1.12

func (col *Column) IsKeyType(keyType KeyType) bool

func (*Column) IsNumber

func (col *Column) IsNumber() bool

IsNumber returns whether the column is a decimal or an integer

func (*Column) IsString

func (col *Column) IsString() bool

IsString returns whether the column is a string

func (*Column) IsUnique

func (col *Column) IsUnique() bool

func (*Column) Key

func (col *Column) Key() string

func (*Column) SetLengthPrecisionScale

func (col *Column) SetLengthPrecisionScale()

SetLengthPrecisionScale parse length, precision, scale

func (*Column) SetMetadata

func (col *Column) SetMetadata(key string, value string)

type ColumnStats

type ColumnStats struct {
	MinLen      int    `json:"min_len,omitempty"`
	MaxLen      int    `json:"max_len,omitempty"`
	MaxDecLen   int    `json:"max_dec_len,omitempty"`
	Min         int64  `json:"min"`
	Max         int64  `json:"max"`
	NullCnt     int64  `json:"null_cnt"`
	IntCnt      int64  `json:"int_cnt,omitempty"`
	DecCnt      int64  `json:"dec_cnt,omitempty"`
	BoolCnt     int64  `json:"bool_cnt,omitempty"`
	JsonCnt     int64  `json:"json_cnt,omitempty"`
	StringCnt   int64  `json:"string_cnt,omitempty"`
	DateCnt     int64  `json:"date_cnt,omitempty"`
	DateTimeCnt int64  `json:"datetime_cnt,omitempty"`
	TotalCnt    int64  `json:"total_cnt"`
	UniqCnt     int64  `json:"uniq_cnt"`
	Checksum    uint64 `json:"checksum"`
}

ColumnStats holds statistics for a column

func (*ColumnStats) DistinctPercent

func (cs *ColumnStats) DistinctPercent() float64

func (*ColumnStats) DuplicateCount

func (cs *ColumnStats) DuplicateCount() int64

func (*ColumnStats) DuplicatePercent

func (cs *ColumnStats) DuplicatePercent() float64

type ColumnType

type ColumnType string
const (
	BigIntType     ColumnType = "bigint"
	BinaryType     ColumnType = "binary"
	BoolType       ColumnType = "bool"
	DateType       ColumnType = "date"
	DatetimeType   ColumnType = "datetime"
	DecimalType    ColumnType = "decimal"
	IntegerType    ColumnType = "integer"
	JsonType       ColumnType = "json"
	SmallIntType   ColumnType = "smallint"
	StringType     ColumnType = "string"
	TextType       ColumnType = "text"
	TimestampType  ColumnType = "timestamp"
	TimestampzType ColumnType = "timestampz"
	FloatType      ColumnType = "float"
	TimeType       ColumnType = "time"
	TimezType      ColumnType = "timez"
)

func (ColumnType) IsBinary added in v1.2.3

func (ct ColumnType) IsBinary() bool

IsBinary returns whether the column is a binary

func (ColumnType) IsBool

func (ct ColumnType) IsBool() bool

IsBool returns whether the column is a boolean

func (ColumnType) IsDate added in v1.1.8

func (ct ColumnType) IsDate() bool

IsDatetime returns whether the column is a datetime object

func (ColumnType) IsDatetime

func (ct ColumnType) IsDatetime() bool

IsDatetime returns whether the column is a datetime object

func (ColumnType) IsDecimal

func (ct ColumnType) IsDecimal() bool

IsDecimal returns whether the column is a decimal

func (ColumnType) IsFloat added in v1.1.14

func (ct ColumnType) IsFloat() bool

IsFloat returns whether the column is a float

func (ColumnType) IsInteger

func (ct ColumnType) IsInteger() bool

IsInteger returns whether the column is an integer

func (ColumnType) IsJSON

func (ct ColumnType) IsJSON() bool

IsJSON returns whether the column is a json

func (ColumnType) IsNumber

func (ct ColumnType) IsNumber() bool

IsNumber returns whether the column is a decimal or an integer

func (ColumnType) IsString

func (ct ColumnType) IsString() bool

IsString returns whether the column is a string

func (ColumnType) IsValid

func (ct ColumnType) IsValid() bool

IsValid returns whether the column has a valid type

type Columns

type Columns []Column

Columns represent many columns

func NewColumns

func NewColumns(cols ...Column) Columns

NewColumnsFromFields creates Columns from fields

func NewColumnsFromFields

func NewColumnsFromFields(fields ...string) (cols Columns)

NewColumnsFromFields creates Columns from fields

func (Columns) Clone

func (cols Columns) Clone() (newCols Columns)

Names return the column names

func (Columns) Coerce

func (cols Columns) Coerce(castCols Columns, hasHeader bool) (newCols Columns)

Coerce casts columns into specified types

func (Columns) Data added in v1.2.2

func (cols Columns) Data(includeParent bool) (fields []string, rows [][]any)

func (Columns) Dataset

func (cols Columns) Dataset() Dataset

Dataset return an empty inferred dataset

func (Columns) DbTypes

func (cols Columns) DbTypes(args ...bool) []string

DbTypes return the column names/db types args -> (lower bool, cleanUp bool)

func (Columns) FieldMap

func (cols Columns) FieldMap(toLower bool) map[string]int

FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased

func (Columns) GetColumn

func (cols Columns) GetColumn(name string) Column

GetColumn returns the matched Col

func (Columns) GetKeys

func (cols Columns) GetKeys(keyType KeyType) Columns

GetKeys gets key columns

func (Columns) GetMissing added in v1.1.8

func (cols Columns) GetMissing(newCols ...Column) (missing Columns)

GetMissing returns the missing columns from newCols

func (Columns) IsDifferent

func (cols Columns) IsDifferent(newCols Columns) bool

func (Columns) IsDummy

func (cols Columns) IsDummy() bool

IsDummy returns true if the columns are injected by CreateDummyFields

func (Columns) IsSimilarTo

func (cols Columns) IsSimilarTo(otherCols Columns) bool

IsSimilarTo returns true if has same number of columns and contains the same columns, but may be in different order

func (Columns) JSON added in v1.2.2

func (cols Columns) JSON(includeParent bool) (output string)

PrettyTable returns a text pretty table

func (Columns) Keys

func (cols Columns) Keys() []string

Names return the column names args -> (lower bool, cleanUp bool)

func (Columns) MakeRec

func (cols Columns) MakeRec(row []any) map[string]any

func (Columns) MakeShaper

func (cols Columns) MakeShaper(tgtColumns Columns) (shaper *Shaper, err error)

func (Columns) Merge added in v1.1.15

func (cols Columns) Merge(newCols Columns, overwrite bool) (col2 Columns, added schemaChg, changed []schemaChg)

func (Columns) Names

func (cols Columns) Names(args ...bool) []string

Names return the column names args -> (lower bool, cleanUp bool)

func (Columns) PrettyTable added in v1.1.8

func (cols Columns) PrettyTable(includeParent bool) (output string)

PrettyTable returns a text pretty table

func (Columns) SetKeys

func (cols Columns) SetKeys(keyType KeyType, colNames ...string) (err error)

SetKeys sets key columns

func (Columns) Sourced added in v1.1.6

func (cols Columns) Sourced() (sourced bool)

Sourced returns true if the columns are all sourced

func (Columns) Types

func (cols Columns) Types(args ...bool) []string

Types return the column names/types args -> (lower bool, cleanUp bool)

func (Columns) WithoutMeta added in v1.2.2

func (cols Columns) WithoutMeta() (newCols Columns)

WithoutMeta returns the columns with metadata columns

type Compressor

type Compressor interface {
	Self() Compressor
	Compress(io.Reader) io.Reader
	Decompress(io.Reader) (io.Reader, error)
	Suffix() string
}

Compressor implements differnt kind of compression

func NewCompressor

func NewCompressor(cpType CompressorType) Compressor

type CompressorType

type CompressorType string

CompressorType is an int type for enum for the Compressor Type

const (
	// AutoCompressorType is for auto compression
	AutoCompressorType CompressorType = "AUTO"
	// NoneCompressorType is for no compression
	NoneCompressorType CompressorType = "NONE"
	// ZipCompressorType is for Zip compression
	ZipCompressorType CompressorType = "ZIP"
	// GzipCompressorType is for Gzip compression
	GzipCompressorType CompressorType = "GZIP"
	// SnappyCompressorType is for Snappy compression
	SnappyCompressorType CompressorType = "SNAPPY"
	// ZStandardCompressorType is for ZStandard
	ZStandardCompressorType CompressorType = "ZSTD"
)

func CompressorTypePtr

func CompressorTypePtr(v CompressorType) *CompressorType

CompressorTypePtr returns a pointer to the CompressorType value passed in.

type Dataflow

type Dataflow struct {
	Columns     Columns
	Buffer      [][]interface{}
	StreamCh    chan *Datastream
	Streams     []*Datastream
	Context     *g.Context
	Limit       uint64
	EgressBytes uint64

	Ready           bool
	Inferred        bool
	FsURL           string
	OnColumnChanged func(col Column) error
	OnColumnAdded   func(col Column) error

	StreamMap map[string]*Datastream

	SchemaVersion int // for column type version
	// contains filtered or unexported fields
}

Dataflow is a collection of concurrent Datastreams

func MakeDataFlow

func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)

MakeDataFlow create a dataflow from datastreams

func NewDataflow

func NewDataflow(limit ...int) (df *Dataflow)

NewDataflow creates a new dataflow

func NewDataflowContext added in v1.1.15

func NewDataflowContext(ctx context.Context, limit ...int) (df *Dataflow)

func (*Dataflow) AddColumns

func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)

SetColumns sets the columns

func (*Dataflow) AddEgressBytes added in v1.2.2

func (df *Dataflow) AddEgressBytes(bytes uint64)

AddEgressBytes add egress bytes

func (*Dataflow) Bytes

func (df *Dataflow) Bytes() (inBytes, outBytes uint64)

func (*Dataflow) ChangeColumn

func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool

SetColumns sets the columns

func (*Dataflow) CleanUp

func (df *Dataflow) CleanUp()

CleanUp refers the defer functions

func (*Dataflow) Close

func (df *Dataflow) Close()

Close closes the df

func (*Dataflow) CloseCurrentBatches

func (df *Dataflow) CloseCurrentBatches()

func (*Dataflow) Collect

func (df *Dataflow) Collect() (data Dataset, err error)

Collect reads from one or more streams and return a dataset

func (*Dataflow) Count

func (df *Dataflow) Count() (cnt uint64)

Count returns the aggregate count

func (*Dataflow) Defer

func (df *Dataflow) Defer(f func())

Defer runs a given function as close of Dataflow

func (*Dataflow) DsTotalBytes

func (df *Dataflow) DsTotalBytes() (bytes uint64)

func (*Dataflow) Err

func (df *Dataflow) Err() (err error)

Err return the error if any

func (*Dataflow) IsClosed

func (df *Dataflow) IsClosed() bool

IsClosed is true is ds is closed

func (*Dataflow) IsEmpty

func (df *Dataflow) IsEmpty() bool

IsEmpty returns true is ds.Rows of all channels as empty

func (*Dataflow) MakeStreamCh

func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)

MakeStreamCh determines whether to merge all the streams into one or keep them separate. If data is small per stream, it's best to merge For example, Bigquery has limits on number of operations can be called within a time limit

func (*Dataflow) MergeColumns added in v1.1.15

func (df *Dataflow) MergeColumns(columns []Column, inferred bool) (processOk bool)

SetColumns sets the columns

func (*Dataflow) Pause

func (df *Dataflow) Pause(exceptDs ...string) bool

Pause pauses all streams

func (*Dataflow) PushStreamChan

func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)

func (*Dataflow) ResetConfig added in v1.1.15

func (df *Dataflow) ResetConfig()

ResetConfig resets the Sp config, so that, for example, delimiter settings are not carried through.

func (*Dataflow) SetConfig added in v1.1.15

func (df *Dataflow) SetConfig(cfg *StreamConfig)

SetConfig set the Sp config

func (*Dataflow) SetEmpty

func (df *Dataflow) SetEmpty()

SetEmpty sets all underlying datastreams empty

func (*Dataflow) SetReady

func (df *Dataflow) SetReady()

SetReady sets the df.ready

func (*Dataflow) Size

func (df *Dataflow) Size() int

Size is the number of streams

func (*Dataflow) SyncColumns

func (df *Dataflow) SyncColumns()

SyncColumns a workaround to synch the ds.Columns to the df.Columns

func (*Dataflow) SyncStats

func (df *Dataflow) SyncStats()

SyncStats sync stream processor stats aggregated to the df.Columns

func (*Dataflow) Unpause

func (df *Dataflow) Unpause(exceptDs ...string)

Unpause unpauses all streams

func (*Dataflow) WaitClosed

func (df *Dataflow) WaitClosed()

WaitClosed waits until dataflow is closed hack to make sure all streams are pushed

func (*Dataflow) WaitReady

func (df *Dataflow) WaitReady() error

WaitReady waits until dataflow is ready

type Dataset

type Dataset struct {
	Result        *sqlx.Rows
	Columns       Columns
	Rows          [][]interface{}
	SQL           string
	Duration      float64
	Sp            *StreamProcessor
	Inferred      bool
	SafeInference bool
	NoDebug       bool
}

Dataset is a query returned dataset

func NewDataset

func NewDataset(columns Columns) (data Dataset)

NewDataset return a new dataset

func NewDatasetFromMap

func NewDatasetFromMap(m map[string]interface{}) (data Dataset)

NewDatasetFromMap return a new dataset

func NewExcelDataset added in v1.2.2

func NewExcelDataset(reader io.Reader, props map[string]string) (data Dataset, err error)

func ReadCsv

func ReadCsv(path string) (Dataset, error)

ReadCsv reads CSV and returns dataset

func (*Dataset) AddColumns

func (data *Dataset) AddColumns(newCols Columns, overwrite bool) (added Columns)

SetColumns sets the columns

func (*Dataset) Append

func (data *Dataset) Append(row ...[]any)

Append appends a new row

func (*Dataset) ColValues

func (data *Dataset) ColValues(col int) []interface{}

ColValues returns the values of a one column as array

func (*Dataset) ColValuesStr

func (data *Dataset) ColValuesStr(col int) []string

ColValuesStr returns the values of a one column as array or string

func (*Dataset) FirstRow

func (data *Dataset) FirstRow() []interface{}

FirstRow returns the first row

func (*Dataset) FirstVal

func (data *Dataset) FirstVal() interface{}

FirstVal returns the first value from the first row

func (*Dataset) GetFields

func (data *Dataset) GetFields(lower ...bool) []string

GetFields return the fields of the Data

func (*Dataset) InferColumnTypes

func (data *Dataset) InferColumnTypes()

InferColumnTypes determines the columns types

func (*Dataset) Pick

func (data *Dataset) Pick(colNames ...string) (nData Dataset)

Pick returns a new dataset with specified columns

func (*Dataset) PrettyTable added in v1.1.8

func (data *Dataset) PrettyTable(fields ...string) (output string)

func (*Dataset) Print

func (data *Dataset) Print(limit int)

Print pretty prints the data with a limit 0 is unlimited

func (*Dataset) Records

func (data *Dataset) Records(lower ...bool) []map[string]interface{}

Records return rows of maps

func (*Dataset) RecordsCasted

func (data *Dataset) RecordsCasted(lower ...bool) []map[string]interface{}

RecordsCasted return rows of maps or casted values

func (*Dataset) RecordsString

func (data *Dataset) RecordsString(lower ...bool) []map[string]interface{}

RecordsString return rows of maps or string values

func (*Dataset) SetFields

func (data *Dataset) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Dataset) Sort

func (data *Dataset) Sort(args ...any)

Sort sorts by cols example: `data.Sort(0, 2, 3, false)` will sort col0, col2, col3 descending example: `data.Sort(0, 2, true)` will sort col0, col2 ascending

func (*Dataset) Stream

func (data *Dataset) Stream(Props ...map[string]string) *Datastream

Stream returns a datastream of the dataset

func (*Dataset) ToJSONMap

func (data *Dataset) ToJSONMap() map[string]interface{}

ToJSONMap converst to a JSON object

func (*Dataset) WriteCsv

func (data *Dataset) WriteCsv(dest io.Writer) (tbw int, err error)

WriteCsv writes to a writer

type Datastream

type Datastream struct {
	Columns       Columns
	Buffer        [][]any
	BatchChan     chan *Batch
	Batches       []*Batch
	CurrentBatch  *Batch
	Count         uint64
	Context       *g.Context
	Ready         bool
	Bytes         atomic.Uint64
	Sp            *StreamProcessor
	SafeInference bool
	NoDebug       bool
	Inferred      bool

	ID       string
	Metadata Metadata // map of column name to metadata type
	// contains filtered or unexported fields
}

Datastream is a stream of rows

func MergeDataflow

func MergeDataflow(df *Dataflow) (dsN *Datastream)

MergeDataflow merges the dataflow streams into one

func NewDatastream

func NewDatastream(columns Columns) (ds *Datastream)

NewDatastream return a new datastream

func NewDatastreamContext

func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)

NewDatastreamContext return a new datastream

func NewDatastreamIt

func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)

NewDatastreamIt with it

func ReadCsvStream

func ReadCsvStream(path string) (ds *Datastream, err error)

ReadCsvStream reads CSV and returns datasream

func (*Datastream) AddBytes

func (ds *Datastream) AddBytes(b int64)

AddBytes add bytes as processed

func (*Datastream) AddColumns

func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)

SetColumns sets the columns

func (*Datastream) CastRowToString

func (ds *Datastream) CastRowToString(row []any) []string

CastRowToString returns the row as string casted

func (*Datastream) CastRowToStringSafe added in v1.2.6

func (ds *Datastream) CastRowToStringSafe(row []any) []string

CastRowToStringSafe returns the row as string casted (safer)

func (*Datastream) ChangeColumn

func (ds *Datastream) ChangeColumn(i int, newType ColumnType)

ChangeColumn applies a column type change

func (*Datastream) Chunk

func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)

Chunk splits the datastream into chunk datastreams (in sequence)

func (*Datastream) Close

func (ds *Datastream) Close()

Close closes the datastream

func (*Datastream) Collect

func (ds *Datastream) Collect(limit int) (Dataset, error)

Collect reads a stream and return a dataset limit of 0 is unlimited

func (*Datastream) ConsumeAvroReader

func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)

ConsumeAvroReader uses the provided reader to stream rows

func (*Datastream) ConsumeAvroReaderSeeker

func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)

ConsumeAvroReaderSeeker uses the provided reader to stream rows

func (*Datastream) ConsumeCsvReader

func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)

ConsumeCsvReader uses the provided reader to stream rows

func (*Datastream) ConsumeCsvReaderChl added in v1.2.4

func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (err error)

ConsumeCsvReaderChl reads a channel of readers. Should be safe to use with header top row

func (*Datastream) ConsumeExcelReader added in v1.2.2

func (ds *Datastream) ConsumeExcelReader(reader io.Reader, props map[string]string) (err error)

ConsumeSASReader uses the provided reader to stream rows

func (*Datastream) ConsumeExcelReaderSeeker added in v1.2.2

func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[string]string) (err error)

ConsumeSASReaderSeeker uses the provided reader to stream rows

func (*Datastream) ConsumeJsonReader

func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)

ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream

func (*Datastream) ConsumeJsonReaderChl added in v1.2.6

func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXML bool) (err error)

func (*Datastream) ConsumeParquetReader

func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream) ConsumeParquetReaderSeeker

func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream) ConsumeSASReader

func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)

ConsumeSASReader uses the provided reader to stream rows

func (*Datastream) ConsumeSASReaderSeeker

func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)

ConsumeSASReaderSeeker uses the provided reader to stream rows

func (*Datastream) ConsumeXmlReader

func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)

ConsumeXmlReader uses the provided reader to stream XML This will put each XML rec as one string value so payload can be processed downstream

func (*Datastream) Defer

func (ds *Datastream) Defer(f func())

Defer runs a given function as close of Datastream

func (*Datastream) Df

func (ds *Datastream) Df() *Dataflow

func (*Datastream) Err

func (ds *Datastream) Err() (err error)

Err return the error if any

func (*Datastream) GetConfig

func (ds *Datastream) GetConfig() (configMap map[string]string)

GetConfig get config

func (*Datastream) GetFields

func (ds *Datastream) GetFields(args ...bool) []string

GetFields return the fields of the Data

func (*Datastream) IsClosed

func (ds *Datastream) IsClosed() bool

IsClosed is true is ds is closed

func (*Datastream) LatestBatch

func (ds *Datastream) LatestBatch() *Batch

func (*Datastream) Limited added in v1.2.4

func (ds *Datastream) Limited(limit ...int) bool

func (*Datastream) Map

func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)

Map applies the provided function to every row and returns the result

func (*Datastream) MapParallel

func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)

MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.

func (*Datastream) NewBatch

func (ds *Datastream) NewBatch(columns Columns) *Batch

NewBatch create new batch with fixed columns should be used each time column type changes, or columns are added

func (*Datastream) NewCsvBufferReader

func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader

NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvBufferReaderChnl

func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)

NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory

func (*Datastream) NewCsvBytesChnl

func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)

NewCsvBytesChnl returns a channel yield chunk of bytes of csv

func (*Datastream) NewCsvReader

func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader

NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvReaderChnl

func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)

NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) NewExcelReaderChnl added in v1.2.2

func (ds *Datastream) NewExcelReaderChnl(rowLimit int, bytesLimit int64, sheetName string) (readerChn chan *BatchReader)

func (*Datastream) NewIterator

func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator

func (*Datastream) NewJsonLinesReaderChnl

func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)

NewJsonLinesReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) NewJsonReaderChnl

func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)

func (*Datastream) NewParquetArrowReaderChnl added in v1.1.7

func (ds *Datastream) NewParquetArrowReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)

NewParquetArrowReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes WARN: Not using this one since it doesn't write Decimals properly.

func (*Datastream) NewParquetReaderChnl

func (ds *Datastream) NewParquetReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)

NewParquetReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) Pause

func (ds *Datastream) Pause()

func (*Datastream) Push

func (ds *Datastream) Push(row []any)

Push return the fields of the Data

func (*Datastream) Records

func (ds *Datastream) Records() <-chan map[string]any

Records return rows of maps

func (*Datastream) Rows

func (ds *Datastream) Rows() chan []any

func (*Datastream) SetConfig

func (ds *Datastream) SetConfig(configMap map[string]string)

SetConfig sets the ds.config values

func (*Datastream) SetEmpty

func (ds *Datastream) SetEmpty()

SetEmpty sets the ds.Rows channel as empty

func (*Datastream) SetFields

func (ds *Datastream) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Datastream) SetFileURI added in v1.1.15

func (ds *Datastream) SetFileURI()

SetFileURI sets the FileURI of the columns of the Datastream

func (*Datastream) SetIterator added in v1.1.14

func (ds *Datastream) SetIterator(it *Iterator)

func (*Datastream) SetMetadata

func (ds *Datastream) SetMetadata(jsonStr string)

func (*Datastream) SetReady

func (ds *Datastream) SetReady()

SetReady sets the ds.ready

func (*Datastream) Shape

func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)

Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows

func (*Datastream) Split

func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)

Split splits the datastream into parallel datastreams

func (*Datastream) Start

func (ds *Datastream) Start() (err error)

Start generates the stream Should cycle the Iter Func until done

func (*Datastream) TryPause

func (ds *Datastream) TryPause() bool

func (*Datastream) Unpause

func (ds *Datastream) Unpause()

Unpause unpauses all streams

func (*Datastream) WaitClosed

func (ds *Datastream) WaitClosed()

WaitClosed waits until dataflow is closed hack to make sure all streams are pushed

func (*Datastream) WaitReady

func (ds *Datastream) WaitReady() error

WaitReady waits until datastream is ready

type Excel added in v1.2.2

type Excel struct {
	File   *excelize.File
	Sheets []string
	Path   string
	// contains filtered or unexported fields
}

Excel represent an Excel object pointing to its file

func NewExcel added in v1.2.2

func NewExcel() (xls *Excel)

NewExcel creates a new excel file

func NewExcelFromFile added in v1.2.2

func NewExcelFromFile(path string) (xls *Excel, err error)

NewExcelFromFile return a new Excel instance from a local file

func NewExcelFromReader added in v1.2.2

func NewExcelFromReader(reader io.Reader) (xls *Excel, err error)

NewExcelFromReader return a new Excel instance from a reader

func (*Excel) GetDataset added in v1.2.2

func (xls *Excel) GetDataset(sheet string) (data Dataset)

GetDataset returns a dataset of the provided sheet

func (*Excel) GetDatasetFromRange added in v1.2.2

func (xls *Excel) GetDatasetFromRange(sheet, cellRange string) (data Dataset, err error)

GetDatasetFromRange returns a dataset of the provided sheet / range cellRange example: `$AH$13:$AI$20` or `AH13:AI20` or `A:E`

func (*Excel) RefreshSheets added in v1.2.2

func (xls *Excel) RefreshSheets() (err error)

RefreshSheets refresh sheet index data

func (*Excel) WriteSheet added in v1.2.2

func (xls *Excel) WriteSheet(shtName string, ds *Datastream, mode string) (err error)

WriteSheet write a datastream into a sheet mode can be: `new`, `append` or `overwrite`. Default is `new`

func (*Excel) WriteToFile added in v1.2.2

func (xls *Excel) WriteToFile(path string) (err error)

WriteToFile write to a file

func (*Excel) WriteToWriter added in v1.2.2

func (xls *Excel) WriteToWriter(w io.Writer) (err error)

WriteToWriter write to a provided writer

type GoogleSheet added in v1.2.2

type GoogleSheet struct {
	Sheets        []string
	SpreadsheetID string
	// contains filtered or unexported fields
}

GoogleSheet represent a Google Sheet object

func NewGoogleSheet added in v1.2.2

func NewGoogleSheet(props ...string) (ggs *GoogleSheet, err error)

NewGoogleSheet is a blank spreadsheet title is the new spreadsheet title

func NewGoogleSheetFromURL added in v1.2.2

func NewGoogleSheetFromURL(urlStr string, props ...string) (ggs *GoogleSheet, err error)

NewGoogleSheetFromURL return a new GoogleSheet instance from a provided url

func (*GoogleSheet) DeleteSheet added in v1.2.2

func (ggs *GoogleSheet) DeleteSheet(shtName string) (err error)

func (*GoogleSheet) GetDataset added in v1.2.2

func (ggs *GoogleSheet) GetDataset(shtName string) (data Dataset, err error)

GetDataset returns a dataset of the sheet

func (*GoogleSheet) GetDatasetFromRange added in v1.2.2

func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRange string) (data Dataset, err error)

GetDatasetFromRange returns a dataset from the specified range

func (*GoogleSheet) RefreshSheets added in v1.2.2

func (ggs *GoogleSheet) RefreshSheets() (err error)

RefreshSheets refreshes sheets data

func (*GoogleSheet) URL added in v1.2.2

func (ggs *GoogleSheet) URL() string

func (*GoogleSheet) WriteSheet added in v1.2.2

func (ggs *GoogleSheet) WriteSheet(shtName string, ds *Datastream, mode string) (err error)

WriteSheet write a datastream into a sheet mode can be: `new`, `append` or `overwrite`. Default is `new`

type GzipCompressor

type GzipCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*GzipCompressor) Compress

func (cp *GzipCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*GzipCompressor) Decompress

func (cp *GzipCompressor) Decompress(reader io.Reader) (gReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*GzipCompressor) Suffix

func (cp *GzipCompressor) Suffix() string

type Iterator

type Iterator struct {
	Row          []any
	Reprocess    chan []any
	IsCasted     bool
	RowIsCasted  bool
	Counter      uint64
	StreamRowNum uint64
	Context      *g.Context
	Closed       bool
	// contains filtered or unexported fields
}

Iterator is the row provider for a datastream

func (*Iterator) Ds

func (it *Iterator) Ds() *Datastream

type KeyType

type KeyType string
const (
	AggregateKey    KeyType = "aggregate"
	ClusterKey      KeyType = "cluster"
	DistributionKey KeyType = "distribution"
	DuplicateKey    KeyType = "duplicate"
	HashKey         KeyType = "hash"
	IndexKey        KeyType = "index"
	PartitionKey    KeyType = "partition"
	PrimaryKey      KeyType = "primary"
	SortKey         KeyType = "sort"
	UniqueKey       KeyType = "unique"
	UpdateKey       KeyType = "update"
)

type KeyValue

type KeyValue struct {
	Key   string `json:"key"`
	Value any    `json:"value"`
}

type Metadata

type Metadata struct {
	StreamURL KeyValue `json:"stream_url"`
	LoadedAt  KeyValue `json:"loaded_at"`
	RowNum    KeyValue `json:"row_num"`
	RowID     KeyValue `json:"row_id"`
}

func (*Metadata) AsMap

func (m *Metadata) AsMap() map[string]any

AsMap return as map

type NoneCompressor

type NoneCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*NoneCompressor) Compress

func (cp *NoneCompressor) Compress(reader io.Reader) io.Reader

func (*NoneCompressor) Decompress

func (cp *NoneCompressor) Decompress(reader io.Reader) (gReader io.Reader, err error)

func (*NoneCompressor) Suffix

func (cp *NoneCompressor) Suffix() string

type Parquet

type Parquet struct {
	Path   string
	Reader *parquet.Reader
	Data   *Dataset
	// contains filtered or unexported fields
}

Parquet is a parquet object

func NewParquetReader added in v1.1.7

func NewParquetReader(reader io.ReaderAt, columns Columns) (p *Parquet, err error)

func (*Parquet) Columns

func (p *Parquet) Columns() Columns

type ParquetArrowDumper added in v1.1.6

type ParquetArrowDumper struct {
	// contains filtered or unexported fields
}

func NewParquetArrowDumper added in v1.1.6

func NewParquetArrowDumper(ccReader file.ColumnChunkReader) *ParquetArrowDumper

func (*ParquetArrowDumper) Next added in v1.1.6

func (pad *ParquetArrowDumper) Next() (interface{}, bool)

type ParquetArrowReader added in v1.1.6

type ParquetArrowReader struct {
	Path    string
	Reader  *file.Reader
	Data    *Dataset
	Context *g.Context
	// contains filtered or unexported fields
}

ParquetArrowReader is a parquet reader object

func NewParquetArrowReader added in v1.1.7

func NewParquetArrowReader(reader *os.File, selected []string) (p *ParquetArrowReader, err error)

func (*ParquetArrowReader) Columns added in v1.1.6

func (p *ParquetArrowReader) Columns() Columns

type ParquetArrowWriter added in v1.1.6

type ParquetArrowWriter struct {
	Writer *file.Writer
	// contains filtered or unexported fields
}

func NewParquetArrowWriter added in v1.1.6

func NewParquetArrowWriter(w io.Writer, columns Columns, codec compress.Compression) (p *ParquetArrowWriter, err error)

func (*ParquetArrowWriter) AppendNewRowGroup added in v1.1.6

func (p *ParquetArrowWriter) AppendNewRowGroup() (err error)

func (*ParquetArrowWriter) Close added in v1.1.6

func (p *ParquetArrowWriter) Close() (err error)

func (*ParquetArrowWriter) Columns added in v1.1.6

func (p *ParquetArrowWriter) Columns() Columns

func (*ParquetArrowWriter) WriteRow added in v1.1.6

func (p *ParquetArrowWriter) WriteRow(row []any) (err error)

type ParquetWriter added in v1.1.7

type ParquetWriter struct {
	Writer *parquet.Writer
	// contains filtered or unexported fields
}

func NewParquetWriter added in v1.1.7

func NewParquetWriter(w io.Writer, columns Columns, codec compress.Codec) (p *ParquetWriter, err error)

func (*ParquetWriter) Close added in v1.1.7

func (pw *ParquetWriter) Close() error

func (*ParquetWriter) WriteRow added in v1.1.7

func (pw *ParquetWriter) WriteRow(row []any) error

type ReaderReady added in v1.2.6

type ReaderReady struct {
	Reader io.Reader
	URI    string
}

type RecNode

type RecNode struct {
	// contains filtered or unexported fields
}

func NewRecNode

func NewRecNode(cols Columns) *RecNode

func (*RecNode) Compression

func (rn *RecNode) Compression() compress.Codec

func (*RecNode) Encoding

func (rn *RecNode) Encoding() encoding.Encoding

func (*RecNode) Fields

func (rn *RecNode) Fields() []parquet.Field

func (*RecNode) GoType

func (rn *RecNode) GoType() reflect.Type

func (*RecNode) ID

func (rn *RecNode) ID() int

func (*RecNode) Leaf

func (rn *RecNode) Leaf() bool

func (*RecNode) Optional

func (rn *RecNode) Optional() bool

func (*RecNode) Repeated

func (rn *RecNode) Repeated() bool

func (*RecNode) Required

func (rn *RecNode) Required() bool

func (*RecNode) String

func (rn *RecNode) String() string

func (*RecNode) Type

func (rn *RecNode) Type() parquet.Type

type Record

type Record struct {
	Columns *Columns
	Values  []any
}

type SAS

type SAS struct {
	Path   string
	Reader *datareader.SAS7BDAT
	Data   *Dataset
	// contains filtered or unexported fields
}

SAS is a sas7bdat object

func NewSASStream

func NewSASStream(reader io.ReadSeeker, columns Columns) (s *SAS, err error)

func (*SAS) Columns

func (s *SAS) Columns() Columns

type SSHClient

type SSHClient struct {
	Host       string
	Port       int
	User       string
	Password   string
	TgtHost    string
	TgtPort    int
	PrivateKey string
	Passphrase string
	Err        error
	// contains filtered or unexported fields
}

SSHClient is a client to connect to a ssh server with the main goal of forwarding ports

func (*SSHClient) Close

func (s *SSHClient) Close()

Close stops the client connection

func (*SSHClient) Connect

func (s *SSHClient) Connect() (err error)

Connect connects to the server

func (*SSHClient) GetOutput

func (s *SSHClient) GetOutput() (stdout string, stderr string)

GetOutput return stdout & stderr outputs

func (*SSHClient) OpenPortForward

func (s *SSHClient) OpenPortForward() (localPort int, err error)

OpenPortForward forwards the port as specified

func (*SSHClient) RunAsProcess

func (s *SSHClient) RunAsProcess() (localPort int, err error)

RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key

func (*SSHClient) SftpClient

func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)

SftpClient returns an SftpClient

type Shaper

type Shaper struct {
	Func       func([]any) []any
	SrcColumns Columns
	TgtColumns Columns
	ColMap     map[int]int
}

type SnappyCompressor

type SnappyCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*SnappyCompressor) Compress

func (cp *SnappyCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*SnappyCompressor) Decompress

func (cp *SnappyCompressor) Decompress(reader io.Reader) (sReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*SnappyCompressor) Suffix

func (cp *SnappyCompressor) Suffix() string

type StreamConfig added in v1.1.15

type StreamConfig struct {
	TrimSpace      bool    `json:"trim_space"`
	EmptyAsNull    bool    `json:"empty_as_null"`
	Header         bool    `json:"header"`
	Compression    string  `json:"compression"` // AUTO | ZIP | GZIP | SNAPPY | NONE
	NullIf         string  `json:"null_if"`
	NullAs         string  `json:"null_as"`
	DatetimeFormat string  `json:"datetime_format"`
	SkipBlankLines bool    `json:"skip_blank_lines"`
	Delimiter      string  `json:"delimiter"`
	Escape         string  `json:"escape"`
	FileMaxRows    int64   `json:"file_max_rows"`
	MaxDecimals    int     `json:"max_decimals"`
	Flatten        bool    `json:"flatten"`
	FieldsPerRec   int     `json:"fields_per_rec"`
	Jmespath       string  `json:"jmespath"`
	BoolAsInt      bool    `json:"-"`
	Columns        Columns `json:"columns"` // list of column types. Can be partial list! likely is!

	Map map[string]string `json:"-"`
	// contains filtered or unexported fields
}

func DefaultStreamConfig added in v1.1.15

func DefaultStreamConfig() *StreamConfig

type StreamProcessor

type StreamProcessor struct {
	N uint64

	Config *StreamConfig
	// contains filtered or unexported fields
}

StreamProcessor processes rows and values

func NewStreamProcessor

func NewStreamProcessor() *StreamProcessor

NewStreamProcessor returns a new StreamProcessor

func (*StreamProcessor) CastRow

func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}

CastRow casts each value of a row slows down processing about 40%?

func (*StreamProcessor) CastToString

func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string

CastToString to string. used for csv writing slows processing down 5% with upstream CastRow or 35% without upstream CastRow

func (*StreamProcessor) CastToStringSafe added in v1.2.6

func (sp *StreamProcessor) CastToStringSafe(i int, val interface{}, valType ...ColumnType) string

CastToStringSafe to string (safer)

func (*StreamProcessor) CastToTime

func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)

CastToTime converts interface to time

func (*StreamProcessor) CastType

func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}

CastType casts the type of an interface CastType is used to cast the interface place holders?

func (*StreamProcessor) CastVal

func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}

CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/slingdata-io/sling-cli/core/dbio/iop' -bench '^BenchmarkProcessVal'

func (*StreamProcessor) CastValWithoutStats

func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}

CastValWithoutStats casts the value without counting stats

func (*StreamProcessor) EncodingTransform added in v1.2.3

func (sp *StreamProcessor) EncodingTransform(t Transform, val string) (newVal string, err error)

func (*StreamProcessor) GetType

func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)

GetType returns the type of an interface

func (*StreamProcessor) ParseString

func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}

ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"

func (*StreamProcessor) ParseTime

func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)

ParseTime parses a date string and returns time.Time

func (*StreamProcessor) ParseVal

func (sp *StreamProcessor) ParseVal(val interface{}) interface{}

ParseVal parses the value into its appropriate type

func (*StreamProcessor) ProcessRow

func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}

ProcessRow processes a row

func (*StreamProcessor) ProcessVal

func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}

ProcessVal processes a value

func (*StreamProcessor) ResetConfig added in v1.1.15

func (sp *StreamProcessor) ResetConfig()

func (*StreamProcessor) SetConfig

func (sp *StreamProcessor) SetConfig(configMap map[string]string)

SetConfig sets the data.Sp.config values

type Transform added in v1.2.2

type Transform string
const (
	TransformDecodeLatin1        Transform = "decode_latin1"
	TransformDecodeLatin5        Transform = "decode_latin5"
	TransformDecodeLatin9        Transform = "decode_latin9"
	TransformDecodeUtf8          Transform = "decode_utf8"
	TransformDecodeUtf8Bom       Transform = "decode_utf8_bom"
	TransformDecodeUtf16         Transform = "decode_utf16"
	TransformDecodeWindows1250   Transform = "decode_windows1250"
	TransformDecodeWindows1252   Transform = "decode_windows1252"
	TransformDuckdbListToText    Transform = "duckdb_list_to_text"
	TransformEncodeLatin1        Transform = "encode_latin1"
	TransformEncodeLatin5        Transform = "encode_latin5"
	TransformEncodeLatin9        Transform = "encode_latin9"
	TransformEncodeUtf8          Transform = "encode_utf8"
	TransformEncodeUtf8Bom       Transform = "encode_utf8_bom"
	TransformEncodeUtf16         Transform = "encode_utf16"
	TransformEncodeWindows1250   Transform = "encode_windows1250"
	TransformEncodeWindows1252   Transform = "encode_windows1252"
	TransformHashMd5             Transform = "hash_md5"
	TransformHashSha256          Transform = "hash_sha256"
	TransformHashSha512          Transform = "hash_sha512"
	TransformParseBit            Transform = "parse_bit"
	TransformParseFix            Transform = "parse_fix"
	TransformParseUuid           Transform = "parse_uuid"
	TransformReplace0x00         Transform = "replace_0x00"
	TransformReplaceAccents      Transform = "replace_accents"
	TransformReplaceNonPrintable Transform = "replace_non_printable"
	TransformTrimSpace           Transform = "trim_space"
)

type TransformFunc

type TransformFunc func(*StreamProcessor, string) (string, error)

type Transformers added in v1.2.2

type Transformers struct {
	Accent transform.Transformer

	DecodeUTF8        transform.Transformer
	DecodeUTF8BOM     transform.Transformer
	DecodeUTF16       transform.Transformer
	DecodeISO8859_1   transform.Transformer
	DecodeISO8859_5   transform.Transformer
	DecodeISO8859_15  transform.Transformer
	DecodeWindows1250 transform.Transformer
	DecodeWindows1252 transform.Transformer

	EncodeUTF8        transform.Transformer
	EncodeUTF8BOM     transform.Transformer
	EncodeUTF16       transform.Transformer
	EncodeISO8859_1   transform.Transformer
	EncodeISO8859_5   transform.Transformer
	EncodeISO8859_15  transform.Transformer
	EncodeWindows1250 transform.Transformer
	EncodeWindows1252 transform.Transformer
}

func NewTransformers added in v1.2.2

func NewTransformers() Transformers

type ZStandardCompressor

type ZStandardCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*ZStandardCompressor) Compress

func (cp *ZStandardCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*ZStandardCompressor) Decompress

func (cp *ZStandardCompressor) Decompress(reader io.Reader) (sReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*ZStandardCompressor) Suffix

func (cp *ZStandardCompressor) Suffix() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL