Documentation ¶
Index ¶
- Variables
- type AwsRedshift
- func (ar *AwsRedshift) Close() error
- func (ar *AwsRedshift) Copy(wrappedTx *Transaction, fileKey, tableName string) error
- func (ar *AwsRedshift) CreateDbSchema(dbSchemaName string) error
- func (ar *AwsRedshift) CreateTable(tableSchema *schema.Table) error
- func (ar *AwsRedshift) GetTableSchema(tableName string) (*schema.Table, error)
- func (ar *AwsRedshift) Insert(schema *schema.Table, valuesMap map[string]interface{}) error
- func (AwsRedshift) Name() string
- func (ar *AwsRedshift) OpenTx() (*Transaction, error)
- func (ar *AwsRedshift) PatchTableSchema(patchSchema *schema.Table) error
- func (ar *AwsRedshift) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
- type BQItem
- type BigQuery
- func (bq *BigQuery) Close() error
- func (bq *BigQuery) Copy(fileKey, tableName string) error
- func (bq *BigQuery) CreateDataset(dataset string) error
- func (bq *BigQuery) CreateTable(tableSchema *schema.Table) error
- func (bq *BigQuery) GetTableSchema(tableName string) (*schema.Table, error)
- func (bq *BigQuery) Insert(schema *schema.Table, valuesMap map[string]interface{}) error
- func (bq *BigQuery) PatchTableSchema(patchSchema *schema.Table) error
- func (bq *BigQuery) Test() error
- func (bq *BigQuery) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
- type ClickHouse
- func (ch *ClickHouse) Close() error
- func (ch *ClickHouse) CreateDB(dbName string) error
- func (ch *ClickHouse) CreateTable(tableSchema *schema.Table) error
- func (ch *ClickHouse) GetTableSchema(tableName string) (*schema.Table, error)
- func (ch *ClickHouse) Insert(schema *schema.Table, valuesMap map[string]interface{}) error
- func (ch *ClickHouse) InsertInTransaction(wrappedTx *Transaction, schema *schema.Table, valuesMap map[string]interface{}) error
- func (ClickHouse) Name() string
- func (ch *ClickHouse) OpenTx() (*Transaction, error)
- func (ch *ClickHouse) PatchTableSchema(patchSchema *schema.Table) error
- func (ch *ClickHouse) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
- type ClickHouseConfig
- type DataSourceConfig
- type EngineConfig
- type FieldConfig
- type GoogleCloudStorage
- func (gcs *GoogleCloudStorage) Close() error
- func (gcs *GoogleCloudStorage) DeleteObject(key string) error
- func (gcs *GoogleCloudStorage) GetObject(key string) ([]byte, error)
- func (gcs *GoogleCloudStorage) ListBucket(prefix string) ([]string, error)
- func (gcs *GoogleCloudStorage) UploadBytes(fileName string, fileBytes []byte) error
- type GoogleConfig
- type Postgres
- func (p *Postgres) Close() error
- func (p *Postgres) CreateDbSchema(dbSchemaName string) error
- func (p *Postgres) CreateTable(tableSchema *schema.Table) error
- func (p *Postgres) GetTableSchema(tableName string) (*schema.Table, error)
- func (p *Postgres) Insert(schema *schema.Table, valuesMap map[string]interface{}) error
- func (p *Postgres) InsertInTransaction(wrappedTx *Transaction, table *schema.Table, valuesMap map[string]interface{}) error
- func (Postgres) Name() string
- func (p *Postgres) OpenTx() (*Transaction, error)
- func (p *Postgres) PatchTableSchema(patchSchema *schema.Table) error
- func (p *Postgres) TablesList() ([]string, error)
- func (p *Postgres) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
- type S3
- type S3Config
- type Snowflake
- func (s *Snowflake) Close() (multiErr error)
- func (s *Snowflake) Copy(wrappedTx *Transaction, fileKey, header, tableName string) error
- func (s *Snowflake) CreateDbSchema(dbSchemaName string) error
- func (s *Snowflake) CreateTable(tableSchema *schema.Table) error
- func (s *Snowflake) GetTableSchema(tableName string) (*schema.Table, error)
- func (s *Snowflake) Insert(schema *schema.Table, valuesMap map[string]interface{}) error
- func (s *Snowflake) InsertInTransaction(wrappedTx *Transaction, schema *schema.Table, valuesMap map[string]interface{}) error
- func (Snowflake) Name() string
- func (s *Snowflake) OpenTx() (*Transaction, error)
- func (s *Snowflake) PatchTableSchema(patchSchema *schema.Table) error
- func (s *Snowflake) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
- type SnowflakeConfig
- type Stage
- type TableManager
- type TableStatementFactory
- type Transaction
Constants ¶
This section is empty.
Variables ¶
var ( SchemaToBigQuery = map[typing.DataType]bigquery.FieldType{ typing.STRING: bigquery.StringFieldType, typing.INT64: bigquery.IntegerFieldType, typing.FLOAT64: bigquery.FloatFieldType, typing.TIMESTAMP: bigquery.TimestampFieldType, } BigQueryToSchema = map[bigquery.FieldType]typing.DataType{ bigquery.StringFieldType: typing.STRING, bigquery.IntegerFieldType: typing.INT64, bigquery.FloatFieldType: typing.FLOAT64, bigquery.TimestampFieldType: typing.TIMESTAMP, } )
Functions ¶
This section is empty.
Types ¶
type AwsRedshift ¶
type AwsRedshift struct {
// contains filtered or unexported fields
}
AwsRedshift adapter for creating,patching (schema or table), copying data from s3 to redshift
func NewAwsRedshift ¶
func NewAwsRedshift(ctx context.Context, dsConfig *DataSourceConfig, s3Config *S3Config) (*AwsRedshift, error)
NewAwsRedshift return configured AwsRedshift adapter instance
func (*AwsRedshift) Copy ¶
func (ar *AwsRedshift) Copy(wrappedTx *Transaction, fileKey, tableName string) error
Copy transfer data from s3 to redshift by passing COPY request to redshift in provided wrapped transaction
func (*AwsRedshift) CreateDbSchema ¶
func (ar *AwsRedshift) CreateDbSchema(dbSchemaName string) error
CreateDbSchema create database schema instance if doesn't exist
func (*AwsRedshift) CreateTable ¶
func (ar *AwsRedshift) CreateTable(tableSchema *schema.Table) error
CreateTable create database table with name,columns provided in schema.Table representation
func (*AwsRedshift) GetTableSchema ¶
func (ar *AwsRedshift) GetTableSchema(tableName string) (*schema.Table, error)
GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct
func (*AwsRedshift) Insert ¶
func (ar *AwsRedshift) Insert(schema *schema.Table, valuesMap map[string]interface{}) error
Insert provided object in AwsRedshift in stream mode
func (AwsRedshift) Name ¶
func (AwsRedshift) Name() string
func (*AwsRedshift) OpenTx ¶
func (ar *AwsRedshift) OpenTx() (*Transaction, error)
OpenTx open underline sql transaction and return wrapped instance
func (*AwsRedshift) PatchTableSchema ¶
func (ar *AwsRedshift) PatchTableSchema(patchSchema *schema.Table) error
PatchTableSchema add new columns(from provided schema.Table) to existing table
func (*AwsRedshift) UpdatePrimaryKey ¶ added in v1.15.6
func (ar *AwsRedshift) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
type BQItem ¶
type BQItem struct {
// contains filtered or unexported fields
}
BQItem struct for streaming inserts to BigQuery
type BigQuery ¶
type BigQuery struct {
// contains filtered or unexported fields
}
func NewBigQuery ¶
func NewBigQuery(ctx context.Context, config *GoogleConfig) (*BigQuery, error)
func (*BigQuery) Copy ¶
Transfer data from google cloud storage file to google BigQuery table as one batch
func (*BigQuery) CreateDataset ¶
Create google BigQuery Dataset if doesn't exist
func (*BigQuery) CreateTable ¶
Create google BigQuery table from schema.Table
func (*BigQuery) GetTableSchema ¶
Return google BigQuery table representation(name, columns with types) as schema.Table
func (*BigQuery) PatchTableSchema ¶
Add schema.Table columns to google BigQuery table
func (*BigQuery) UpdatePrimaryKey ¶ added in v1.15.6
type ClickHouse ¶
type ClickHouse struct {
// contains filtered or unexported fields
}
ClickHouse is adapter for creating,patching (schema or table), inserting data to clickhouse
func NewClickHouse ¶
func NewClickHouse(ctx context.Context, connectionString, database, cluster string, tlsConfig map[string]string, tableStatementFactory *TableStatementFactory, nullableFields map[string]bool) (*ClickHouse, error)
NewClickHouse return configured ClickHouse adapter instance
func (*ClickHouse) CreateDB ¶
func (ch *ClickHouse) CreateDB(dbName string) error
CreateDB create database instance if doesn't exist
func (*ClickHouse) CreateTable ¶
func (ch *ClickHouse) CreateTable(tableSchema *schema.Table) error
CreateTable create database table with name,columns provided in schema.Table representation New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not
func (*ClickHouse) GetTableSchema ¶
func (ch *ClickHouse) GetTableSchema(tableName string) (*schema.Table, error)
GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct
func (*ClickHouse) Insert ¶
func (ch *ClickHouse) Insert(schema *schema.Table, valuesMap map[string]interface{}) error
Insert provided object in ClickHouse in stream mode
func (*ClickHouse) InsertInTransaction ¶
func (ch *ClickHouse) InsertInTransaction(wrappedTx *Transaction, schema *schema.Table, valuesMap map[string]interface{}) error
Insert provided object in ClickHouse in transaction
func (ClickHouse) Name ¶
func (ClickHouse) Name() string
func (*ClickHouse) OpenTx ¶
func (ch *ClickHouse) OpenTx() (*Transaction, error)
OpenTx open underline sql transaction and return wrapped instance
func (*ClickHouse) PatchTableSchema ¶
func (ch *ClickHouse) PatchTableSchema(patchSchema *schema.Table) error
PatchTableSchema add new columns(from provided schema.Table) to existing table drop and create distributed table
func (*ClickHouse) UpdatePrimaryKey ¶ added in v1.15.6
func (ch *ClickHouse) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
type ClickHouseConfig ¶
type ClickHouseConfig struct { Dsns []string `mapstructure:"dsns" json:"dsns,omitempty" yaml:"dsns,omitempty"` Database string `mapstructure:"db" json:"db,omitempty" yaml:"db,omitempty"` Tls map[string]string `mapstructure:"tls" json:"tls,omitempty" yaml:"tls,omitempty"` Cluster string `mapstructure:"cluster" json:"cluster,omitempty" yaml:"cluster,omitempty"` Engine *EngineConfig `mapstructure:"engine" json:"engine,omitempty" yaml:"engine,omitempty"` }
ClickHouseConfig dto for deserialized clickhouse config
func (*ClickHouseConfig) Validate ¶
func (chc *ClickHouseConfig) Validate() error
Validate required fields in ClickHouseConfig
type DataSourceConfig ¶
type DataSourceConfig struct { Host string `mapstructure:"host" json:"host,omitempty" yaml:"host,omitempty"` Port int `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"` Db string `mapstructure:"db" json:"db,omitempty" yaml:"db,omitempty"` Schema string `mapstructure:"schema" json:"schema,omitempty" yaml:"schema,omitempty"` Username string `mapstructure:"username" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password" json:"password,omitempty" yaml:"password,omitempty"` Parameters map[string]string `mapstructure:"parameters" json:"parameters,omitempty" yaml:"parameters,omitempty"` }
DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)
func (*DataSourceConfig) Validate ¶
func (dsc *DataSourceConfig) Validate() error
Validate required fields in DataSourceConfig
type EngineConfig ¶
type EngineConfig struct { RawStatement string `mapstructure:"raw_statement" json:"raw_statement,omitempty" yaml:"raw_statement,omitempty"` NullableFields []string `mapstructure:"nullable_fields" json:"nullable_fields,omitempty" yaml:"nullable_fields,omitempty"` PartitionFields []FieldConfig `mapstructure:"partition_fields" json:"partition_fields,omitempty" yaml:"partition_fields,omitempty"` OrderFields []FieldConfig `mapstructure:"order_fields" json:"order_fields,omitempty" yaml:"order_fields,omitempty"` PrimaryKeys []string `mapstructure:"primary_keys" json:"primary_keys,omitempty" yaml:"primary_keys,omitempty"` }
EngineConfig dto for deserialized clickhouse engine config
type FieldConfig ¶
type FieldConfig struct { Function string `mapstructure:"function" json:"function,omitempty" yaml:"function,omitempty"` Field string `mapstructure:"field" json:"field,omitempty" yaml:"field,omitempty"` }
FieldConfig dto for deserialized clickhouse engine fields
type GoogleCloudStorage ¶
type GoogleCloudStorage struct {
// contains filtered or unexported fields
}
func NewGoogleCloudStorage ¶
func NewGoogleCloudStorage(ctx context.Context, config *GoogleConfig) (*GoogleCloudStorage, error)
func (*GoogleCloudStorage) Close ¶
func (gcs *GoogleCloudStorage) Close() error
func (*GoogleCloudStorage) DeleteObject ¶
func (gcs *GoogleCloudStorage) DeleteObject(key string) error
Delete object from google cloud storage bucket
func (*GoogleCloudStorage) GetObject ¶
func (gcs *GoogleCloudStorage) GetObject(key string) ([]byte, error)
Get object from google cloud storage bucket
func (*GoogleCloudStorage) ListBucket ¶
func (gcs *GoogleCloudStorage) ListBucket(prefix string) ([]string, error)
Return google cloud storage bucket file names filtered by prefix
func (*GoogleCloudStorage) UploadBytes ¶
func (gcs *GoogleCloudStorage) UploadBytes(fileName string, fileBytes []byte) error
Create named file on google cloud storage with payload
type GoogleConfig ¶
type GoogleConfig struct { Bucket string `mapstructure:"gcs_bucket" json:"gcs_bucket,omitempty" yaml:"gcs_bucket,omitempty"` Project string `mapstructure:"bq_project" json:"bq_project,omitempty" yaml:"bq_project,omitempty"` Dataset string `mapstructure:"bq_dataset" json:"bq_dataset,omitempty" yaml:"bq_dataset,omitempty"` KeyFile interface{} `mapstructure:"key_file" json:"key_file,omitempty" yaml:"key_file,omitempty"` // contains filtered or unexported fields }
func (*GoogleConfig) Validate ¶
func (gc *GoogleConfig) Validate(streamMode bool) error
type Postgres ¶
type Postgres struct {
// contains filtered or unexported fields
}
Postgres is adapter for creating,patching (schema or table), inserting data to postgres
func NewPostgres ¶
func NewPostgres(ctx context.Context, config *DataSourceConfig) (*Postgres, error)
NewPostgres return configured Postgres adapter instance
func (*Postgres) CreateDbSchema ¶
CreateDbSchema create database schema instance if doesn't exist
func (*Postgres) CreateTable ¶
CreateTable create database table with name,columns provided in schema.Table representation
func (*Postgres) GetTableSchema ¶
GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct
func (*Postgres) InsertInTransaction ¶
func (*Postgres) OpenTx ¶
func (p *Postgres) OpenTx() (*Transaction, error)
OpenTx open underline sql transaction and return wrapped instance
func (*Postgres) PatchTableSchema ¶
PatchTableSchema add new columns(from provided schema.Table) to existing table
func (*Postgres) TablesList ¶
TablesList return slice of postgres table names
func (*Postgres) UpdatePrimaryKey ¶ added in v1.15.6
type S3 ¶
type S3 struct {
// contains filtered or unexported fields
}
func (*S3) DeleteObject ¶
Delete object from s3 bucket by key
func (*S3) ListBucket ¶
Return s3 bucket file keys filtered by file name prefix
type S3Config ¶
type S3Config struct { AccessKeyID string `mapstructure:"access_key_id" json:"access_key_id,omitempty" yaml:"access_key_id,omitempty"` SecretKey string `mapstructure:"secret_access_key" json:"secret_access_key,omitempty" yaml:"secret_access_key,omitempty"` Bucket string `mapstructure:"bucket" json:"bucket,omitempty" yaml:"bucket,omitempty"` Region string `mapstructure:"region" json:"region,omitempty" yaml:"region,omitempty"` Endpoint string `mapstructure:"endpoint" json:"endpoint,omitempty" yaml:"endpoint,omitempty"` Folder string `mapstructure:"folder" json:"folder,omitempty" yaml:"folder,omitempty"` }
type Snowflake ¶
type Snowflake struct {
// contains filtered or unexported fields
}
Snowflake is adapter for creating,patching (schema or table), inserting data to snowflake
func NewSnowflake ¶
func NewSnowflake(ctx context.Context, config *SnowflakeConfig, s3Config *S3Config) (*Snowflake, error)
NewSnowflake return configured Snowflake adapter instance
func (*Snowflake) Copy ¶
func (s *Snowflake) Copy(wrappedTx *Transaction, fileKey, header, tableName string) error
Copy transfer data from s3 to Snowflake by passing COPY request to Snowflake in provided wrapped transaction
func (*Snowflake) CreateDbSchema ¶
CreateDbSchema create database schema instance if doesn't exist
func (*Snowflake) CreateTable ¶
CreateTable create database table with name,columns provided in schema.Table representation
func (*Snowflake) GetTableSchema ¶
GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct
func (*Snowflake) InsertInTransaction ¶
func (*Snowflake) OpenTx ¶
func (s *Snowflake) OpenTx() (*Transaction, error)
OpenTx open underline sql transaction and return wrapped instance
func (*Snowflake) PatchTableSchema ¶
PatchTableSchema add new columns(from provided schema.Table) to existing table
func (*Snowflake) UpdatePrimaryKey ¶ added in v1.15.6
type SnowflakeConfig ¶
type SnowflakeConfig struct { Account string `mapstructure:"account" json:"account,omitempty" yaml:"account,omitempty"` Port int `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"` Db string `mapstructure:"db" json:"db,omitempty" yaml:"db,omitempty"` Schema string `mapstructure:"schema" json:"schema,omitempty" yaml:"schema,omitempty"` Username string `mapstructure:"username" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password" json:"password,omitempty" yaml:"password,omitempty"` Warehouse string `mapstructure:"warehouse" json:"warehouse,omitempty" yaml:"warehouse,omitempty"` Stage string `mapstructure:"stage" json:"stage,omitempty" yaml:"stage,omitempty"` Parameters map[string]*string `mapstructure:"parameters" json:"parameters,omitempty" yaml:"parameters,omitempty"` }
SnowflakeConfig dto for deserialized datasource config for Snowflake
func (*SnowflakeConfig) Validate ¶
func (sc *SnowflakeConfig) Validate() error
Validate required fields in SnowflakeConfig
type TableManager ¶
type TableStatementFactory ¶
type TableStatementFactory struct {
// contains filtered or unexported fields
}
TableStatementFactory is used for creating CREATE TABLE statements depends on config
func NewTableStatementFactory ¶
func NewTableStatementFactory(config *ClickHouseConfig) (*TableStatementFactory, error)
func (TableStatementFactory) CreateTableStatement ¶
func (tsf TableStatementFactory) CreateTableStatement(tableName, columnsClause string) string
CreateTableStatement return clickhouse DDL for creating table statement
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction is sql transaction wrapper. Used for handling and log errors with db type (postgres, redshift, clickhouse or snowflake) on Commit() and Rollback() calls Use DirectCommit() if you need not to swallow an error on commit
func (*Transaction) Commit ¶
func (t *Transaction) Commit()
func (*Transaction) DirectCommit ¶
func (t *Transaction) DirectCommit() error
func (*Transaction) Rollback ¶
func (t *Transaction) Rollback()