迁移指南
将工作负载迁移到 Delta Lake
标题为“将工作负载迁移到 Delta Lake”的部分将工作负载迁移到 Delta Lake 时,您应该了解与 Apache Spark 和 Apache Hive 提供的数据源相比的以下简化和差异。
Delta Lake 会自动处理以下操作,您不应手动执行这些操作
- 添加和删除分区:Delta Lake 会自动跟踪表中存在的分区集,并在添加或删除数据时更新列表。因此,无需运行
ALTER TABLE [ADD|DROP] PARTITION
或MSCK
。 - 加载单个分区:作为优化,您有时可能会直接加载您感兴趣的数据分区。例如,
spark.read.format("parquet").load("/data/date=2017-01-01")
。这对于 Delta Lake 来说是不必要的,因为它可以通过事务日志快速读取文件列表以找到相关文件。如果您对单个分区感兴趣,请使用WHERE
子句指定它。例如,spark.read.delta("/data").where("date = '2017-01-01'")
。对于分区中包含许多文件的大型表,这可能比从 Parquet 表加载单个分区(使用直接分区路径或使用WHERE
)快得多,因为列出目录中的文件通常比从事务日志读取文件列表慢。
将现有应用程序移植到 Delta Lake 时,应避免以下绕过事务日志的操作
- 手动修改数据:Delta Lake 使用事务日志以原子方式提交对表的更改。由于日志是真实来源,因此 Spark 不会读取已写入但未添加到事务日志的文件。同样,即使您手动删除文件,事务日志中仍然存在指向该文件的指针。不要手动修改存储在 Delta 表中的文件,而应始终使用本指南中描述的命令。
- 外部读取器:直接读取存储在 Delta Lake 中的数据。有关如何读取 Delta 表的信息,请参阅读取表。
假设您有一个名为 /data-pipeline
的目录中存储的 Parquet 数据,并且您想要创建一个名为 events
的 Delta 表。
第一个示例演示了如何
- 从其原始位置
/data-pipeline
读取 Parquet 数据到 DataFrame 中。 - 将 DataFrame 的内容以 Delta 格式保存在单独的位置
/tmp/delta/data-pipeline/
中。 - 根据该单独位置
/tmp/delta/data-pipeline/
创建events
表。
第二个示例演示了如何使用 CONVERT TO TABLE
将数据从 Parquet 转换为 Delta 格式,而无需更改其原始位置 /data-pipeline/
。
另存为 Delta 表
标题为“另存为 Delta 表”的部分-
将 Parquet 数据读取到 DataFrame 中,然后将 DataFrame 的内容以
delta
格式保存到新目录中data = spark.read.format("parquet").load("/data-pipeline")data.write.format("delta").save("/tmp/delta/data-pipeline/") -
创建名为
events
的 Delta 表,该表引用新目录中的文件spark.sql("CREATE TABLE events USING DELTA LOCATION '/tmp/delta/data-pipeline/'")
转换为 Delta 表
标题为“转换为 Delta 表”的部分您有两种将 Parquet 表转换为 Delta 表的选项
-
将文件转换为 Delta Lake 格式,然后创建 Delta 表
CONVERT TO DELTA parquet.`/data-pipeline/`CREATE TABLE events USING DELTA LOCATION '/data-pipeline/' -
创建 Parquet 表,然后将其转换为 Delta 表
CREATE TABLE events USING PARQUET OPTIONS (path '/data-pipeline/')CONVERT TO DELTA events
有关详细信息,请参阅将 Parquet 表转换为 Delta 表。
将 Delta Lake 工作负载迁移到更高版本
标题为“将 Delta Lake 工作负载迁移到更高版本”的部分本节讨论从旧版本 Delta Lake 迁移到新版本时用户代码可能需要的任何更改。
Delta Lake 3.0 以下版本到 Delta Lake 3.0 或以上版本
标题为“Delta Lake 3.0 以下版本到 Delta Lake 3.0 或以上版本”的部分请注意,Spark Maven 工件上的 Delta Lake 已从 delta-core
(3.0 之前)重命名为 delta-spark
(3.0 及以上)。
Delta Lake 2.1.1 或以下版本到 Delta Lake 2.2 或以上版本
标题为“Delta Lake 2.1.1 或以下版本到 Delta Lake 2.2 或以上版本”的部分Delta Lake 2.2 在将 parquet 表转换为 Delta Lake 表(例如使用 CONVERT TO DELTA
命令)时默认收集统计信息。要选择不收集统计信息并恢复到 2.1.1 或更低版本的默认行为,请使用 NO STATISTICS
SQL API(例如 CONVERT TO DELTA parquet.
/path-to-table NO STATISTICS
)
Delta Lake 1.2.1、2.0.0 或 2.1.0 到 Delta Lake 2.0.1、2.1.1 或以上版本
标题为“Delta Lake 1.2.1、2.0.0 或 2.1.0 到 Delta Lake 2.0.1、2.1.1 或以上版本”的部分Delta Lake 1.2.1、2.0.0 和 2.1.0 在基于 DynamoDB 的 S3 多集群配置实现中存在一个错误,即向 DynamoDB 写入了不正确的时间戳值。这导致 DynamoDB 的 TTL 功能在安全清理完成的项目之前就进行了清理。此问题已在 Delta Lake 2.0.1 和 2.1.1 版本中修复,并且 TTL 属性已从 commitTime
重命名为 expireTime
。
如果您已经在 DynamoDB 表上使用旧属性启用了 TTL,则需要禁用该属性的 TTL,然后为新属性启用它。您可能需要在这两个操作之间等待一小时,因为 TTL 设置更改可能需要一些时间才能传播。请参阅此处 DynamoDB 文档。如果您不这样做,DynamoDB 的 TTL 功能将不会删除任何新的和已过期的条目。没有数据丢失的风险。
# Disable TTL on old attributeaws dynamodb update-time-to-live \ --region <region> \ --table-name <table-name> \ --time-to-live-specification "Enabled=false, AttributeName=commitTime"
# Enable TTL on new attributeaws dynamodb update-time-to-live \ --region <region> \ --table-name <table-name> \ --time-to-live-specification "Enabled=true, AttributeName=expireTime"
Delta Lake 2.0 或以下版本到 Delta Lake 2.1 或以上版本
标题为“Delta Lake 2.0 或以下版本到 Delta Lake 2.1 或以上版本”的部分在对目录表调用 CONVERT TO DELTA
时,Delta Lake 2.1 从目录中推断数据模式。在 2.0 及以下版本中,Delta Lake 从数据中推断数据模式。这意味着在 Delta 2.1 中,未在原始目录表中定义的数据列将不会出现在转换后的 Delta 表中。可以通过将 Spark 会话配置 spark.databricks.delta.convert.useCatalogSchema=false
设置为 false 来禁用此行为。
Delta Lake 1.2 或以下版本到 Delta Lake 2.0 或以上版本
标题为“Delta Lake 1.2 或以下版本到 Delta Lake 2.0 或以上版本”的部分Delta Lake 2.0.0 对 DROP CONSTRAINT 引入了行为更改。在 1.2 及以下版本中,尝试删除不存在的约束时不会抛出错误。在 2.0.0 及以上版本中,行为更改为抛出约束不存在错误。为避免此错误,请使用 IF EXISTS
构造(例如,ALTER TABLE events DROP CONSTRAINT IF EXISTS constraint_name
)。删除现有约束的行为没有变化。
Delta Lake 2.0.0 引入了对动态分区覆盖的支持。在 1.2 及以下版本中,在 Spark 会话配置或 DataFrameWriter
选项中启用动态分区覆盖模式是空操作,并且 overwrite
模式下的写入会替换表中每个分区中的所有现有数据。在 2.0.0 及以上版本中,启用动态分区覆盖模式时,Delta Lake 会替换写入将提交新数据的每个逻辑分区中的所有现有数据。
Delta Lake 1.1 或以下版本到 Delta Lake 1.2 或以上版本
标题为“Delta Lake 1.1 或以下版本到 Delta Lake 1.2 或以上版本”的部分由于 issue #951,为了更好地管理代码,LogStore 相关代码已从 delta-core
Maven 模块中提取到一个新模块 delta-storage
中。这导致 delta-core
依赖于额外的 JAR delta-storage-<version>.jar
。默认情况下,额外的 JAR 作为 delta-core-<version>_<scala-version>.jar
依赖的一部分进行下载。在没有互联网连接的集群中,无法下载 delta-storage-<version>.jar
。建议手动下载 delta-storage-<version>.jar
并将其放置在 Java 类路径中。
Delta Lake 1.0 或以下版本到 Delta Lake 1.1 或以上版本
标题为“Delta Lake 1.0 或以下版本到 Delta Lake 1.1 或以上版本”的部分如果 Delta 表中的分区列名称包含无效字符( ,;{}()\n\t=
),由于 SPARK-36271,您无法在 Delta Lake 1.1 及更高版本中读取它。但是,这种情况应该很少发生,因为您无法使用 Delta Lake 0.6 及更高版本创建此类表。如果您仍然有此类旧表,您可以在将 Delta Lake 升级到 1.1 及更高版本之前,使用 Delta Lake 1.0 及更低版本使用新的有效列名覆盖您的表,例如以下操作
spark.read \ .format("delta") \ .load("/the/delta/table/path") \ .withColumnRenamed("column name", "column-name") \ .write \ .format("delta")\ .mode("overwrite") \ .option("overwriteSchema", "true") \ .save("/the/delta/table/path")
spark.read .format("delta") .load("/the/delta/table/path") .withColumnRenamed("column name", "column-name") .write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .save("/the/delta/table/path")
Delta Lake 0.6 或以下版本到 Delta Lake 0.7 或以上版本
标题为“Delta Lake 0.6 或以下版本到 Delta Lake 0.7 或以上版本”的部分如果您在 Scala、Java 或 Python 中使用 DeltaTable
API 来更新或运行实用操作,那么您在创建用于执行这些操作的 SparkSession
时可能需要添加以下配置。
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .appName("...") \ .master("...") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate()
import org.apache.spark.sql.SparkSession
val spark = SparkSession .builder() .appName("...") .master("...") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession .builder() .appName("...") .master("...") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate();
或者,您可以在使用 spark-submit
提交 Spark 应用程序时或在启动 spark-shell
/pyspark
时,通过将它们指定为命令行参数来添加其他配置。
spark-submit \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ ...
pyspark \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ ...