跳到内容

迁移指南

将工作负载迁移到 Delta Lake 时,您应该了解与 Apache Spark 和 Apache Hive 提供的数据源相比的以下简化和差异。

Delta Lake 会自动处理以下操作,您不应手动执行这些操作

  • 添加和删除分区:Delta Lake 会自动跟踪表中存在的分区集,并在添加或删除数据时更新列表。因此,无需运行 ALTER TABLE [ADD|DROP] PARTITIONMSCK
  • 加载单个分区:作为优化,您有时可能会直接加载您感兴趣的数据分区。例如,spark.read.format("parquet").load("/data/date=2017-01-01")。这对于 Delta Lake 来说是不必要的,因为它可以通过事务日志快速读取文件列表以找到相关文件。如果您对单个分区感兴趣,请使用 WHERE 子句指定它。例如,spark.read.delta("/data").where("date = '2017-01-01'")。对于分区中包含许多文件的大型表,这可能比从 Parquet 表加载单个分区(使用直接分区路径或使用 WHERE)快得多,因为列出目录中的文件通常比从事务日志读取文件列表慢。

将现有应用程序移植到 Delta Lake 时,应避免以下绕过事务日志的操作

  • 手动修改数据:Delta Lake 使用事务日志以原子方式提交对表的更改。由于日志是真实来源,因此 Spark 不会读取已写入但未添加到事务日志的文件。同样,即使您手动删除文件,事务日志中仍然存在指向该文件的指针。不要手动修改存储在 Delta 表中的文件,而应始终使用本指南中描述的命令。
  • 外部读取器:直接读取存储在 Delta Lake 中的数据。有关如何读取 Delta 表的信息,请参阅读取表

假设您有一个名为 /data-pipeline 的目录中存储的 Parquet 数据,并且您想要创建一个名为 events 的 Delta 表。

第一个示例演示了如何

  • 从其原始位置 /data-pipeline 读取 Parquet 数据到 DataFrame 中。
  • 将 DataFrame 的内容以 Delta 格式保存在单独的位置 /tmp/delta/data-pipeline/ 中。
  • 根据该单独位置 /tmp/delta/data-pipeline/ 创建 events 表。

第二个示例演示了如何使用 CONVERT TO TABLE 将数据从 Parquet 转换为 Delta 格式,而无需更改其原始位置 /data-pipeline/

  1. 将 Parquet 数据读取到 DataFrame 中,然后将 DataFrame 的内容以 delta 格式保存到新目录中

    data = spark.read.format("parquet").load("/data-pipeline")
    data.write.format("delta").save("/tmp/delta/data-pipeline/")
  2. 创建名为 events 的 Delta 表,该表引用新目录中的文件

    spark.sql("CREATE TABLE events USING DELTA LOCATION '/tmp/delta/data-pipeline/'")

您有两种将 Parquet 表转换为 Delta 表的选项

  • 将文件转换为 Delta Lake 格式,然后创建 Delta 表

    CONVERT TO DELTA parquet.`/data-pipeline/`
    CREATE TABLE events USING DELTA LOCATION '/data-pipeline/'
  • 创建 Parquet 表,然后将其转换为 Delta 表

    CREATE TABLE events USING PARQUET OPTIONS (path '/data-pipeline/')
    CONVERT TO DELTA events

有关详细信息,请参阅将 Parquet 表转换为 Delta 表

将 Delta Lake 工作负载迁移到更高版本

标题为“将 Delta Lake 工作负载迁移到更高版本”的部分

本节讨论从旧版本 Delta Lake 迁移到新版本时用户代码可能需要的任何更改。

Delta Lake 3.0 以下版本到 Delta Lake 3.0 或以上版本

标题为“Delta Lake 3.0 以下版本到 Delta Lake 3.0 或以上版本”的部分

请注意,Spark Maven 工件上的 Delta Lake 已从 delta-core(3.0 之前)重命名为 delta-spark(3.0 及以上)。

Delta Lake 2.1.1 或以下版本到 Delta Lake 2.2 或以上版本

标题为“Delta Lake 2.1.1 或以下版本到 Delta Lake 2.2 或以上版本”的部分

Delta Lake 2.2 在将 parquet 表转换为 Delta Lake 表(例如使用 CONVERT TO DELTA 命令)时默认收集统计信息。要选择不收集统计信息并恢复到 2.1.1 或更低版本的默认行为,请使用 NO STATISTICS SQL API(例如 CONVERT TO DELTA parquet./path-to-table NO STATISTICS

Delta Lake 1.2.1、2.0.0 或 2.1.0 到 Delta Lake 2.0.1、2.1.1 或以上版本

标题为“Delta Lake 1.2.1、2.0.0 或 2.1.0 到 Delta Lake 2.0.1、2.1.1 或以上版本”的部分

Delta Lake 1.2.1、2.0.0 和 2.1.0 在基于 DynamoDB 的 S3 多集群配置实现中存在一个错误,即向 DynamoDB 写入了不正确的时间戳值。这导致 DynamoDB 的 TTL 功能在安全清理完成的项目之前就进行了清理。此问题已在 Delta Lake 2.0.1 和 2.1.1 版本中修复,并且 TTL 属性已从 commitTime 重命名为 expireTime

如果您已经在 DynamoDB 表上使用旧属性启用了 TTL,则需要禁用该属性的 TTL,然后为新属性启用它。您可能需要在这两个操作之间等待一小时,因为 TTL 设置更改可能需要一些时间才能传播。请参阅此处 DynamoDB 文档。如果您不这样做,DynamoDB 的 TTL 功能将不会删除任何新的和已过期的条目。没有数据丢失的风险。

终端窗口
# Disable TTL on old attribute
aws dynamodb update-time-to-live \
--region <region> \
--table-name <table-name> \
--time-to-live-specification "Enabled=false, AttributeName=commitTime"
# Enable TTL on new attribute
aws dynamodb update-time-to-live \
--region <region> \
--table-name <table-name> \
--time-to-live-specification "Enabled=true, AttributeName=expireTime"

Delta Lake 2.0 或以下版本到 Delta Lake 2.1 或以上版本

标题为“Delta Lake 2.0 或以下版本到 Delta Lake 2.1 或以上版本”的部分

在对目录表调用 CONVERT TO DELTA 时,Delta Lake 2.1 从目录中推断数据模式。在 2.0 及以下版本中,Delta Lake 从数据中推断数据模式。这意味着在 Delta 2.1 中,未在原始目录表中定义的数据列将不会出现在转换后的 Delta 表中。可以通过将 Spark 会话配置 spark.databricks.delta.convert.useCatalogSchema=false 设置为 false 来禁用此行为。

Delta Lake 1.2 或以下版本到 Delta Lake 2.0 或以上版本

标题为“Delta Lake 1.2 或以下版本到 Delta Lake 2.0 或以上版本”的部分

Delta Lake 2.0.0 对 DROP CONSTRAINT 引入了行为更改。在 1.2 及以下版本中,尝试删除不存在的约束时不会抛出错误。在 2.0.0 及以上版本中,行为更改为抛出约束不存在错误。为避免此错误,请使用 IF EXISTS 构造(例如,ALTER TABLE events DROP CONSTRAINT IF EXISTS constraint_name)。删除现有约束的行为没有变化。

Delta Lake 2.0.0 引入了对动态分区覆盖的支持。在 1.2 及以下版本中,在 Spark 会话配置或 DataFrameWriter 选项中启用动态分区覆盖模式是空操作,并且 overwrite 模式下的写入会替换表中每个分区中的所有现有数据。在 2.0.0 及以上版本中,启用动态分区覆盖模式时,Delta Lake 会替换写入将提交新数据的每个逻辑分区中的所有现有数据。

Delta Lake 1.1 或以下版本到 Delta Lake 1.2 或以上版本

标题为“Delta Lake 1.1 或以下版本到 Delta Lake 1.2 或以上版本”的部分

由于 issue #951,为了更好地管理代码,LogStore 相关代码已从 delta-core Maven 模块中提取到一个新模块 delta-storage 中。这导致 delta-core 依赖于额外的 JAR delta-storage-<version>.jar。默认情况下,额外的 JAR 作为 delta-core-<version>_<scala-version>.jar 依赖的一部分进行下载。在没有互联网连接的集群中,无法下载 delta-storage-<version>.jar。建议手动下载 delta-storage-<version>.jar 并将其放置在 Java 类路径中。

Delta Lake 1.0 或以下版本到 Delta Lake 1.1 或以上版本

标题为“Delta Lake 1.0 或以下版本到 Delta Lake 1.1 或以上版本”的部分

如果 Delta 表中的分区列名称包含无效字符( ,;{}()\n\t=),由于 SPARK-36271,您无法在 Delta Lake 1.1 及更高版本中读取它。但是,这种情况应该很少发生,因为您无法使用 Delta Lake 0.6 及更高版本创建此类表。如果您仍然有此类旧表,您可以在将 Delta Lake 升级到 1.1 及更高版本之前,使用 Delta Lake 1.0 及更低版本使用新的有效列名覆盖您的表,例如以下操作

spark.read \
.format("delta") \
.load("/the/delta/table/path") \
.withColumnRenamed("column name", "column-name") \
.write \
.format("delta")\
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/the/delta/table/path")

Delta Lake 0.6 或以下版本到 Delta Lake 0.7 或以上版本

标题为“Delta Lake 0.6 或以下版本到 Delta Lake 0.7 或以上版本”的部分

如果您在 Scala、Java 或 Python 中使用 DeltaTable API 来更新运行实用操作,那么您在创建用于执行这些操作的 SparkSession 时可能需要添加以下配置。

from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

或者,您可以在使用 spark-submit 提交 Spark 应用程序时或在启动 spark-shell/pyspark 时,通过将它们指定为命令行参数来添加其他配置。

终端窗口
spark-submit \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
...
终端窗口
pyspark \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
...