sql

package
v0.0.0-...-2af46a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: MIT Imports: 43 Imported by: 1

Documentation

Index

Constants

View Source
const (
	BigQueryAutocommitUnsupported = "BigQuery bulker doesn't support auto commit mode as not efficient"
	BigqueryBulkerTypeId          = "bigquery"
)
View Source
const (
	SSLModeRequire    string = "require"
	SSLModeDisable    string = "disable"
	SSLModeVerifyCA   string = "verify-ca"
	SSLModeVerifyFull string = "verify-full"

	SSLModeNotProvided string = ""
)
View Source
const BulkerManagedPkConstraintPrefix = "jitsu_pk_"
View Source
const (
	ClickHouseBulkerTypeId = "clickhouse"
)
View Source
const ContextTransactionKey = "transaction"
View Source
const (
	MySQLBulkerTypeId = "mysql"
)
View Source
const PartitonIdKeyword = "__partition_id"
View Source
const (
	PostgresBulkerTypeId = "postgres"
)
View Source
const (
	RedshiftBulkerTypeId = "redshift"
)
View Source
const (
	SnowflakeBulkerTypeId = "snowflake"
)
View Source
const SqlTypePrefix = "__sql_type"

Variables

View Source
var (
	ColumnTypesOption = bulker.ImplementationOption[types.SQLTypes]{
		Key:          "columnTypes",
		DefaultValue: types.SQLTypes{},
		AdvancedParseFunc: func(o *bulker.ImplementationOption[types.SQLTypes], serializedValue any) (bulker.StreamOption, error) {
			switch v := serializedValue.(type) {
			case map[string]any:
				sqlTypes := types.SQLTypes{}
				for key, value := range v {
					switch t := value.(type) {
					case string:
						sqlTypes.With(key, t)
					case []string:
						if len(t) == 1 {
							sqlTypes.With(key, t[0])
						} else if len(t) == 2 {
							sqlTypes.WithDDL(key, t[0], t[1])
						} else {
							return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect number of elements. expected 1 or 2", v)
						}
					}
				}
				return withColumnTypes(o, sqlTypes), nil
			default:
				return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect type: %T expected map[string]any", v, v)
			}
		},
	}

	DeduplicateWindow = bulker.ImplementationOption[int]{
		Key:          "deduplicateWindow",
		DefaultValue: 31,
		ParseFunc:    utils.ParseInt,
	}

	OmitNilsOption = bulker.ImplementationOption[bool]{
		Key:          "omitNils",
		DefaultValue: true,
		ParseFunc:    utils.ParseBool,
	}
)
View Source
var BigQueryPartitonIdRegex = regexp.MustCompile("(\\w+)/(\\d\\d\\d\\d-\\d\\d-\\d\\dT\\d\\d:\\d\\d:\\d\\dZ)")
View Source
var DefaultTypeResolver = NewTypeResolver()
View Source
var ErrTableNotExist = errors.New("table doesn't exist")
View Source
var IndexParameterPlaceholder = func(i int, name string) string {
	return "$" + strconv.Itoa(i)
}
View Source
var NamedParameterPlaceholder = func(i int, name string) string {
	return "@" + name
}
View Source
var QuestionMarkParameterPlaceholder = func(i int, name string) string {
	return "?"
}

Functions

func BuildConstraintName

func BuildConstraintName(tableName string) string

func GranularityToPartitionIds

func GranularityToPartitionIds(g Granularity, t time.Time) []string

func NewBigquery

func NewBigquery(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewBigquery return configured BigQuery bulker.Bulker instance

func NewClickHouse

func NewClickHouse(bulkerConfig bulkerlib.Config) (bulkerlib.Bulker, error)

NewClickHouse returns configured ClickHouse adapter instance

func NewMySQL

func NewMySQL(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewMySQL returns configured MySQL adapter instance

func NewPostgres

func NewPostgres(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewPostgres return configured Postgres bulker.Bulker instance

func NewRedshift

func NewRedshift(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewRedshift returns configured Redshift adapter instance

func NewSnowflake

func NewSnowflake(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewSnowflake returns configured Snowflake adapter instance

func ProcessSSL

func ProcessSSL(dir string, dsc *PostgresConfig) error

ProcessSSL serializes SSL payload (ca, client cert, key) into files enriches input DataSourceConfig parameters with SSL config ssl configuration might be file path as well as string content

func WithColumnType

func WithColumnType(columnName, sqlType string) bulker.StreamOption

WithColumnType provides overrides for column type of single column for current BulkerStream object fields

func WithColumnTypeDDL

func WithColumnTypeDDL(columnName, sqlType, ddlType string) bulker.StreamOption

WithColumnTypeDDL provides overrides for column type and DDL type of single column for current BulkerStream object fields

func WithColumnTypes

func WithColumnTypes(fields types.SQLTypes) bulker.StreamOption

WithColumnTypes provides overrides for column types of current BulkerStream object fields

func WithDeduplicateWindow

func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption

func WithOmitNils

func WithOmitNils() bulker.StreamOption

func WithoutOmitNils

func WithoutOmitNils() bulker.StreamOption

Types

type AbstractSQLStream

type AbstractSQLStream struct {
	// contains filtered or unexported fields
}

type AbstractTransactionalSQLStream

type AbstractTransactionalSQLStream struct {
	*AbstractSQLStream
	// contains filtered or unexported fields
}

func (*AbstractTransactionalSQLStream) Abort

func (ps *AbstractTransactionalSQLStream) Abort(ctx context.Context) (state bulker.State, err error)

func (*AbstractTransactionalSQLStream) Consume

func (ps *AbstractTransactionalSQLStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error)

type AutoCommitStream

type AutoCommitStream struct {
	*AbstractSQLStream
}

func (*AutoCommitStream) Abort

func (ps *AutoCommitStream) Abort(ctx context.Context) (state bulker.State, err error)

func (*AutoCommitStream) Complete

func (ps *AutoCommitStream) Complete(ctx context.Context) (state bulker.State, err error)

func (*AutoCommitStream) Consume

func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error)

type BQItem

type BQItem struct {
	// contains filtered or unexported fields
}

BQItem struct for streaming inserts to BigQuery

func (*BQItem) Save

func (bqi *BQItem) Save() (row map[string]bigquery.Value, insertID string, err error)

type BigQuery

type BigQuery struct {
	appbase.Service
	// contains filtered or unexported fields
}

BigQuery adapter for creating,patching (schema or table), inserting and copying data from gcs to BigQuery

func (*BigQuery) Close

func (bq *BigQuery) Close() error

func (*BigQuery) ColumnName

func (bq *BigQuery) ColumnName(identifier string) string

func (*BigQuery) CopyTables

func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state *bulker.WarehouseState, err error)

func (*BigQuery) Count

func (bq *BigQuery) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)

func (*BigQuery) CreateStream

func (bq *BigQuery) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*BigQuery) CreateTable

func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error)

CreateTable creates google BigQuery table from Table

func (*BigQuery) Delete

func (bq *BigQuery) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) (err error)

func (*BigQuery) DeletePartition

func (bq *BigQuery) DeletePartition(ctx context.Context, tableName string, datePartiton *DatePartition) error

func (*BigQuery) Drop

func (bq *BigQuery) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*BigQuery) DropTable

func (bq *BigQuery) DropTable(ctx context.Context, tableName string, ifExists bool) error

DropTable drops table from BigQuery

func (*BigQuery) GetAvroSchema

func (bq *BigQuery) GetAvroSchema(table *Table) *types2.AvroSchema

func (*BigQuery) GetAvroType

func (bq *BigQuery) GetAvroType(sqlType string) (any, bool)

func (*BigQuery) GetBatchFileCompression

func (bq *BigQuery) GetBatchFileCompression() types2.FileCompression

func (*BigQuery) GetBatchFileFormat

func (bq *BigQuery) GetBatchFileFormat() types2.FileFormat

func (*BigQuery) GetDataType

func (bq *BigQuery) GetDataType(sqlType string) (types2.DataType, bool)

func (*BigQuery) GetSQLType

func (bq *BigQuery) GetSQLType(dataType types2.DataType) (string, bool)

func (*BigQuery) GetTableSchema

func (bq *BigQuery) GetTableSchema(ctx context.Context, tableName string) (*Table, error)

GetTableSchema return google BigQuery table (name,columns) representation wrapped in Table struct

func (*BigQuery) InitDatabase

func (bq *BigQuery) InitDatabase(ctx context.Context) error

InitDatabase creates google BigQuery Dataset if doesn't exist

func (*BigQuery) Insert

func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) (err error)

func (*BigQuery) LoadTable

func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state *bulker.WarehouseState, err error)

func (*BigQuery) OpenTx

func (bq *BigQuery) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

func (*BigQuery) PatchTableSchema

func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) error

PatchTableSchema adds Table columns to google BigQuery table

func (*BigQuery) Ping

func (bq *BigQuery) Ping(ctx context.Context) error

func (*BigQuery) ReplaceTable

func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*BigQuery) RunJob

func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, state *bulker.WarehouseState, err error)

func (*BigQuery) Select

func (bq *BigQuery) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*BigQuery) StringifyObjects

func (bq *BigQuery) StringifyObjects() bool

func (*BigQuery) TableHelper

func (bq *BigQuery) TableHelper() *TableHelper

func (*BigQuery) TableName

func (bq *BigQuery) TableName(identifier string) string

func (*BigQuery) TruncateTable

func (bq *BigQuery) TruncateTable(ctx context.Context, tableName string) error

TruncateTable deletes all records in tableName table

func (*BigQuery) Type

func (bq *BigQuery) Type() string

func (*BigQuery) Update

func (bq *BigQuery) Update(ctx context.Context, tableName string, object types2.Object, whenConditions *WhenConditions) (err error)

type ClickHouse

type ClickHouse struct {
	*SQLAdapterBase[ClickHouseConfig]
	// contains filtered or unexported fields
}

ClickHouse is adapter for creating,patching (schema or table), inserting data to clickhouse

func (*ClickHouse) CopyTables

func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state *bulkerlib.WarehouseState, err error)

func (*ClickHouse) Count

func (ch *ClickHouse) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)

func (*ClickHouse) CreateStream

func (ch *ClickHouse) CreateStream(id, tableName string, mode bulkerlib.BulkMode, streamOptions ...bulkerlib.StreamOption) (bulkerlib.BulkerStream, error)

func (*ClickHouse) CreateTable

func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error

CreateTable create database table with name,columns provided in Table representation New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not

func (*ClickHouse) Delete

func (ch *ClickHouse) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error

func (*ClickHouse) Drop

func (ch *ClickHouse) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*ClickHouse) DropTable

func (ch *ClickHouse) DropTable(ctx context.Context, tableName string, ifExists bool) error

func (*ClickHouse) GetTableSchema

func (ch *ClickHouse) GetTableSchema(ctx context.Context, tableName string) (*Table, error)

GetTableSchema return table (name,columns with name and types) representation wrapped in Table struct

func (*ClickHouse) InitDatabase

func (ch *ClickHouse) InitDatabase(ctx context.Context) error

InitDatabase create database instance if doesn't exist

func (*ClickHouse) Insert

func (ch *ClickHouse) Insert(ctx context.Context, table *Table, _ bool, objects ...types.Object) (err error)

func (*ClickHouse) LoadTable

func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state *bulkerlib.WarehouseState, err error)

LoadTable transfer data from local file to ClickHouse table

func (*ClickHouse) OpenTx

func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx relies on ClickHouse session by creating new connection and wrapping it with TxSQLAdapter it makes sure that all activity happens in one connection.

func (*ClickHouse) PatchTableSchema

func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) error

PatchTableSchema add new columns(from provided Table) to existing table drop and create distributed table

func (*ClickHouse) Ping

func (ch *ClickHouse) Ping(_ context.Context) error

func (*ClickHouse) ReplaceTable

func (ch *ClickHouse) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*ClickHouse) Select

func (ch *ClickHouse) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*ClickHouse) TruncateTable

func (ch *ClickHouse) TruncateTable(ctx context.Context, tableName string) error

TruncateTable deletes all records in tableName table

func (*ClickHouse) Type

func (ch *ClickHouse) Type() string

type ClickHouseConfig

type ClickHouseConfig struct {
	Protocol   ClickHouseProtocol `mapstructure:"protocol,omitempty" json:"protocol,omitempty" yaml:"protocol,omitempty"`
	Hosts      []string           `mapstructure:"hosts,omitempty" json:"hosts,omitempty" yaml:"hosts,omitempty"`
	Parameters map[string]string  `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
	Username   string             `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string             `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Database   string             `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"`
	Cluster    string             `mapstructure:"cluster,omitempty" json:"cluster,omitempty" yaml:"cluster,omitempty"`
	TLS        map[string]string  `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"`
	Engine     *EngineConfig      `mapstructure:"engine,omitempty" json:"engine,omitempty" yaml:"engine,omitempty"`
}

ClickHouseConfig dto for deserialized clickhouse config

func (*ClickHouseConfig) Validate

func (chc *ClickHouseConfig) Validate() error

Validate required fields in ClickHouseConfig

type ClickHouseProtocol

type ClickHouseProtocol string
const (
	ClickHouseProtocolNative ClickHouseProtocol = "clickhouse"
	ClickHouseProtocolSecure ClickHouseProtocol = "clickhouse-secure"
	ClickHouseProtocolHTTP   ClickHouseProtocol = "http"
	ClickHouseProtocolHTTPS  ClickHouseProtocol = "https"
)

type ColumnDDLFunction

type ColumnDDLFunction func(quotedName, name string, table *Table) string

ColumnDDLFunction generate column DDL for CREATE TABLE statement based on type (SQLColumn) and whether it is used for PK

type ColumnScanner

type ColumnScanner struct {
	ColumnType *sql.ColumnType
	// contains filtered or unexported fields
}

func (*ColumnScanner) Get

func (s *ColumnScanner) Get() any

func (*ColumnScanner) Scan

func (s *ColumnScanner) Scan(src any) error

type Columns

type Columns map[string]types.SQLColumn

Columns is a list of columns representation

func (Columns) Clone

func (c Columns) Clone() Columns

func (Columns) ToSimpleMap

func (c Columns) ToSimpleMap() map[string]string

type ConWithDB

type ConWithDB struct {
	// contains filtered or unexported fields
}

func NewConWithDB

func NewConWithDB(db *sql.DB, con *sql.Conn) *ConWithDB

func (*ConWithDB) Close

func (c *ConWithDB) Close() error

func (*ConWithDB) ExecContext

func (c *ConWithDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

func (*ConWithDB) PrepareContext

func (c *ConWithDB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

func (*ConWithDB) QueryContext

func (c *ConWithDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

func (*ConWithDB) QueryRowContext

func (c *ConWithDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

type DB

type DB interface {
	TxOrDB
	io.Closer
}

type DataSourceConfig

type DataSourceConfig struct {
	Host       string            `mapstructure:"host,omitempty" json:"host,omitempty" yaml:"host,omitempty"`
	Port       int               `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"`
	Db         string            `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"`
	Schema     string            `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"`
	Username   string            `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string            `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Parameters map[string]string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)

func (*DataSourceConfig) Validate

func (dsc *DataSourceConfig) Validate() error

Validate required fields in DataSourceConfig

type DatePartition

type DatePartition struct {
	Field       string
	Value       time.Time
	Granularity Granularity
}

type DbConnectFunction

type DbConnectFunction[T any] func(config *T) (*sql.DB, error)

DbConnectFunction function is used to connect to database

type DummyTypeResolver

type DummyTypeResolver struct {
}

DummyTypeResolver doesn't do anything

func NewDummyTypeResolver

func NewDummyTypeResolver() *DummyTypeResolver

NewDummyTypeResolver return DummyTypeResolver

func (*DummyTypeResolver) Resolve

func (dtr *DummyTypeResolver) Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error)

Resolve return one dummy field and types.Fields becomes not empty. (it is used in Facebook destination)

type EngineConfig

type EngineConfig struct {
	RawStatement    string        `mapstructure:"rawStatement,omitempty" json:"rawStatement,omitempty" yaml:"rawStatement,omitempty"`
	NullableFields  []string      `mapstructure:"nullableFields,omitempty" json:"nullableFields,omitempty" yaml:"nullableFields,omitempty"`
	PartitionFields []FieldConfig `mapstructure:"partitionFields,omitempty" json:"partitionFields,omitempty" yaml:"partitionFields,omitempty"`
	OrderFields     []FieldConfig `mapstructure:"orderFields,omitempty" json:"orderFields,omitempty" yaml:"orderFields,omitempty"`
	PrimaryKeys     []string      `mapstructure:"primaryKeys,omitempty" json:"primaryKeys,omitempty" yaml:"primaryKeys,omitempty"`
}

EngineConfig dto for deserialized clickhouse engine config

type ErrorAdapter

type ErrorAdapter func(error) error

ErrorAdapter is used to extract implementation specific payload and adapt to standard error

type Field

type Field struct {
	// contains filtered or unexported fields
}

Field is a data type holder with sql type suggestion

func NewField

func NewField(t types2.DataType) Field

NewField returns Field instance

func NewFieldWithSQLType

func NewFieldWithSQLType(t types2.DataType, suggestedType *types2.SQLColumn) Field

NewFieldWithSQLType returns Field instance with configured suggested sql types

func (Field) GetSuggestedSQLType

func (f Field) GetSuggestedSQLType() (types2.SQLColumn, bool)

GetSuggestedSQLType returns suggested SQL type if configured

func (Field) GetType

func (f Field) GetType() types2.DataType

GetType get field type based on occurrence in one file lazily get common ancestor type (typing.GetCommonAncestorType)

type FieldConfig

type FieldConfig struct {
	Function string `mapstructure:"function,omitempty" json:"function,omitempty" yaml:"function,omitempty"`
	Field    string `mapstructure:"field,omitempty" json:"field,omitempty" yaml:"field,omitempty"`
}

FieldConfig dto for deserialized clickhouse engine fields

type Fields

type Fields map[string]Field

func (Fields) Add

func (f Fields) Add(other Fields)

Add all new fields from other to current instance if field exists - skip it

func (Fields) Clone

func (f Fields) Clone() Fields

Clone copies fields into a new Fields object

func (Fields) Header

func (f Fields) Header() (header []string)

Header return fields names as a string slice

func (Fields) Merge

func (f Fields) Merge(other Fields)

Merge adds all fields from other to current instance or merge if exists

func (Fields) OverrideTypes

func (f Fields) OverrideTypes(newTypes types2.SQLTypes)

type Granularity

type Granularity string

Granularity is a granularity of TimeInterval

const (
	HOUR    Granularity = "HOUR"
	DAY     Granularity = "DAY"
	WEEK    Granularity = "WEEK"
	MONTH   Granularity = "MONTH"
	QUARTER Granularity = "QUARTER"
	YEAR    Granularity = "YEAR"
	ALL     Granularity = "ALL"
)

func ParseGranularity

func ParseGranularity(s string) (Granularity, error)

ParseGranularity returns Granularity value from string

func (Granularity) Format

func (g Granularity) Format(t time.Time) string

Format returns formatted string value representation

func (Granularity) Lower

func (g Granularity) Lower(t time.Time) time.Time

Lower returns the lower value of interval

func (Granularity) String

func (g Granularity) String() string

String returns string value representation

func (Granularity) Upper

func (g Granularity) Upper(t time.Time) time.Time

Upper returns the upper value of interval

type IdentifierFunction

type IdentifierFunction func(identifier string, alphanumeric bool) (adapted string, needQuotes bool)

IdentifierFunction adapts identifier name to format required by database e.g. masks or escapes special characters

type JobRunner

type JobRunner interface {
	Run(ctx context.Context) (*bigquery.Job, error)
}

type LoadSource

type LoadSource struct {
	Type     LoadSourceType
	Format   types2.FileFormat
	Path     string
	S3Config *S3OptionConfig
}

type LoadSourceType

type LoadSourceType string
const (
	LocalFile        LoadSourceType = "local_file"
	GoogleCloudStore LoadSourceType = "google_cloud_store"
	AmazonS3         LoadSourceType = "amazon_s3"
)

type MySQL

type MySQL struct {
	*SQLAdapterBase[DataSourceConfig]
	// contains filtered or unexported fields
}

MySQL is adapter for creating, patching (schema or table), inserting data to mySQL database

func (*MySQL) CopyTables

func (m *MySQL) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (*bulker.WarehouseState, error)

func (*MySQL) CreateStream

func (m *MySQL) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*MySQL) CreateTable

func (m *MySQL) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*MySQL) GetTableSchema

func (m *MySQL) GetTableSchema(ctx context.Context, tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*MySQL) InitDatabase

func (m *MySQL) InitDatabase(ctx context.Context) error

InitDatabase creates database instance if doesn't exist

func (*MySQL) Insert

func (m *MySQL) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*MySQL) LoadTable

func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state *bulker.WarehouseState, err error)

func (*MySQL) OpenTx

func (m *MySQL) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*MySQL) ReplaceTable

func (m *MySQL) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

type ParameterPlaceholder

type ParameterPlaceholder func(i int, name string) string

type Postgres

type Postgres struct {
	*SQLAdapterBase[PostgresConfig]
	// contains filtered or unexported fields
}

Postgres is adapter for creating,patching (schema or table), inserting data to postgres

func (*Postgres) Close

func (p *Postgres) Close() error

Close underlying sql.DB

func (*Postgres) CopyTables

func (p *Postgres) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (*bulker.WarehouseState, error)

func (*Postgres) CreateStream

func (p *Postgres) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*Postgres) CreateTable

func (p *Postgres) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*Postgres) GetTableSchema

func (p *Postgres) GetTableSchema(ctx context.Context, tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*Postgres) InitDatabase

func (p *Postgres) InitDatabase(ctx context.Context) error

InitDatabase creates database schema instance if doesn't exist

func (*Postgres) Insert

func (p *Postgres) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*Postgres) LoadTable

func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state *bulker.WarehouseState, err error)

func (*Postgres) OpenTx

func (p *Postgres) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*Postgres) Ping

func (p *Postgres) Ping(ctx context.Context) error

func (*Postgres) ReplaceTable

func (p *Postgres) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

type PostgresConfig

type PostgresConfig struct {
	DataSourceConfig `mapstructure:",squash"`
	SSLConfig        `mapstructure:",squash"`
}

type QueryPayload

type QueryPayload struct {
	TableName      string
	Columns        string
	Placeholders   string
	PrimaryKeyName string
	UpdateSet      string

	TableTo        string
	TableFrom      string
	JoinConditions string
	SourceColumns  string
}

type Redshift

type Redshift struct {
	//Aws Redshift uses Postgres fork under the hood
	*Postgres
	// contains filtered or unexported fields
}

Redshift adapter for creating,patching (schema or table), inserting and copying data from s3 to redshift

func (*Redshift) CopyTables

func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state *bulker.WarehouseState, err error)

func (*Redshift) CreateStream

func (p *Redshift) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*Redshift) CreateTable

func (p *Redshift) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*Redshift) GetTableSchema

func (p *Redshift) GetTableSchema(ctx context.Context, tableName string) (*Table, error)

GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct

func (*Redshift) Insert

func (p *Redshift) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*Redshift) LoadTable

func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state *bulker.WarehouseState, err error)

LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift

func (*Redshift) OpenTx

func (p *Redshift) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*Redshift) ReplaceTable

func (p *Redshift) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*Redshift) Type

func (p *Redshift) Type() string

Type returns Postgres type

type RedshiftConfig

type RedshiftConfig struct {
	DataSourceConfig `mapstructure:",squash"`
	S3OptionConfig   `mapstructure:",squash" yaml:"-,inline"`
}

type ReplacePartitionStream

type ReplacePartitionStream struct {
	*AbstractTransactionalSQLStream
	// contains filtered or unexported fields
}

func (*ReplacePartitionStream) Complete

func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.State, err error)

func (*ReplacePartitionStream) Consume

func (ps *ReplacePartitionStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObjects types.Object, err error)

type ReplaceTableStream

type ReplaceTableStream struct {
	*AbstractTransactionalSQLStream
}

func (*ReplaceTableStream) Complete

func (ps *ReplaceTableStream) Complete(ctx context.Context) (state bulker.State, err error)

type RepresentationTable

type RepresentationTable struct {
	Name             string            `json:"name"`
	Schema           map[string]string `json:"schema"`
	PrimaryKeyFields []string          `json:"primaryKeyFields,omitempty"`
	PrimaryKeyName   string            `json:"primaryKeyName,omitempty"`
	Temporary        bool              `json:"temporary,omitempty"`
}

type S3OptionConfig

type S3OptionConfig struct {
	AccessKeyID string `mapstructure:"accessKeyId,omitempty" json:"accessKeyId,omitempty" yaml:"accessKeyId,omitempty"`
	SecretKey   string `mapstructure:"secretAccessKey,omitempty" json:"secretAccessKey,omitempty" yaml:"secretAccessKey,omitempty"`
	Bucket      string `mapstructure:"bucket,omitempty" json:"bucket,omitempty" yaml:"bucket,omitempty"`
	Region      string `mapstructure:"region,omitempty" json:"region,omitempty" yaml:"region,omitempty"`
	Folder      string `mapstructure:"folder,omitempty" json:"folder,omitempty" yaml:"folder,omitempty"`
}

type SQLAdapter

type SQLAdapter interface {
	Type() string

	//GetSQLType return mapping from generic bulker type to SQL type specific for this database
	GetSQLType(dataType types2.DataType) (string, bool)
	GetDataType(sqlType string) (types2.DataType, bool)
	GetAvroType(sqlType string) (any, bool)
	GetAvroSchema(table *Table) *types2.AvroSchema
	GetBatchFileFormat() types2.FileFormat
	GetBatchFileCompression() types2.FileCompression
	StringifyObjects() bool
	OpenTx(ctx context.Context) (*TxSQLAdapter, error)
	Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
	Ping(ctx context.Context) error
	// InitDatabase setups required db objects like 'schema' or 'dataset' if they don't exist
	InitDatabase(ctx context.Context) error
	TableHelper() *TableHelper
	GetTableSchema(ctx context.Context, tableName string) (*Table, error)
	CreateTable(ctx context.Context, schemaToCreate *Table) error
	CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state *bulker.WarehouseState, err error)
	LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state *bulker.WarehouseState, err error)
	PatchTableSchema(ctx context.Context, patchTable *Table) error
	TruncateTable(ctx context.Context, tableName string) error
	//(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error
	Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
	DropTable(ctx context.Context, tableName string, ifExists bool) error
	Drop(ctx context.Context, table *Table, ifExists bool) error

	ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error

	Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
	Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)

	// ColumnName adapts column name to sql identifier rules of database
	ColumnName(rawColumn string) string
	// TableName adapts table name to sql identifier rules of database
	TableName(rawTableName string) string
}

SQLAdapter is a manager for DWH tables

type SQLAdapterBase

type SQLAdapterBase[T any] struct {
	appbase.Service
	// contains filtered or unexported fields
}

func (*SQLAdapterBase[T]) Close

func (b *SQLAdapterBase[T]) Close() error

Close underlying sql.DB

func (*SQLAdapterBase[T]) ColumnName

func (b *SQLAdapterBase[T]) ColumnName(identifier string) string

func (*SQLAdapterBase[T]) Count

func (b *SQLAdapterBase[T]) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)

func (*SQLAdapterBase[T]) CreateTable

func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Table) error

CreateTable create table columns and pk key override input table sql type with configured cast type make fields from Table PkFields - 'not null'

func (*SQLAdapterBase[T]) Delete

func (b *SQLAdapterBase[T]) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error

func (*SQLAdapterBase[T]) Drop

func (b *SQLAdapterBase[T]) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*SQLAdapterBase[T]) DropTable

func (b *SQLAdapterBase[T]) DropTable(ctx context.Context, tableName string, ifExists bool) error

func (*SQLAdapterBase[T]) GetAvroSchema

func (b *SQLAdapterBase[T]) GetAvroSchema(table *Table) *types2.AvroSchema

func (*SQLAdapterBase[T]) GetAvroType

func (b *SQLAdapterBase[T]) GetAvroType(sqlType string) (any, bool)

func (*SQLAdapterBase[T]) GetBatchFileCompression

func (b *SQLAdapterBase[T]) GetBatchFileCompression() types2.FileCompression

func (*SQLAdapterBase[T]) GetBatchFileFormat

func (b *SQLAdapterBase[T]) GetBatchFileFormat() types2.FileFormat

func (*SQLAdapterBase[T]) GetDataType

func (b *SQLAdapterBase[T]) GetDataType(sqlType string) (types2.DataType, bool)

func (*SQLAdapterBase[T]) GetSQLType

func (b *SQLAdapterBase[T]) GetSQLType(dataType types2.DataType) (string, bool)

func (*SQLAdapterBase[T]) PatchTableSchema

func (b *SQLAdapterBase[T]) PatchTableSchema(ctx context.Context, patchTable *Table) error

PatchTableSchema alter table with columns (if not empty) recreate primary key (if not empty) or delete primary key if Table.DeletePkFields is true

func (*SQLAdapterBase[T]) Ping

func (b *SQLAdapterBase[T]) Ping(ctx context.Context) error

func (*SQLAdapterBase[T]) ReplaceTable

func (b *SQLAdapterBase[T]) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*SQLAdapterBase[T]) Select

func (b *SQLAdapterBase[T]) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*SQLAdapterBase[T]) StringifyObjects

func (b *SQLAdapterBase[T]) StringifyObjects() bool

func (*SQLAdapterBase[T]) TableHelper

func (b *SQLAdapterBase[T]) TableHelper() *TableHelper

func (*SQLAdapterBase[T]) TableName

func (b *SQLAdapterBase[T]) TableName(identifier string) string

func (*SQLAdapterBase[T]) ToWhenConditions

func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramExpression ParameterPlaceholder, valuesShift int) (string, []any)

ToWhenConditions generates WHEN clause for SQL query based on provided WhenConditions

paramExpression - SQLParameterExpression function that produce parameter placeholder for parametrized query, depending on database can be: IndexParameterPlaceholder, QuestionMarkParameterPlaceholder, NamedParameterPlaceholder

valuesShift - for parametrized query index of first when clause value in all values provided to query (for UPDATE queries 'valuesShift' = len(object fields))

func (*SQLAdapterBase[T]) TruncateTable

func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, tableName string) error

TruncateTable deletes all records in tableName table

func (*SQLAdapterBase[T]) Type

func (b *SQLAdapterBase[T]) Type() string

Type returns Postgres type

func (*SQLAdapterBase[T]) Update

func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object types2.Object, whenConditions *WhenConditions) error

type SSLConfig

type SSLConfig struct {
	SSLMode       string `mapstructure:"sslMode,omitempty"`
	SSLServerCA   string `mapstructure:"sslServerCA,omitempty"`
	SSLClientCert string `mapstructure:"sslClientCert,omitempty"`
	SSLClientKey  string `mapstructure:"sslClientKey,omitempty"`
}

SSLConfig is a dto for deserialized SSL configuration for Postgres

func (*SSLConfig) ValidateSSL

func (sc *SSLConfig) ValidateSSL() error

ValidateSSL returns err if the ssl configuration is invalid

type Snowflake

type Snowflake struct {
	*SQLAdapterBase[SnowflakeConfig]
}

Snowflake is adapter for creating,patching (schema or table), inserting data to snowflake

func (*Snowflake) CopyTables

func (s *Snowflake) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (*bulker.WarehouseState, error)

func (*Snowflake) CreateStream

func (s *Snowflake) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*Snowflake) CreateTable

func (s *Snowflake) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*Snowflake) GetTableSchema

func (s *Snowflake) GetTableSchema(ctx context.Context, tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*Snowflake) InitDatabase

func (s *Snowflake) InitDatabase(ctx context.Context) error

InitDatabase create database schema instance if doesn't exist

func (*Snowflake) Insert

func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

Insert inserts data with InsertContext as a single object or a batch into Snowflake

func (*Snowflake) LoadTable

func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state *bulker.WarehouseState, err error)

LoadTable transfer data from local file to Snowflake by passing COPY request to Snowflake

func (*Snowflake) OpenTx

func (s *Snowflake) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*Snowflake) ReplaceTable

func (s *Snowflake) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error

func (*Snowflake) Select

func (s *Snowflake) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

type SnowflakeConfig

type SnowflakeConfig struct {
	Account    string             `mapstructure:"account,omitempty" json:"account,omitempty" yaml:"account,omitempty"`
	Port       int                `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"`
	Db         string             `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"`
	Schema     string             `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"`
	Username   string             `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string             `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Warehouse  string             `mapstructure:"warehouse,omitempty" json:"warehouse,omitempty" yaml:"warehouse,omitempty"`
	Parameters map[string]*string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

SnowflakeConfig dto for deserialized datasource config for Snowflake

func (*SnowflakeConfig) Validate

func (sc *SnowflakeConfig) Validate() error

Validate required fields in SnowflakeConfig

type Table

type Table struct {
	Name      string
	Temporary bool
	Cached    bool

	Columns         Columns
	PKFields        utils.Set[string]
	PrimaryKeyName  string
	TimestampColumn string

	Partition DatePartition

	DeletePkFields bool
}

Table is a dto for DWH Table representation

func (*Table) Clone

func (t *Table) Clone() *Table

Clone returns clone of current table

func (*Table) ColumnsWithSQLTypes

func (t *Table) ColumnsWithSQLTypes() [][]string

func (*Table) Diff

func (t *Table) Diff(another *Table) *Table

Diff calculates diff between current schema and another one. Return schema to add to current schema (for being equal) or empty if 1) another one is empty 2) all fields from another schema exist in current schema NOTE: Diff method doesn't take types into account

func (*Table) Exists

func (t *Table) Exists() bool

Exists returns true if there is at least one column

func (*Table) FitsToTable

func (t *Table) FitsToTable(destination *Table) bool

FitsToTable checks that current table fits to the destination table column-wise (doesn't have new columns)

func (*Table) GetPKFields

func (t *Table) GetPKFields() []string

GetPKFields returns primary keys list

func (*Table) GetPKFieldsSet

func (t *Table) GetPKFieldsSet() utils.Set[string]

GetPKFieldsSet returns primary keys set

func (*Table) SortedColumnNames

func (t *Table) SortedColumnNames() []string

SortedColumnNames return column names sorted in alphabetical order

type TableField

type TableField struct {
	Field string `json:"field,omitempty"`
	Type  string `json:"type,omitempty"`
	Value any    `json:"value,omitempty"`
}

TableField is a table column representation

type TableHelper

type TableHelper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TableHelper keeps tables schema state inmemory and update it according to incoming new data consider that all tables are in one destination schema. note: Assume that after any outer changes in db we need to increment table version in Service

func NewTableHelper

func NewTableHelper(maxIdentifierLength int, identifierQuoteChar rune) TableHelper

NewTableHelper returns configured TableHelper instance Note: columnTypesMapping must be not empty (or fields will be ignored)

func (*TableHelper) ColumnName

func (th *TableHelper) ColumnName(columnName string) string

func (*TableHelper) EnsureTableWithCaching

func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error)

EnsureTableWithCaching calls ensureTable with cacheTable = true it is used in stream destinations (because we don't have time to select table schema, but there is retry on error)

func (*TableHelper) EnsureTableWithoutCaching

func (th *TableHelper) EnsureTableWithoutCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error)

EnsureTableWithoutCaching calls ensureTable with cacheTable = true it is used in batch destinations and syncStore (because we have time to select table schema)

func (*TableHelper) GetCached

func (th *TableHelper) GetCached(tableName string) (*Table, bool)

func (*TableHelper) MapSchema

func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema) *Table

MapSchema maps types.Schema into types.Table (structure with SQL types)

func (*TableHelper) MapTableSchema

func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesHeader, object types2.Object, pkFields []string, timestampColumn string) (*Table, types2.Object)

MapTableSchema maps types.TypesHeader (JSON structure with json data types) into types.Table (structure with SQL types) applies column types mapping adjusts object properties names to column names

func (*TableHelper) TableName

func (th *TableHelper) TableName(tableName string) string

type TableStatementFactory

type TableStatementFactory struct {
	// contains filtered or unexported fields
}

TableStatementFactory is used for creating CREATE TABLE statements depends on config

func NewTableStatementFactory

func NewTableStatementFactory(ch *ClickHouse) *TableStatementFactory

func (TableStatementFactory) CreateTableStatement

func (tsf TableStatementFactory) CreateTableStatement(quotedTableName, tableName, columnsClause string, table *Table) string

CreateTableStatement return clickhouse DDL for creating table statement

type TransactionalStream

type TransactionalStream struct {
	*AbstractTransactionalSQLStream
}

TODO: Use real temporary tables

func (*TransactionalStream) Complete

func (ps *TransactionalStream) Complete(ctx context.Context) (state bulker.State, err error)

type TxOrDB

type TxOrDB interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
	PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
}

type TxSQLAdapter

type TxSQLAdapter struct {
	// contains filtered or unexported fields
}

func (*TxSQLAdapter) ColumnName

func (tx *TxSQLAdapter) ColumnName(identifier string) string

func (*TxSQLAdapter) Commit

func (tx *TxSQLAdapter) Commit() error

func (*TxSQLAdapter) CopyTables

func (tx *TxSQLAdapter) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (*bulker.WarehouseState, error)

func (*TxSQLAdapter) Count

func (tx *TxSQLAdapter) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)

func (*TxSQLAdapter) CreateTable

func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*TxSQLAdapter) Delete

func (tx *TxSQLAdapter) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
func (tx *TxSQLAdapter) Update(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error {
	ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx)
	return tx.sqlAdapter.Update(ctx, tableName, object, whenConditions)
}

func (*TxSQLAdapter) Drop

func (tx *TxSQLAdapter) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*TxSQLAdapter) DropTable

func (tx *TxSQLAdapter) DropTable(ctx context.Context, tableName string, ifExists bool) error

func (*TxSQLAdapter) GetAvroSchema

func (tx *TxSQLAdapter) GetAvroSchema(table *Table) *types2.AvroSchema

func (*TxSQLAdapter) GetAvroType

func (tx *TxSQLAdapter) GetAvroType(sqlType string) (any, bool)

func (*TxSQLAdapter) GetBatchFileCompression

func (tx *TxSQLAdapter) GetBatchFileCompression() types2.FileCompression

func (*TxSQLAdapter) GetBatchFileFormat

func (tx *TxSQLAdapter) GetBatchFileFormat() types2.FileFormat

func (*TxSQLAdapter) GetDataType

func (tx *TxSQLAdapter) GetDataType(sqlType string) (types2.DataType, bool)

func (*TxSQLAdapter) GetSQLType

func (tx *TxSQLAdapter) GetSQLType(dataType types2.DataType) (string, bool)

func (*TxSQLAdapter) GetTableSchema

func (tx *TxSQLAdapter) GetTableSchema(ctx context.Context, tableName string) (*Table, error)

func (*TxSQLAdapter) InitDatabase

func (tx *TxSQLAdapter) InitDatabase(ctx context.Context) error

func (*TxSQLAdapter) Insert

func (tx *TxSQLAdapter) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*TxSQLAdapter) LoadTable

func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (*bulker.WarehouseState, error)

func (*TxSQLAdapter) OpenTx

func (tx *TxSQLAdapter) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

func (*TxSQLAdapter) PatchTableSchema

func (tx *TxSQLAdapter) PatchTableSchema(ctx context.Context, patchTable *Table) error

func (*TxSQLAdapter) Ping

func (tx *TxSQLAdapter) Ping(ctx context.Context) error

func (*TxSQLAdapter) ReplaceTable

func (tx *TxSQLAdapter) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error

func (*TxSQLAdapter) Rollback

func (tx *TxSQLAdapter) Rollback() error

func (*TxSQLAdapter) Select

func (tx *TxSQLAdapter) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*TxSQLAdapter) StringifyObjects

func (tx *TxSQLAdapter) StringifyObjects() bool

func (*TxSQLAdapter) TableHelper

func (tx *TxSQLAdapter) TableHelper() *TableHelper

func (*TxSQLAdapter) TableName

func (tx *TxSQLAdapter) TableName(identifier string) string

func (*TxSQLAdapter) TruncateTable

func (tx *TxSQLAdapter) TruncateTable(ctx context.Context, tableName string) error

func (*TxSQLAdapter) Type

func (tx *TxSQLAdapter) Type() string

type TxWrapper

type TxWrapper struct {
	// contains filtered or unexported fields
}

TxWrapper is sql transaction wrapper. Used for handling and log errors with db type (postgres, mySQL, redshift or snowflake) on Commit() and Rollback() calls

func NewDbWrapper

func NewDbWrapper(dbType string, db DB, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter, closeDb bool) *TxWrapper

func NewDummyTxWrapper

func NewDummyTxWrapper(dbType string) *TxWrapper

func NewTxWrapper

func NewTxWrapper(dbType string, tx *sql.Tx, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter) *TxWrapper

func (*TxWrapper) Commit

func (t *TxWrapper) Commit() error

Commit commits underlying transaction and returns err if occurred

func (*TxWrapper) ExecContext

func (t *TxWrapper) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

ExecContext executes a query that doesn't return rows. For example: an INSERT and UPDATE.

func (*TxWrapper) PrepareContext

func (t *TxWrapper) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

PrepareContext creates a prepared statement for use within a transaction.

The returned statement operates within the transaction and will be closed when the transaction has been committed or rolled back.

To use an existing prepared statement on this transaction, see Tx.Stmt.

The provided context will be used for the preparation of the context, not for the execution of the returned statement. The returned statement will run in the transaction context.

func (*TxWrapper) QueryContext

func (t *TxWrapper) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

QueryContext executes a query that returns rows, typically a SELECT.

func (*TxWrapper) QueryRowContext

func (t *TxWrapper) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

QueryRowContext executes a query that is expected to return at most one row. QueryRowContext always returns a non-nil value. Errors are deferred until Row's Scan method is called. If the query selects no rows, the *Row's Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the first selected row and discards the rest.

func (*TxWrapper) Rollback

func (t *TxWrapper) Rollback() error

Rollback cancels underlying transaction and logs system err if occurred

type TypeCastFunction

type TypeCastFunction func(placeholder string, column types2.SQLColumn) string

TypeCastFunction wraps parameter(or placeholder) to a type cast expression if it is necessary (e.g. on types overrides)

type TypeResolver

type TypeResolver interface {
	Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error)
}

TypeResolver resolves types.Fields from input object

type TypeResolverImpl

type TypeResolverImpl struct {
}

TypeResolverImpl resolves types based on converter.go rules

func NewTypeResolver

func NewTypeResolver() *TypeResolverImpl

NewTypeResolver returns TypeResolverImpl

func (*TypeResolverImpl) Resolve

func (tr *TypeResolverImpl) Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error)

Resolve return types.Fields representation of input object apply default typecast and define column types reformat from json.Number into int64 or float64 and put back reformat from string with timestamp into time.Time and put back

type TypesHeader

type TypesHeader struct {
	TableName string
	Fields    Fields
	Partition DatePartition
}

TypesHeader is the schema result of parsing JSON objects

func ProcessEvents

func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, omitNils bool, stringifyObjects bool) (*TypesHeader, types.Object, error)

ProcessEvents processes events objects without applying mapping rules returns table headerm array of processed objects or error if at least 1 was occurred

func (*TypesHeader) Exists

func (bh *TypesHeader) Exists() bool

Exists returns true if there is at least one field

type ValueMappingFunction

type ValueMappingFunction func(value any, valuePresent bool, column types2.SQLColumn) any

ValueMappingFunction maps object value to database value. For cases such default value substitution for null or missing values

type WhenCondition

type WhenCondition struct {
	Field  string
	Value  any
	Clause string
}

WhenCondition is a representation of SQL delete condition

type WhenConditions

type WhenConditions struct {
	Conditions    []WhenCondition
	JoinCondition string
}

WhenConditions is a dto for multiple WhenCondition instances with Joiner

func ByPartitionId

func ByPartitionId(partitonId string) *WhenConditions

ByPartitionId return delete condition that removes objects based on __partition_id value or empty condition if partitonId is empty

func NewWhenConditions

func NewWhenConditions(field string, clause string, value any) *WhenConditions

func (*WhenConditions) Add

func (dc *WhenConditions) Add(field string, clause string, value any) *WhenConditions

func (*WhenConditions) IsEmpty

func (dc *WhenConditions) IsEmpty() bool

IsEmpty returns true if there is no conditions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL