database

package
v0.4.86 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2024 License: GPL-3.0 Imports: 61 Imported by: 3

Documentation

Index

Constants

View Source
const RelationManyToMany = "many_to_many"
View Source
const RelationManyToOne = "many_to_one"
View Source
const RelationOneToMany = "one_to_many"
View Source
const RelationOneToOne = "one_to_one"
View Source
const SQLiteVersion = "3.41.0"

Variables

View Source
var Debug = false

Debug prints queries when true

View Source
var DuckDbFileContext = map[string]*g.Context{} // so that collision doesn't happen
View Source
var DuckDbUseTempFile = false
View Source
var DuckDbVersion = "0.9.1"
View Source
var InferDBStream = false

InferDBStream may need to be `true`, since precision and scale is not guaranteed. If `false`, will use the database stream source schema

View Source
var (
	// UseBulkExportFlowCSV to use BulkExportFlowCSV
	UseBulkExportFlowCSV = false
)

Functions

func AddMissingColumns added in v0.3.66

func AddMissingColumns(conn Connection, table Table, newCols iop.Columns) (ok bool, err error)

func ChangeColumnTypeViaAdd added in v0.3.130

func ChangeColumnTypeViaAdd(conn Connection, table Table, col iop.Column) (err error)

ChangeColumnTypeViaAdd swaps a new column with the old in order to change the type need to use this with snowflake when changing from date to string, or number to string

func CleanSQL

func CleanSQL(conn Connection, sql string) string

CleanSQL removes creds from the query

func CommonColumns

func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)

CommonColumns return common columns

func CopyFromAzure

func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)

CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func CopyFromS3

func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)

func EnsureBinDuckDB added in v0.3.221

func EnsureBinDuckDB(version string) (binPath string, err error)

EnsureBinDuckDB ensures duckdb binary exists if missing, downloads and uses

func EnsureBinSQLite added in v0.3.221

func EnsureBinSQLite() (binPath string, err error)

EnsureBinSQLite ensures sqlite binary exists if missing, downloads and uses

func GenerateAlterDDL added in v0.3.111

func GenerateAlterDDL(conn Connection, table Table, newColumns iop.Columns) (bool, error)

GenerateAlterDDL generate a DDL based on a dataset

func GetQualifierQuote added in v0.3.111

func GetQualifierQuote(dialect dbio.Type) string

func HasVariedCase added in v0.3.275

func HasVariedCase(text string) bool

func InsertBatchStream added in v0.0.5

func InsertBatchStream(conn Connection, tx Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func InsertStream added in v0.0.5

func InsertStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream inserts a stream

func NativeTypeToGeneral added in v0.3.133

func NativeTypeToGeneral(name, dbType string, conn Connection) (colType iop.ColumnType)

func PK added in v0.0.5

func PK(obj interface{}) (pk []string)

PK returns the primary keys of a model

func ParseColumnName added in v0.3.185

func ParseColumnName(text string, dialect dbio.Type) (colName string, err error)

func ParseSQLMultiStatements added in v0.0.5

func ParseSQLMultiStatements(sql string) (sqls g.Strings)

ParseSQLMultiStatements splits a sql text into statements typically by a ';'

func SQLColumns

func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns)

SQLColumns returns the columns from database ColumnType

func SplitTableFullName

func SplitTableFullName(tableName string) (string, string)

SplitTableFullName retrusn the schema / table name

func TableExists added in v0.3.231

func TableExists(conn Connection, tableFName string) (exists bool, err error)

TableExists returns true if the table exists

func TestPermissions

func TestPermissions(conn Connection, tableName string) (err error)

TestPermissions tests the needed permissions in a given connection

func UID added in v0.0.5

func UID(obj interface{}) string

UID returns a unique ID for that object

func Upsert added in v0.0.5

func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, pkFields []string) (count int64, err error)

Upsert upserts from source table into target table

Types

type BaseConn

type BaseConn struct {
	Connection
	URL  string
	Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite

	Data iop.Dataset

	Log []string
	// contains filtered or unexported fields
}

BaseConn is a database connection

func (*BaseConn) AddLog added in v0.1.0

func (conn *BaseConn) AddLog(text string)

AddLog logs a text for debugging

func (*BaseConn) Base added in v0.1.0

func (conn *BaseConn) Base() *BaseConn

BaseURL returns the base Conn

func (*BaseConn) BaseURL

func (conn *BaseConn) BaseURL() string

BaseURL returns the base URL with default port

func (*BaseConn) Begin

func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)

Begin starts a connection wide transaction

func (*BaseConn) BeginContext added in v0.0.5

func (conn *BaseConn) BeginContext(ctx context.Context, options ...*sql.TxOptions) (err error)

BeginContext starts a connection wide transaction

func (*BaseConn) BulkExportFlow

func (conn *BaseConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err error)

BulkExportFlow creates a dataflow from a sql query

func (*BaseConn) BulkExportFlowCSV

func (conn *BaseConn) BulkExportFlowCSV(tables ...Table) (df *iop.Dataflow, err error)

BulkExportFlowCSV creates a dataflow from a sql query, using CSVs

func (*BaseConn) BulkExportStream

func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream streams the rows in bulk

func (*BaseConn) BulkImportFlow

func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow imports the streams rows in bulk concurrently using channels

func (*BaseConn) BulkImportStream

func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream import the stream rows in bulk

func (*BaseConn) CastColumnForSelect

func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string

CastColumnForSelect casts to the correct target column type

func (*BaseConn) CastColumnsForSelect

func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string

CastColumnsForSelect cast the source columns into the target Column types

func (*BaseConn) Close

func (conn *BaseConn) Close() error

Close closes the connection

func (*BaseConn) Commit

func (conn *BaseConn) Commit() (err error)

Commit commits a connection wide transaction

func (*BaseConn) CompareChecksums

func (conn *BaseConn) CompareChecksums(tableName string, columns iop.Columns) (err error)

CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor

func (*BaseConn) ConnString added in v0.2.0

func (conn *BaseConn) ConnString() string

ConnString returns the connection string needed for connection

func (*BaseConn) Connect

func (conn *BaseConn) Connect(timeOut ...int) (err error)

Connect connects to the database

func (*BaseConn) Context

func (conn *BaseConn) Context() *g.Context

Context returns the db context

func (*BaseConn) CreateTable added in v0.0.5

func (conn *BaseConn) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)

CreateTable creates a new table based on provided columns `tableName` should have 'schema.table' format

func (*BaseConn) CreateTemporaryTable added in v0.0.5

func (conn *BaseConn) CreateTemporaryTable(tableName string, cols iop.Columns) (err error)

CreateTemporaryTable creates a temp table based on provided columns

func (*BaseConn) CurrentDatabase added in v0.2.0

func (conn *BaseConn) CurrentDatabase() (dbName string, err error)

CurrentDatabase returns the name of the current database

func (*BaseConn) Db

func (conn *BaseConn) Db() *sqlx.DB

Db returns the sqlx db object

func (*BaseConn) DbX added in v0.0.5

func (conn *BaseConn) DbX() *DbX

DbX returns the DbX object

func (*BaseConn) DropTable

func (conn *BaseConn) DropTable(tableNames ...string) (err error)

DropTable drops given table.

func (*BaseConn) DropView

func (conn *BaseConn) DropView(viewNames ...string) (err error)

DropView drops given view.

func (*BaseConn) Exec

func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)

Exec runs a sql query, returns `error`

func (*BaseConn) ExecContext

func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*BaseConn) ExecMulti added in v0.0.5

func (conn *BaseConn) ExecMulti(sql string, args ...interface{}) (result sql.Result, err error)

ExecMulti runs mutiple sql queries, returns `error`

func (*BaseConn) ExecMultiContext added in v0.0.5

func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecMultiContext runs multiple sql queries with context, returns `error`

func (*BaseConn) GenerateDDL

func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)

GenerateDDL genrate a DDL based on a dataset

func (*BaseConn) GenerateInsertStatement

func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string

GenerateInsertStatement returns the proper INSERT statement

func (*BaseConn) GenerateUpsertExpressions

func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)

GenerateUpsertExpressions returns a map with needed expressions

func (*BaseConn) GenerateUpsertSQL added in v0.0.5

func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL returns a sql for upsert

func (*BaseConn) GetAnalysis added in v0.0.5

func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)

GetAnalysis runs an analysis

func (*BaseConn) GetColumnStats

func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)

GetColumnStats analyzes the table and returns the column statistics

func (*BaseConn) GetColumns

func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)

func (*BaseConn) GetColumnsFull

func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)

GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`

func (*BaseConn) GetCount

func (conn *BaseConn) GetCount(tableFName string) (uint64, error)

GetCount returns count of records

func (*BaseConn) GetDDL

func (conn *BaseConn) GetDDL(tableFName string) (string, error)

GetDDL returns DDL for given table.

func (*BaseConn) GetDatabases added in v0.2.0

func (conn *BaseConn) GetDatabases() (iop.Dataset, error)

GetDatabases returns databases for given connection

func (*BaseConn) GetGormConn

func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)

GetGormConn returns the gorm db connection

func (*BaseConn) GetIndexes

func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)

GetIndexes returns indexes for given table.

func (*BaseConn) GetNativeType added in v0.0.5

func (conn *BaseConn) GetNativeType(col iop.Column) (nativeType string, err error)

GetNativeType returns the native column type from generic

func (*BaseConn) GetObjects

func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)

GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'

func (*BaseConn) GetPrimaryKeys

func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)

GetPrimaryKeys returns primark keys for given table.

func (*BaseConn) GetProp

func (conn *BaseConn) GetProp(key string) string

GetProp returns the value of a property

func (*BaseConn) GetSQLColumns

func (conn *BaseConn) GetSQLColumns(tables ...Table) (columns iop.Columns, err error)

GetSQLColumns return columns from a sql query result

func (*BaseConn) GetSchemas

func (conn *BaseConn) GetSchemas() (iop.Dataset, error)

GetSchemas returns schemas

func (*BaseConn) GetSchemata added in v0.1.0

func (conn *BaseConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)

GetSchemata obtain full schemata info for a schema and/or table in current database

func (*BaseConn) GetTableColumns added in v0.3.111

func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)

GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`

func (*BaseConn) GetTables

func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)

GetTables returns tables for given schema

func (*BaseConn) GetTemplateValue

func (conn *BaseConn) GetTemplateValue(path string) (value string)

GetTemplateValue returns the value of the path

func (*BaseConn) GetType

func (conn *BaseConn) GetType() dbio.Type

GetType returns the type db object

func (*BaseConn) GetURL

func (conn *BaseConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*BaseConn) GetViews

func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)

GetViews returns views for given schema

func (*BaseConn) Import

func (conn *BaseConn) Import(data iop.Dataset, tableName string) error

Import imports `data` into `tableName`

func (*BaseConn) Info added in v0.2.6

func (conn *BaseConn) Info() (ci ConnInfo)

Info returns connection information

func (*BaseConn) Init

func (conn *BaseConn) Init() (err error)

Init initiates the connection object & add default port if missing

func (*BaseConn) InsertBatchStream

func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BaseConn) InsertStream

func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream inserts a stream into a table

func (*BaseConn) Kill

func (conn *BaseConn) Kill() error

Kill kill the database connection

func (*BaseConn) LoadTemplates

func (conn *BaseConn) LoadTemplates() error

LoadTemplates loads the appropriate yaml template

func (*BaseConn) MustExec

func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)

MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.

func (*BaseConn) NewTransaction added in v0.0.5

func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)

NewTransaction creates a new transaction

func (*BaseConn) OptimizeTable

func (conn *BaseConn) OptimizeTable(table *Table, newColumns iop.Columns) (ok bool, err error)

OptimizeTable analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.

func (*BaseConn) Prepare

func (conn *BaseConn) Prepare(query string) (stmt *sql.Stmt, err error)

Prepare prepares the statement

func (*BaseConn) ProcessTemplate added in v0.1.0

func (conn *BaseConn) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)

ProcessTemplate processes a template SQL text at a given level

func (*BaseConn) PropArr

func (conn *BaseConn) PropArr() []string

PropArr returns an array of properties

func (*BaseConn) Props

func (conn *BaseConn) Props() map[string]string

Props returns a map properties

func (*BaseConn) Query

func (conn *BaseConn) Query(sql string, options ...map[string]interface{}) (data iop.Dataset, err error)

Query runs a sql query, returns `result`, `error`

func (*BaseConn) QueryContext

func (conn *BaseConn) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (iop.Dataset, error)

QueryContext runs a sql query with ctx, returns `result`, `error`

func (*BaseConn) Quote

func (conn *BaseConn) Quote(field string, normalize ...bool) string

Quote adds quotes to the field name

func (*BaseConn) Rollback

func (conn *BaseConn) Rollback() (err error)

Rollback rolls back a connection wide transaction

func (*BaseConn) RunAnalysis

func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)

RunAnalysis runs an analysis

func (*BaseConn) Schemata

func (conn *BaseConn) Schemata() Schemata

Schemata returns the Schemata object

func (*BaseConn) Self

func (conn *BaseConn) Self() Connection

Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)

func (*BaseConn) SetProp

func (conn *BaseConn) SetProp(key string, val string)

SetProp sets the value of a property

func (*BaseConn) StreamRecords

func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)

StreamRecords the records of a sql query, returns `result`, `error`

func (*BaseConn) StreamRows

func (conn *BaseConn) StreamRows(sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)

StreamRows the rows of a sql query, returns `result`, `error`

func (*BaseConn) StreamRowsContext

func (conn *BaseConn) StreamRowsContext(ctx context.Context, query string, options ...map[string]interface{}) (ds *iop.Datastream, err error)

StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`

func (*BaseConn) SumbitTemplate added in v0.1.0

func (conn *BaseConn) SumbitTemplate(level string, templateMap map[string]string, name string, values map[string]interface{}) (data iop.Dataset, err error)

func (*BaseConn) SwapTable

func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)

SwapTable swaps two table

func (*BaseConn) Template

func (conn *BaseConn) Template() Template

Template returns the Template object

func (*BaseConn) Tx

func (conn *BaseConn) Tx() Transaction

Tx returns the current sqlx tx object

func (*BaseConn) Unquote

func (conn *BaseConn) Unquote(field string) string

Unquote removes quotes to the field name

func (*BaseConn) Upsert

func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

func (*BaseConn) ValidateColumnNames

func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)

ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`

type BaseTransaction added in v0.3.82

type BaseTransaction struct {
	Tx   *sqlx.Tx
	Conn Connection
	// contains filtered or unexported fields
}

BaseTransaction is a database transaction

func (*BaseTransaction) Commit added in v0.3.82

func (t *BaseTransaction) Commit() (err error)

func (*BaseTransaction) Connection added in v0.3.202

func (t *BaseTransaction) Connection() Connection

Connection return the connection

func (*BaseTransaction) Context added in v0.3.82

func (t *BaseTransaction) Context() *g.Context

Commit commits connection wide transaction

func (*BaseTransaction) DisableTrigger added in v0.3.82

func (t *BaseTransaction) DisableTrigger(tableName, triggerName string) (err error)

DisableTrigger disables a trigger

func (*BaseTransaction) EnableTrigger added in v0.3.82

func (t *BaseTransaction) EnableTrigger(tableName, triggerName string) (err error)

EnableTrigger enables a trigger

func (*BaseTransaction) Exec added in v0.3.82

func (t *BaseTransaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)

Exec runs a sql query, returns `error`

func (*BaseTransaction) ExecContext added in v0.3.82

func (t *BaseTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*BaseTransaction) ExecMultiContext added in v0.3.82

func (t *BaseTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecMultiContext runs multiple sql queries with context, returns `error`

func (*BaseTransaction) InsertBatchStream added in v0.3.82

func (t *BaseTransaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BaseTransaction) InsertStream added in v0.3.82

func (t *BaseTransaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream inserts a stream into a table

func (*BaseTransaction) Prepare added in v0.3.82

func (t *BaseTransaction) Prepare(query string) (stmt *sql.Stmt, err error)

Prepare prepares the statement

func (*BaseTransaction) QueryContext added in v0.3.82

func (t *BaseTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)

QueryContext queries rows

func (*BaseTransaction) Rollback added in v0.3.82

func (t *BaseTransaction) Rollback() (err error)

Rollback rolls back connection wide transaction

func (*BaseTransaction) Upsert added in v0.3.82

func (t *BaseTransaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)

Upsert does an upsert from source table into target table

func (*BaseTransaction) UpsertStream added in v0.3.82

func (t *BaseTransaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)

UpsertStream inserts a stream into a table in batch

type BigQueryConn

type BigQueryConn struct {
	BaseConn
	URL       string
	Client    *bigquery.Client
	ProjectID string
	DatasetID string
	Location  string
	Datasets  []string
}

BigQueryConn is a Google Big Query connection

func (*BigQueryConn) BulkExportFlow

func (conn *BigQueryConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*BigQueryConn) BulkImportFlow

func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in GCS and then use the COPY command.

func (*BigQueryConn) BulkImportStream

func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.

func (*BigQueryConn) CastColumnForSelect added in v0.3.239

func (conn *BigQueryConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)

CastColumnForSelect casts to the correct target column type

func (*BigQueryConn) Close

func (conn *BigQueryConn) Close() error

Close closes the connection

func (*BigQueryConn) Connect

func (conn *BigQueryConn) Connect(timeOut ...int) error

Connect connects to the database

func (*BigQueryConn) CopyFromGCS

func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error

func (*BigQueryConn) CopyFromLocal added in v0.3.121

func (conn *BigQueryConn) CopyFromLocal(localURI string, tableFName string, dsColumns []iop.Column) error

CopyFromGCS into bigquery from google storage

func (*BigQueryConn) CopyToGCS

func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error

func (*BigQueryConn) ExecContext

func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)

func (*BigQueryConn) ExecMultiContext added in v0.3.131

func (conn *BigQueryConn) ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*BigQueryConn) ExportToGCS added in v0.3.122

func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error

CopyToGCS Copy table to gc storage

func (*BigQueryConn) GenerateUpsertSQL added in v0.0.5

func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*BigQueryConn) GetDatabases added in v0.2.9

func (conn *BigQueryConn) GetDatabases() (iop.Dataset, error)

GetDatabases returns databases

func (*BigQueryConn) GetSchemas added in v0.2.9

func (conn *BigQueryConn) GetSchemas() (iop.Dataset, error)

GetSchemas returns schemas

func (*BigQueryConn) GetSchemata added in v0.2.9

func (conn *BigQueryConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)

GetSchemata obtain full schemata info for a schema and/or table in current database

func (*BigQueryConn) Init

func (conn *BigQueryConn) Init() error

Init initiates the object

func (*BigQueryConn) InsertBatchStream

func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BigQueryConn) InsertStream

func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.

func (*BigQueryConn) LoadCSVFromReader added in v0.3.120

func (conn *BigQueryConn) LoadCSVFromReader(tableFName string, reader io.Reader, dsColumns []iop.Column) error

LoadCSVFromReader demonstrates loading data into a BigQuery table using a file on the local filesystem. https://cloud.google.com/bigquery/docs/batch-loading-data#loading_data_from_local_files

func (*BigQueryConn) NewTransaction added in v0.3.82

func (conn *BigQueryConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)

NewTransaction creates a new transaction

func (*BigQueryConn) StreamRowsContext

func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)

func (*BigQueryConn) Unload

func (conn *BigQueryConn) Unload(tables ...Table) (gsPath string, err error)

Unload to Google Cloud Storage

type BigTableAction added in v0.3.126

type BigTableAction string
const BTCreateColumnFamily BigTableAction = "create_column_family"
const BTCreateTable BigTableAction = "create_table"
const BTDeleteTable BigTableAction = "delete_table"
const BTTableInfo BigTableAction = "table_info"

type BigTableConn added in v0.3.126

type BigTableConn struct {
	BaseConn
	URL        string
	Client     *bigtable.Client
	ProjectID  string
	InstanceID string
	Location   string
}

BigTableConn is a Google Big Query connection

func (*BigTableConn) BulkExportFlow added in v0.3.126

func (conn *BigTableConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err error)

func (*BigTableConn) Close added in v0.3.126

func (conn *BigTableConn) Close() error

Close closes the connection

func (*BigTableConn) Connect added in v0.3.126

func (conn *BigTableConn) Connect(timeOut ...int) error

Connect connects to the database

func (*BigTableConn) ExecContext added in v0.3.126

func (conn *BigTableConn) ExecContext(ctx context.Context, payload string, args ...interface{}) (result sql.Result, err error)

func (*BigTableConn) GetColumns added in v0.3.126

func (conn *BigTableConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)

func (*BigTableConn) GetColumnsFull added in v0.3.135

func (conn *BigTableConn) GetColumnsFull(tableFName string) (iop.Dataset, error)

func (*BigTableConn) GetSQLColumns added in v0.3.126

func (conn *BigTableConn) GetSQLColumns(tables ...Table) (columns iop.Columns, err error)

GetTables returns tables for given schema

func (*BigTableConn) GetSchemas added in v0.3.135

func (conn *BigTableConn) GetSchemas() (iop.Dataset, error)

func (*BigTableConn) GetSchemata added in v0.3.128

func (conn *BigTableConn) GetSchemata(schemaName string, tableNames ...string) (schemata Schemata, err error)

func (*BigTableConn) GetTables added in v0.3.126

func (conn *BigTableConn) GetTables(schema string) (data iop.Dataset, err error)

func (*BigTableConn) GetViews added in v0.3.126

func (conn *BigTableConn) GetViews(schema string) (data iop.Dataset, err error)

GetTables returns tables for given schema

func (*BigTableConn) Init added in v0.3.126

func (conn *BigTableConn) Init() error

Init initiates the object

func (*BigTableConn) InsertBatchStream added in v0.3.126

func (conn *BigTableConn) InsertBatchStream(table string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BigTableConn) NewTransaction added in v0.3.126

func (conn *BigTableConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)

NewTransaction creates a new transaction

func (*BigTableConn) StreamRowsContext added in v0.3.126

func (conn *BigTableConn) StreamRowsContext(ctx context.Context, table string, options ...map[string]interface{}) (ds *iop.Datastream, err error)

type BigTableQuery added in v0.3.126

type BigTableQuery struct {
	Action         BigTableAction `json:"action"`
	Table          string         `json:"table"`
	ColumnFamilies []string       `json:"column_family"`
}

type BlankTransaction added in v0.3.82

type BlankTransaction struct {
	Conn Connection
	// contains filtered or unexported fields
}

func (*BlankTransaction) Commit added in v0.3.82

func (t *BlankTransaction) Commit() (err error)

func (*BlankTransaction) Connection added in v0.3.202

func (t *BlankTransaction) Connection() Connection

func (*BlankTransaction) Context added in v0.3.82

func (t *BlankTransaction) Context() *g.Context

func (*BlankTransaction) ExecContext added in v0.3.82

func (t *BlankTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

func (*BlankTransaction) ExecMultiContext added in v0.3.82

func (t *BlankTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

func (*BlankTransaction) Prepare added in v0.3.82

func (t *BlankTransaction) Prepare(query string) (stmt *sql.Stmt, err error)

func (*BlankTransaction) QueryContext added in v0.3.82

func (t *BlankTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)

func (*BlankTransaction) Rollback added in v0.3.82

func (t *BlankTransaction) Rollback() (err error)

type ClickhouseConn added in v0.1.0

type ClickhouseConn struct {
	BaseConn
	URL string
}

ClickhouseConn is a Clikchouse connection

func (*ClickhouseConn) BulkImportStream added in v0.3.118

func (conn *ClickhouseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table

func (*ClickhouseConn) ConnString added in v0.4.64

func (conn *ClickhouseConn) ConnString() string

func (*ClickhouseConn) GenerateInsertStatement added in v0.3.118

func (conn *ClickhouseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string

GenerateInsertStatement returns the proper INSERT statement

func (*ClickhouseConn) GenerateUpsertSQL added in v0.1.0

func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*ClickhouseConn) Init added in v0.1.0

func (conn *ClickhouseConn) Init() error

Init initiates the object

func (*ClickhouseConn) NewTransaction added in v0.3.84

func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)

NewTransaction creates a new transaction

type ColumnType added in v0.3.84

type ColumnType struct {
	Name             string
	DatabaseTypeName string
	Length           int
	Precision        int
	Scale            int
	Nullable         bool
	Sourced          bool
}

type ConnInfo added in v0.2.6

type ConnInfo struct {
	Host      string
	Port      int
	Database  string
	User      string
	Password  string
	Schema    string
	Warehouse string
	Role      string
	URL       *net.URL
}

type Connection

type Connection interface {
	Base() *BaseConn
	BaseURL() string
	Begin(options ...*sql.TxOptions) error
	BeginContext(ctx context.Context, options ...*sql.TxOptions) error
	BulkExportFlow(tables ...Table) (*iop.Dataflow, error)
	BulkExportStream(sql string) (*iop.Datastream, error)
	BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
	BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string
	CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string
	Close() error
	Commit() error
	CompareChecksums(tableName string, columns iop.Columns) (err error)
	Connect(timeOut ...int) error
	ConnString() string
	Context() *g.Context
	CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)
	CreateTemporaryTable(tableName string, cols iop.Columns) (err error)
	CurrentDatabase() (string, error)
	Db() *sqlx.DB
	DbX() *DbX
	DropTable(...string) error
	DropView(...string) error
	Exec(sql string, args ...interface{}) (result sql.Result, err error)
	ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
	ExecMulti(sql string, args ...interface{}) (result sql.Result, err error)
	ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
	GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)
	GenerateInsertStatement(tableName string, fields []string, numRows int) string
	GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
	GetAnalysis(string, map[string]interface{}) (string, error)
	GetColumns(tableFName string, fields ...string) (iop.Columns, error)
	GetColumnsFull(string) (iop.Dataset, error)
	GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
	GetCount(string) (uint64, error)
	GetDatabases() (iop.Dataset, error)
	GetDDL(string) (string, error)
	GetGormConn(config *gorm.Config) (*gorm.DB, error)
	GetIndexes(string) (iop.Dataset, error)
	GetNativeType(col iop.Column) (nativeType string, err error)
	GetPrimaryKeys(string) (iop.Dataset, error)
	GetProp(string) string
	GetSchemas() (iop.Dataset, error)
	GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
	GetSQLColumns(tables ...Table) (columns iop.Columns, err error)
	GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
	GetTables(string) (iop.Dataset, error)
	GetTemplateValue(path string) (value string)
	GetType() dbio.Type
	GetURL(newURL ...string) string
	GetViews(string) (iop.Dataset, error)
	Info() ConnInfo
	Init() error
	InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	Kill() error
	LoadTemplates() error
	MustExec(sql string, args ...interface{}) (result sql.Result)
	NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
	OptimizeTable(table *Table, columns iop.Columns) (ok bool, err error)
	Prepare(query string) (stmt *sql.Stmt, err error)
	ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
	Props() map[string]string
	PropsArr() []string
	Query(sql string, options ...map[string]interface{}) (iop.Dataset, error)
	QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (iop.Dataset, error)
	Quote(field string, normalize ...bool) string
	RenameTable(table string, newTable string) (err error)
	Rollback() error
	RunAnalysis(string, map[string]interface{}) (iop.Dataset, error)
	Schemata() Schemata
	Self() Connection

	SetProp(string, string)
	StreamRecords(sql string) (<-chan map[string]interface{}, error)
	StreamRows(sql string, options ...map[string]interface{}) (*iop.Datastream, error)
	StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
	SumbitTemplate(level string, templateMap map[string]string, name string, values map[string]interface{}) (data iop.Dataset, err error)
	SwapTable(srcTable string, tgtTable string) (err error)
	Template() Template
	Tx() Transaction
	Unquote(string) string
	Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
	ValidateColumnNames(tgtColName []string, colNames []string, quote bool) (newColNames []string, err error)
	// contains filtered or unexported methods
}

Connection is the Base interface for Connections

func Clone added in v0.3.111

func Clone(conn Connection) (newConn Connection, err error)

func NewConn

func NewConn(URL string, props ...string) (Connection, error)

NewConn return the most proper connection for a given database

func NewConnContext

func NewConnContext(ctx context.Context, URL string, props ...string) (Connection, error)

NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`

type DataAnalyzer added in v0.3.65

type DataAnalyzer struct {
	Conn        Connection
	Schemata    Schemata
	ColumnMap   map[string]iop.Column
	RelationMap map[string]map[string]map[string]Relation // table > column A > column B > relation
	Options     DataAnalyzerOptions
}

func NewDataAnalyzer added in v0.3.65

func NewDataAnalyzer(conn Connection, opts DataAnalyzerOptions) (da *DataAnalyzer, err error)

func (*DataAnalyzer) AnalyzeColumns added in v0.3.65

func (da *DataAnalyzer) AnalyzeColumns(sampleSize int, includeViews bool) (err error)

func (*DataAnalyzer) GetManyToMany added in v0.3.65

func (da *DataAnalyzer) GetManyToMany(nonUniqueCols iop.Columns, asString bool) (err error)

func (*DataAnalyzer) GetOneToMany added in v0.3.65

func (da *DataAnalyzer) GetOneToMany(uniqueCols, nonUniqueCols iop.Columns, asString bool) (err error)

func (*DataAnalyzer) GetOneToOne added in v0.3.65

func (da *DataAnalyzer) GetOneToOne(uniqueCols iop.Columns, asString bool) (err error)

func (*DataAnalyzer) GetSchemata added in v0.3.65

func (da *DataAnalyzer) GetSchemata(force bool) (err error)

func (*DataAnalyzer) ProcessRelations added in v0.3.65

func (da *DataAnalyzer) ProcessRelations() (err error)

func (*DataAnalyzer) ProcessRelationsInteger added in v0.3.244

func (da *DataAnalyzer) ProcessRelationsInteger() (err error)

func (*DataAnalyzer) ProcessRelationsString added in v0.3.244

func (da *DataAnalyzer) ProcessRelationsString() (err error)

func (*DataAnalyzer) WriteRelationsYaml added in v0.3.244

func (da *DataAnalyzer) WriteRelationsYaml(path string) (err error)

type DataAnalyzerOptions added in v0.3.65

type DataAnalyzerOptions struct {
	DbName      string
	SchemaNames []string
}

type Database added in v0.1.7

type Database struct {
	Name    string            `json:"name"`
	Schemas map[string]Schema `json:"schemas"`
}

Database represents a schemata database

func (*Database) Columns added in v0.1.7

func (db *Database) Columns() map[string]iop.Column

func (*Database) Tables added in v0.1.7

func (db *Database) Tables() map[string]Table

type DbX added in v0.0.5

type DbX struct {
	// contains filtered or unexported fields
}

DbX is db express

func (*DbX) Delete added in v0.0.5

func (x *DbX) Delete(o interface{}) (cnt int, err error)

Delete deletes from object or slice

func (*DbX) Get added in v0.0.5

func (x *DbX) Get(o interface{}, fields ...string) (err error)

Get retrieves object

func (*DbX) Insert added in v0.0.5

func (x *DbX) Insert(o interface{}, fields ...string) (err error)

Insert inserts object or slice

func (*DbX) Select added in v0.0.5

func (x *DbX) Select(o interface{}, fields ...string) (err error)

Select retrieves objects

func (*DbX) TableName added in v0.0.5

func (x *DbX) TableName(o interface{}) (name string)

TableName returns the table name of object or slice

func (*DbX) Update added in v0.0.5

func (x *DbX) Update(o interface{}, fields ...string) (cnt int, err error)

Update updates from object or slice

func (*DbX) Upsert added in v0.0.5

func (x *DbX) Upsert(o interface{}, fields ...string) (cnt int, err error)

Upsert upserts from object or slice

func (*DbX) Where added in v0.0.5

func (x *DbX) Where(where ...interface{}) *DbX

Where adds a where clause

type DuckDbConn added in v0.3.198

type DuckDbConn struct {
	BaseConn
	URL string
	// contains filtered or unexported fields
}

DuckDbConn is a Duck DB connection

func (*DuckDbConn) BulkImportStream added in v0.3.198

func (conn *DuckDbConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table

func (*DuckDbConn) Close added in v0.3.221

func (conn *DuckDbConn) Close() error

Close closes the connection

func (*DuckDbConn) Connect added in v0.3.221

func (conn *DuckDbConn) Connect(timeOut ...int) (err error)

func (*DuckDbConn) ExecContext added in v0.3.221

func (conn *DuckDbConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)

func (*DuckDbConn) ExecMultiContext added in v0.3.221

func (conn *DuckDbConn) ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*DuckDbConn) GenerateUpsertSQL added in v0.3.210

func (conn *DuckDbConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*DuckDbConn) GetURL added in v0.3.198

func (conn *DuckDbConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*DuckDbConn) Init added in v0.3.198

func (conn *DuckDbConn) Init() error

Init initiates the object

func (*DuckDbConn) InsertBatchStream added in v0.3.221

func (conn *DuckDbConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*DuckDbConn) InsertStream added in v0.3.221

func (conn *DuckDbConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.

func (*DuckDbConn) StreamRowsContext added in v0.3.221

func (conn *DuckDbConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)

type ManualTransaction added in v0.3.82

type ManualTransaction struct {
	Conn Connection
	// contains filtered or unexported fields
}

func (*ManualTransaction) Commit added in v0.3.82

func (t *ManualTransaction) Commit() (err error)

func (*ManualTransaction) Context added in v0.3.82

func (t *ManualTransaction) Context() *g.Context

func (*ManualTransaction) ExecContext added in v0.3.82

func (t *ManualTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

func (*ManualTransaction) ExecMultiContext added in v0.3.82

func (t *ManualTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

func (*ManualTransaction) Prepare added in v0.3.82

func (t *ManualTransaction) Prepare(query string) (stmt *sql.Stmt, err error)

func (*ManualTransaction) QueryContext added in v0.3.82

func (t *ManualTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)

func (*ManualTransaction) Rollback added in v0.3.82

func (t *ManualTransaction) Rollback() (err error)

type ModelDbX added in v0.0.5

type ModelDbX struct {
	Ptr          interface{} `json:"-"`
	RowsAffected int         `json:"-"`
	// contains filtered or unexported fields
}

ModelDbX is the base for any SQL model

func (*ModelDbX) Bind added in v0.0.5

func (m *ModelDbX) Bind(bindFunc func(p interface{}) error, objPtr interface{}) (err error)

Bind extracts values from provided echo context

func (*ModelDbX) Delete added in v0.0.5

func (m *ModelDbX) Delete(db *sqlx.DB) (err error)

Delete deletes a record

func (*ModelDbX) Fields added in v0.0.5

func (m *ModelDbX) Fields() (fields []string)

Fields returns the model fields

func (*ModelDbX) Get added in v0.0.5

func (m *ModelDbX) Get(db *sqlx.DB, fields ...string) (err error)

Get get the first record

func (*ModelDbX) Insert added in v0.0.5

func (m *ModelDbX) Insert(db *sqlx.DB, fields ...string) (err error)

Insert inserts one records

func (*ModelDbX) Rec added in v0.0.5

func (m *ModelDbX) Rec() map[string]interface{}

Rec returns the record

func (*ModelDbX) Select added in v0.0.5

func (m *ModelDbX) Select(db *sqlx.DB, objPtr interface{}, fields ...string) (err error)

Select returns multiple records

func (*ModelDbX) TableName added in v0.0.5

func (m *ModelDbX) TableName(objPtr interface{}) string

TableName returns the table name of the underlying pointer

func (*ModelDbX) Update added in v0.0.5

func (m *ModelDbX) Update(db *sqlx.DB, fields ...string) (err error)

Insert inserts one records

func (*ModelDbX) Values added in v0.0.5

func (m *ModelDbX) Values(fields []string) (values []interface{}, err error)

Where adds a where clause

func (*ModelDbX) Where added in v0.0.5

func (m *ModelDbX) Where(where ...interface{}) *ModelDbX

Where adds a where clause

type MsSQLServerConn

type MsSQLServerConn struct {
	BaseConn
	URL string
	// contains filtered or unexported fields
}

MsSQLServerConn is a Microsoft SQL Server connection

func (*MsSQLServerConn) BcpExport

func (conn *MsSQLServerConn) BcpExport() (err error)

BcpExport exports data to datastream

func (*MsSQLServerConn) BcpImportFile added in v0.3.215

func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count uint64, err error)

BcpImportFile Import using bcp tool https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation

func (*MsSQLServerConn) BcpImportFileParrallel added in v0.3.215

func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)

BcpImportFileParrallel uses goroutine to import partitioned files

func (*MsSQLServerConn) BulkImportFlow

func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow bulk import flow

func (*MsSQLServerConn) BulkImportStream

func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*MsSQLServerConn) CopyFromAzure

func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)

CopyFromAzure uses the COPY INTO Table command from Azure https://docs.microsoft.com/en-us/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest

func (*MsSQLServerConn) CopyViaAzure

func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAzure uses the Azure DWH COPY INTO Table command

func (*MsSQLServerConn) GenerateUpsertSQL added in v0.0.5

func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*MsSQLServerConn) GetURL

func (conn *MsSQLServerConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*MsSQLServerConn) Init

func (conn *MsSQLServerConn) Init() error

Init initiates the object

type MySQLConn

type MySQLConn struct {
	BaseConn
	URL string
}

MySQLConn is a MySQL or MariaDB connection

func (*MySQLConn) BulkExportStream

func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream bulk Export

func (*MySQLConn) BulkImportStream

func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*MySQLConn) GenerateUpsertSQL added in v0.0.5

func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

UPSERT https://vladmihalcea.com/how-do-upsert-and-merge-work-in-oracle-sql-server-postgresql-and-mysql/ GenerateUpsertSQL generates the upsert SQL

func (*MySQLConn) GetURL

func (conn *MySQLConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*MySQLConn) Init

func (conn *MySQLConn) Init() error

Init initiates the object

func (*MySQLConn) LoadDataInFile

func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)

LoadDataInFile Bulk Import

func (*MySQLConn) LoadDataOutFile

func (conn *MySQLConn) LoadDataOutFile(sql string) (stdOutReader io.Reader, err error)

LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. https://stackoverflow.com/questions/9819271/why-is-mysql-innodb-insert-so-slow to improve innodb insert speed

type OracleConn

type OracleConn struct {
	BaseConn
	URL string
}

OracleConn is a Postgres connection

func (*OracleConn) BulkImportStream

func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*OracleConn) ExecMultiContext added in v0.3.120

func (conn *OracleConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecMultiContext runs multiple sql queries with context, returns `error`

func (*OracleConn) GenerateInsertStatement

func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string

GenerateInsertStatement returns the proper INSERT statement

func (*OracleConn) GenerateUpsertSQL added in v0.0.5

func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*OracleConn) Init

func (conn *OracleConn) Init() error

Init initiates the object

func (*OracleConn) SQLLoad

func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)

SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.

type Pool

type Pool struct {
	Dbs     map[string]*sqlx.DB
	DuckDbs map[string]*DuckDbConn
	Mux     sync.Mutex
}

Pool is a pool of connections

type PostgresConn

type PostgresConn struct {
	BaseConn
	URL string
}

PostgresConn is a Postgres connection

func (*PostgresConn) BulkExportStream

func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream uses the bulk dumping (COPY)

func (*PostgresConn) BulkImportStream

func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table

func (*PostgresConn) CastColumnForSelect

func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)

CastColumnForSelect casts to the correct target column type

func (*PostgresConn) CopyToStdout

func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)

CopyToStdout Copy TO STDOUT

func (*PostgresConn) GenerateUpsertSQL added in v0.0.5

func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*PostgresConn) Init

func (conn *PostgresConn) Init() error

Init initiates the object

type RedshiftConn

type RedshiftConn struct {
	BaseConn
	URL string
}

RedshiftConn is a Redshift connection

func (*RedshiftConn) BulkExportFlow

func (conn *RedshiftConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*RedshiftConn) BulkExportStream

func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream reads in bulk

func (*RedshiftConn) BulkImportFlow

func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*RedshiftConn) BulkImportStream

func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*RedshiftConn) ConnString added in v0.2.0

func (conn *RedshiftConn) ConnString() string

func (*RedshiftConn) CopyFromS3

func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)

CopyFromS3 uses the COPY INTO Table command from AWS S3

func (*RedshiftConn) GenerateUpsertSQL added in v0.0.5

func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*RedshiftConn) Init

func (conn *RedshiftConn) Init() error

Init initiates the object

func (*RedshiftConn) Unload

func (conn *RedshiftConn) Unload(tables ...Table) (s3Path string, err error)

Unload unloads a query to S3

type Relation added in v0.3.65

type Relation string

type Result added in v0.0.5

type Result struct {
	// contains filtered or unexported fields
}

func (Result) LastInsertId added in v0.0.5

func (r Result) LastInsertId() (int64, error)

func (Result) RowsAffected added in v0.0.5

func (r Result) RowsAffected() (int64, error)

type SQLiteConn

type SQLiteConn struct {
	BaseConn
	URL string
}

SQLiteConn is a Google Big Query connection

func (*SQLiteConn) BulkImportStream added in v0.3.206

func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table

func (*SQLiteConn) GenerateUpsertSQL added in v0.0.5

func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*SQLiteConn) GetSchemata added in v0.3.279

func (conn *SQLiteConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)

GetSchemata obtain full schemata info for a schema and/or table in current database

func (*SQLiteConn) GetURL added in v0.3.113

func (conn *SQLiteConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*SQLiteConn) Init

func (conn *SQLiteConn) Init() error

Init initiates the object

type Schema

type Schema struct {
	Name   string           `json:"name"`
	Tables map[string]Table `json:"tables"`
}

Schema represents a schemata schema

func (*Schema) Columns added in v0.1.7

func (schema *Schema) Columns() map[string]iop.Column

func (*Schema) ToData

func (schema *Schema) ToData() (data iop.Dataset)

ToData converts schema objects to tabular format

type Schemata

type Schemata struct {
	Databases map[string]Database `json:"databases"`
	// contains filtered or unexported fields
}

Schemata contains the full schema for a connection

func GetSchemataAll added in v0.2.0

func GetSchemataAll(conn Connection) (schemata Schemata, err error)

GetSchemataAll obtains the schemata for all databases detected

func GetTablesSchemata added in v0.3.162

func GetTablesSchemata(conn Connection, tableNames ...string) (schemata Schemata, err error)

GetTablesSchemata obtains the schemata for specified tables

func (*Schemata) Columns added in v0.1.7

func (s *Schemata) Columns() map[string]iop.Column

func (*Schemata) Database added in v0.1.7

func (s *Schemata) Database() Database

Database returns the first encountered database

func (*Schemata) LoadTablesJSON added in v0.2.0

func (s *Schemata) LoadTablesJSON(payload string) error

LoadTablesJSON loads from a json string

func (*Schemata) Tables

func (s *Schemata) Tables() map[string]Table

type SnowflakeConn

type SnowflakeConn struct {
	BaseConn
	URL        string
	Warehouse  string
	CopyMethod string
}

SnowflakeConn is a Snowflake connection

func (*SnowflakeConn) BulkExportFlow

func (conn *SnowflakeConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*SnowflakeConn) BulkImportFlow

func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow bulk import flow

func (*SnowflakeConn) BulkImportStream

func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*SnowflakeConn) ConnString added in v0.2.0

func (conn *SnowflakeConn) ConnString() string

func (*SnowflakeConn) Connect

func (conn *SnowflakeConn) Connect(timeOut ...int) error

Connect connects to the database

func (*SnowflakeConn) CopyFromAzure

func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)

CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyFromS3

func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)

CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyToAzure

func (conn *SnowflakeConn) CopyToAzure(tables ...Table) (azPath string, err error)

CopyToAzure exports a query to an Azure location

func (*SnowflakeConn) CopyToS3

func (conn *SnowflakeConn) CopyToS3(tables ...Table) (s3Path string, err error)

CopyToS3 exports a query to an S3 location

func (*SnowflakeConn) CopyViaAWS

func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyViaAzure

func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyViaStage

func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaStage uses the Snowflake COPY INTO Table command https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) GenerateUpsertSQL added in v0.0.5

func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*SnowflakeConn) GetColumnsFull added in v0.1.8

func (conn *SnowflakeConn) GetColumnsFull(tableFName string) (data iop.Dataset, err error)

GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`

func (*SnowflakeConn) GetDatabases added in v0.2.0

func (conn *SnowflakeConn) GetDatabases() (data iop.Dataset, err error)

GetDatabases returns the list of databases

func (*SnowflakeConn) GetFile added in v0.1.0

func (conn *SnowflakeConn) GetFile(internalStagePath, fPath string) (err error)

GetFile Copies from a staging location to a local file or folder

func (*SnowflakeConn) GetSchemas

func (conn *SnowflakeConn) GetSchemas() (data iop.Dataset, err error)

GetSchemas returns schemas

func (*SnowflakeConn) GetTables

func (conn *SnowflakeConn) GetTables(schema string) (data iop.Dataset, err error)

GetTables returns tables

func (*SnowflakeConn) GetViews added in v0.3.205

func (conn *SnowflakeConn) GetViews(schema string) (data iop.Dataset, err error)

GetTables returns tables

func (*SnowflakeConn) Init

func (conn *SnowflakeConn) Init() error

Init initiates the object

func (*SnowflakeConn) PutFile

func (conn *SnowflakeConn) PutFile(fPath string, internalStagePath string) (err error)

PutFile Copies a local file or folder into a staging location

func (*SnowflakeConn) UnloadViaStage added in v0.1.0

func (conn *SnowflakeConn) UnloadViaStage(sqls ...string) (filePath string, err error)

type StarRocksConn added in v0.4.86

type StarRocksConn struct {
	BaseConn
	URL string
}

StarRocksConn is a StarRocks connection

func (*StarRocksConn) BulkImportFlow added in v0.4.86

func (conn *StarRocksConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow inserts a flow of streams into a table.

func (*StarRocksConn) GenerateDDL added in v0.4.86

func (conn *StarRocksConn) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)

GenerateDDL genrate a DDL based on a dataset

func (*StarRocksConn) GetURL added in v0.4.86

func (conn *StarRocksConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*StarRocksConn) Init added in v0.4.86

func (conn *StarRocksConn) Init() error

Init initiates the object

func (*StarRocksConn) InsertBatchStream added in v0.4.86

func (conn *StarRocksConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*StarRocksConn) StreamLoad added in v0.4.86

func (conn *StarRocksConn) StreamLoad(feURL, tableFName string, df *iop.Dataflow) (count uint64, err error)

StreamLoad bulk loads https://docs.starrocks.io/docs/loading/StreamLoad/ https://docs.starrocks.io/docs/sql-reference/sql-statements/data-manipulation/STREAM_LOAD/

type StatFieldSQL added in v0.3.65

type StatFieldSQL struct {
	Name        string
	TemplateSQL string
}

type Table

type Table struct {
	Name     string      `json:"name"`
	Schema   string      `json:"schema"`
	Database string      `json:"database,omitempty"`
	IsView   bool        `json:"is_view,omitempty"` // whether is a view
	SQL      string      `json:"sql,omitempty"`
	Dialect  dbio.Type   `json:"dialect,omitempty"`
	Columns  iop.Columns `json:"columns,omitempty"`
}

Table represents a schemata table

func ParseTableName added in v0.3.111

func ParseTableName(text string, dialect dbio.Type) (table Table, err error)

func (*Table) ColumnsMap

func (t *Table) ColumnsMap() map[string]iop.Column

func (*Table) DatabaseQ added in v0.3.144

func (t *Table) DatabaseQ() string

func (*Table) FDQN added in v0.3.111

func (t *Table) FDQN() string

func (*Table) FullName

func (t *Table) FullName() string

func (*Table) IsQuery added in v0.3.111

func (t *Table) IsQuery() bool

func (*Table) NameQ added in v0.3.144

func (t *Table) NameQ() string

func (*Table) SchemaQ added in v0.3.144

func (t *Table) SchemaQ() string

func (*Table) Select added in v0.3.212

func (t *Table) Select(fields ...string) string

type Template

type Template struct {
	Core           map[string]string
	Metadata       map[string]string
	Analysis       map[string]string
	Function       map[string]string `yaml:"function"`
	GeneralTypeMap map[string]string `yaml:"general_type_map"`
	NativeTypeMap  map[string]string `yaml:"native_type_map"`
	NativeStatsMap map[string]bool   `yaml:"native_stat_map"`
	Variable       map[string]string
}

Template is a database YAML template

func (Template) ToData

func (template Template) ToData() (data iop.Dataset)

ToData convert is dataset

type Transaction added in v0.0.5

type Transaction interface {
	Connection() Connection
	Context() *g.Context
	Commit() (err error)
	Rollback() (err error)
	Prepare(query string) (stmt *sql.Stmt, err error)
	QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
	ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
	ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
}

type User added in v0.0.5

type User struct {
	ModelDbX
	Name string
	Age  int
}

func NewUser added in v0.0.5

func NewUser() *User

type WhereClause added in v0.0.5

type WhereClause []interface{}

WhereClause is the where clause

func (WhereClause) Args added in v0.0.5

func (wc WhereClause) Args() []interface{}

Args returns the where clause arguments

func (WhereClause) Clause added in v0.0.5

func (wc WhereClause) Clause() string

Clause returns the string where clause

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL