catalog

package
v0.0.0-...-5db83a0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

README

Catalog Implementations

Integration Testing

The Catalog implementations can be manually tested using the CLI implemented in the cmd/iceberg folder.

REST Catalog

To test the REST catalog implementation, we have a docker configuration for a Minio container and tabluario/iceberg-rest container.

You can spin up the local catalog by going to the dev/ folder and running docker-compose up. You can then follow the steps of the Iceberg Quickstart tutorial, which we've summarized below.

Setup your Iceberg catalog

First launch a pyspark console by running:

docker exec -it spark-iceberg pyspark

Once in the pyspark shell, we create a simple table with a namespace of "demo.nyc" called "taxis":

from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()

Finally, we write another data-frame to the table to add new files:

schema = spark.table("demo.nyc.taxis").schema
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()
Testing with the CLI

Now that we have a table in the catalog which is running. You can use the CLI which is implemented in the cmd/iceberg folder. You will need to set the following environment variables (which can also be found in the docker-compose.yml):

AWS_S3_ENDPOINT=http://localhost:9000
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=password

With those environment variables set you can now run the CLI:

$ go run ./cmd/iceberg list --catalog rest --uri http://localhost:8181
┌──────┐
| IDs  |
| ---- |
| demo |
└──────┘

You can retrieve the schema of the table:

$ go run ./cmd/iceberg schema --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Current Schema, id=0
├──1: vendor_id: optional long
├──2: trip_id: optional long
├──3: trip_distance: optional float
├──4: fare_amount: optional double
└──5: store_and_fwd_flag: optional string

You can get the file list:

$ go run ./cmd/iceberg files --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Snapshots: rest.demo.nyc.taxis
└─┬Snapshot 7004656639550124164, schema 0: s3://warehouse/demo/nyc/taxis/metadata/snap-7004656639550124164-1-0d533cd4-f0c1-45a6-a691-f2be3abe5491.avro
  └─┬Manifest: s3://warehouse/demo/nyc/taxis/metadata/0d533cd4-f0c1-45a6-a691-f2be3abe5491-m0.avro
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00004-24-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00009-29-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00014-34-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    └──Datafile: s3://warehouse/demo/nyc/taxis/data/00019-39-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet

and so on, for the various options available in the CLI.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoSuchTable is returned when a table does not exist in the catalog.
	ErrNoSuchTable            = errors.New("table does not exist")
	ErrNoSuchNamespace        = errors.New("namespace does not exist")
	ErrNamespaceAlreadyExists = errors.New("namespace already exists")
)
View Source
var (
	ErrRESTError            = errors.New("REST error")
	ErrBadRequest           = fmt.Errorf("%w: bad request", ErrRESTError)
	ErrForbidden            = fmt.Errorf("%w: forbidden", ErrRESTError)
	ErrUnauthorized         = fmt.Errorf("%w: unauthorized", ErrRESTError)
	ErrAuthorizationExpired = fmt.Errorf("%w: authorization expired", ErrRESTError)
	ErrServiceUnavailable   = fmt.Errorf("%w: service unavailable", ErrRESTError)
	ErrServerError          = fmt.Errorf("%w: server error", ErrRESTError)
	ErrCommitFailed         = fmt.Errorf("%w: commit failed, refresh and try again", ErrRESTError)
	ErrCommitStateUnknown   = fmt.Errorf("%w: commit failed due to unknown reason", ErrRESTError)
	ErrOAuthError           = fmt.Errorf("%w: oauth error", ErrRESTError)
)

Functions

func GlueDatabaseIdentifier

func GlueDatabaseIdentifier(database string) table.Identifier

GlueDatabaseIdentifier returns a database identifier for a Glue database in the format [database].

func GlueTableIdentifier

func GlueTableIdentifier(database string, tableName string) table.Identifier

GlueTableIdentifier returns a glue table identifier for an iceberg table in the format [database, table].

func NamespaceFromIdent

func NamespaceFromIdent(ident table.Identifier) table.Identifier

func TableNameFromIdent

func TableNameFromIdent(ident table.Identifier) string

func ToRestIdentifier

func ToRestIdentifier(ident ...string) table.Identifier

Types

type Catalog

type Catalog interface {
	// CatalogType returns the type of the catalog.
	CatalogType() CatalogType

	// ListTables returns a list of table identifiers in the catalog, with the returned
	// identifiers containing the information required to load the table via that catalog.
	ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
	// LoadTable loads a table from the catalog and returns a Table with the metadata.
	LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
	// DropTable tells the catalog to drop the table entirely
	DropTable(ctx context.Context, identifier table.Identifier) error
	// RenameTable tells the catalog to rename a given table by the identifiers
	// provided, and then loads and returns the destination table
	RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
	// ListNamespaces returns the list of available namespaces, optionally filtering by a
	// parent namespace
	ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
	// CreateNamespace tells the catalog to create a new namespace with the given properties
	CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
	// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
	DropNamespace(ctx context.Context, namespace table.Identifier) error
	// LoadNamespaceProperties returns the current properties in the catalog for
	// a given namespace
	LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
	// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
	UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
		removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}

Catalog for iceberg table operations like create, drop, load, list and others.

type CatalogType

type CatalogType string
const (
	REST     CatalogType = "rest"
	Hive     CatalogType = "hive"
	Glue     CatalogType = "glue"
	DynamoDB CatalogType = "dynamodb"
	SQL      CatalogType = "sql"
)

type GlueCatalog

type GlueCatalog struct {
	// contains filtered or unexported fields
}

func NewGlueCatalog

func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog

func (*GlueCatalog) CatalogType

func (c *GlueCatalog) CatalogType() CatalogType

func (*GlueCatalog) CreateNamespace

func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error

func (*GlueCatalog) DropNamespace

func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error

func (*GlueCatalog) DropTable

func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error

func (*GlueCatalog) ListNamespaces

func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)

func (*GlueCatalog) ListTables

func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)

ListTables returns a list of iceberg tables in the given Glue database.

The namespace should just contain the Glue database name.

func (*GlueCatalog) LoadNamespaceProperties

func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)

func (*GlueCatalog) LoadTable

func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)

LoadTable loads a table from the catalog table details.

The identifier should contain the Glue database name, then glue table name.

func (*GlueCatalog) RenameTable

func (c *GlueCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)

func (*GlueCatalog) UpdateNamespaceProperties

func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
	removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)

type Option

type Option[T GlueCatalog | RestCatalog] func(*options)

func WithAuthURI

func WithAuthURI(uri *url.URL) Option[RestCatalog]

func WithAwsConfig

func WithAwsConfig(cfg aws.Config) Option[GlueCatalog]

WithAwsConfig sets the AWS configuration for the catalog.

func WithCredential

func WithCredential(cred string) Option[RestCatalog]

func WithMetadataLocation

func WithMetadataLocation(loc string) Option[RestCatalog]

func WithOAuthToken

func WithOAuthToken(token string) Option[RestCatalog]

func WithPrefix

func WithPrefix(prefix string) Option[RestCatalog]

func WithSigV4

func WithSigV4() Option[RestCatalog]

func WithSigV4RegionSvc

func WithSigV4RegionSvc(region, service string) Option[RestCatalog]

func WithTLSConfig

func WithTLSConfig(config *tls.Config) Option[RestCatalog]

func WithWarehouseLocation

func WithWarehouseLocation(loc string) Option[RestCatalog]

type PropertiesUpdateSummary

type PropertiesUpdateSummary struct {
	Removed []string `json:"removed"`
	Updated []string `json:"updated"`
	Missing []string `json:"missing"`
}

type RestCatalog

type RestCatalog struct {
	// contains filtered or unexported fields
}

func NewRestCatalog

func NewRestCatalog(name, uri string, opts ...Option[RestCatalog]) (*RestCatalog, error)

func (*RestCatalog) CatalogType

func (r *RestCatalog) CatalogType() CatalogType

func (*RestCatalog) CreateNamespace

func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error

func (*RestCatalog) DropNamespace

func (r *RestCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error

func (*RestCatalog) DropTable

func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier) error

func (*RestCatalog) ListNamespaces

func (r *RestCatalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)

func (*RestCatalog) ListTables

func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)

func (*RestCatalog) LoadNamespaceProperties

func (r *RestCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)

func (*RestCatalog) LoadTable

func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)

func (*RestCatalog) RenameTable

func (r *RestCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)

func (*RestCatalog) UpdateNamespaceProperties

func (r *RestCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
	removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL