common

package module
v0.0.0-...-67ab311 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2026 License: AGPL-3.0 Imports: 40 Imported by: 4

Documentation

Index

Constants

View Source
const (
	VERSION = "1.8.0"

	ENV_LOG_LEVEL                   = "BEMIDB_LOG_LEVEL"
	ENV_DISABLE_ANONYMOUS_ANALYTICS = "BEMIDB_DISABLE_ANONYMOUS_ANALYTICS"

	ENV_CATALOG_DATABASE_URL = "CATALOG_DATABASE_URL"

	ENV_AWS_REGION            = "AWS_REGION"
	ENV_AWS_S3_ENDPOINT       = "AWS_S3_ENDPOINT"
	ENV_AWS_S3_BUCKET         = "AWS_S3_BUCKET"
	ENV_AWS_ACCESS_KEY_ID     = "AWS_ACCESS_KEY_ID"
	ENV_AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"

	DEFAULT_LOG_LEVEL       = "INFO"
	DEFAULT_AWS_S3_ENDPOINT = "s3.amazonaws.com"
)
View Source
const (
	TEMP_TABLE_SUFFIX_SYNCING  = "-bemidb-syncing"
	TEMP_TABLE_SUFFIX_DELETING = "-bemidb-deleting"
)
View Source
const (
	IcebergColumnTypeBoolean   IcebergColumnType = "boolean"
	IcebergColumnTypeString    IcebergColumnType = "string"
	IcebergColumnTypeInteger   IcebergColumnType = "int"
	IcebergColumnTypeDecimal   IcebergColumnType = "decimal"
	IcebergColumnTypeLong      IcebergColumnType = "long"
	IcebergColumnTypeFloat     IcebergColumnType = "float"
	IcebergColumnTypeDouble    IcebergColumnType = "double"
	IcebergColumnTypeDate      IcebergColumnType = "date"
	IcebergColumnTypeTime      IcebergColumnType = "time"
	IcebergColumnTypeTimeTz    IcebergColumnType = "timetz"
	IcebergColumnTypeTimestamp IcebergColumnType = "timestamp"
	IcebergColumnTypeBinary    IcebergColumnType = "binary"

	IcebergLogicalColumnTypeInterval    IcebergLogicalColumnType = "interval"
	IcebergLogicalColumnTypeBpchar      IcebergLogicalColumnType = "bpchar"
	IcebergLogicalColumnTypePoint       IcebergLogicalColumnType = "point"
	IcebergLogicalColumnTypeJson        IcebergLogicalColumnType = "json"
	IcebergLogicalColumnTypeUserDefined IcebergLogicalColumnType = "user_defined"

	BEMIDB_NULL_STRING = "BEMIDB_NULL"

	PARQUET_NAN                    = 0 // DuckDB crashes on NaN, libc++abi: terminating due to uncaught exception of type duckdb::InvalidConfigurationException: {"exception_type":"Invalid Configuration","exception_message":"Column float4_column lower bound deserialization failed: Failed to deserialize blob ” of size 0, attempting to produce value of type 'FLOAT'"}
	PARQUET_MAX_DECIMAL_PRECISION  = 38
	PARQUET_FALLBACK_DECIMAL_SCALE = 6
	PARQUET_NESTED_FIELD_ID_PREFIX = 1000

	EMBEDDING_COLUMN_NAME   = "_bemi_embedding"
	EMBEDDING_COLUMN_LENGTH = "1536"
)
View Source
const (
	MAX_LOAD_BATCH_SIZE   = 1024 * 1024 * 1024 // 1 GB
	MAX_PARQUET_FILE_SIZE = 100 * 1024 * 1024  // 100 MB
)
View Source
const (
	LOG_LEVEL_TRACE = "TRACE"
	LOG_LEVEL_DEBUG = "DEBUG"
	LOG_LEVEL_WARN  = "WARN"
	LOG_LEVEL_INFO  = "INFO"
	LOG_LEVEL_ERROR = "ERROR"
)
View Source
const (
	ICEBERG_MANIFEST_STATUS_EXISTING = 0
	ICEBERG_MANIFEST_STATUS_ADDED    = 1
	ICEBERG_MANIFEST_STATUS_DELETED  = 2

	ICEBERG_MANIFEST_LIST_OPERATION_REPLACE   = "replace"
	ICEBERG_MANIFEST_LIST_OPERATION_APPEND    = "append"
	ICEBERG_MANIFEST_LIST_OPERATION_OVERWRITE = "overwrite"
	ICEBERG_MANIFEST_LIST_OPERATION_DELETE    = "delete"

	ICEBERG_METADATA_INITIAL_FILE_NAME = "v1.metadata.json"
)
View Source
const (
	MANIFEST_SCHEMA = `` /* 5075-byte string literal not displayed */

	MANIFEST_LIST_SCHEMA = `` /* 1909-byte string literal not displayed */

)
View Source
const (
	CONNECTION_TIMEOUT = 30 * time.Second
)
View Source
const DEFAULT_CAPPED_BUFFER_SIZE = 32 * 1024 * 1024 // 32 MB in memory
View Source
const (
	DUCKDB_YES = "YES"
)
View Source
const (
	UUID_LENGTH = 36
)

Variables

View Source
var SYNCER_DUCKDB_BOOT_QUERIES = []string{
	"SET memory_limit='2GB'",
	"SET threads=2",
}

Functions

func Base64ToHex

func Base64ToHex(s string) string

func Float64ToString

func Float64ToString(i float64) string

func HandleUnexpectedPanic

func HandleUnexpectedPanic(config *CommonConfig)

func HexToString

func HexToString(s string) (string, error)

func Int64ToString

func Int64ToString(i int64) string

func IntToString

func IntToString(i int) string

func IsLocalHost

func IsLocalHost(host string) bool

func LogDebug

func LogDebug(config *CommonConfig, message ...interface{})

func LogError

func LogError(config *CommonConfig, message ...interface{})

func LogInfo

func LogInfo(config *CommonConfig, message ...interface{})

func LogTrace

func LogTrace(config *CommonConfig, message ...interface{})

func LogWarn

func LogWarn(config *CommonConfig, message ...interface{})

func Panic

func Panic(config *CommonConfig, message string)

func PanicIfError

func PanicIfError(config *CommonConfig, err error)

func PrintErrorAndExit

func PrintErrorAndExit(config *CommonConfig, message string)

func SendAnonymousAnalytics

func SendAnonymousAnalytics(config *CommonConfig, command string, name string)

func StringDateToTime

func StringDateToTime(str string) time.Time

func StringMsToUtcTime

func StringMsToUtcTime(s string) time.Time

func StringToFloat64

func StringToFloat64(s string) float64

func StringToInt

func StringToInt(s string) int

func StringToInt64

func StringToInt64(s string) int64

func TimeToUtcStringMs

func TimeToUtcStringMs(t time.Time) string

Types

type AnonymousAnalyticsData

type AnonymousAnalyticsData struct {
	Command  string `json:"command"`
	OsName   string `json:"osName"`
	Version  string `json:"version"`
	S3Bucket string `json:"s3Bucket"`
	Name     string `json:"name"`
}

type AnonymousErrorData

type AnonymousErrorData struct {
	Command    string `json:"command"`
	OsName     string `json:"osName"`
	Version    string `json:"version"`
	Error      string `json:"error"`
	StackTrace string `json:"stackTrace"`
	S3Bucket   string `json:"s3Bucket"`
}

type AwsConfig

type AwsConfig struct {
	Region          string
	S3Endpoint      string // optional
	S3Bucket        string
	AccessKeyId     string
	SecretAccessKey string
}

type CappedBuffer

type CappedBuffer struct {
	Config       *CommonConfig
	MaxSizeBytes int
	// contains filtered or unexported fields
}

func NewCappedBuffer

func NewCappedBuffer(config *CommonConfig, maxSizeBytes int) *CappedBuffer

func (*CappedBuffer) Close

func (buf *CappedBuffer) Close() error

func (*CappedBuffer) Read

func (buf *CappedBuffer) Read(payload []byte) (readBytes int, err error)

Implements io.Reader

func (*CappedBuffer) Write

func (buf *CappedBuffer) Write(payload []byte) (writtenBytes int, err error)

Implements io.Writer

type CatalogTableColumn

type CatalogTableColumn struct {
	Name     string `json:"name"`
	Type     string `json:"type"`
	Position int    `json:"position"`
	List     bool   `json:"list"`
	Required bool   `json:"required"`
}

func (CatalogTableColumn) ToMetadataFieldMap

func (tableColumn CatalogTableColumn) ToMetadataFieldMap() map[string]interface{}

func (CatalogTableColumn) ToSql

func (tableColumn CatalogTableColumn) ToSql() string

type CommonConfig

type CommonConfig struct {
	Aws                       AwsConfig
	LogLevel                  string
	CatalogDatabaseUrl        string
	DisableAnonymousAnalytics bool
}

type CursorValue

type CursorValue struct {
	ColumnName           string
	StringValue          string
	OverrideAppendedRows bool // Override rows that have the same value as this cursor value (if new rows are added with the same value, they will be included in the next sync)
}

type DuckdbClient

type DuckdbClient struct {
	Config      *CommonConfig
	Db          *sql.DB
	Connector   *duckdb.Connector
	BootQueries []string
}

func NewDuckdbClient

func NewDuckdbClient(config *CommonConfig, bootQueries ...[]string) *DuckdbClient

func (*DuckdbClient) Appender

func (client *DuckdbClient) Appender(schema string, table string) (*duckdb.Appender, error)

func (*DuckdbClient) Close

func (client *DuckdbClient) Close()

func (*DuckdbClient) ExecContext

func (client *DuckdbClient) ExecContext(ctx context.Context, query string, args ...map[string]string) (sql.Result, error)

func (*DuckdbClient) ExecTransactionContext

func (client *DuckdbClient) ExecTransactionContext(ctx context.Context, queries []string, args ...[]map[string]string) error

func (*DuckdbClient) PrepareContext

func (client *DuckdbClient) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

func (*DuckdbClient) QueryContext

func (client *DuckdbClient) QueryContext(ctx context.Context, query string) (*sql.Rows, error)

func (*DuckdbClient) QueryRowContext

func (client *DuckdbClient) QueryRowContext(ctx context.Context, query string, args ...map[string]string) *sql.Row

func (*DuckdbClient) RecreateDb

func (client *DuckdbClient) RecreateDb()

type DuckdbSchemaColumn

type DuckdbSchemaColumn struct {
	ColumnName        string
	DataType          string
	IsNullable        string
	OrdinalPosition   string
	NumericPrecision  string
	NumericScale      string
	DatetimePrecision string
	Config            *CommonConfig
}

func (*DuckdbSchemaColumn) ToIcebergSchemaColumn

func (column *DuckdbSchemaColumn) ToIcebergSchemaColumn() *IcebergSchemaColumn

type IcebergCatalog

type IcebergCatalog struct {
	Config *CommonConfig
}

func NewIcebergCatalog

func NewIcebergCatalog(config *CommonConfig) *IcebergCatalog

func (*IcebergCatalog) CreateMaterializedView

func (catalog *IcebergCatalog) CreateMaterializedView(icebergSchemaTable IcebergSchemaTable, definition string, ifNotExists bool) error

func (*IcebergCatalog) CreateTable

func (catalog *IcebergCatalog) CreateTable(icebergSchemaTable IcebergSchemaTable, metadataLocation string, icebergSchemaColumns []*IcebergSchemaColumn)

func (*IcebergCatalog) DropMaterializedView

func (catalog *IcebergCatalog) DropMaterializedView(icebergSchemaTable IcebergSchemaTable, missingOk bool) error

func (*IcebergCatalog) DropTable

func (catalog *IcebergCatalog) DropTable(icebergSchemaTable IcebergSchemaTable)

func (*IcebergCatalog) MaterializedView

func (catalog *IcebergCatalog) MaterializedView(icebergSchemaTable IcebergSchemaTable) (IcebergMaterializedView, error)

func (*IcebergCatalog) MaterializedViews

func (catalog *IcebergCatalog) MaterializedViews() ([]IcebergMaterializedView, error)

func (*IcebergCatalog) MetadataFileS3Path

func (catalog *IcebergCatalog) MetadataFileS3Path(icebergSchemaTable IcebergSchemaTable) string

func (*IcebergCatalog) RenameMaterializedView

func (catalog *IcebergCatalog) RenameMaterializedView(icebergSchemaTable IcebergSchemaTable, newName string, missingOk bool) error

func (*IcebergCatalog) RenameTable

func (catalog *IcebergCatalog) RenameTable(oldIcebergSchemaTable IcebergSchemaTable, newIcebergTableName string)

func (*IcebergCatalog) SchemaTableNames

func (catalog *IcebergCatalog) SchemaTableNames(schemaName string) Set[string]

func (*IcebergCatalog) SchemaTables

func (catalog *IcebergCatalog) SchemaTables() (Set[IcebergSchemaTable], error)

func (*IcebergCatalog) TableColumns

func (catalog *IcebergCatalog) TableColumns(icebergSchemaTable IcebergSchemaTable) ([]CatalogTableColumn, error)

func (*IcebergCatalog) TableS3Path

func (catalog *IcebergCatalog) TableS3Path(icebergTableName IcebergSchemaTable) string

type IcebergColumnType

type IcebergColumnType string

type IcebergLogicalColumnType

type IcebergLogicalColumnType string

type IcebergMaterializedView

type IcebergMaterializedView struct {
	Schema     string
	Table      string
	Definition string
}

func (IcebergMaterializedView) ToIcebergSchemaTable

func (view IcebergMaterializedView) ToIcebergSchemaTable() IcebergSchemaTable

type IcebergSchemaColumn

type IcebergSchemaColumn struct {
	Config              *CommonConfig
	ColumnName          string
	ColumnType          IcebergColumnType
	LogicalColumnType   IcebergLogicalColumnType
	Position            int
	NumericPrecision    int
	NumericScale        int
	DatetimePrecision   int
	IsList              bool
	IsRequired          bool
	IsPartOfUniqueIndex bool
}

func (*IcebergSchemaColumn) CatalogTableColumn

func (col *IcebergSchemaColumn) CatalogTableColumn() CatalogTableColumn

func (*IcebergSchemaColumn) DuckdbType

func (col *IcebergSchemaColumn) DuckdbType() string

func (*IcebergSchemaColumn) DuckdbValueFromCsv

func (col *IcebergSchemaColumn) DuckdbValueFromCsv(value string) interface{}

func (*IcebergSchemaColumn) DuckdbValueFromJson

func (col *IcebergSchemaColumn) DuckdbValueFromJson(value any) interface{}

func (*IcebergSchemaColumn) NormalizedColumnName

func (col *IcebergSchemaColumn) NormalizedColumnName() string

func (*IcebergSchemaColumn) NormalizedPrecision

func (col *IcebergSchemaColumn) NormalizedPrecision() int

func (*IcebergSchemaColumn) NormalizedScale

func (col *IcebergSchemaColumn) NormalizedScale() int

func (*IcebergSchemaColumn) QuotedColumnName

func (col *IcebergSchemaColumn) QuotedColumnName() string

type IcebergSchemaTable

type IcebergSchemaTable struct {
	Schema string
	Table  string
}

func (IcebergSchemaTable) String

func (schemaTable IcebergSchemaTable) String() string

func (IcebergSchemaTable) ToArg

func (schemaTable IcebergSchemaTable) ToArg() string

type IcebergTable

type IcebergTable struct {
	Config             *CommonConfig
	IcebergSchemaTable IcebergSchemaTable
	IcebergCatalog     *IcebergCatalog
	StorageS3          *StorageS3
	DuckdbClient       *DuckdbClient
}

func NewIcebergTable

func NewIcebergTable(config *CommonConfig, storageS3 *StorageS3, duckdbClient *DuckdbClient, icebergSchemaTable IcebergSchemaTable) *IcebergTable

func (*IcebergTable) Create

func (table *IcebergTable) Create(tableS3Path string, icebergSchemaColumns []*IcebergSchemaColumn)

func (*IcebergTable) DropIfExists

func (table *IcebergTable) DropIfExists()

func (*IcebergTable) GenerateTableS3Path

func (table *IcebergTable) GenerateTableS3Path() string

func (*IcebergTable) LastCursorValue

func (table *IcebergTable) LastCursorValue(columnName string) CursorValue

func (*IcebergTable) MetadataFileS3Path

func (table *IcebergTable) MetadataFileS3Path() string

func (*IcebergTable) Rename

func (table *IcebergTable) Rename(newName string)

func (*IcebergTable) ReplaceWith

func (table *IcebergTable) ReplaceWith(callbackFunc func(syncingIcebergTable *IcebergTable))

func (*IcebergTable) String

func (table *IcebergTable) String() string

type IcebergTableWriter

type IcebergTableWriter struct {
	Config               *CommonConfig
	StorageS3            *StorageS3
	DuckdbClient         *DuckdbClient
	IcebergTable         *IcebergTable
	IcebergSchemaColumns []*IcebergSchemaColumn
	CompressionFactor    int64
}

func NewIcebergTableWriter

func NewIcebergTableWriter(
	config *CommonConfig,
	storageS3 *StorageS3,
	duckdbClient *DuckdbClient,
	icebergTable *IcebergTable,
	icebergSchemaColumns []*IcebergSchemaColumn,
	compressionFactor int64,
) *IcebergTableWriter

func (*IcebergTableWriter) AppendFromCsvCappedBuffer

func (writer *IcebergTableWriter) AppendFromCsvCappedBuffer(cursorValue CursorValue, cappedBuffer *CappedBuffer)

func (*IcebergTableWriter) AppendFromJsonCappedBuffer

func (writer *IcebergTableWriter) AppendFromJsonCappedBuffer(cursorValue CursorValue, cappedBuffer *CappedBuffer)

func (*IcebergTableWriter) DeleteFromJsonCappedBuffer

func (writer *IcebergTableWriter) DeleteFromJsonCappedBuffer(cappedBuffer *CappedBuffer)

func (*IcebergTableWriter) InsertFromCsvCappedBuffer

func (writer *IcebergTableWriter) InsertFromCsvCappedBuffer(cappedBuffer *CappedBuffer)

func (*IcebergTableWriter) InsertFromJsonCappedBuffer

func (writer *IcebergTableWriter) InsertFromJsonCappedBuffer(cappedBuffer *CappedBuffer)

func (*IcebergTableWriter) InsertFromQuery

func (writer *IcebergTableWriter) InsertFromQuery(query string) error

func (*IcebergTableWriter) UniqueIndexColumnNames

func (writer *IcebergTableWriter) UniqueIndexColumnNames() []string

func (*IcebergTableWriter) UpdateFromJsonCappedBuffer

func (writer *IcebergTableWriter) UpdateFromJsonCappedBuffer(cappedBuffer *CappedBuffer)

type JsonQueueReader

type JsonQueueReader struct {
	Reader io.Reader
}

func NewJsonQueueReader

func NewJsonQueueReader(r io.Reader) *JsonQueueReader

func (*JsonQueueReader) Close

func (r *JsonQueueReader) Close() error

func (*JsonQueueReader) Read

func (r *JsonQueueReader) Read(value interface{}) (int, error)

type JsonQueueWriter

type JsonQueueWriter struct {
	Writer io.Writer
}

func NewJsonQueueWriter

func NewJsonQueueWriter(w io.Writer) *JsonQueueWriter

func (*JsonQueueWriter) Close

func (w *JsonQueueWriter) Close() error

func (*JsonQueueWriter) Write

func (w *JsonQueueWriter) Write(value interface{}) error

type LogLevel

type LogLevel string

type ManifestFile

type ManifestFile struct {
	Key                string
	Path               string // With s3://bucket/ prefix
	Size               int64
	TotalRecordCount   int64
	TotalDataFileCount int32
	RecordsDeleted     bool
}

type ManifestListFile

type ManifestListFile struct {
	SequenceNumber int
	SnapshotId     int64
	TimestampMs    int64
	Key            string
	Path           string // With s3://bucket/ prefix
	Operation      string
	TotalFilesSize int64
	TotalDataFiles int64
	TotalRecords   int64
}

type ManifestListItem

type ManifestListItem struct {
	SequenceNumber int
	ManifestFile   ManifestFile
}

type ManifestListSequenceStats

type ManifestListSequenceStats struct {
	TotalFilesSize int64
	TotalDataFiles int64
	TotalRecords   int64
}

type ManifestListsJson

type ManifestListsJson struct {
	Snapshots []struct {
		SequenceNumber int    `json:"sequence-number"`
		SnapshotId     int64  `json:"snapshot-id"`
		TimestampMs    int64  `json:"timestamp-ms"`
		Path           string `json:"manifest-list"`
		Summary        struct {
			Operation      string `json:"operation"`
			TotalDataFiles string `json:"total-data-files"`
			TotalFilesSize string `json:"total-files-size"`
			TotalRecords   string `json:"total-records"`
		} `json:"summary"`
	} `json:"snapshots"`
}

type MetadataFile

type MetadataFile struct {
	Version int64
	Key     string
}

type MetadataJson

type MetadataJson struct {
	Schemas []struct {
		Fields []struct {
			ID       int         `json:"id"`
			Name     string      `json:"name"`
			Type     interface{} `json:"type"`
			Required bool        `json:"required"`
		} `json:"fields"`
	} `json:"schemas"`
}

type ParquetFile

type ParquetFile struct {
	Key         string
	Path        string // With s3://bucket/ prefix
	Size        int64
	RecordCount int64
	Stats       ParquetFileStats
}

type ParquetFileStats

type ParquetFileStats struct {
	ColumnSizes     map[int]int64
	ValueCounts     map[int]int64
	NullValueCounts map[int]int64
	LowerBounds     map[int][]byte
	UpperBounds     map[int][]byte
	SplitOffsets    []int64
}

type PostgresClient

type PostgresClient struct {
	Conn   *pgx.Conn
	Config *CommonConfig
}

func NewPostgresClient

func NewPostgresClient(config *CommonConfig, databaseUrl string) *PostgresClient

func (*PostgresClient) Close

func (client *PostgresClient) Close()

func (*PostgresClient) Copy

func (client *PostgresClient) Copy(writer io.Writer, query string) (pgconn.CommandTag, error)

func (*PostgresClient) Exec

func (client *PostgresClient) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)

func (*PostgresClient) Query

func (client *PostgresClient) Query(ctx context.Context, query string, args ...any) (pgx.Rows, error)

func (*PostgresClient) QueryRow

func (client *PostgresClient) QueryRow(ctx context.Context, query string, args ...any) pgx.Row

type S3Client

type S3Client struct {
	Config *CommonConfig
	S3     *s3.Client
}

func NewS3Client

func NewS3Client(Config *CommonConfig) *S3Client

func (*S3Client) BucketS3Prefix

func (s3Client *S3Client) BucketS3Prefix() string

func (*S3Client) DeleteObject

func (s3Client *S3Client) DeleteObject(fileKey string)

func (*S3Client) DeleteObjects

func (s3Client *S3Client) DeleteObjects(fileKeys []*string)

func (*S3Client) GetObject

func (s3Client *S3Client) GetObject(fileKey string) *s3.GetObjectOutput

func (*S3Client) HeadObject

func (s3Client *S3Client) HeadObject(fileKey string) *s3.HeadObjectOutput

func (*S3Client) ListObjects

func (s3Client *S3Client) ListObjects(prefix string) *s3.ListObjectsV2Output

func (*S3Client) ObjectKey

func (s3Client *S3Client) ObjectKey(objectPath string) string

s3://bucket/some/path -> some/path

func (*S3Client) UploadObject

func (s3Client *S3Client) UploadObject(fileKey string, file *os.File)

type Set

type Set[T comparable] map[T]struct{}

func NewSet

func NewSet[T comparable]() Set[T]

func (Set[T]) Add

func (set Set[T]) Add(item T) Set[T]

func (Set[T]) AddAll

func (set Set[T]) AddAll(items []T) Set[T]

func (Set[T]) Contains

func (set Set[T]) Contains(item T) bool

func (Set[T]) IsEmpty

func (set Set[T]) IsEmpty() bool

func (Set[T]) Remove

func (set Set[T]) Remove(item T) Set[T]

func (Set[T]) Reset

func (set Set[T]) Reset()

func (Set[T]) Values

func (set Set[T]) Values() []T

type StorageS3

type StorageS3 struct {
	S3Client     *S3Client
	Config       *CommonConfig
	StorageUtils *StorageUtils
}

func NewStorageS3

func NewStorageS3(Config *CommonConfig) *StorageS3

func (*StorageS3) CreateManifest

func (storage *StorageS3) CreateManifest(metadataS3Path string, parquetFilesSortedAsc []ParquetFile) (manifestFile ManifestFile)

func (*StorageS3) CreateManifestList

func (storage *StorageS3) CreateManifestList(metadataS3Path string, totalDataFileSize int64, manifestListItemsSortedDesc []ManifestListItem) (manifestListFile ManifestListFile)

func (*StorageS3) CreateMetadata

func (storage *StorageS3) CreateMetadata(metadataS3Path string, icebergSchemaColumns []*IcebergSchemaColumn, manifestListFilesSortedAsc []ManifestListFile) (metadataFile MetadataFile)

func (*StorageS3) CreateParquet

func (storage *StorageS3) CreateParquet(dataS3Path string, duckdbClient *DuckdbClient, tempDuckdbTableName string, icebergSchemaColumns []*IcebergSchemaColumn, rowCount int64) ParquetFile

func (*StorageS3) DeleteTableFiles

func (storage *StorageS3) DeleteTableFiles(tableS3Path string)

func (*StorageS3) LastManifestListFile

func (storage *StorageS3) LastManifestListFile(metadataS3Path string) ManifestListFile

func (*StorageS3) ManifestListItems

func (storage *StorageS3) ManifestListItems(manifestListFile ManifestListFile) []ManifestListItem

func (*StorageS3) ParquetFiles

func (storage *StorageS3) ParquetFiles(manifestFile ManifestFile, icebergSchemaColumns []*IcebergSchemaColumn) []ParquetFile

type StorageUtils

type StorageUtils struct {
	Config *CommonConfig
}

func NewStorageUtils

func NewStorageUtils(config *CommonConfig) *StorageUtils

func (*StorageUtils) ParseLastManifestListFile

func (utils *StorageUtils) ParseLastManifestListFile(bucketS3Prefix string, metadataContent []byte) ManifestListFile

func (*StorageUtils) ParseManifestListItems

func (utils *StorageUtils) ParseManifestListItems(bucketS3Prefix string, manifestListFileContent []byte) []ManifestListItem

func (*StorageUtils) ParseParquetFiles

func (utils *StorageUtils) ParseParquetFiles(manifestContent []byte) []ParquetFile

func (*StorageUtils) ReadParquetStats

func (utils *StorageUtils) ReadParquetStats(fileReader source.ParquetFile, icebergSchemaColumns []*IcebergSchemaColumn) (parquetStats ParquetFileStats)

func (*StorageUtils) WriteManifestFile

func (utils *StorageUtils) WriteManifestFile(filePath string, parquetFilesSortedAsc []ParquetFile) (int64, error)

func (*StorageUtils) WriteManifestListFile

func (utils *StorageUtils) WriteManifestListFile(filePath string, totalDataFileSize int64, manifestListItemsSortedDesc []ManifestListItem) (ManifestListFile, error)

func (*StorageUtils) WriteMetadataFile

func (utils *StorageUtils) WriteMetadataFile(s3TablePath string, filePath string, icebergSchemaColumns []*IcebergSchemaColumn, manifestListFilesSortedAsc []ManifestListFile) (err error)

func (*StorageUtils) WriteParquetFile

func (utils *StorageUtils) WriteParquetFile(fileS3Path string, duckdbClient *DuckdbClient, tempDuckdbTableName string, icebergSchemaColumns []*IcebergSchemaColumn)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL