dynamodb

package
v1.0.1-0...-386defc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

README

Spanner migration tool: DynamoDB-to-Spanner Evaluation and Migration

Spanner migration tool (formerly known as HarbourBridge) is a stand-alone open source tool for Cloud Spanner evaluation and migration, using data from an existing database. This README provides details of the tool's DynamoDB capabilities. For general Spanner migration tool information see this README.

Example DynamoDB Usage

Before running Spanner migration tool, make sure that you have set up your AWS credentials/region correctly (set the environment variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION). Spanner migration tool accesses your DynamoDB database via the aws go sdk package. If you use a custom endpoint for dynamodb, you can specify that using the environment variable DYNAMODB_ENDPOINT_OVERRIDE.

The following examples assume a spanner-migration-tool alias has been setup as described in the Installing Spanner migration tool section of the main README.

For example, run

export DYNAMODB_ENDPOINT_OVERRIDE=http://dynamodb.us-west-2.amazonaws.com
spanner-migration-tool schema -source=dynamodb 

Instead of setting the environment variables, you can also pass corresponding source profile connection parameters aws-access-key-id , aws-secret-access-key, aws-region. Custom endpoint can be specified using dydb-endpoint param.

For example, to perform schema conversion, run

spanner-migration-tool schema -source=dynamodb -source-profile="aws-access-key-id=<>,aws-secret-access-key=<>,aws-region=<>"

This will generate a session file with session.json suffix. This file contains schema mapping from source to destination. You will need to specify this file during data migration. You also need to specify a particular Spanner instance and database to use during data migration.

For example, run

spanner-migration-tool data -session=mydb.session.json -source=dynamodb -source-profile="aws-access-key-id=<>,..." -target-profile="instance=my-spanner-instance,,dbName=my-spanner-database-name"

You can also run Spanner migration tool in a schema-and-data mode, where it will perform both schema and data migration. This is useful for quick evaluation when source database size is small.

spanner-migration-tool schema-and-data -source=dynamodb -source-profile="aws-access-key-id=<>,..." -target-profile="instance=my-spanner-instance,..."

Spanner migration tool generates a report file, a schema file, and a bad-data file (if there are bad-data rows). You can control where these files are written by specifying a file prefix. For example,

spanner-migration-tool schema -prefix=mydb. -source=dynamodb -source-profile="aws-access-key-id=<>,..."

will write files mydb.report.txt, mydb.schema.txt, and mydb.dropped.txt. The prefix can also be a directory. For example,

spanner-migration-tool schema -prefix=~/spanner-eval-mydb/ -source=dynamodb -source-profile="aws-access-key-id=<>,..."

would write the files into the directory ~/spanner-eval-mydb/. Note that Spanner migration tool will not create directories as it writes these files.

Spanner migration tool accepts an additional param schema-sample-size for -source-profile for DynamoDB. Due to the schemaless nature of DynamoDB, the tool infers the schema based on a certain amount of sampled data, by default, 100000 rows. If a table has more rows than the default value, we only use 100000 rows for estimating the schema. This flag lets you specify the number of rows to use for inferring schema. The default value is 100,000.

Sample usage:

spanner-migration-tool schema -source=dynamodb -source-profile="schema-sample-size=500000,aws-access-key-id=<>,..."

DynamoDB Streaming Migration Usage

  • DynamoDB Streams will be used for Change Data Capture in streaming migration.
  • If there exists any DynamoDB Stream for a given table, then it must be of StreamViewType NEW_IMAGE or NEW_AND_OLD_IMAGES. If this condition is not followed then this table will not be considered for streaming migration.
Steps
  1. Start the streaming migration. Example usage
spanner-migration-tool schema-and-data -source=dynamodb -source-profile="aws-access-key-id=<>,...,enableStreaming=<>" -target-profile="instance=my-spanner-instance,..."

Valid choices for enableStreaming: yes, no, true, false

Regular Updates: Count of records processed and if the current moment is optimum for switching to Cloud Spanner or not will be updated regularly at an interval of 1 minute.

  1. If you want to switch to Cloud Spanner then stop the writes on the source DynamoDB database and press Ctrl+C. After that remaining unprocessed records within DynamoDB Streams will be processed. Wait for it to get finished.

  2. Switch to Cloud Spanner once the whole migration process is completed.

Schema Conversion

The Spanner migration tool maps DynamoDB types to Spanner types as follows:

DynamoDB Type Spanner Type Notes
Number NUMERIC or STRING defaults to NUMERIC, otherwise, STRING
String STRING
Boolean BOOL
Binary BYTES
Null A nullable column type
List STRING json encoding
Map STRING json encoding
StringSet ARRAY<STRING>
NumberSet ARRAY<NUMERIC or STRING>
BinarySet ARRAY<BYTES>

We discuss these, as well as other limits and notes on schema conversion, in the following sections.

Schema Inference

DynamoDB is a schemaless database: other than a primary index and optional secondary index, column names and types are essentially unconstrained and can vary from one row to the next.

However, many customers use DynamoDB in a consistent, structured way with a fairly well defined set of columns and types. Our Spanner migration tool support for DynamoDB focuses on this use-case, and we construct a Spanner schema by inspecting table data.

For small tables, we inspect all rows of the table. For large tables, scanning the entire table would be extremely expensive and slow, and so we only inspect the first N rows (defined by the flag schema-sample-size) from the table scan. While DynamoDB doesn't return scan results in order, they might not be a truly random sample of rows. However, the alternative of randomly sampling rows would be much more expensive.

Columns with consistent types are assigned Spanner types as detailed below. Columns without a consistent type are mapped to STRING.

Number

In most cases, we map the Number type in DynamoDB to Spanner's Numeric type. However, since the range of Numeric in Cloud Spanner is smaller than the range of Number in DynamoDB, this conversion could result in out of range with potential precision loss. To address this possibility, we try to convert the sample data, and if it consistently fails, we choose STRING type for the column.

Null Data Type

In DynamoDB, a column can have a Null data type that represents an unknown or undefined state. Also, each row defines its own schema for columns (not for primary keys). So columns can be absent in rows.

We treat the above two cases the same as a Null value in Cloud Spanner. The cases that a column contains a Null value or a column is not present is an indication that this column should be nullable.

List and Map

In Cloud Spanner, the most similar type to List and Map is STRUCT, but it is not a valid column type (available for query but not for storage). Therefore, we encode them into a json string.

Occasional Errors

To prevent a few spurious rows from impacting schema construction, we define an error threshold: when building a type for a column, if the percentage of rows with a specific data type is lower than or equal to an extremely low value (0.1%), then we treat those rows as likely errors. Such rows are ignored for schema construction: their type is not considered a candidate type for the column.

Multi-type Columns

For a special scenario, we may get a column that has equal distribution of two data types. E.g., a column has 40% rows in String and 60% rows in Number. If we choose Number as its type, it may lead to 40% data loss in the data conversion. In the migration, we define a conflicting threshold on rows (after removing Null data types and rows that the column is not present). By default, the conflicting threshold is 5% and if the percentages of two or more data types are greater than it, we would consider that the column has conflicting data types. As a safe choice, we define this column as a STRING type in Cloud Spanner.

Data Conversion

A Scan for Entire Table

Data conversion proceeds table by table. For each table, we use the Scan API to read data. Each read has a size limit up to 1MB. By using the returned token, we make a subsequent call to continue retrieving data from the table.

The row result contains the data type and data itself. According to our inferred schema, we will parse the row to a format that Cloud Spanner can support. If the value parsing fails, we would drop the entire row and record it as bad data in the report. If a column does not appear or column has a NULL data type, we would process this as a NULL value in Cloud Spanner.

Documentation

Overview

Copyright 2022 Google LLC

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Package dynamodb handles schema and data migrations from DynamoDB.

Index

Constants

View Source
const (
	ESC = 27
)

Variables

This section is empty.

Functions

func NewDynamoDBStream

func NewDynamoDBStream(client dynamodbiface.DynamoDBAPI, srcTable string) (string, error)

NewDynamoDBStream initializes a new DynamoDB Stream for a table with NEW_AND_OLD_IMAGES StreamViewType. If there exists a stream for a given table then it must be of type NEW_IMAGE or NEW_AND_OLD_IMAGES otherwise streaming changes for this table won't be captured. It returns latest Stream Arn for the table along with any error if encountered.

func ProcessDataRow

func ProcessDataRow(m map[string]*dynamodb.AttributeValue, conv *internal.Conv, tableId string, srcSchema schema.Table, colIds []string, spSchema ddl.CreateTable)

func ProcessRecord

func ProcessRecord(conv *internal.Conv, streamInfo *StreamingInfo, record *dynamodbstreams.Record, srcTable string)

ProcessRecord processes records retrieved from shards. It first converts the data to Spanner data (based on the source and Spanner schemas), and then writes that data to Cloud Spanner.

func ProcessShard

func ProcessShard(wgShard *sync.WaitGroup, streamInfo *StreamingInfo, conv *internal.Conv, streamClient dynamodbstreamsiface.DynamoDBStreamsAPI, shard *dynamodbstreams.Shard, streamArn, srcTable string)

ProcessShard processes records within a shard starting from the first unexpired record. It doesn't start processing unless parent shard is processed. For closed shards this process is completed after processing all records but for open shards it keeps searching for new records until shards gets closed or customer calls for a exit.

func ProcessStream

func ProcessStream(wgStream *sync.WaitGroup, streamClient dynamodbstreamsiface.DynamoDBStreamsAPI, streamInfo *StreamingInfo, conv *internal.Conv, streamArn, srcTable string)

ProcessStream processes the latest enabled DynamoDB Stream for a table. It searches for shards within stream and for each shard it creates a seperate working thread to process records within it.

Types

type InfoSchemaImpl

type InfoSchemaImpl struct {
	DynamoClient        dynamodbiface.DynamoDBAPI
	DynamoStreamsClient dynamodbstreamsiface.DynamoDBStreamsAPI
	SampleSize          int64
}

func (InfoSchemaImpl) GetColumns

func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error)

func (InfoSchemaImpl) GetConstraints

func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, constraints map[string][]string, err error)

func (InfoSchemaImpl) GetForeignKeys

func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error)

func (InfoSchemaImpl) GetIndexes

func (isi InfoSchemaImpl) GetIndexes(conv *internal.Conv, table common.SchemaAndName, colNameIdMap map[string]string) (indexes []schema.Index, err error)

func (InfoSchemaImpl) GetRowCount

func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error)

func (InfoSchemaImpl) GetRowsFromTable

func (isi InfoSchemaImpl) GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)

func (InfoSchemaImpl) GetTableName

func (isi InfoSchemaImpl) GetTableName(schema string, tableName string) string

func (InfoSchemaImpl) GetTables

func (isi InfoSchemaImpl) GetTables() ([]common.SchemaAndName, error)

func (InfoSchemaImpl) GetToDdl

func (isi InfoSchemaImpl) GetToDdl() common.ToDdl

func (InfoSchemaImpl) ProcessData

func (isi InfoSchemaImpl) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, colIds []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error

ProcessData performs data conversion for DynamoDB database. For each table, we extract data using Scan requests, convert the data to Spanner data (based on the source and Spanner schemas), and write it to Spanner. If we can't get/process data for a table, we skip that table and process the remaining tables.

func (InfoSchemaImpl) StartChangeDataCapture

func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)

StartChangeDataCapture initializes the DynamoDB Streams for the source database. It returns the latestStreamArn for all tables in the source database.

func (InfoSchemaImpl) StartStreamingMigration

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error)

StartStreamingMigration starts the streaming migration process by creating a seperate worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if customer wants to stop the process.

type StreamingInfo

type StreamingInfo struct {
	Records        map[string]map[string]int64 // Tablewise count of records received from DynamoDB Streams, broken down by record type i.e. INSERT, MODIFY & REMOVE.
	BadRecords     map[string]map[string]int64 // Tablewise count of records not converted successfully, broken down by record type.
	DroppedRecords map[string]map[string]int64 // Tablewise count of records successfully converted but failed to written on Spanner, broken down by record type.

	ShardProcessed map[string]bool  // Processing status of a shard, (default false i.e. unprocessed).
	UserExit       bool             // Flag confirming if customer wants to exit or not, (false until user presses Ctrl+C).
	Unexpecteds    map[string]int64 // Count of unexpected conditions, broken down by condition description.

	SampleBadRecords []string // Records that generated errors during conversion.
	SampleBadWrites  []string // Records that faced errors while writing to Cloud Spanner.
	// contains filtered or unexported fields
}

StreamingInfo contains information related to processing of DynamoDB Streams.

func MakeStreamingInfo

func MakeStreamingInfo() *StreamingInfo

func (*StreamingInfo) CollectBadRecord

func (info *StreamingInfo) CollectBadRecord(recordType, srcTable string, srcCols []string, vals []string)

CollectBadRecord collects a record if record is not successfully converted to Cloud Spanner supported data types.

func (*StreamingInfo) CollectDroppedRecord

func (info *StreamingInfo) CollectDroppedRecord(recordType, spTable string, spCols []string, spVals []interface{}, err error)

CollectDroppedRecord collects a record if record faces an error while writing to Cloud Spanner.

func (*StreamingInfo) SetShardStatus

func (info *StreamingInfo) SetShardStatus(shardId string, status bool)

SetShardStatus changes the processing status of a shard.

true -> shard processed and vice versa.

func (*StreamingInfo) StatsAddBadRecord

func (info *StreamingInfo) StatsAddBadRecord(srcTable, recordType string)

StatsAddBadRecord increases the count of records which are not successfully converted to Cloud Spanner supported data types based on the table name and record type.

func (*StreamingInfo) StatsAddDroppedRecord

func (info *StreamingInfo) StatsAddDroppedRecord(srcTable, recordType string)

StatsAddDroppedRecord increases the count of records which failed while writing to Cloud Spanner based on the table name and record type.

func (*StreamingInfo) StatsAddRecord

func (info *StreamingInfo) StatsAddRecord(srcTable, recordType string)

StatsAddRecord increases the count of records read from DynamoDB Streams based on the table name and record type.

func (*StreamingInfo) StatsAddRecordProcessed

func (info *StreamingInfo) StatsAddRecordProcessed()

StatsAddRecordProcessed increases the count of total records processed to Cloud Spanner.

func (*StreamingInfo) TotalUnexpecteds

func (info *StreamingInfo) TotalUnexpecteds() int64

TotalUnexpecteds returns the total number of distinct unexpected conditions encountered during processing of DynamoDB Streams.

func (*StreamingInfo) Unexpected

func (info *StreamingInfo) Unexpected(u string)

Unexpected records stats about corner-cases and conditions that were not expected.

type ToDdlImpl

type ToDdlImpl struct {
}

ToDdl implementation for DynamoDB

func (ToDdlImpl) ToSpannerType

func (tdi ToDdlImpl) ToSpannerType(conv *internal.Conv, spType string, srcType schema.Type) (ddl.Type, []internal.SchemaIssue)

Functions below implement the common.ToDdl interface toSpannerType maps a scalar source schema type (defined by id and mods) into a Spanner type. This is the core source-to-Spanner type mapping. toSpannerType returns the Spanner type and a list of type conversion issues encountered.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL