adapters

package
v1.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2020 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type AwsRedshift

type AwsRedshift struct {
	// contains filtered or unexported fields
}

AwsRedshift adapter for creating,patching (schema or table), copying data from s3 to redshift

func NewAwsRedshift

func NewAwsRedshift(ctx context.Context, dsConfig *DataSourceConfig, s3Config *S3Config) (*AwsRedshift, error)

NewAwsRedshift return configured AwsRedshift adapter instance

func (*AwsRedshift) Close

func (ar *AwsRedshift) Close() error

Close underlying sql.DB

func (*AwsRedshift) Copy

func (ar *AwsRedshift) Copy(wrappedTx *Transaction, fileKey, tableName string) error

Copy transfer data from s3 to redshift by passing COPY request to redshift in provided wrapped transaction

func (*AwsRedshift) CreateDbSchema

func (ar *AwsRedshift) CreateDbSchema(dbSchemaName string) error

CreateDbSchema create database schema instance if doesn't exist

func (*AwsRedshift) CreateTable

func (ar *AwsRedshift) CreateTable(tableSchema *schema.Table) error

CreateTable create database table with name,columns provided in schema.Table representation

func (*AwsRedshift) GetTableSchema

func (ar *AwsRedshift) GetTableSchema(tableName string) (*schema.Table, error)

GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct

func (*AwsRedshift) Insert

func (ar *AwsRedshift) Insert(schema *schema.Table, valuesMap map[string]interface{}) error

Insert provided object in AwsRedshift in stream mode

func (AwsRedshift) Name

func (AwsRedshift) Name() string

func (*AwsRedshift) OpenTx

func (ar *AwsRedshift) OpenTx() (*Transaction, error)

OpenTx open underline sql transaction and return wrapped instance

func (*AwsRedshift) PatchTableSchema

func (ar *AwsRedshift) PatchTableSchema(patchSchema *schema.Table) error

PatchTableSchema add new columns(from provided schema.Table) to existing table

func (*AwsRedshift) UpdatePrimaryKey added in v1.15.6

func (ar *AwsRedshift) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error

type BQItem

type BQItem struct {
	// contains filtered or unexported fields
}

BQItem struct for streaming inserts to BigQuery

func (BQItem) Save

func (bqi BQItem) Save() (row map[string]bigquery.Value, insertID string, err error)

type BigQuery

type BigQuery struct {
	// contains filtered or unexported fields
}

func NewBigQuery

func NewBigQuery(ctx context.Context, config *GoogleConfig) (*BigQuery, error)

func (*BigQuery) Close

func (bq *BigQuery) Close() error

func (*BigQuery) Copy

func (bq *BigQuery) Copy(fileKey, tableName string) error

Transfer data from google cloud storage file to google BigQuery table as one batch

func (*BigQuery) CreateDataset

func (bq *BigQuery) CreateDataset(dataset string) error

Create google BigQuery Dataset if doesn't exist

func (*BigQuery) CreateTable

func (bq *BigQuery) CreateTable(tableSchema *schema.Table) error

Create google BigQuery table from schema.Table

func (*BigQuery) GetTableSchema

func (bq *BigQuery) GetTableSchema(tableName string) (*schema.Table, error)

Return google BigQuery table representation(name, columns with types) as schema.Table

func (*BigQuery) Insert

func (bq *BigQuery) Insert(schema *schema.Table, valuesMap map[string]interface{}) error

Insert provided object in BigQuery in stream mode

func (*BigQuery) PatchTableSchema

func (bq *BigQuery) PatchTableSchema(patchSchema *schema.Table) error

Add schema.Table columns to google BigQuery table

func (*BigQuery) Test added in v1.7.0

func (bq *BigQuery) Test() error

func (*BigQuery) UpdatePrimaryKey added in v1.15.6

func (bq *BigQuery) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error

type ClickHouse

type ClickHouse struct {
	// contains filtered or unexported fields
}

ClickHouse is adapter for creating,patching (schema or table), inserting data to clickhouse

func NewClickHouse

func NewClickHouse(ctx context.Context, connectionString, database, cluster string, tlsConfig map[string]string,
	tableStatementFactory *TableStatementFactory, nullableFields map[string]bool) (*ClickHouse, error)

NewClickHouse return configured ClickHouse adapter instance

func (*ClickHouse) Close

func (ch *ClickHouse) Close() error

Close underlying sql.DB

func (*ClickHouse) CreateDB

func (ch *ClickHouse) CreateDB(dbName string) error

CreateDB create database instance if doesn't exist

func (*ClickHouse) CreateTable

func (ch *ClickHouse) CreateTable(tableSchema *schema.Table) error

CreateTable create database table with name,columns provided in schema.Table representation New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not

func (*ClickHouse) GetTableSchema

func (ch *ClickHouse) GetTableSchema(tableName string) (*schema.Table, error)

GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct

func (*ClickHouse) Insert

func (ch *ClickHouse) Insert(schema *schema.Table, valuesMap map[string]interface{}) error

Insert provided object in ClickHouse in stream mode

func (*ClickHouse) InsertInTransaction

func (ch *ClickHouse) InsertInTransaction(wrappedTx *Transaction, schema *schema.Table, valuesMap map[string]interface{}) error

Insert provided object in ClickHouse in transaction

func (ClickHouse) Name

func (ClickHouse) Name() string

func (*ClickHouse) OpenTx

func (ch *ClickHouse) OpenTx() (*Transaction, error)

OpenTx open underline sql transaction and return wrapped instance

func (*ClickHouse) PatchTableSchema

func (ch *ClickHouse) PatchTableSchema(patchSchema *schema.Table) error

PatchTableSchema add new columns(from provided schema.Table) to existing table drop and create distributed table

func (*ClickHouse) UpdatePrimaryKey added in v1.15.6

func (ch *ClickHouse) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error

type ClickHouseConfig

type ClickHouseConfig struct {
	Dsns     []string          `mapstructure:"dsns" json:"dsns,omitempty" yaml:"dsns,omitempty"`
	Database string            `mapstructure:"db" json:"db,omitempty" yaml:"db,omitempty"`
	Tls      map[string]string `mapstructure:"tls" json:"tls,omitempty" yaml:"tls,omitempty"`
	Cluster  string            `mapstructure:"cluster" json:"cluster,omitempty" yaml:"cluster,omitempty"`
	Engine   *EngineConfig     `mapstructure:"engine" json:"engine,omitempty" yaml:"engine,omitempty"`
}

ClickHouseConfig dto for deserialized clickhouse config

func (*ClickHouseConfig) Validate

func (chc *ClickHouseConfig) Validate() error

Validate required fields in ClickHouseConfig

type DataSourceConfig

type DataSourceConfig struct {
	Host       string            `mapstructure:"host" json:"host,omitempty" yaml:"host,omitempty"`
	Port       int               `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"`
	Db         string            `mapstructure:"db" json:"db,omitempty" yaml:"db,omitempty"`
	Schema     string            `mapstructure:"schema" json:"schema,omitempty" yaml:"schema,omitempty"`
	Username   string            `mapstructure:"username" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string            `mapstructure:"password" json:"password,omitempty" yaml:"password,omitempty"`
	Parameters map[string]string `mapstructure:"parameters" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)

func (*DataSourceConfig) Validate

func (dsc *DataSourceConfig) Validate() error

Validate required fields in DataSourceConfig

type EngineConfig

type EngineConfig struct {
	RawStatement    string        `mapstructure:"raw_statement" json:"raw_statement,omitempty" yaml:"raw_statement,omitempty"`
	NullableFields  []string      `mapstructure:"nullable_fields" json:"nullable_fields,omitempty" yaml:"nullable_fields,omitempty"`
	PartitionFields []FieldConfig `mapstructure:"partition_fields" json:"partition_fields,omitempty" yaml:"partition_fields,omitempty"`
	OrderFields     []FieldConfig `mapstructure:"order_fields" json:"order_fields,omitempty" yaml:"order_fields,omitempty"`
	PrimaryKeys     []string      `mapstructure:"primary_keys" json:"primary_keys,omitempty" yaml:"primary_keys,omitempty"`
}

EngineConfig dto for deserialized clickhouse engine config

type FieldConfig

type FieldConfig struct {
	Function string `mapstructure:"function" json:"function,omitempty" yaml:"function,omitempty"`
	Field    string `mapstructure:"field" json:"field,omitempty" yaml:"field,omitempty"`
}

FieldConfig dto for deserialized clickhouse engine fields

type GoogleCloudStorage

type GoogleCloudStorage struct {
	// contains filtered or unexported fields
}

func NewGoogleCloudStorage

func NewGoogleCloudStorage(ctx context.Context, config *GoogleConfig) (*GoogleCloudStorage, error)

func (*GoogleCloudStorage) Close

func (gcs *GoogleCloudStorage) Close() error

func (*GoogleCloudStorage) DeleteObject

func (gcs *GoogleCloudStorage) DeleteObject(key string) error

Delete object from google cloud storage bucket

func (*GoogleCloudStorage) GetObject

func (gcs *GoogleCloudStorage) GetObject(key string) ([]byte, error)

Get object from google cloud storage bucket

func (*GoogleCloudStorage) ListBucket

func (gcs *GoogleCloudStorage) ListBucket(prefix string) ([]string, error)

Return google cloud storage bucket file names filtered by prefix

func (*GoogleCloudStorage) UploadBytes

func (gcs *GoogleCloudStorage) UploadBytes(fileName string, fileBytes []byte) error

Create named file on google cloud storage with payload

type GoogleConfig

type GoogleConfig struct {
	Bucket  string      `mapstructure:"gcs_bucket" json:"gcs_bucket,omitempty" yaml:"gcs_bucket,omitempty"`
	Project string      `mapstructure:"bq_project" json:"bq_project,omitempty" yaml:"bq_project,omitempty"`
	Dataset string      `mapstructure:"bq_dataset" json:"bq_dataset,omitempty" yaml:"bq_dataset,omitempty"`
	KeyFile interface{} `mapstructure:"key_file" json:"key_file,omitempty" yaml:"key_file,omitempty"`
	// contains filtered or unexported fields
}

func (*GoogleConfig) Validate

func (gc *GoogleConfig) Validate(streamMode bool) error

type Postgres

type Postgres struct {
	// contains filtered or unexported fields
}

Postgres is adapter for creating,patching (schema or table), inserting data to postgres

func NewPostgres

func NewPostgres(ctx context.Context, config *DataSourceConfig) (*Postgres, error)

NewPostgres return configured Postgres adapter instance

func (*Postgres) Close

func (p *Postgres) Close() error

Close underlying sql.DB

func (*Postgres) CreateDbSchema

func (p *Postgres) CreateDbSchema(dbSchemaName string) error

CreateDbSchema create database schema instance if doesn't exist

func (*Postgres) CreateTable

func (p *Postgres) CreateTable(tableSchema *schema.Table) error

CreateTable create database table with name,columns provided in schema.Table representation

func (*Postgres) GetTableSchema

func (p *Postgres) GetTableSchema(tableName string) (*schema.Table, error)

GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct

func (*Postgres) Insert

func (p *Postgres) Insert(schema *schema.Table, valuesMap map[string]interface{}) error

Insert provided object in postgres

func (*Postgres) InsertInTransaction

func (p *Postgres) InsertInTransaction(wrappedTx *Transaction, table *schema.Table, valuesMap map[string]interface{}) error

func (Postgres) Name

func (Postgres) Name() string

func (*Postgres) OpenTx

func (p *Postgres) OpenTx() (*Transaction, error)

OpenTx open underline sql transaction and return wrapped instance

func (*Postgres) PatchTableSchema

func (p *Postgres) PatchTableSchema(patchSchema *schema.Table) error

PatchTableSchema add new columns(from provided schema.Table) to existing table

func (*Postgres) TablesList

func (p *Postgres) TablesList() ([]string, error)

TablesList return slice of postgres table names

func (*Postgres) UpdatePrimaryKey added in v1.15.6

func (p *Postgres) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error

type S3

type S3 struct {
	// contains filtered or unexported fields
}

func NewS3

func NewS3(s3Config *S3Config) (*S3, error)

func (*S3) Close

func (a *S3) Close() error

func (*S3) DeleteObject

func (a *S3) DeleteObject(key string) error

Delete object from s3 bucket by key

func (*S3) GetObject

func (a *S3) GetObject(key string) ([]byte, error)

Return s3 file by key

func (*S3) ListBucket

func (a *S3) ListBucket(fileNamePrefix string) ([]string, error)

Return s3 bucket file keys filtered by file name prefix

func (*S3) UploadBytes

func (a *S3) UploadBytes(fileName string, fileBytes []byte) error

Create named file on s3 with payload

type S3Config

type S3Config struct {
	AccessKeyID string `mapstructure:"access_key_id" json:"access_key_id,omitempty" yaml:"access_key_id,omitempty"`
	SecretKey   string `mapstructure:"secret_access_key" json:"secret_access_key,omitempty" yaml:"secret_access_key,omitempty"`
	Bucket      string `mapstructure:"bucket" json:"bucket,omitempty" yaml:"bucket,omitempty"`
	Region      string `mapstructure:"region" json:"region,omitempty" yaml:"region,omitempty"`
	Endpoint    string `mapstructure:"endpoint" json:"endpoint,omitempty" yaml:"endpoint,omitempty"`
	Folder      string `mapstructure:"folder" json:"folder,omitempty" yaml:"folder,omitempty"`
}

func (*S3Config) Validate

func (s3c *S3Config) Validate() error

type Snowflake

type Snowflake struct {
	// contains filtered or unexported fields
}

Snowflake is adapter for creating,patching (schema or table), inserting data to snowflake

func NewSnowflake

func NewSnowflake(ctx context.Context, config *SnowflakeConfig, s3Config *S3Config) (*Snowflake, error)

NewSnowflake return configured Snowflake adapter instance

func (*Snowflake) Close

func (s *Snowflake) Close() (multiErr error)

Close underlying sql.DB

func (*Snowflake) Copy

func (s *Snowflake) Copy(wrappedTx *Transaction, fileKey, header, tableName string) error

Copy transfer data from s3 to Snowflake by passing COPY request to Snowflake in provided wrapped transaction

func (*Snowflake) CreateDbSchema

func (s *Snowflake) CreateDbSchema(dbSchemaName string) error

CreateDbSchema create database schema instance if doesn't exist

func (*Snowflake) CreateTable

func (s *Snowflake) CreateTable(tableSchema *schema.Table) error

CreateTable create database table with name,columns provided in schema.Table representation

func (*Snowflake) GetTableSchema

func (s *Snowflake) GetTableSchema(tableName string) (*schema.Table, error)

GetTableSchema return table (name,columns with name and types) representation wrapped in schema.Table struct

func (*Snowflake) Insert

func (s *Snowflake) Insert(schema *schema.Table, valuesMap map[string]interface{}) error

Insert provided object in snowflake

func (*Snowflake) InsertInTransaction

func (s *Snowflake) InsertInTransaction(wrappedTx *Transaction, schema *schema.Table, valuesMap map[string]interface{}) error

func (Snowflake) Name

func (Snowflake) Name() string

func (*Snowflake) OpenTx

func (s *Snowflake) OpenTx() (*Transaction, error)

OpenTx open underline sql transaction and return wrapped instance

func (*Snowflake) PatchTableSchema

func (s *Snowflake) PatchTableSchema(patchSchema *schema.Table) error

PatchTableSchema add new columns(from provided schema.Table) to existing table

func (*Snowflake) UpdatePrimaryKey added in v1.15.6

func (s *Snowflake) UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error

type SnowflakeConfig

type SnowflakeConfig struct {
	Account    string             `mapstructure:"account" json:"account,omitempty" yaml:"account,omitempty"`
	Port       int                `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"`
	Db         string             `mapstructure:"db" json:"db,omitempty" yaml:"db,omitempty"`
	Schema     string             `mapstructure:"schema" json:"schema,omitempty" yaml:"schema,omitempty"`
	Username   string             `mapstructure:"username" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string             `mapstructure:"password" json:"password,omitempty" yaml:"password,omitempty"`
	Warehouse  string             `mapstructure:"warehouse" json:"warehouse,omitempty" yaml:"warehouse,omitempty"`
	Stage      string             `mapstructure:"stage" json:"stage,omitempty" yaml:"stage,omitempty"`
	Parameters map[string]*string `mapstructure:"parameters" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

SnowflakeConfig dto for deserialized datasource config for Snowflake

func (*SnowflakeConfig) Validate

func (sc *SnowflakeConfig) Validate() error

Validate required fields in SnowflakeConfig

type Stage

type Stage interface {
	io.Closer
	UploadBytes(fileName string, fileBytes []byte) error
	ListBucket(prefix string) ([]string, error)
	GetObject(name string) ([]byte, error)
	DeleteObject(key string) error
}

type TableManager

type TableManager interface {
	GetTableSchema(tableName string) (*schema.Table, error)
	CreateTable(schemaToCreate *schema.Table) error
	PatchTableSchema(schemaToAdd *schema.Table) error
	UpdatePrimaryKey(patchTableSchema *schema.Table, patchConstraint *schema.PKFieldsPatch) error
}

type TableStatementFactory

type TableStatementFactory struct {
	// contains filtered or unexported fields
}

TableStatementFactory is used for creating CREATE TABLE statements depends on config

func NewTableStatementFactory

func NewTableStatementFactory(config *ClickHouseConfig) (*TableStatementFactory, error)

func (TableStatementFactory) CreateTableStatement

func (tsf TableStatementFactory) CreateTableStatement(tableName, columnsClause string) string

CreateTableStatement return clickhouse DDL for creating table statement

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction is sql transaction wrapper. Used for handling and log errors with db type (postgres, redshift, clickhouse or snowflake) on Commit() and Rollback() calls Use DirectCommit() if you need not to swallow an error on commit

func (*Transaction) Commit

func (t *Transaction) Commit()

func (*Transaction) DirectCommit

func (t *Transaction) DirectCommit() error

func (*Transaction) Rollback

func (t *Transaction) Rollback()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL