Documentation ¶
Index ¶
- Variables
- func Check(e error, msg string)
- func Compress(reader io.Reader) io.Reader
- func Decompress(reader io.Reader) (gReader io.Reader, err error)
- func Error(e error, msg string) error
- func F(format string, args ...interface{}) string
- func GetType(myvar interface{}) string
- func IsErr(err error, msg string) bool
- func IsErrExit(err error, msg string)
- func Log(text string)
- func LogC(text string, col string, w io.Writer)
- func LogCBlue(text string)
- func LogCCyan(text string)
- func LogCGreen(text string)
- func LogCMagenta(text string)
- func LogCRed(text string)
- func LogCRedErr(text string)
- func LogCWhite(text string)
- func LogError(E error)
- func LogErrorExit(E error)
- func LogErrorMail(E error)
- func LogIfError(E error)
- func Now() int64
- func Panic(e error, msg string)
- func ParseString(s string) interface{}
- func PrintT(v interface{})
- func PrintV(v interface{})
- func Propagate(err error, msg string) error
- func R(format string, args ...string) string
- func RandString(charset string, n int) string
- func Rm(format string, m map[string]interface{}) string
- func SendMail(from string, to []string, subject string, textHTML string) error
- func Tee(reader io.Reader, limit int) io.Reader
- type BaseConn
- func (conn *BaseConn) BulkExportStream(sql string) (ds Datastream, err error)
- func (conn *BaseConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
- func (conn *BaseConn) Close() error
- func (conn *BaseConn) Connect() error
- func (conn *BaseConn) Context() Context
- func (conn *BaseConn) Db() *sqlx.DB
- func (conn *BaseConn) DropTable(tableNames ...string) (err error)
- func (conn *BaseConn) DropView(viewNames ...string) (err error)
- func (conn *BaseConn) GenerateDDL(tableFName string, data Dataset) (string, error)
- func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string) string
- func (conn *BaseConn) GetColumns(tableFName string) (Dataset, error)
- func (conn *BaseConn) GetColumnsFull(tableFName string) (Dataset, error)
- func (conn *BaseConn) GetCount(tableFName string) (uint64, error)
- func (conn *BaseConn) GetDDL(tableFName string) (string, error)
- func (conn *BaseConn) GetGormConn() (*gorm.DB, error)
- func (conn *BaseConn) GetIndexes(tableFName string) (Dataset, error)
- func (conn *BaseConn) GetObjects(schema string, objectType string) (Dataset, error)
- func (conn *BaseConn) GetPrimaryKeys(tableFName string) (Dataset, error)
- func (conn *BaseConn) GetProp(key string) string
- func (conn *BaseConn) GetSchemas() (Dataset, error)
- func (conn *BaseConn) GetSchemata(schemaName string) (Schema, error)
- func (conn *BaseConn) GetTables(schema string) (Dataset, error)
- func (conn *BaseConn) GetTemplateValue(path string) (value string)
- func (conn *BaseConn) GetType() string
- func (conn *BaseConn) GetViews(schema string) (Dataset, error)
- func (conn *BaseConn) Import(data Dataset, tableName string) error
- func (conn *BaseConn) Init() (err error)
- func (conn *BaseConn) InsertBatchStream(tableFName string, columns []string, streamRow <-chan []interface{}) error
- func (conn *BaseConn) InsertStream(tableFName string, ds Datastream) (count uint64, err error)
- func (conn *BaseConn) Kill() error
- func (conn *BaseConn) LoadYAML() error
- func (conn *BaseConn) Query(sql string) (Dataset, error)
- func (conn *BaseConn) QueryContext(ctx context.Context, sql string) (Dataset, error)
- func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (Dataset, error)
- func (conn *BaseConn) RunAnalysisField(analysisName string, tableFName string, fields ...string) (Dataset, error)
- func (conn *BaseConn) RunAnalysisTable(analysisName string, tableFNames ...string) (Dataset, error)
- func (conn *BaseConn) Schemata() *Schemata
- func (conn *BaseConn) SetProp(key string, val string)
- func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)
- func (conn *BaseConn) StreamRows(sql string) (ds Datastream, err error)
- func (conn *BaseConn) StreamRowsContext(ctx context.Context, sql string) (ds Datastream, err error)
- func (conn *BaseConn) Template() *Template
- type CSV
- type Column
- type ColumnStats
- type Connection
- type Context
- type Dataset
- type Datastream
- type MsSQLServerConn
- type MySQLConn
- func (conn *MySQLConn) BulkExportStream(sql string) (ds Datastream, err error)
- func (conn *MySQLConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
- func (conn *MySQLConn) Init() error
- func (conn *MySQLConn) LoadDataInFile(tableFName string, ds Datastream) (count uint64, err error)
- func (conn *MySQLConn) LoadDataOutFile(sql string) (stdOutReader io.Reader, err error)
- type OracleConn
- type Parquet
- type PostgresConn
- func (conn *PostgresConn) BulkExportStream(sql string) (ds Datastream, err error)
- func (conn *PostgresConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
- func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
- func (conn *PostgresConn) Init() error
- type RedshiftConn
- type S3
- type Schema
- type Schemata
- type Table
- type Template
Constants ¶
This section is empty.
Variables ¶
var ( // SMTPServer is email SMTP server host SMTPServer = "smtp.gmail.com" // SMTPPort is email SMTP server port SMTPPort = 465 // SMTPUser is SMTP user name SMTPUser = os.Getenv("SMTP_USER") // SMTPPass is user password SMTPPass = os.Getenv("SMTP_PASS") // AlertEmail is the email address to send errors to AlertEmail = os.Getenv("ALERT_EMAIL") )
Functions ¶
func Decompress ¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func LogErrorExit ¶
func LogErrorExit(E error)
LogErrorExit handles logging of an error and exits, useful for reporting
func LogErrorMail ¶
func LogErrorMail(E error)
LogErrorMail handles logging of an error and mail it to self
func LogIfError ¶
func LogIfError(E error)
LogIfError handles logging of an error if it i not nil, useful for reporting
func ParseString ¶
func ParseString(s string) interface{}
ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"
func RandString ¶
RandString returns a random string of len n with the provided char set charset can be `aplha` or `aplhanumeric`
func SendMail ¶
SendMail sends an email to the specific email address https://godoc.org/gopkg.in/gomail.v2#example-package
Types ¶
type BaseConn ¶
type BaseConn struct { Connection URL string Type string // the type of database for sqlx: postgres, mysql, sqlite Data Dataset // contains filtered or unexported fields }
BaseConn is a database connection
func (*BaseConn) BulkExportStream ¶
func (conn *BaseConn) BulkExportStream(sql string) (ds Datastream, err error)
BulkExportStream streams the rows in bulk
func (*BaseConn) BulkImportStream ¶
func (conn *BaseConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
BulkImportStream import the stream rows in bulk
func (*BaseConn) GenerateDDL ¶
GenerateDDL genrate a DDL based on a dataset
func (*BaseConn) GenerateInsertStatement ¶
GenerateInsertStatement returns the proper INSERT statement
func (*BaseConn) GetColumns ¶
GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`
func (*BaseConn) GetColumnsFull ¶
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*BaseConn) GetGormConn ¶
GetGormConn returns the gorm db connection
func (*BaseConn) GetIndexes ¶
GetIndexes returns indexes for given table.
func (*BaseConn) GetObjects ¶
GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'
func (*BaseConn) GetPrimaryKeys ¶
GetPrimaryKeys returns primark keys for given table.
func (*BaseConn) GetSchemas ¶
GetSchemas returns schemas
func (*BaseConn) GetSchemata ¶
GetSchemata obtain full schemata info
func (*BaseConn) GetTemplateValue ¶
GetTemplateValue returns the value of the path
func (*BaseConn) InsertBatchStream ¶
func (conn *BaseConn) InsertBatchStream(tableFName string, columns []string, streamRow <-chan []interface{}) error
InsertBatchStream inserts a stream into a table in batch
func (*BaseConn) InsertStream ¶
func (conn *BaseConn) InsertStream(tableFName string, ds Datastream) (count uint64, err error)
InsertStream inserts a stream into a table
func (*BaseConn) QueryContext ¶
QueryContext runs a sql query with ctx, returns `result`, `error`
func (*BaseConn) RunAnalysis ¶
func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (Dataset, error)
RunAnalysis runs an analysis
func (*BaseConn) RunAnalysisField ¶
func (conn *BaseConn) RunAnalysisField(analysisName string, tableFName string, fields ...string) (Dataset, error)
RunAnalysisField runs a field level analysis
func (*BaseConn) RunAnalysisTable ¶
RunAnalysisTable runs a table level analysis
func (*BaseConn) StreamRecords ¶
StreamRecords the records of a sql query, returns `result`, `error`
func (*BaseConn) StreamRows ¶
func (conn *BaseConn) StreamRows(sql string) (ds Datastream, err error)
StreamRows the rows of a sql query, returns `result`, `error`
func (*BaseConn) StreamRowsContext ¶
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
type CSV ¶
CSV is a csv object
func (*CSV) NewReader ¶
func (c *CSV) NewReader() (*io.PipeReader, error)
NewReader creates a Reader
func (*CSV) ReadStream ¶
func (c *CSV) ReadStream() (ds Datastream, err error)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV) WriteStream ¶
func (c *CSV) WriteStream(ds Datastream) (cnt uint64, err error)
WriteStream to CSV file
type Column ¶
type Column struct { Position int64 `json:"position"` Name string `json:"name"` Type string `json:"type"` // contains filtered or unexported fields }
Column represents a schemata column
type ColumnStats ¶
type ColumnStats struct {
// contains filtered or unexported fields
}
ColumnStats holds statistics for a column
type Connection ¶
type Connection interface { Init() error Connect() error Kill() error Close() error GetType() string GetGormConn() (*gorm.DB, error) LoadYAML() error StreamRows(sql string) (Datastream, error) BulkExportStream(sql string) (Datastream, error) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error) Query(sql string) (Dataset, error) QueryContext(ctx context.Context, sql string) (Dataset, error) GenerateDDL(tableFName string, data Dataset) (string, error) GenerateInsertStatement(tableName string, fields []string) string DropTable(...string) error DropView(...string) error InsertStream(tableFName string, ds Datastream) (count uint64, err error) Db() *sqlx.DB Schemata() *Schemata Template() *Template SetProp(string, string) GetProp(string) string GetTemplateValue(path string) (value string) Context() Context StreamRecords(sql string) (<-chan map[string]interface{}, error) GetDDL(string) (string, error) GetSchemata(string) (Schema, error) GetSchemas() (Dataset, error) GetTables(string) (Dataset, error) GetViews(string) (Dataset, error) GetColumns(string) (Dataset, error) GetPrimaryKeys(string) (Dataset, error) GetIndexes(string) (Dataset, error) GetColumnsFull(string) (Dataset, error) GetCount(string) (uint64, error) RunAnalysis(string, map[string]interface{}) (Dataset, error) RunAnalysisTable(string, ...string) (Dataset, error) RunAnalysisField(string, string, ...string) (Dataset, error) // contains filtered or unexported methods }
Connection is the Base interface for Connections
func GetConn ¶
func GetConn(URL string) Connection
GetConn return the most proper connection for a given database
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context is to manage context
type Dataset ¶
type Dataset struct { Result *sqlx.Rows Columns []Column Rows [][]interface{} SQL string Duration float64 }
Dataset is a query returned dataset
func (*Dataset) InferColumnTypes ¶
func (data *Dataset) InferColumnTypes()
InferColumnTypes determines the columns types
type Datastream ¶
type Datastream struct { Columns []Column Rows chan []interface{} Buffer [][]interface{} // contains filtered or unexported fields }
Datastream is a stream of rows
func ReadCsvStream ¶
func ReadCsvStream(path string) (Datastream, error)
ReadCsvStream reads CSV and returns datasream
func (*Datastream) Collect ¶
func (ds *Datastream) Collect() Dataset
Collect reads a stream and return a dataset
func (*Datastream) GetFields ¶
func (ds *Datastream) GetFields() []string
GetFields return the fields of the Data
func (*Datastream) InferTypes ¶
func (ds *Datastream) InferTypes()
InferTypes infers types if needed and add to Buffer Experimental....
func (*Datastream) NewCsvReader ¶
func (ds *Datastream) NewCsvReader(limit int) *io.PipeReader
NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.
type MsSQLServerConn ¶
MsSQLServerConn is a Microsoft SQL Server connection
type MySQLConn ¶
MySQLConn is a Postgres connection
func (*MySQLConn) BulkExportStream ¶
func (conn *MySQLConn) BulkExportStream(sql string) (ds Datastream, err error)
BulkExportStream bulk Export
func (*MySQLConn) BulkImportStream ¶
func (conn *MySQLConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MySQLConn) LoadDataInFile ¶
func (conn *MySQLConn) LoadDataInFile(tableFName string, ds Datastream) (count uint64, err error)
LoadDataInFile Bulk Import
func (*MySQLConn) LoadDataOutFile ¶
LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File priviledge needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work.
type OracleConn ¶
OracleConn is a Postgres connection
func (*OracleConn) BulkImportStream ¶
func (conn *OracleConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*OracleConn) SQLLoad ¶
func (conn *OracleConn) SQLLoad(tableFName string, ds Datastream) (count uint64, err error)
SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr
type Parquet ¶
type Parquet struct { Path string Columns []Column File *os.File PFile source.ParquetFile Data *Dataset }
Parquet is a parquet object
func (*Parquet) ReadStream ¶
func (p *Parquet) ReadStream() (Datastream, error)
ReadStream returns the read Parquet stream into a Datastream https://github.com/xitongsys/parquet-go/blob/master/example/read_partial.go
func (*Parquet) WriteStream ¶
func (p *Parquet) WriteStream(ds Datastream) error
WriteStream to Parquet file from datastream
type PostgresConn ¶
PostgresConn is a Postgres connection
func (*PostgresConn) BulkExportStream ¶
func (conn *PostgresConn) BulkExportStream(sql string) (ds Datastream, err error)
BulkExportStream uses the bulk dumping (COPY)
func (*PostgresConn) BulkImportStream ¶
func (conn *PostgresConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*PostgresConn) CopyToStdout ¶
func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
CopyToStdout Copy TO STDOUT
type RedshiftConn ¶
RedshiftConn is a Redshift connection
func (*RedshiftConn) BulkExportStream ¶
func (conn *RedshiftConn) BulkExportStream(sql string) (ds Datastream, err error)
BulkExportStream reads in bulk
func (*RedshiftConn) BulkImportStream ¶
func (conn *RedshiftConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
type S3 ¶
S3 is a AWS s3 object
func (*S3) ReadStream ¶
func (s *S3) ReadStream(key string) (*io.PipeReader, error)
ReadStream read from an S3 bucket (download) Example: S3 file stream into Database or CSV
type Schemata ¶
type Schemata struct { Schemas map[string]Schema Tables map[string]*Table // all tables with full name lower case (schema.table) }
Schemata contains the full schema for a connection
type Table ¶
type Table struct { Name string `json:"name"` FullName string `json:"full_name"` IsView bool `json:"is_view"` // whether is a view Columns []Column ColumnsMap map[string]*Column }
Table represents a schemata table
type Template ¶
type Template struct { Core map[string]string Metadata map[string]string Analysis map[string]string Function map[string]string GeneralTypeMap map[string]string `yaml:"general_type_map"` NativeTypeMap map[string]string `yaml:"native_type_map"` Variable map[string]string }
Template is a database YAML template