Documentation
¶
Index ¶
- Constants
- Variables
- func Base64ToHex(s string) string
- func Float64ToString(i float64) string
- func HandleUnexpectedPanic(config *CommonConfig)
- func HexToString(s string) (string, error)
- func Int64ToString(i int64) string
- func IntToString(i int) string
- func IsLocalHost(host string) bool
- func LogDebug(config *CommonConfig, message ...interface{})
- func LogError(config *CommonConfig, message ...interface{})
- func LogInfo(config *CommonConfig, message ...interface{})
- func LogTrace(config *CommonConfig, message ...interface{})
- func LogWarn(config *CommonConfig, message ...interface{})
- func Panic(config *CommonConfig, message string)
- func PanicIfError(config *CommonConfig, err error)
- func PrintErrorAndExit(config *CommonConfig, message string)
- func SendAnonymousAnalytics(config *CommonConfig, command string, name string)
- func StringDateToTime(str string) time.Time
- func StringMsToUtcTime(s string) time.Time
- func StringToFloat64(s string) float64
- func StringToInt(s string) int
- func StringToInt64(s string) int64
- func TimeToUtcStringMs(t time.Time) string
- type AnonymousAnalyticsData
- type AnonymousErrorData
- type AwsConfig
- type CappedBuffer
- type CatalogTableColumn
- type CommonConfig
- type CursorValue
- type DuckdbClient
- func (client *DuckdbClient) Appender(schema string, table string) (*duckdb.Appender, error)
- func (client *DuckdbClient) Close()
- func (client *DuckdbClient) ExecContext(ctx context.Context, query string, args ...map[string]string) (sql.Result, error)
- func (client *DuckdbClient) ExecTransactionContext(ctx context.Context, queries []string, args ...[]map[string]string) error
- func (client *DuckdbClient) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
- func (client *DuckdbClient) QueryContext(ctx context.Context, query string) (*sql.Rows, error)
- func (client *DuckdbClient) QueryRowContext(ctx context.Context, query string, args ...map[string]string) *sql.Row
- func (client *DuckdbClient) RecreateDb()
- type DuckdbSchemaColumn
- type IcebergCatalog
- func (catalog *IcebergCatalog) CreateMaterializedView(icebergSchemaTable IcebergSchemaTable, definition string, ifNotExists bool) error
- func (catalog *IcebergCatalog) CreateTable(icebergSchemaTable IcebergSchemaTable, metadataLocation string, ...)
- func (catalog *IcebergCatalog) DropMaterializedView(icebergSchemaTable IcebergSchemaTable, missingOk bool) error
- func (catalog *IcebergCatalog) DropTable(icebergSchemaTable IcebergSchemaTable)
- func (catalog *IcebergCatalog) MaterializedView(icebergSchemaTable IcebergSchemaTable) (IcebergMaterializedView, error)
- func (catalog *IcebergCatalog) MaterializedViews() ([]IcebergMaterializedView, error)
- func (catalog *IcebergCatalog) MetadataFileS3Path(icebergSchemaTable IcebergSchemaTable) string
- func (catalog *IcebergCatalog) RenameMaterializedView(icebergSchemaTable IcebergSchemaTable, newName string, missingOk bool) error
- func (catalog *IcebergCatalog) RenameTable(oldIcebergSchemaTable IcebergSchemaTable, newIcebergTableName string)
- func (catalog *IcebergCatalog) SchemaTableNames(schemaName string) Set[string]
- func (catalog *IcebergCatalog) SchemaTables() (Set[IcebergSchemaTable], error)
- func (catalog *IcebergCatalog) TableColumns(icebergSchemaTable IcebergSchemaTable) ([]CatalogTableColumn, error)
- func (catalog *IcebergCatalog) TableS3Path(icebergTableName IcebergSchemaTable) string
- type IcebergColumnType
- type IcebergLogicalColumnType
- type IcebergMaterializedView
- type IcebergSchemaColumn
- func (col *IcebergSchemaColumn) CatalogTableColumn() CatalogTableColumn
- func (col *IcebergSchemaColumn) DuckdbType() string
- func (col *IcebergSchemaColumn) DuckdbValueFromCsv(value string) interface{}
- func (col *IcebergSchemaColumn) DuckdbValueFromJson(value any) interface{}
- func (col *IcebergSchemaColumn) NormalizedColumnName() string
- func (col *IcebergSchemaColumn) NormalizedPrecision() int
- func (col *IcebergSchemaColumn) NormalizedScale() int
- func (col *IcebergSchemaColumn) QuotedColumnName() string
- type IcebergSchemaTable
- type IcebergTable
- func (table *IcebergTable) Create(tableS3Path string, icebergSchemaColumns []*IcebergSchemaColumn)
- func (table *IcebergTable) DropIfExists()
- func (table *IcebergTable) GenerateTableS3Path() string
- func (table *IcebergTable) LastCursorValue(columnName string) CursorValue
- func (table *IcebergTable) MetadataFileS3Path() string
- func (table *IcebergTable) Rename(newName string)
- func (table *IcebergTable) ReplaceWith(callbackFunc func(syncingIcebergTable *IcebergTable))
- func (table *IcebergTable) String() string
- type IcebergTableWriter
- func (writer *IcebergTableWriter) AppendFromCsvCappedBuffer(cursorValue CursorValue, cappedBuffer *CappedBuffer)
- func (writer *IcebergTableWriter) AppendFromJsonCappedBuffer(cursorValue CursorValue, cappedBuffer *CappedBuffer)
- func (writer *IcebergTableWriter) DeleteFromJsonCappedBuffer(cappedBuffer *CappedBuffer)
- func (writer *IcebergTableWriter) InsertFromCsvCappedBuffer(cappedBuffer *CappedBuffer)
- func (writer *IcebergTableWriter) InsertFromJsonCappedBuffer(cappedBuffer *CappedBuffer)
- func (writer *IcebergTableWriter) InsertFromQuery(query string) error
- func (writer *IcebergTableWriter) UniqueIndexColumnNames() []string
- func (writer *IcebergTableWriter) UpdateFromJsonCappedBuffer(cappedBuffer *CappedBuffer)
- type JsonQueueReader
- type JsonQueueWriter
- type LogLevel
- type ManifestFile
- type ManifestListFile
- type ManifestListItem
- type ManifestListSequenceStats
- type ManifestListsJson
- type MetadataFile
- type MetadataJson
- type ParquetFile
- type ParquetFileStats
- type PostgresClient
- func (client *PostgresClient) Close()
- func (client *PostgresClient) Copy(writer io.Writer, query string) (pgconn.CommandTag, error)
- func (client *PostgresClient) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)
- func (client *PostgresClient) Query(ctx context.Context, query string, args ...any) (pgx.Rows, error)
- func (client *PostgresClient) QueryRow(ctx context.Context, query string, args ...any) pgx.Row
- type S3Client
- func (s3Client *S3Client) BucketS3Prefix() string
- func (s3Client *S3Client) DeleteObject(fileKey string)
- func (s3Client *S3Client) DeleteObjects(fileKeys []*string)
- func (s3Client *S3Client) GetObject(fileKey string) *s3.GetObjectOutput
- func (s3Client *S3Client) HeadObject(fileKey string) *s3.HeadObjectOutput
- func (s3Client *S3Client) ListObjects(prefix string) *s3.ListObjectsV2Output
- func (s3Client *S3Client) ObjectKey(objectPath string) string
- func (s3Client *S3Client) UploadObject(fileKey string, file *os.File)
- type Set
- type StorageS3
- func (storage *StorageS3) CreateManifest(metadataS3Path string, parquetFilesSortedAsc []ParquetFile) (manifestFile ManifestFile)
- func (storage *StorageS3) CreateManifestList(metadataS3Path string, totalDataFileSize int64, ...) (manifestListFile ManifestListFile)
- func (storage *StorageS3) CreateMetadata(metadataS3Path string, icebergSchemaColumns []*IcebergSchemaColumn, ...) (metadataFile MetadataFile)
- func (storage *StorageS3) CreateParquet(dataS3Path string, duckdbClient *DuckdbClient, tempDuckdbTableName string, ...) ParquetFile
- func (storage *StorageS3) DeleteTableFiles(tableS3Path string)
- func (storage *StorageS3) LastManifestListFile(metadataS3Path string) ManifestListFile
- func (storage *StorageS3) ManifestListItems(manifestListFile ManifestListFile) []ManifestListItem
- func (storage *StorageS3) ParquetFiles(manifestFile ManifestFile, icebergSchemaColumns []*IcebergSchemaColumn) []ParquetFile
- type StorageUtils
- func (utils *StorageUtils) ParseLastManifestListFile(bucketS3Prefix string, metadataContent []byte) ManifestListFile
- func (utils *StorageUtils) ParseManifestListItems(bucketS3Prefix string, manifestListFileContent []byte) []ManifestListItem
- func (utils *StorageUtils) ParseParquetFiles(manifestContent []byte) []ParquetFile
- func (utils *StorageUtils) ReadParquetStats(fileReader source.ParquetFile, icebergSchemaColumns []*IcebergSchemaColumn) (parquetStats ParquetFileStats)
- func (utils *StorageUtils) WriteManifestFile(filePath string, parquetFilesSortedAsc []ParquetFile) (int64, error)
- func (utils *StorageUtils) WriteManifestListFile(filePath string, totalDataFileSize int64, ...) (ManifestListFile, error)
- func (utils *StorageUtils) WriteMetadataFile(s3TablePath string, filePath string, ...) (err error)
- func (utils *StorageUtils) WriteParquetFile(fileS3Path string, duckdbClient *DuckdbClient, tempDuckdbTableName string, ...)
Constants ¶
View Source
const ( VERSION = "1.8.0" ENV_LOG_LEVEL = "BEMIDB_LOG_LEVEL" ENV_DISABLE_ANONYMOUS_ANALYTICS = "BEMIDB_DISABLE_ANONYMOUS_ANALYTICS" ENV_CATALOG_DATABASE_URL = "CATALOG_DATABASE_URL" ENV_AWS_REGION = "AWS_REGION" ENV_AWS_S3_ENDPOINT = "AWS_S3_ENDPOINT" ENV_AWS_S3_BUCKET = "AWS_S3_BUCKET" ENV_AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" ENV_AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" DEFAULT_LOG_LEVEL = "INFO" DEFAULT_AWS_S3_ENDPOINT = "s3.amazonaws.com" )
View Source
const ( TEMP_TABLE_SUFFIX_SYNCING = "-bemidb-syncing" TEMP_TABLE_SUFFIX_DELETING = "-bemidb-deleting" )
View Source
const ( IcebergColumnTypeBoolean IcebergColumnType = "boolean" IcebergColumnTypeString IcebergColumnType = "string" IcebergColumnTypeInteger IcebergColumnType = "int" IcebergColumnTypeDecimal IcebergColumnType = "decimal" IcebergColumnTypeLong IcebergColumnType = "long" IcebergColumnTypeFloat IcebergColumnType = "float" IcebergColumnTypeDouble IcebergColumnType = "double" IcebergColumnTypeDate IcebergColumnType = "date" IcebergColumnTypeTime IcebergColumnType = "time" IcebergColumnTypeTimeTz IcebergColumnType = "timetz" IcebergColumnTypeTimestamp IcebergColumnType = "timestamp" IcebergColumnTypeBinary IcebergColumnType = "binary" IcebergLogicalColumnTypeInterval IcebergLogicalColumnType = "interval" IcebergLogicalColumnTypeBpchar IcebergLogicalColumnType = "bpchar" IcebergLogicalColumnTypePoint IcebergLogicalColumnType = "point" IcebergLogicalColumnTypeJson IcebergLogicalColumnType = "json" IcebergLogicalColumnTypeUserDefined IcebergLogicalColumnType = "user_defined" BEMIDB_NULL_STRING = "BEMIDB_NULL" PARQUET_NAN = 0 // DuckDB crashes on NaN, libc++abi: terminating due to uncaught exception of type duckdb::InvalidConfigurationException: {"exception_type":"Invalid Configuration","exception_message":"Column float4_column lower bound deserialization failed: Failed to deserialize blob ” of size 0, attempting to produce value of type 'FLOAT'"} PARQUET_MAX_DECIMAL_PRECISION = 38 PARQUET_FALLBACK_DECIMAL_SCALE = 6 PARQUET_NESTED_FIELD_ID_PREFIX = 1000 EMBEDDING_COLUMN_NAME = "_bemi_embedding" EMBEDDING_COLUMN_LENGTH = "1536" )
View Source
const ( MAX_LOAD_BATCH_SIZE = 1024 * 1024 * 1024 // 1 GB MAX_PARQUET_FILE_SIZE = 100 * 1024 * 1024 // 100 MB )
View Source
const ( LOG_LEVEL_TRACE = "TRACE" LOG_LEVEL_DEBUG = "DEBUG" LOG_LEVEL_WARN = "WARN" LOG_LEVEL_INFO = "INFO" LOG_LEVEL_ERROR = "ERROR" )
View Source
const ( ICEBERG_MANIFEST_STATUS_EXISTING = 0 ICEBERG_MANIFEST_STATUS_ADDED = 1 ICEBERG_MANIFEST_STATUS_DELETED = 2 ICEBERG_MANIFEST_LIST_OPERATION_REPLACE = "replace" ICEBERG_MANIFEST_LIST_OPERATION_APPEND = "append" ICEBERG_MANIFEST_LIST_OPERATION_OVERWRITE = "overwrite" ICEBERG_MANIFEST_LIST_OPERATION_DELETE = "delete" ICEBERG_METADATA_INITIAL_FILE_NAME = "v1.metadata.json" )
View Source
const ( MANIFEST_SCHEMA = `` /* 5075-byte string literal not displayed */ MANIFEST_LIST_SCHEMA = `` /* 1909-byte string literal not displayed */ )
View Source
const (
CONNECTION_TIMEOUT = 30 * time.Second
)
View Source
const DEFAULT_CAPPED_BUFFER_SIZE = 32 * 1024 * 1024 // 32 MB in memory
View Source
const (
DUCKDB_YES = "YES"
)
View Source
const (
UUID_LENGTH = 36
)
Variables ¶
View Source
var LOG_LEVELS = []string{ LOG_LEVEL_TRACE, LOG_LEVEL_DEBUG, LOG_LEVEL_WARN, LOG_LEVEL_INFO, LOG_LEVEL_ERROR, }
View Source
var SYNCER_DUCKDB_BOOT_QUERIES = []string{
"SET memory_limit='2GB'",
"SET threads=2",
}
Functions ¶
func Base64ToHex ¶
func Float64ToString ¶
func HandleUnexpectedPanic ¶
func HandleUnexpectedPanic(config *CommonConfig)
func HexToString ¶
func Int64ToString ¶
func IntToString ¶
func IsLocalHost ¶
func LogDebug ¶
func LogDebug(config *CommonConfig, message ...interface{})
func LogError ¶
func LogError(config *CommonConfig, message ...interface{})
func LogInfo ¶
func LogInfo(config *CommonConfig, message ...interface{})
func LogTrace ¶
func LogTrace(config *CommonConfig, message ...interface{})
func LogWarn ¶
func LogWarn(config *CommonConfig, message ...interface{})
func Panic ¶
func Panic(config *CommonConfig, message string)
func PanicIfError ¶
func PanicIfError(config *CommonConfig, err error)
func PrintErrorAndExit ¶
func PrintErrorAndExit(config *CommonConfig, message string)
func SendAnonymousAnalytics ¶
func SendAnonymousAnalytics(config *CommonConfig, command string, name string)
func StringDateToTime ¶
func StringMsToUtcTime ¶
func StringToFloat64 ¶
func StringToInt ¶
func StringToInt64 ¶
func TimeToUtcStringMs ¶
Types ¶
type AnonymousAnalyticsData ¶
type AnonymousErrorData ¶
type CappedBuffer ¶
type CappedBuffer struct {
Config *CommonConfig
MaxSizeBytes int
// contains filtered or unexported fields
}
func NewCappedBuffer ¶
func NewCappedBuffer(config *CommonConfig, maxSizeBytes int) *CappedBuffer
func (*CappedBuffer) Close ¶
func (buf *CappedBuffer) Close() error
type CatalogTableColumn ¶
type CatalogTableColumn struct {
Name string `json:"name"`
Type string `json:"type"`
Position int `json:"position"`
List bool `json:"list"`
Required bool `json:"required"`
}
func (CatalogTableColumn) ToMetadataFieldMap ¶
func (tableColumn CatalogTableColumn) ToMetadataFieldMap() map[string]interface{}
func (CatalogTableColumn) ToSql ¶
func (tableColumn CatalogTableColumn) ToSql() string
type CommonConfig ¶
type CursorValue ¶
type DuckdbClient ¶
type DuckdbClient struct {
Config *CommonConfig
Db *sql.DB
Connector *duckdb.Connector
BootQueries []string
}
func NewDuckdbClient ¶
func NewDuckdbClient(config *CommonConfig, bootQueries ...[]string) *DuckdbClient
func (*DuckdbClient) Close ¶
func (client *DuckdbClient) Close()
func (*DuckdbClient) ExecContext ¶
func (*DuckdbClient) ExecTransactionContext ¶
func (*DuckdbClient) PrepareContext ¶
func (*DuckdbClient) QueryContext ¶
func (*DuckdbClient) QueryRowContext ¶
func (*DuckdbClient) RecreateDb ¶
func (client *DuckdbClient) RecreateDb()
type DuckdbSchemaColumn ¶
type DuckdbSchemaColumn struct {
ColumnName string
DataType string
IsNullable string
OrdinalPosition string
NumericPrecision string
NumericScale string
DatetimePrecision string
Config *CommonConfig
}
func (*DuckdbSchemaColumn) ToIcebergSchemaColumn ¶
func (column *DuckdbSchemaColumn) ToIcebergSchemaColumn() *IcebergSchemaColumn
type IcebergCatalog ¶
type IcebergCatalog struct {
Config *CommonConfig
}
func NewIcebergCatalog ¶
func NewIcebergCatalog(config *CommonConfig) *IcebergCatalog
func (*IcebergCatalog) CreateMaterializedView ¶
func (catalog *IcebergCatalog) CreateMaterializedView(icebergSchemaTable IcebergSchemaTable, definition string, ifNotExists bool) error
func (*IcebergCatalog) CreateTable ¶
func (catalog *IcebergCatalog) CreateTable(icebergSchemaTable IcebergSchemaTable, metadataLocation string, icebergSchemaColumns []*IcebergSchemaColumn)
func (*IcebergCatalog) DropMaterializedView ¶
func (catalog *IcebergCatalog) DropMaterializedView(icebergSchemaTable IcebergSchemaTable, missingOk bool) error
func (*IcebergCatalog) DropTable ¶
func (catalog *IcebergCatalog) DropTable(icebergSchemaTable IcebergSchemaTable)
func (*IcebergCatalog) MaterializedView ¶
func (catalog *IcebergCatalog) MaterializedView(icebergSchemaTable IcebergSchemaTable) (IcebergMaterializedView, error)
func (*IcebergCatalog) MaterializedViews ¶
func (catalog *IcebergCatalog) MaterializedViews() ([]IcebergMaterializedView, error)
func (*IcebergCatalog) MetadataFileS3Path ¶
func (catalog *IcebergCatalog) MetadataFileS3Path(icebergSchemaTable IcebergSchemaTable) string
func (*IcebergCatalog) RenameMaterializedView ¶
func (catalog *IcebergCatalog) RenameMaterializedView(icebergSchemaTable IcebergSchemaTable, newName string, missingOk bool) error
func (*IcebergCatalog) RenameTable ¶
func (catalog *IcebergCatalog) RenameTable(oldIcebergSchemaTable IcebergSchemaTable, newIcebergTableName string)
func (*IcebergCatalog) SchemaTableNames ¶
func (catalog *IcebergCatalog) SchemaTableNames(schemaName string) Set[string]
func (*IcebergCatalog) SchemaTables ¶
func (catalog *IcebergCatalog) SchemaTables() (Set[IcebergSchemaTable], error)
func (*IcebergCatalog) TableColumns ¶
func (catalog *IcebergCatalog) TableColumns(icebergSchemaTable IcebergSchemaTable) ([]CatalogTableColumn, error)
func (*IcebergCatalog) TableS3Path ¶
func (catalog *IcebergCatalog) TableS3Path(icebergTableName IcebergSchemaTable) string
type IcebergColumnType ¶
type IcebergColumnType string
type IcebergLogicalColumnType ¶
type IcebergLogicalColumnType string
type IcebergMaterializedView ¶
func (IcebergMaterializedView) ToIcebergSchemaTable ¶
func (view IcebergMaterializedView) ToIcebergSchemaTable() IcebergSchemaTable
type IcebergSchemaColumn ¶
type IcebergSchemaColumn struct {
Config *CommonConfig
ColumnName string
ColumnType IcebergColumnType
LogicalColumnType IcebergLogicalColumnType
Position int
NumericPrecision int
NumericScale int
DatetimePrecision int
IsList bool
IsRequired bool
IsPartOfUniqueIndex bool
}
func (*IcebergSchemaColumn) CatalogTableColumn ¶
func (col *IcebergSchemaColumn) CatalogTableColumn() CatalogTableColumn
func (*IcebergSchemaColumn) DuckdbType ¶
func (col *IcebergSchemaColumn) DuckdbType() string
func (*IcebergSchemaColumn) DuckdbValueFromCsv ¶
func (col *IcebergSchemaColumn) DuckdbValueFromCsv(value string) interface{}
func (*IcebergSchemaColumn) DuckdbValueFromJson ¶
func (col *IcebergSchemaColumn) DuckdbValueFromJson(value any) interface{}
func (*IcebergSchemaColumn) NormalizedColumnName ¶
func (col *IcebergSchemaColumn) NormalizedColumnName() string
func (*IcebergSchemaColumn) NormalizedPrecision ¶
func (col *IcebergSchemaColumn) NormalizedPrecision() int
func (*IcebergSchemaColumn) NormalizedScale ¶
func (col *IcebergSchemaColumn) NormalizedScale() int
func (*IcebergSchemaColumn) QuotedColumnName ¶
func (col *IcebergSchemaColumn) QuotedColumnName() string
type IcebergSchemaTable ¶
func (IcebergSchemaTable) String ¶
func (schemaTable IcebergSchemaTable) String() string
func (IcebergSchemaTable) ToArg ¶
func (schemaTable IcebergSchemaTable) ToArg() string
type IcebergTable ¶
type IcebergTable struct {
Config *CommonConfig
IcebergSchemaTable IcebergSchemaTable
IcebergCatalog *IcebergCatalog
StorageS3 *StorageS3
DuckdbClient *DuckdbClient
}
func NewIcebergTable ¶
func NewIcebergTable(config *CommonConfig, storageS3 *StorageS3, duckdbClient *DuckdbClient, icebergSchemaTable IcebergSchemaTable) *IcebergTable
func (*IcebergTable) Create ¶
func (table *IcebergTable) Create(tableS3Path string, icebergSchemaColumns []*IcebergSchemaColumn)
func (*IcebergTable) DropIfExists ¶
func (table *IcebergTable) DropIfExists()
func (*IcebergTable) GenerateTableS3Path ¶
func (table *IcebergTable) GenerateTableS3Path() string
func (*IcebergTable) LastCursorValue ¶
func (table *IcebergTable) LastCursorValue(columnName string) CursorValue
func (*IcebergTable) MetadataFileS3Path ¶
func (table *IcebergTable) MetadataFileS3Path() string
func (*IcebergTable) Rename ¶
func (table *IcebergTable) Rename(newName string)
func (*IcebergTable) ReplaceWith ¶
func (table *IcebergTable) ReplaceWith(callbackFunc func(syncingIcebergTable *IcebergTable))
func (*IcebergTable) String ¶
func (table *IcebergTable) String() string
type IcebergTableWriter ¶
type IcebergTableWriter struct {
Config *CommonConfig
StorageS3 *StorageS3
DuckdbClient *DuckdbClient
IcebergTable *IcebergTable
IcebergSchemaColumns []*IcebergSchemaColumn
CompressionFactor int64
}
func NewIcebergTableWriter ¶
func NewIcebergTableWriter( config *CommonConfig, storageS3 *StorageS3, duckdbClient *DuckdbClient, icebergTable *IcebergTable, icebergSchemaColumns []*IcebergSchemaColumn, compressionFactor int64, ) *IcebergTableWriter
func (*IcebergTableWriter) AppendFromCsvCappedBuffer ¶
func (writer *IcebergTableWriter) AppendFromCsvCappedBuffer(cursorValue CursorValue, cappedBuffer *CappedBuffer)
func (*IcebergTableWriter) AppendFromJsonCappedBuffer ¶
func (writer *IcebergTableWriter) AppendFromJsonCappedBuffer(cursorValue CursorValue, cappedBuffer *CappedBuffer)
func (*IcebergTableWriter) DeleteFromJsonCappedBuffer ¶
func (writer *IcebergTableWriter) DeleteFromJsonCappedBuffer(cappedBuffer *CappedBuffer)
func (*IcebergTableWriter) InsertFromCsvCappedBuffer ¶
func (writer *IcebergTableWriter) InsertFromCsvCappedBuffer(cappedBuffer *CappedBuffer)
func (*IcebergTableWriter) InsertFromJsonCappedBuffer ¶
func (writer *IcebergTableWriter) InsertFromJsonCappedBuffer(cappedBuffer *CappedBuffer)
func (*IcebergTableWriter) InsertFromQuery ¶
func (writer *IcebergTableWriter) InsertFromQuery(query string) error
func (*IcebergTableWriter) UniqueIndexColumnNames ¶
func (writer *IcebergTableWriter) UniqueIndexColumnNames() []string
func (*IcebergTableWriter) UpdateFromJsonCappedBuffer ¶
func (writer *IcebergTableWriter) UpdateFromJsonCappedBuffer(cappedBuffer *CappedBuffer)
type JsonQueueReader ¶
func NewJsonQueueReader ¶
func NewJsonQueueReader(r io.Reader) *JsonQueueReader
func (*JsonQueueReader) Close ¶
func (r *JsonQueueReader) Close() error
func (*JsonQueueReader) Read ¶
func (r *JsonQueueReader) Read(value interface{}) (int, error)
type JsonQueueWriter ¶
func NewJsonQueueWriter ¶
func NewJsonQueueWriter(w io.Writer) *JsonQueueWriter
func (*JsonQueueWriter) Close ¶
func (w *JsonQueueWriter) Close() error
func (*JsonQueueWriter) Write ¶
func (w *JsonQueueWriter) Write(value interface{}) error
type ManifestFile ¶
type ManifestListFile ¶
type ManifestListItem ¶
type ManifestListItem struct {
SequenceNumber int
ManifestFile ManifestFile
}
type ManifestListsJson ¶
type ManifestListsJson struct {
Snapshots []struct {
SequenceNumber int `json:"sequence-number"`
SnapshotId int64 `json:"snapshot-id"`
TimestampMs int64 `json:"timestamp-ms"`
Path string `json:"manifest-list"`
Summary struct {
Operation string `json:"operation"`
TotalDataFiles string `json:"total-data-files"`
TotalFilesSize string `json:"total-files-size"`
TotalRecords string `json:"total-records"`
} `json:"summary"`
} `json:"snapshots"`
}
type MetadataFile ¶
type MetadataJson ¶
type ParquetFile ¶
type ParquetFile struct {
Key string
Path string // With s3://bucket/ prefix
Size int64
RecordCount int64
Stats ParquetFileStats
}
type ParquetFileStats ¶
type PostgresClient ¶
type PostgresClient struct {
Conn *pgx.Conn
Config *CommonConfig
}
func NewPostgresClient ¶
func NewPostgresClient(config *CommonConfig, databaseUrl string) *PostgresClient
func (*PostgresClient) Close ¶
func (client *PostgresClient) Close()
func (*PostgresClient) Copy ¶
func (client *PostgresClient) Copy(writer io.Writer, query string) (pgconn.CommandTag, error)
func (*PostgresClient) Exec ¶
func (client *PostgresClient) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)
type S3Client ¶
type S3Client struct {
Config *CommonConfig
S3 *s3.Client
}
func NewS3Client ¶
func NewS3Client(Config *CommonConfig) *S3Client
func (*S3Client) BucketS3Prefix ¶
func (*S3Client) DeleteObject ¶
func (*S3Client) DeleteObjects ¶
func (*S3Client) GetObject ¶
func (s3Client *S3Client) GetObject(fileKey string) *s3.GetObjectOutput
func (*S3Client) HeadObject ¶
func (s3Client *S3Client) HeadObject(fileKey string) *s3.HeadObjectOutput
func (*S3Client) ListObjects ¶
func (s3Client *S3Client) ListObjects(prefix string) *s3.ListObjectsV2Output
type StorageS3 ¶
type StorageS3 struct {
S3Client *S3Client
Config *CommonConfig
StorageUtils *StorageUtils
}
func NewStorageS3 ¶
func NewStorageS3(Config *CommonConfig) *StorageS3
func (*StorageS3) CreateManifest ¶
func (storage *StorageS3) CreateManifest(metadataS3Path string, parquetFilesSortedAsc []ParquetFile) (manifestFile ManifestFile)
func (*StorageS3) CreateManifestList ¶
func (storage *StorageS3) CreateManifestList(metadataS3Path string, totalDataFileSize int64, manifestListItemsSortedDesc []ManifestListItem) (manifestListFile ManifestListFile)
func (*StorageS3) CreateMetadata ¶
func (storage *StorageS3) CreateMetadata(metadataS3Path string, icebergSchemaColumns []*IcebergSchemaColumn, manifestListFilesSortedAsc []ManifestListFile) (metadataFile MetadataFile)
func (*StorageS3) CreateParquet ¶
func (storage *StorageS3) CreateParquet(dataS3Path string, duckdbClient *DuckdbClient, tempDuckdbTableName string, icebergSchemaColumns []*IcebergSchemaColumn, rowCount int64) ParquetFile
func (*StorageS3) DeleteTableFiles ¶
func (*StorageS3) LastManifestListFile ¶
func (storage *StorageS3) LastManifestListFile(metadataS3Path string) ManifestListFile
func (*StorageS3) ManifestListItems ¶
func (storage *StorageS3) ManifestListItems(manifestListFile ManifestListFile) []ManifestListItem
func (*StorageS3) ParquetFiles ¶
func (storage *StorageS3) ParquetFiles(manifestFile ManifestFile, icebergSchemaColumns []*IcebergSchemaColumn) []ParquetFile
type StorageUtils ¶
type StorageUtils struct {
Config *CommonConfig
}
func NewStorageUtils ¶
func NewStorageUtils(config *CommonConfig) *StorageUtils
func (*StorageUtils) ParseLastManifestListFile ¶
func (utils *StorageUtils) ParseLastManifestListFile(bucketS3Prefix string, metadataContent []byte) ManifestListFile
func (*StorageUtils) ParseManifestListItems ¶
func (utils *StorageUtils) ParseManifestListItems(bucketS3Prefix string, manifestListFileContent []byte) []ManifestListItem
func (*StorageUtils) ParseParquetFiles ¶
func (utils *StorageUtils) ParseParquetFiles(manifestContent []byte) []ParquetFile
func (*StorageUtils) ReadParquetStats ¶
func (utils *StorageUtils) ReadParquetStats(fileReader source.ParquetFile, icebergSchemaColumns []*IcebergSchemaColumn) (parquetStats ParquetFileStats)
func (*StorageUtils) WriteManifestFile ¶
func (utils *StorageUtils) WriteManifestFile(filePath string, parquetFilesSortedAsc []ParquetFile) (int64, error)
func (*StorageUtils) WriteManifestListFile ¶
func (utils *StorageUtils) WriteManifestListFile(filePath string, totalDataFileSize int64, manifestListItemsSortedDesc []ManifestListItem) (ManifestListFile, error)
func (*StorageUtils) WriteMetadataFile ¶
func (utils *StorageUtils) WriteMetadataFile(s3TablePath string, filePath string, icebergSchemaColumns []*IcebergSchemaColumn, manifestListFilesSortedAsc []ManifestListFile) (err error)
func (*StorageUtils) WriteParquetFile ¶
func (utils *StorageUtils) WriteParquetFile(fileS3Path string, duckdbClient *DuckdbClient, tempDuckdbTableName string, icebergSchemaColumns []*IcebergSchemaColumn)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.