跳到内容

表删除、更新和合并

Delta Lake 支持多种语句,以方便从 Delta 表中删除数据和更新数据。

你可以从 Delta 表中删除与谓词匹配的数据。例如,在名为 people10m 的表或路径 /tmp/delta/people-10m 中,要删除 birthDate 列中值早于 1955 的所有人员对应的行,你可以运行以下命令

DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession

from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

有关详细信息,请参阅 Delta Lake API

你可以更新 Delta 表中与谓词匹配的数据。例如,在名为 people10m 的表或路径 /tmp/delta/people-10m 中,要将 gender 列中的缩写从 MF 更改为 MaleFemale,你可以运行以下命令

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession

from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)

有关详细信息,请参阅 Delta Lake API

你可以使用 MERGE SQL 操作将源表、视图或 DataFrame 中的数据插入或更新到目标 Delta 表中。Delta Lake 支持在 MERGE 中进行插入、更新和删除操作,并且支持超出 SQL 标准的扩展语法,以方便高级用例。

假设你有一个名为 people10mupdates 的源表或路径 /tmp/delta/people-10m-updates,其中包含目标表 people10m 或路径 /tmp/delta/people-10m 的新数据。其中一些新记录可能已存在于目标数据中。要合并新数据,你需要更新其中人员 id 已存在的行,并插入没有匹配 id 的新行。你可以运行以下命令

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)

有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession

from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()

有关 Scala、Java 和 Python 语法详细信息,请参阅 Delta Lake API

你可以使用 WHEN NOT MATCHED BY SOURCE 子句来 UPDATEDELETE 目标表中没有相应源表记录的记录。我们建议添加一个可选的条件子句,以避免完全重写目标表。

以下代码示例展示了使用此方法进行删除的基本语法,它使用源表的内容覆盖目标表并删除目标表中不匹配的记录。

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED
UPDATE SET *
WHEN NOT MATCHED
INSERT *
WHEN NOT MATCHED BY SOURCE
DELETE

以下示例向 WHEN NOT MATCHED BY SOURCE 子句添加了条件,并指定了要在不匹配的目标行中更新的值。

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'

以下是 merge 编程操作的详细描述。

  • 可以有任意数量的 whenMatchedwhenNotMatched 子句。

  • 当源行与目标表行根据匹配条件匹配时,执行 whenMatched 子句。这些子句具有以下语义。

    • whenMatched 子句最多可以有一个 update 和一个 delete 操作。merge 中的 update 操作仅更新匹配的目标行的指定列(类似于 update 操作)。delete 操作删除匹配的行。

    • 每个 whenMatched 子句可以有一个可选条件。如果此子句条件存在,则仅当子句条件为真时,才对任何匹配的源-目标行对执行 updatedelete 操作。

    • 如果存在多个 whenMatched 子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有 whenMatched 子句都必须具有条件。

    • 如果对于与合并条件匹配的源和目标行对,所有 whenMatched 条件都不为真,则目标行保持不变。

    • 要使用源数据集的相应列更新目标 Delta 表的所有列,请使用 whenMatched(...).updateAll()。这等效于

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

      对于目标 Delta 表的所有列。因此,此操作假定源表具有与目标表中相同的列,否则查询将引发分析错误。

  • 当源行与目标行不根据匹配条件匹配时,执行 whenNotMatched 子句。这些子句具有以下语义。

    • whenNotMatched 子句只能有 insert 操作。新行根据指定的列和相应的表达式生成。你无需指定目标表中的所有列。对于未指定的目标列,插入 NULL

    • 每个 whenNotMatched 子句可以有一个可选条件。如果存在子句条件,则仅当该行条件为真时,才插入源行。否则,忽略源列。

    • 如果存在多个 whenNotMatched 子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有 whenNotMatched 子句都必须具有条件。

    • 要使用源数据集的相应列插入目标 Delta 表的所有列,请使用 whenNotMatched(...).insertAll()。这等效于

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

      对于目标 Delta 表的所有列。因此,此操作假定源表具有与目标表中相同的列,否则查询将引发分析错误。

  • 当目标行根据合并条件不匹配任何源行时,执行 whenNotMatchedBySource 子句。这些子句具有以下语义。

    • whenNotMatchedBySource 子句可以指定 deleteupdate 操作。
    • 每个 whenNotMatchedBySource 子句可以有一个可选条件。如果存在子句条件,则仅当该行条件为真时,才修改目标行。否则,目标行保持不变。
    • 如果存在多个 whenNotMatchedBySource 子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有 whenNotMatchedBySource 子句都必须具有条件。
    • 根据定义,whenNotMatchedBySource 子句没有源行可从中提取列值,因此不能引用源列。对于要修改的每个列,你可以指定文字值或对目标列执行操作,例如 SET target.deleted_count = target.deleted_count + 1

merge 自动验证插入和更新表达式生成的数据的 Schema 与表的 Schema 兼容。它使用以下规则来确定 merge 操作是否兼容

  • 对于 updateinsert 操作,指定的 target 列必须存在于 target Delta 表中。
  • 对于 updateAllinsertAll 操作,源数据集必须包含目标 Delta 表的所有列。源数据集可以有额外的列,它们将被忽略。

如果你不希望忽略额外列,而是希望更新目标表 Schema 以包含新列,请参阅 自动 Schema 演进

  • 对于所有操作,如果生成目标列的表达式生成的数据类型与目标 Delta 表中相应列的数据类型不同,merge 会尝试将它们转换为表中的类型。

Schema 演进允许用户解决合并时目标表和源表之间的 Schema 不匹配问题。它处理以下两种情况

  1. 源表中的列不存在于目标表中。新列被添加到目标 Schema,其值使用源值进行插入或更新。
  2. 目标表中的列不存在于源表中。目标 Schema 保持不变;附加目标列中的值要么保持不变(对于 UPDATE),要么设置为 NULL(对于 INSERT)。

以下是一些有无 Schema 演进的 merge 操作效果示例。

查询(在 SQL 中)无 Schema 演进时的行为(默认)有 Schema 演进时的行为
目标: key, value 源: key, value, new_valuesql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *表 Schema 保持不变;只更新/插入列 key, value表 Schema 更改为 (key, value, new_value)。现有匹配记录将使用源中的 valuenew_value 进行更新。新行将使用 Schema (key, value, new_value) 插入。
目标: key, old_value 源: key, new_valuesql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *UPDATEINSERT 操作会抛出错误,因为目标列 old_value 不在源中。表 Schema 更改为 (key, old_value, new_value)。现有匹配记录将使用源中的 new_value 进行更新,同时 old_value 保持不变。新记录将使用指定的 keynew_value 插入,并且 old_valueNULL
目标: key, old_value 源: key, new_valuesql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_valueUPDATE 抛出错误,因为目标表中不存在列 new_value表 Schema 更改为 (key, old_value, new_value)。现有匹配记录将使用源中的 new_value 进行更新,同时 old_value 保持不变,不匹配的记录将 new_value 设置为 NULL
目标: key, old_value 源: key, new_valuesql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value)INSERT 抛出错误,因为目标表中不存在列 new_value表 Schema 更改为 (key, old_value, new_value)。新记录将使用指定的 keynew_value 插入,并且 old_valueNULL。现有记录将 new_value 设置为 NULL,同时 old_value 保持不变。请参阅注释 (1)。

包含结构体数组的 Schema 的特殊注意事项

标题为“包含结构体数组的 Schema 的特殊注意事项”的章节

Delta MERGE INTO 支持按名称解析结构体字段,并演进结构体数组的 Schema。启用 Schema 演进后,目标表 Schema 将针对结构体数组进行演进,这也适用于数组内部的任何嵌套结构体。

以下是一些有无 Schema 演进的合并操作对结构体数组的影响示例。

源 Schema目标 Schema无 Schema 演进时的行为(默认)有 Schema 演进时的行为
array<struct<b: string, a: string>>array<struct<a: int, b: int>>表 Schema 保持不变。列将按名称解析并进行更新或插入。表 Schema 保持不变。列将按名称解析并进行更新或插入。
array<struct<a: int, c: string, d: string>>array<struct<a: string, b: string>>updateinsert 抛出错误,因为 cd 不存在于目标表中。表 Schema 更改为 array<struct<a: string, b: string, c: string, d: string>>。cd 在目标表的现有条目中作为 NULL 插入。updateinserta 转换为字符串和 b 作为 NULL 填充源表中的条目。
array<struct<a: string, b: struct<c: string, d: string>>>array<struct<a: string, b: struct<c: string>>>updateinsert 抛出错误,因为 d 不存在于目标表中。目标表 Schema 更改为 array<struct<a: string, b: struct<c: string, d: string>>>。d 在目标表的现有条目中作为 NULL 插入。

你可以使用以下方法减少合并所需的时间

  • 减少匹配搜索空间:默认情况下,merge 操作会搜索整个 Delta 表以查找源表中的匹配项。加快 merge 的一种方法是在匹配条件中添加已知约束以减少搜索空间。例如,假设你有一个按 countrydate 分区的表,并且你希望使用 merge 更新最后一天和特定国家的信息。添加条件

    events.date = current_date() AND events.country = 'USA'

    将使查询更快,因为它只在相关的分区中查找匹配项。此外,它还将减少与其他并发操作冲突的机会。有关更多详细信息,请参阅 并发控制

  • 压缩文件:如果数据存储在许多小文件中,读取数据以搜索匹配项可能会变慢。你可以将小文件压缩成更大的文件以提高读取吞吐量。有关详细信息,请参阅 压缩文件

  • 控制写入的 shuffle 分区merge 操作会多次 shuffle 数据以计算和写入更新后的数据。用于 shuffle 的任务数由 Spark 会话配置 spark.sql.shuffle.partitions 控制。设置此参数不仅控制并行度,还决定输出文件的数量。增加值会增加并行度,但也会生成更多的小数据文件。

  • 在写入前重新分区输出数据:对于分区表,merge 可能会生成比 shuffle 分区数量多得多的文件,这会成为性能瓶颈。这是因为每个 shuffle 任务都可以在多个分区中写入多个文件。在许多情况下,在写入之前,通过表的 partition 列重新分区输出数据会有帮助。可以通过将 Spark 会话配置 spark.databricks.delta.merge.repartitionBeforeWrite.enabled 设置为 true 来启用此功能。

以下是一些如何在不同场景中使用 merge 的示例。

常见的 ETL 用例是将日志收集到 Delta 表中,通过将其附加到表中。然而,源通常会生成重复的日志记录,需要下游去重步骤来处理它们。使用 merge,你可以避免插入重复记录。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *

如果你知道只有几天可能会出现重复记录,你可以通过按日期对表进行分区,然后指定目标表的日期范围来进一步优化查询。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *

这比之前的命令更有效,因为它只在最近 7 天的日志中查找重复项,而不是整个表。此外,你可以将此仅插入合并与结构化流式处理结合使用,以对日志执行连续去重。

  • 在流式查询中,你可以在 foreachBatch 中使用合并操作,将任何流式数据持续写入到 Delta 表中并进行去重。有关 foreachBatch 的更多信息,请参阅以下流式示例
  • 在另一个流式查询中,你可以持续从这个 Delta 表中读取去重后的数据。这是可能的,因为仅插入的合并操作只向 Delta 表中追加新数据。

另一个常见的操作是 SCD Type 2,它维护维度表中每个键的所有更改历史记录。此类操作需要更新现有行以将键的先前值标记为旧值,并将新行插入为最新值。给定一个带有更新的源表和带有维度数据的目标表,SCD Type 2 可以用 merge 来表达。

这是一个具体的示例,说明如何维护客户地址的历史记录以及每个地址的活动日期范围。当客户的地址需要更新时,你必须将以前的地址标记为非当前地址,更新其活动日期范围,并将新地址添加为当前地址。

customersTable = ... # DeltaTable with schema (customerId, address, current, effectiveDate, endDate)
updatesDF = ... # DataFrame with schema (customerId, address, effectiveDate)
# Rows to INSERT new addresses of existing customers
newAddressesToInsert = updatesDF \
.alias("updates") \
.join(customersTable.toDF().alias("customers"), "customerid") \
.where("customers.current = true AND updates.address <> customers.address")
# Stage the update by unioning two sets of rows
# 1. Rows that will be inserted in the whenNotMatched clause
# 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers
stagedUpdates = (
newAddressesToInsert
.selectExpr("NULL as mergeKey", "updates.*") # Rows for 1
.union(updatesDF.selectExpr("updates.customerId as mergeKey", "*")) # Rows for 2.
)
# Apply SCD Type 2 operation using merge
customersTable.alias("customers").merge(
stagedUpdates.alias("staged_updates"),
"customers.customerId = mergeKey") \
.whenMatchedUpdate(
condition = "customers.current = true AND customers.address <> staged_updates.address",
set = { # Set current to false and endDate to source's effective date.
"current": "false",
"endDate": "staged_updates.effectiveDate"
}
).whenNotMatchedInsert(
values = {
"customerid": "staged_updates.customerId",
"address": "staged_updates.address",
"current": "true",
"effectiveDate": "staged_updates.effectiveDate", # Set current to true along with the new address and its effective date.
"endDate": "null"
}
).execute()

与 SCD 类似,另一个常见的用例,通常称为变更数据捕获 (CDC),是将外部数据库生成的所有数据更改应用于 Delta 表。换句话说,需要将应用于外部表的一组更新、删除和插入应用于 Delta 表。你可以使用 merge 如下进行此操作。

deltaTable = ... # DeltaTable with schema (key, value)
# DataFrame with changes having following columns
# - key: key of the change
# - time: time of change for ordering between changes (can replaced by other ordering id)
# - newValue: updated or inserted value if key was not deleted
# - deleted: true if the key was deleted, false if the key was inserted or updated
changesDF = spark.table("changes")
# Find the latest change for each key based on the timestamp
# Note: For nested structs, max on struct is computed as
# max on first struct field, if equal fall back to second fields, and so on.
latestChangeForEachKey = changesDF \
.selectExpr("key", "struct(time, newValue, deleted) as otherCols") \
.groupBy("key") \
.agg(max("otherCols").alias("latest")) \
.select("key", "latest.*") \
deltaTable.alias("t").merge(
latestChangeForEachKey.alias("s"),
"s.key = t.key") \
.whenMatchedDelete(condition = "s.deleted = true") \
.whenMatchedUpdate(set = {
"key": "s.key",
"value": "s.newValue"
}) \
.whenNotMatchedInsert(
condition = "s.deleted = false",
values = {
"key": "s.key",
"value": "s.newValue"
}
).execute()

使用 foreachBatch 从流式查询进行插入更新

标题为“使用 foreachBatch 从流式查询进行插入更新”的章节

你可以结合使用 mergeforeachBatch(有关更多信息,请参阅 foreachbatch)将复杂的插入更新从流式查询写入 Delta 表。例如

  • 在更新模式下写入流式聚合:这比完整模式效率高得多。

    from delta.tables import *
    deltaTable = DeltaTable.forPath(spark, "/data/aggregates")
    # Function to upsert microBatchOutputDF into Delta table using merge
    def upsertToDelta(microBatchOutputDF, batchId):
    deltaTable.alias("t").merge(
    microBatchOutputDF.alias("s"),
    "s.key = t.key") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
    }
    # Write the output of a streaming aggregation query into Delta table
    streamingAggregatesDF.writeStream \
    .format("delta") \
    .foreachBatch(upsertToDelta) \
    .outputMode("update") \
    .start()
  • 将数据库变更流写入 Delta 表用于写入变更数据的合并查询 可以在 foreachBatch 中使用,以将变更流持续应用于 Delta 表。

  • 将流数据写入 Delta 表并进行去重:用于去重的仅插入合并查询可以在 foreachBatch 中使用,以持续将数据(包含重复项)写入 Delta 表并自动去重。