表批处理读写
Delta Lake 支持 Apache Spark DataFrame 读写 API 提供的大多数选项,用于对表执行批量读写操作。
对于表上的许多 Delta Lake 操作,您可以通过在创建新的 SparkSession
时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 3.0 版起)的集成。请参阅配置 SparkSession。
创建表
标题为“创建表”的部分Delta Lake 支持创建两种类型的表——元数据存储中定义的表和按路径定义的表。
要使用元数据存储定义的表,您必须通过在创建新的 SparkSession
时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API 的集成。请参阅配置 SparkSession。
您可以通过以下方式创建表:
-
SQL DDL 命令:您可以使用 Apache Spark 中支持的标准 SQL DDL 命令(例如,
CREATE TABLE
和REPLACE TABLE
)来创建 Delta 表。CREATE TABLE IF NOT EXISTS default.people10m (id INT,firstName STRING,middleName STRING,lastName STRING,gender STRING,birthDate TIMESTAMP,ssn STRING,salary INT) USING DELTACREATE OR REPLACE TABLE default.people10m (id INT,firstName STRING,middleName STRING,lastName STRING,gender STRING,birthDate TIMESTAMP,ssn STRING,salary INT) USING DELTASQL 还支持在路径上创建表,而无需在 Hive 元数据存储中创建条目。
-- Create or replace table with pathCREATE OR REPLACE TABLE delta.`/tmp/delta/people10m` (id INT,firstName STRING,middleName STRING,lastName STRING,gender STRING,birthDate TIMESTAMP,ssn STRING,salary INT) USING DELTA -
DataFrameWriter
API:如果您想同时创建表并从 Spark DataFrames 或 Datasets 插入数据,您可以使用 SparkDataFrameWriter
(Scala 或 Java 和 Python)。# Create table in the metastore using DataFrame's schema and write data to itdf.write.format("delta").saveAsTable("default.people10m")# Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to itdf.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")// Create table in the metastore using DataFrame's schema and write data to itdf.write.format("delta").saveAsTable("default.people10m")// Create table with path using DataFrame's schema and write data to itdf.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")您还可以使用 Spark
DataFrameWriterV2
API 创建 Delta 表。 -
DeltaTableBuilder
API:您还可以使用 Delta Lake 中的DeltaTableBuilder
API 来创建表。与 DataFrameWriter API 相比,此 API 更易于指定其他信息,例如列注释、表属性和生成列。
有关详细信息,请参阅API 文档。
分区数据
标题为“分区数据”的部分您可以对数据进行分区,以加快包含分区列谓词的查询或 DML。要在创建 Delta 表时对数据进行分区,请指定按列分区。以下示例按性别进行分区。
-- Create table in the metastoreCREATE TABLE default.people10m ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT)USING DELTAPARTITIONED BY (gender)
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")
DeltaTable.create(spark) \ .tableName("default.people10m") \ .addColumn("id", "INT") \ .addColumn("firstName", "STRING") \ .addColumn("middleName", "STRING") \ .addColumn("lastName", "STRING", comment = "surname") \ .addColumn("gender", "STRING") \ .addColumn("birthDate", "TIMESTAMP") \ .addColumn("ssn", "STRING") \ .addColumn("salary", "INT") \ .partitionedBy("gender") \ .execute()
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")
DeltaTable.createOrReplace(spark) .tableName("default.people10m") .addColumn("id", "INT") .addColumn("firstName", "STRING") .addColumn("middleName", "STRING") .addColumn( DeltaTable.columnBuilder("lastName") .dataType("STRING") .comment("surname") .build()) .addColumn("lastName", "STRING", comment = "surname") .addColumn("gender", "STRING") .addColumn("birthDate", "TIMESTAMP") .addColumn("ssn", "STRING") .addColumn("salary", "INT") .partitionedBy("gender") .execute()
要确定表是否包含特定分区,请使用语句 SELECT COUNT(*) > 0 FROM <table-name> WHERE <partition-column> = <value>
。如果分区存在,则返回 true
。例如:
SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = "M"
display(spark.sql("SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = 'M'"))
display(spark.sql("SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = 'M'"))
控制数据位置
标题为“控制数据位置”的部分对于在元数据存储中定义的表,您可以选择将 LOCATION
指定为路径。使用指定 LOCATION
创建的表被视为由元数据存储非托管。与未指定路径的托管表不同,当您 DROP
表时,非托管表的文件不会被删除。
当您运行 CREATE TABLE
并且 LOCATION
已经包含使用 Delta Lake 存储的数据时,Delta Lake 执行以下操作:
-
如果您仅指定表名和位置,例如:
CREATE TABLE default.people10mUSING DELTALOCATION '/tmp/delta/people10m'元数据存储中的表会自动继承现有数据的模式、分区和表属性。此功能可用于将数据“导入”元数据存储。
-
如果您指定任何配置(模式、分区或表属性),Delta Lake 会验证该规范是否与现有数据的配置完全匹配。
使用生成列
标题为“使用生成列”的部分Delta Lake 支持生成列,这是一种特殊类型的列,其值根据用户指定的 Delta 表中其他列上的函数自动生成。当您写入具有生成列的表而未明确为其提供值时,Delta Lake 会自动计算这些值。例如,您可以从时间戳列自动生成日期列(用于按日期对表进行分区);对表的任何写入只需指定时间戳列的数据。但是,如果您明确为其提供值,则这些值必须满足约束 (<value> <=> <generation expression>) IS TRUE
,否则写入将失败并出现错误。
以下示例展示了如何创建具有生成列的表:
DeltaTable.create(spark) \ .tableName("default.people10m") \ .addColumn("id", "INT") \ .addColumn("firstName", "STRING") \ .addColumn("middleName", "STRING") \ .addColumn("lastName", "STRING", comment = "surname") \ .addColumn("gender", "STRING") \ .addColumn("birthDate", "TIMESTAMP") \ .addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \ .addColumn("ssn", "STRING") \ .addColumn("salary", "INT") \ .partitionedBy("gender") \ .execute()
DeltaTable.create(spark) .tableName("default.people10m") .addColumn("id", "INT") .addColumn("firstName", "STRING") .addColumn("middleName", "STRING") .addColumn( DeltaTable.columnBuilder("lastName") .dataType("STRING") .comment("surname") .build()) .addColumn("lastName", "STRING", comment = "surname") .addColumn("gender", "STRING") .addColumn("birthDate", "TIMESTAMP") .addColumn( DeltaTable.columnBuilder("dateOfBirth") .dataType(DateType) .generatedAlwaysAs("CAST(dateOfBirth AS DATE)") .build()) .addColumn("ssn", "STRING") .addColumn("salary", "INT") .partitionedBy("gender") .execute()
生成列像普通列一样存储。也就是说,它们占用存储空间。
以下限制适用于生成列:
- 生成表达式可以使用 Spark 中任何在给定相同参数值时始终返回相同结果的 SQL 函数,除了以下类型的函数:
- 用户定义的函数。
- 聚合函数。
- 窗口函数。
- 返回多行的函数。
- 对于 Delta Lake 1.1.0 及更高版本,当您将
spark.databricks.delta.schema.autoMerge.enabled
设置为 true 时,MERGE
操作支持生成列。
每当分区列由以下表达式之一定义时,Delta Lake 都可以为查询生成分区筛选器:
CAST(col AS DATE)
且col
的类型为TIMESTAMP
。YEAR(col)
且col
的类型为TIMESTAMP
。- 由
YEAR(col), MONTH(col)
定义的两个分区列,且col
的类型为TIMESTAMP
。 - 由
YEAR(col), MONTH(col), DAY(col)
定义的三个分区列,且col
的类型为TIMESTAMP
。 - 由
YEAR(col), MONTH(col), DAY(col), HOUR(col)
定义的四个分区列,且col
的类型为TIMESTAMP
。 SUBSTRING(col, pos, len)
且col
的类型为STRING
DATE_FORMAT(col, format)
且col
的类型为TIMESTAMP
。DATE_TRUNC(format, col)
且col
的类型为TIMESTAMP
或DATE
。TRUNC(col, format)
且col
的类型为TIMESTAMP
或DATE
。
如果分区列由上述表达式之一定义,并且查询使用生成表达式的基础基列筛选数据,则 Delta Lake 会查看基列和生成列之间的关系,并在可能的情况下根据生成的分区列填充分区筛选器。例如,给定下表:
DeltaTable.create(spark) \ .tableName("default.events") \ .addColumn("eventId", "BIGINT") \ .addColumn("data", "STRING") \ .addColumn("eventType", "STRING") \ .addColumn("eventTime", "TIMESTAMP") \ .addColumn("eventDate", "DATE", generatedAlwaysAs="CAST(eventTime AS DATE)") \ .partitionedBy("eventType", "eventDate") \ .execute()
如果您随后运行以下查询:
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')
Delta Lake 会自动生成一个分区筛选器,以便即使未指定分区筛选器,上述查询也只读取分区 date=2020-10-01
中的数据。
再举一个例子,给定下表:
DeltaTable.create(spark) \ .tableName("default.events") \ .addColumn("eventId", "BIGINT") \ .addColumn("data", "STRING") \ .addColumn("eventType", "STRING") \ .addColumn("eventTime", "TIMESTAMP") \ .addColumn("year", "INT", generatedAlwaysAs="YEAR(eventTime)") \ .addColumn("month", "INT", generatedAlwaysAs="MONTH(eventTime)") \ .addColumn("day", "INT", generatedAlwaysAs="DAY(eventTime)") \ .partitionedBy("eventType", "year", "month", "day") \ .execute()
如果您随后运行以下查询:
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')
Delta Lake 会自动生成一个分区筛选器,以便即使未指定分区筛选器,上述查询也只读取分区 year=2020/month=10/day=01
中的数据。
您可以使用 EXPLAIN 子句并检查提供的计划,以查看 Delta Lake 是否自动生成了任何分区筛选器。
使用标识列
标题为“使用标识列”的部分Delta Lake 3.3 及更高版本支持 Delta Lake 标识列。它们是一种生成列,为插入表中的每条记录分配唯一值。以下示例显示了如何在创建表命令期间声明标识列:
from delta.tables import DeltaTable, IdentityGeneratorfrom pyspark.sql.types import LongType
DeltaTable.create() .tableName("table_name") .addColumn("id_col1", dataType=LongType(), generatedAlwaysAs=IdentityGenerator()) .addColumn("id_col2", dataType=LongType(), generatedAlwaysAs=IdentityGenerator(start=-1, step=1)) .addColumn("id_col3", dataType=LongType(), generatedByDefaultAs=IdentityGenerator()) .addColumn("id_col4", dataType=LongType(), generatedByDefaultAs=IdentityGenerator(start=-1, step=1)) .execute()
import io.delta.tables.DeltaTableimport org.apache.spark.sql.types.LongType
DeltaTable.create(spark) .tableName("table_name") .addColumn( DeltaTable.columnBuilder(spark, "id_col1") .dataType(LongType) .generatedAlwaysAsIdentity().build()) .addColumn( DeltaTable.columnBuilder(spark, "id_col2") .dataType(LongType) .generatedAlwaysAsIdentity(start = -1L, step = 1L).build()) .addColumn( DeltaTable.columnBuilder(spark, "id_col3") .dataType(LongType) .generatedByDefaultAsIdentity().build()) .addColumn( DeltaTable.columnBuilder(spark, "id_col4") .dataType(LongType) .generatedByDefaultAsIdentity(start = -1L, step = 1L).build()) .execute()
您可以选择指定以下内容:
- 起始值。
- 步长,可以是正数或负数。
起始值和步长都默认为 1
。您不能指定步长为 0
。
标识列分配的值是唯一的,并按指定步长的方向递增,并按指定步长的倍数递增,但不保证是连续的。例如,起始值为 0
且步长为 2
,则所有值都是正偶数,但某些偶数可能会被跳过。
当标识列指定为 generated by default as identity
时,插入操作可以为标识列指定值。将其指定为 generated always as identity
以覆盖手动设置值的能力。
标识列仅支持 LongType
,如果分配的值超出 LongType
支持的范围,则操作将失败。
您可以使用 ALTER TABLE table_name ALTER COLUMN column_name SYNC IDENTITY
将标识列的元数据与实际数据同步。当您将自己的值写入标识列时,它可能不符合元数据。此选项评估状态并更新元数据以与实际数据保持一致。在此命令之后,下一个自动分配的标识值将从 start + (n + 1) * step
开始,其中 n
是满足 start + n * step >= max()
的最小值(对于正步长)。
CTAS 和标识列
标题为“CTAS 和标识列”的部分在使用 CREATE TABLE table_name AS SELECT
(CTAS) 语句时,您无法定义模式、标识列约束或任何其他表规范。
要使用标识列创建新表并用现有数据填充它,请执行以下操作:
- 创建一个具有正确模式的表,包括标识列定义和其他表属性。
- 运行插入操作。
以下示例将标识列定义为 generated by default as identity
。如果插入表中的数据包含标识列的有效值,则使用这些值。
from delta.tables import DeltaTable, IdentityGeneratorfrom pyspark.sql.types import LongType, DateType
DeltaTable.create(spark) .tableName("new_table") .addColumn("id", dataType=LongType(), generatedByDefaultAs=IdentityGenerator(start=5, step=1)) .addColumn("event_date", dataType=DateType()) .addColumn("some_value", dataType=LongType()) .execute()
# Insert records including existing IDsold_table_df = spark.table("old_table").select("id", "event_date", "some_value")old_table_df.write .format("delta") .mode("append") .saveAsTable("new_table")
# Insert records and generate new IDsnew_records_df = spark.table("new_records").select("event_date", "some_value")new_records_df.write .format("delta") .mode("append") .saveAsTable("new_table")
import org.apache.spark.sql.types._import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark) .tableName("new_table") .addColumn( DeltaTable.columnBuilder(spark, "id") .dataType(LongType) .generatedByDefaultAsIdentity(start = 5L, step = 1L) .build()) .addColumn( DeltaTable.columnBuilder(spark, "event_date") .dataType(DateType) .nullable(true) .build()) .addColumn( DeltaTable.columnBuilder(spark, "some_value") .dataType(LongType) .nullable(true) .build()) .execute()
// Insert records including existing IDsval oldTableDF = spark.table("old_table").select("id", "event_date", "some_value")oldTableDF.write .format("delta") .mode("append") .saveAsTable("new_table")
// Insert records and generate new IDsval newRecordsDF = spark.table("new_records").select("event_date", "some_value")newRecordsDF.write .format("delta") .mode("append") .saveAsTable("new_table")
标识列限制
标题为“标识列限制”的部分使用标识列时存在以下限制:
- 不支持在启用标识列的表上进行并发事务。
- 您不能按标识列对表进行分区。
- 您不能
ADD
、REPLACE
或CHANGE
标识列。 - 您不能更新现有记录的标识列的值。
指定列的默认值
标题为“指定列的默认值”的部分Delta 允许为 Delta 表中的列指定默认表达式。当用户在不明确提供某些列的值的情况下写入这些表,或者当他们明确使用 SQL 关键字 DEFAULT
来指定列时,Delta 会自动为这些列生成默认值。有关详细信息,请参阅专门的文档页面。
列名中使用特殊字符
标题为“列名中使用特殊字符”的部分默认情况下,列名不支持空格和 ,;{}()\n\t=
等特殊字符。要在表的列名中包含这些特殊字符,请启用列映射。
默认表属性
标题为“默认表属性”的部分在 SparkSession 中设置的 Delta Lake 配置会覆盖会话中创建的新 Delta Lake 表的默认表属性。SparkSession 中使用的前缀与表属性中使用的配置不同。
Delta Lake 配置 | SparkSession 配置 |
---|---|
delta.<conf> | spark.databricks.delta.properties.defaults.<conf> |
例如,要为会话中创建的所有新 Delta Lake 表设置 delta.appendOnly = true
属性,请设置以下内容:
SET spark.databricks.delta.properties.defaults.appendOnly = true
读取表
标题为“读取表”的部分您可以通过指定表名或路径将 Delta 表加载为 DataFrame:
SELECT * FROM default.people10m -- query table in the metastore
SELECT * FROM delta.`/tmp/delta/people10m` -- query table by path
spark.table("default.people10m") # query table in the metastore
spark.read.format("delta").load("/tmp/delta/people10m") # query table by path
spark.table("default.people10m") // query table in the metastore
spark.read.format("delta").load("/tmp/delta/people10m") // create table by path
import io.delta.implicits._spark.read.delta("/tmp/delta/people10m")
返回的 DataFrame 会自动读取表的最新快照以进行任何查询;您无需运行 REFRESH TABLE
。当查询中存在适用谓词时,Delta Lake 会自动使用分区和统计信息来读取最少的数据。
查询表的旧快照(时间旅行)
标题为“查询表的旧快照(时间旅行)”的部分Delta Lake 时间旅行允许您查询 Delta 表的旧快照。时间旅行有许多用例,包括:
- 重新创建分析、报告或输出(例如,机器学习模型的输出)。这对于调试或审计可能很有用,尤其是在受监管的行业中。
- 编写复杂的时态查询。
- 修复数据中的错误。
- 为一组针对快速变化的查询提供快照隔离。
本节描述了查询表旧版本的受支持方法、数据保留问题,并提供了示例。
本节展示了如何查询 Delta 表的旧版本。
SQL AS OF
语法
标题为“SQL AS OF 语法”的部分SELECT * FROM table_name TIMESTAMP AS OF timestamp_expressionSELECT * FROM table_name VERSION AS OF version
-
timestamp_expression
可以是以下之一:'2018-10-18T22:15:12.013Z'
,即可以转换为时间戳的字符串cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
,即日期字符串current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- 任何其他是或可以转换为时间戳的表达式
-
version
是一个长整型值,可以从DESCRIBE HISTORY table_spec
的输出中获取。
timestamp_expression
和 version
都不能是子查询。
SELECT * FROM default.people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123
DataFrameReader 选项
标题为“DataFrameReader 选项”的部分DataFrameReader 选项允许您从固定到表的特定版本的 Delta 表创建 DataFrame。
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")
对于 timestamp_string
,仅接受日期或时间戳字符串。例如,"2019-01-01"
和 "2019-01-01T00:00:00.000Z"
。
一种常见模式是在作业执行期间使用 Delta 表的最新状态来更新下游应用程序。
由于 Delta 表会自动更新,因此从 Delta 表加载的 DataFrame 在底层数据更新时,在不同调用之间可能会返回不同的结果。通过使用时间旅行,您可以修复 DataFrame 在不同调用之间返回的数据:
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/delta/people10m`")latest_version = history.selectExpr("max(version)").collect()df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")
-
修复用户 111 意外删除的表
yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")df.where("userId = 111").write.format("delta").mode("append").save("/tmp/delta/events") -
修复表中意外的错误更新
yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")df.createOrReplaceTempView("my_table_yesterday")spark.sql('''MERGE INTO delta.`/tmp/delta/events` targetUSING my_table_yesterday sourceON source.userId = target.userIdWHEN MATCHED THEN UPDATE SET *''') -
查询上周新增客户的数量
last_week = spark.sql("SELECT CAST(date_sub(current_date(), 7) AS STRING)").collect()[0][0]df = spark.read.format("delta").option("timestampAsOf", last_week).load("/tmp/delta/events")last_week_count = df.select("userId").distinct().count()count = spark.read.format("delta").load("/tmp/delta/events").select("userId").distinct().count()new_customers_count = count - last_week_count
数据保留
标题为“数据保留”的部分要时间旅行到以前的版本,您必须保留该版本的所有日志文件和数据文件。
支持 Delta 表的数据文件从不自动删除;数据文件仅在您运行 VACUUM 时删除。VACUUM
不会删除 Delta 日志文件;日志文件在检查点写入后会自动清理。
默认情况下,您可以将 Delta 表时间旅行到最多 30 天前,除非您已:
- 在 Delta 表上运行
VACUUM
。 - 使用以下表属性更改了数据或日志文件保留期限:
delta.logRetentionDuration = "interval <interval>"
:控制表历史记录的保留时间。默认值为interval 30 days
。
每次写入检查点时,Delta 都会自动清理超出保留间隔的日志条目。如果您将此配置设置为足够大的值,则会保留许多日志条目。这不会影响性能,因为对日志的操作是常量时间。历史记录操作是并行的,但随着日志大小的增加会变得更加昂贵。
-
delta.deletedFileRetentionDuration = "interval <interval>"
:控制文件必须在多久以前被删除,然后才能成为VACUUM
的候选文件。默认值为interval 7 days
。即使您在 Delta 表上运行
VACUUM
,也要访问 30 天的历史数据,请设置delta.deletedFileRetentionDuration = "interval 30 days"
。此设置可能会导致您的存储成本增加。
写入表
标题为“写入表”的部分要以原子方式将新数据添加到现有 Delta 表,请使用 append
模式:
INSERT INTO default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("append").save("/tmp/delta/people10m")df.write.format("delta").mode("append").saveAsTable("default.people10m")
df.write.format("delta").mode("append").save("/tmp/delta/people10m")df.write.format("delta").mode("append").saveAsTable("default.people10m")
import io.delta.implicits._df.write.mode("append").delta("/tmp/delta/people10m")
要以原子方式替换表中的所有数据,请使用 overwrite
模式:
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
import io.delta.implicits._df.write.mode("overwrite").delta("/tmp/delta/people10m")
您可以选择性地仅覆盖与任意表达式匹配的数据。此功能在 Delta Lake 1.1.0 及更高版本中支持 DataFrame,在 Delta Lake 2.4.0 及更高版本中支持 SQL。
以下命令以原子方式将目标表中(按 start_date
分区)一月份的事件替换为 replace_data
中的数据:
INSERT INTO TABLE events REPLACE WHERE start_data >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
replace_data.write \ .format("delta") \ .mode("overwrite") \ .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \ .save("/tmp/delta/events")
replace_data.write .format("delta") .mode("overwrite") .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") .save("/tmp/delta/events")
此示例代码会写入 replace_data
中的数据,验证其是否全部匹配谓词,并执行原子替换。如果您想写入不完全匹配谓词的数据,以替换目标表中匹配的行,则可以通过将 spark.databricks.delta.replaceWhere.constraintCheck.enabled
设置为 false 来禁用约束检查:
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
在 Delta Lake 1.0.0 及更早版本中,replaceWhere
仅覆盖分区列上匹配谓词的数据。以下命令以原子方式将目标表中(按 date
分区)一月份的数据替换为 df
中的数据:
df.write \ .format("delta") \ .mode("overwrite") \ .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \ .save("/tmp/delta/people10m")
df.write .format("delta") .mode("overwrite") .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") .save("/tmp/delta/people10m")
在 Delta Lake 1.1.0 及更高版本中,如果您想回退到旧行为,可以禁用 spark.databricks.delta.replaceWhere.dataColumns.enabled
标志:
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
动态分区覆盖
标题为“动态分区覆盖”的部分Delta Lake 2.0 及更高版本支持分区表的动态分区覆盖模式。
在动态分区覆盖模式下,我们将覆盖写入将提交新数据的每个逻辑分区中的所有现有数据。写入不包含数据的任何现有逻辑分区将保持不变。此模式仅适用于以覆盖模式写入数据时:SQL 中的 INSERT OVERWRITE
或使用 df.write.mode("overwrite")
的 DataFrame 写入。
通过将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode
设置为 dynamic
来配置动态分区覆盖模式。您还可以通过将 DataFrameWriter
选项 partitionOverwriteMode
设置为 dynamic
来启用此功能。如果存在,查询特定选项会覆盖会话配置中定义的模式。partitionOverwriteMode
的默认值为 static
。
SET spark.sql.sources.partitionOverwriteMode=dynamic;INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
df.write \ .format("delta") \ .mode("overwrite") \ .option("partitionOverwriteMode", "dynamic") \ .saveAsTable("default.people10m")
df.write .format("delta") .mode("overwrite") .option("partitionOverwriteMode", "dynamic") .saveAsTable("default.people10m")
有关 Delta Lake 支持更新表的更多信息,请参阅表删除、更新和合并。
限制写入文件中的行数
标题为“限制写入文件中的行数”的部分您可以使用 SQL 会话配置 spark.sql.files.maxRecordsPerFile
来指定写入 Delta Lake 表的单个文件中的最大记录数。指定零或负值表示无限制。
在使用 DataFrame API 写入 Delta Lake 表时,您还可以使用 DataFrameWriter 选项 maxRecordsPerFile
。指定 maxRecordsPerFile
时,将忽略 SQL 会话配置 spark.sql.files.maxRecordsPerFile
的值。
df.write.format("delta") \ .mode("append") \ .option("maxRecordsPerFile", "10000") \ .save("/tmp/delta/people10m")
df.write.format("delta") .mode("append") .option("maxRecordsPerFile", "10000") .save("/tmp/delta/people10m")
幂等写入
标题为“幂等写入”的部分有时,由于各种原因(例如作业遇到故障),写入 Delta 表的作业会重新启动。失败的作业可能在终止之前已将数据写入 Delta 表,也可能未写入。如果数据已写入 Delta 表,则重新启动的作业会将相同的数据写入 Delta 表,从而导致数据重复。
为了解决这个问题,Delta 表支持以下 DataFrameWriter
选项,以使写入具有幂等性:
txnAppId
:您可以在每次DataFrame
写入时传递的唯一字符串。例如,这可以是作业的名称。txnVersion
:单调递增的数字,用作事务版本。此数字对于写入 Delta 表的数据必须是唯一的。例如,这可以是首次尝试查询时的纪元秒数。同一作业的任何后续重新启动都必须具有相同的txnVersion
值。
上述选项组合对于每次摄取到 Delta 表中的新数据都必须是唯一的,并且 txnVersion
必须高于上次摄取到 Delta 表中的数据。例如:
- 上次成功写入的数据包含选项值
dailyETL:23423
(txnAppId:txnVersion
)。 - 下次写入数据时,
txnAppId
应为dailyETL
,txnVersion
至少应为23424
(比上次写入数据的txnVersion
多一个)。 - 任何尝试写入
txnAppId = dailyETL
且txnVersion
为23422
或更小的数据都将被忽略,因为txnVersion
小于表中上次记录的txnVersion
。 - 尝试写入
txnAppId:txnVersion
为anotherETL:23424
的数据成功写入表,因为它包含与上次摄取数据中相同选项值不同的txnAppId
。
您还可以通过设置 Spark 会话配置 spark.databricks.delta.write.txnAppId
和 spark.databricks.delta.write.txnVersion
来配置幂等写入。此外,您可以将 spark.databricks.delta.write.txnVersion.autoReset.enabled
设置为 true,以便在每次写入后自动重置 spark.databricks.delta.write.txnVersion
。当同时设置写入器选项和会话配置时,我们将使用写入器选项值。
SET spark.databricks.delta.write.txnAppId = ...;SET spark.databricks.delta.write.txnVersion = ...;SET spark.databricks.delta.write.txnVersion.autoReset.enabled = true; -- if set to true, this will reset txnVersion after every write
app_id = ... # A unique string that is used as an application ID.version = ... # A monotonically increasing number that acts as transaction version.
dataFrame.write.format(...).option("txnVersion", version).option("txnAppId", app_id).save(...)
val appId = ... // A unique string that is used as an application ID.version = ... // A monotonically increasing number that acts as transaction version.
dataFrame.write.format(...).option("txnVersion", version).option("txnAppId", appId).save(...)
设置用户定义的提交元数据
标题为“设置用户定义的提交元数据”的部分您可以使用 DataFrameWriter 选项 userMetadata
或 SparkSession 配置 spark.databricks.delta.commitInfo.userMetadata
将用户定义的字符串作为元数据指定在这些操作进行的提交中。如果两者都已指定,则选项优先。此用户定义的元数据可在历史记录操作中读取。
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-dataINSERT OVERWRITE default.people10m SELECT * FROM morePeople
df.write.format("delta") \ .mode("overwrite") \ .option("userMetadata", "overwritten-for-fixing-incorrect-data") \ .save("/tmp/delta/people10m")
df.write.format("delta") .mode("overwrite") .option("userMetadata", "overwritten-for-fixing-incorrect-data") .save("/tmp/delta/people10m")
模式验证
标题为“模式验证”的部分Delta Lake 会自动验证正在写入的 DataFrame 的模式是否与表的模式兼容。Delta Lake 使用以下规则来确定从 DataFrame 到表的写入是否兼容:
- 所有 DataFrame 列都必须存在于目标表中。如果 DataFrame 中存在表中不存在的列,则会引发异常。表中存在但 DataFrame 中不存在的列将设置为 null。
- DataFrame 列数据类型必须与目标表中的列数据类型匹配。如果不匹配,则会引发异常。
- DataFrame 列名不能仅通过大小写来区分。这意味着您不能在同一表中定义“Foo”和“foo”等列。虽然您可以在区分大小写或不区分大小写(默认)模式下使用 Spark,但 Parquet 在存储和返回列信息时是区分大小写的。Delta Lake 在存储模式时保留大小写但忽略大小写,并有此限制以避免潜在的错误、数据损坏或丢失问题。
Delta Lake 支持 DDL 显式添加新列和自动更新模式的能力。
如果您将其他选项(例如 partitionBy
)与追加模式结合使用,Delta Lake 会验证它们是否匹配,并对任何不匹配的情况抛出错误。当不存在 partitionBy
时,追加会自动遵循现有数据的分区。
更新表模式
标题为“更新表模式”的部分Delta Lake 允许您更新表的模式。支持以下类型的更改:
- 添加新列(在任意位置)
- 重新排序现有列
您可以使用 DDL 显式进行这些更改,也可以使用 DML 隐式进行更改。
显式更新模式
标题为“显式更新模式”的部分您可以使用以下 DDL 显式更改表的模式。
添加列
标题为“添加列”的部分ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
默认情况下,可为空性为 true
。
要向嵌套字段添加列,请使用:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
如果在运行 ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
之前的模式是:
- root| - colA| - colB| +-field1| +-field2
之后的模式是:
- root| - colA| - colB| +-field1| +-nested| +-field2
更改列注释或顺序
标题为“更改列注释或顺序”的部分ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
要更改嵌套字段中的列,请使用:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
如果在运行 ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST
之前的模式是:
- root| - colA| - colB| +-field1| +-field2
之后的模式是:
- root| - colA| - colB| +-field2| +-field1
替换列
标题为“替换列”的部分ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
当运行以下 DDL 时:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
如果之前的模式是:
- root| - colA| - colB| +-field1| +-field2
之后的模式是:
- root| - colC| - colB| +-field2| +-nested| +-field1| - colA
重命名列
标题为“重命名列”的部分要重命名列而不重写任何列的现有数据,您必须为表启用列映射。请参阅启用列映射。
重命名列:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
重命名嵌套字段:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
当您运行以下命令时:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
如果之前的模式是:
- root| - colA| - colB| +-field1| +-field2
那么之后的模式是:
- root| - colA| - colB| +-field001| +-field2
删除列
标题为“删除列”的部分要将列作为仅元数据操作删除而不重写任何数据文件,您必须为表启用列映射。请参阅启用列映射。
删除列:
ALTER TABLE table_name DROP COLUMN col_name
删除多个列:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
更改列类型或名称
标题为“更改列类型或名称”的部分您可以通过重写表来更改列的类型或名称,或删除列。为此,请使用 overwriteSchema
选项:
更改列类型
标题为“更改列类型”的部分spark.read.table(...) \ .withColumn("birthDate", col("birthDate").cast("date")) \ .write \ .format("delta") \ .mode("overwrite") .option("overwriteSchema", "true") \ .saveAsTable(...)
更改列名
标题为“更改列名”的部分spark.read.table(...) \ .withColumnRenamed("dateOfBirth", "birthDate") \ .write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .saveAsTable(...)
自动模式更新
标题为“自动模式更新”的部分Delta Lake 可以在 DML 事务(追加或覆盖)中自动更新表的模式,并使模式与正在写入的数据兼容。
添加列
标题为“添加列”的部分当满足以下条件时,DataFrame 中存在但表中缺失的列会自动作为写入事务的一部分添加:
write
或writeStream
具有.option("mergeSchema", "true")
spark.databricks.delta.schema.autoMerge.enabled
为true
当两个选项都指定时,DataFrameWriter
中的选项优先。添加的列将附加到它们所在的结构体的末尾。在附加新列时,会保留大小写。
NullType
列
标题为“NullType 列”的部分由于 Parquet 不支持 NullType
,因此在写入 Delta 表时,NullType
列将从 DataFrame 中删除,但仍存储在模式中。当该列收到不同的数据类型时,Delta Lake 会将模式合并到新的数据类型。如果 Delta Lake 为现有列收到 NullType
,则会保留旧模式,并在写入期间删除新列。
流式传输中的 NullType
不受支持。由于在使用流式传输时必须设置模式,因此这种情况应该非常罕见。NullType
也不接受复杂类型,例如 ArrayType
和 MapType
。
替换表模式
标题为“替换表模式”的部分默认情况下,覆盖表中的数据不会覆盖模式。当使用 overwrite
模式且不带 replaceWhere
覆盖表时,您可能仍希望覆盖正在写入的数据的模式。您可以通过将 overwriteSchema
选项设置为 true
来替换表的模式和分区:
df.write.option("overwriteSchema", "true")
表上的视图
标题为“表上的视图”的部分Delta Lake 支持在 Delta 表之上创建视图,就像您可能使用数据源表一样。
使用视图操作时的核心挑战是解析模式。如果您更改 Delta 表模式,则必须重新创建派生视图以考虑模式的任何添加。例如,如果您向 Delta 表添加新列,则必须确保此列在基于该基表构建的相应视图中可用。
表属性
标题为“表属性”的部分您可以使用 CREATE
和 ALTER
中的 TBLPROPERTIES
将您自己的元数据存储为表属性。然后,您可以 SHOW
该元数据。例如:
ALTER TABLE default.people10m SET TBLPROPERTIES ('department' = 'accounting', 'delta.appendOnly' = 'true');
-- Show the table's properties.SHOW TBLPROPERTIES default.people10m;
-- Show just the 'department' table property.SHOW TBLPROPERTIES default.people10m ('department');
TBLPROPERTIES
作为 Delta 表元数据的一部分存储。如果 Delta 表已存在于给定位置,则您不能在 CREATE
语句中定义新的 TBLPROPERTIES
。
此外,为了调整行为和性能,Delta Lake 支持某些 Delta 表属性:
- 阻止 Delta 表中的删除和更新:
delta.appendOnly=true
。 - 配置时间旅行保留属性:
delta.logRetentionDuration=<interval-string>
和delta.deletedFileRetentionDuration=<interval-string>
。有关详细信息,请参阅数据保留。 - 配置收集统计信息的列数:
delta.dataSkippingNumIndexedCols=n
。此属性指示写入器仅收集表中前n
列的统计信息。此外,数据跳过代码会忽略此列索引之外的任何列的统计信息。此属性仅对写入的新数据生效。
您还可以在首次提交到 Delta 表时使用 Spark 配置设置 delta.
前缀属性。例如,要使用属性 delta.appendOnly=true
初始化 Delta 表,请将 Spark 配置 spark.databricks.delta.properties.defaults.appendOnly
设置为 true
。例如:
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
另请参阅Delta 表属性参考。
将表模式和属性同步到 Hive 元数据存储
标题为“将表模式和属性同步到 Hive 元数据存储”的部分您可以通过将 spark.databricks.delta.catalog.update.enabled
设置为 true
来启用表模式和属性的异步同步到元数据存储。每当 Delta 客户端检测到其中任何一个因更新而更改时,它都会将更改同步到元数据存储。
模式存储在 HMS 的表属性中。如果模式很小,它将直接存储在键 spark.sql.sources.schema
下。
{ "spark.sql.sources.schema": "{'name':'col1','type':'string','nullable':true, 'metadata':{}},{'name':'col2','type':'string','nullable':true,'metadata':{}}"}
如果模式很大,模式将被分解为多个部分。将它们拼接在一起应该会得到正确的模式。例如:
{ "spark.sql.sources.schema.numParts": "4", "spark.sql.sources.schema.part.1": "{'name':'col1','type':'string','nullable':tr", "spark.sql.sources.schema.part.2": "ue, 'metadata':{}},{'name':'co", "spark.sql.sources.schema.part.3": "l2','type':'string','nullable':true,'meta", "spark.sql.sources.schema.part.4": "data':{}}"}
表元数据
标题为“表元数据”的部分Delta Lake 具有探索表元数据的丰富功能。
它支持 SHOW COLUMNS
和 DESCRIBE TABLE
。
它还提供了以下独有命令
DESCRIBE DETAIL
标题为“DESCRIBE DETAIL”的部分提供有关模式、分区、表大小等信息。有关详细信息,请参阅检索 Delta 表详细信息。
DESCRIBE HISTORY
标题为“DESCRIBE HISTORY”的部分提供血缘信息,包括操作、用户等,以及每次对表的写入的操作指标。表历史记录保留 30 天。有关详细信息,请参阅检索 Delta 表历史记录。
配置 SparkSession
标题为“配置 SparkSession”的部分对于许多 Delta Lake 操作,您可以通过在创建新的 SparkSession
时设置以下配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 3.0 起)的集成。
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .appName("...") \ .master("...") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate()
import org.apache.spark.sql.SparkSession
val spark = SparkSession .builder() .appName("...") .master("...") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession .builder() .appName("...") .master("...") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate();
或者,您可以在使用 spark-submit
提交 Spark 应用程序时,或在启动 spark-shell
或 pyspark
时,将它们指定为命令行参数来添加配置。
spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
配置存储凭据
标题为“配置存储凭据”的部分Delta Lake 使用 Hadoop FileSystem API 访问存储系统。存储系统的凭据通常可以通过 Hadoop 配置来设置。Delta Lake 提供了多种设置 Hadoop 配置的方式,类似于 Apache Spark。
Spark 配置
标题为“Spark 配置”的部分当您在集群上启动 Spark 应用程序时,您可以以 spark.hadoop.*
的形式设置 Spark 配置,以传递您的自定义 Hadoop 配置。例如,为 spark.hadoop.a.b.c
设置值将把该值作为 Hadoop 配置 a.b.c
传递,Delta Lake 将使用它来访问 Hadoop FileSystem API。
有关更多详细信息,请参阅Spark 文档。
SQL 会话配置
标题为“SQL 会话配置”的部分Spark SQL 会将所有当前的SQL 会话配置传递给 Delta Lake,Delta Lake 将使用它们来访问 Hadoop FileSystem API。例如,SET a.b.c=x.y.z
将告诉 Delta Lake 将值 x.y.z
作为 Hadoop 配置 a.b.c
传递,Delta Lake 将使用它来访问 Hadoop FileSystem API。
DataFrame 选项
标题为“DataFrame 选项”的部分除了通过 Spark(集群)配置或 SQL 会话配置设置 Hadoop 文件系统配置外,Delta 还支持在使用 DataFrameReader.load(path)
或 DataFrameWriter.save(path)
读取或写入表时,从 DataFrameReader
和 DataFrameWriter
选项(即以 fs.
前缀开头的选项键)读取 Hadoop 文件系统配置。
例如,您可以通过 DataFrame 选项传递存储凭据
df1 = spark.read.format("delta") \ .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>") \ .read("...")df2 = spark.read.format("delta") \ .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>") \ .read("...")df1.union(df2).write.format("delta") \ .mode("overwrite") \ .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>") \ .save("...")
val df1 = spark.read.format("delta") .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>") .read("...")val df2 = spark.read.format("delta") .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>") .read("...")df1.union(df2).write.format("delta") .mode("overwrite") .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>") .save("...")
您可以在存储配置中找到存储的 Hadoop 文件系统配置详细信息。