表删除、更新和合并
Delta Lake 支持多种语句,以方便从 Delta 表中删除数据和更新数据。
从表中删除
标题为“从表中删除”的章节你可以从 Delta 表中删除与谓词匹配的数据。例如,在名为 people10m
的表或路径 /tmp/delta/people-10m
中,要删除 birthDate
列中值早于 1955
的所有人员对应的行,你可以运行以下命令
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession。
from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.deltaTable.delete(col("birthDate") < "1955-01-01")
import io.delta.tables.*;import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m");
// Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'");
// Declare the predicate by using Spark SQL functions.deltaTable.delete(functions.col("birthDate").lt(functions.lit("1955-01-01")));
有关详细信息,请参阅 Delta Lake API。
更新表
标题为“更新表”的章节你可以更新 Delta 表中与谓词匹配的数据。例如,在名为 people10m
的表或路径 /tmp/delta/people-10m
中,要将 gender
列中的缩写从 M
或 F
更改为 Male
或 Female
,你可以运行以下命令
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession。
from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.deltaTable.update( condition = "gender = 'F'", set = { "gender": "'Female'" })
# Declare the predicate by using Spark SQL functions.deltaTable.update( condition = col('gender') == 'M', set = { 'gender': lit('Male') })
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.deltaTable.updateExpr( "gender = 'F'", Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.deltaTable.update( col("gender") === "M", Map("gender" -> lit("Male")));
import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");
// Declare the predicate by using a SQL-formatted string.deltaTable.updateExpr( "gender = 'F'", new HashMap<String, String>() {{ put("gender", "'Female'"); }});
// Declare the predicate by using Spark SQL functions.deltaTable.update( functions.col(gender).eq("M"), new HashMap<String, Column>() {{ put("gender", functions.lit("Male")); }});
有关详细信息,请参阅 Delta Lake API。
使用合并将数据插入或更新到表中
标题为“使用合并将数据插入或更新到表中”的章节你可以使用 MERGE
SQL 操作将源表、视图或 DataFrame 中的数据插入或更新到目标 Delta 表中。Delta Lake 支持在 MERGE
中进行插入、更新和删除操作,并且支持超出 SQL 标准的扩展语法,以方便高级用例。
假设你有一个名为 people10mupdates
的源表或路径 /tmp/delta/people-10m-updates
,其中包含目标表 people10m
或路径 /tmp/delta/people-10m
的新数据。其中一些新记录可能已存在于目标数据中。要合并新数据,你需要更新其中人员 id
已存在的行,并插入没有匹配 id
的新行。你可以运行以下命令
MERGE INTO people10mUSING people10mupdatesON people10m.id = people10mupdates.idWHEN MATCHED THEN UPDATE SET id = people10mupdates.id, firstName = people10mupdates.firstName, middleName = people10mupdates.middleName, lastName = people10mupdates.lastName, gender = people10mupdates.gender, birthDate = people10mupdates.birthDate, ssn = people10mupdates.ssn, salary = people10mupdates.salaryWHEN NOT MATCHED THEN INSERT ( id, firstName, middleName, lastName, gender, birthDate, ssn, salary ) VALUES ( people10mupdates.id, people10mupdates.firstName, people10mupdates.middleName, people10mupdates.lastName, people10mupdates.gender, people10mupdates.birthDate, people10mupdates.ssn, people10mupdates.salary )
有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession。
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \ .merge( dfUpdates.alias('updates'), 'people.id = updates.id' ) \ .whenMatchedUpdate(set = { "id": "updates.id", "firstName": "updates.firstName", "middleName": "updates.middleName", "lastName": "updates.lastName", "gender": "updates.gender", "birthDate": "updates.birthDate", "ssn": "updates.ssn", "salary": "updates.salary" } ) \ .whenNotMatchedInsert(values = { "id": "updates.id", "firstName": "updates.firstName", "middleName": "updates.middleName", "lastName": "updates.lastName", "gender": "updates.gender", "birthDate": "updates.birthDate", "ssn": "updates.ssn", "salary": "updates.salary" } ) \ .execute()
import io.delta.tables._import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople .as("people") .merge( dfUpdates.as("updates"), "people.id = updates.id") .whenMatched .updateExpr( Map( "id" -> "updates.id", "firstName" -> "updates.firstName", "middleName" -> "updates.middleName", "lastName" -> "updates.lastName", "gender" -> "updates.gender", "birthDate" -> "updates.birthDate", "ssn" -> "updates.ssn", "salary" -> "updates.salary" )) .whenNotMatched .insertExpr( Map( "id" -> "updates.id", "firstName" -> "updates.firstName", "middleName" -> "updates.middleName", "lastName" -> "updates.lastName", "gender" -> "updates.gender", "birthDate" -> "updates.birthDate", "ssn" -> "updates.ssn", "salary" -> "updates.salary" )) .execute()
import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")Dataset<Row> dfUpdates = spark.read("delta").load("/tmp/delta/people-10m-updates")
deltaTable .as("people") .merge( dfUpdates.as("updates"), "people.id = updates.id") .whenMatched() .updateExpr( new HashMap<String, String>() {{ put("id", "updates.id"); put("firstName", "updates.firstName"); put("middleName", "updates.middleName"); put("lastName", "updates.lastName"); put("gender", "updates.gender"); put("birthDate", "updates.birthDate"); put("ssn", "updates.ssn"); put("salary", "updates.salary"); }}) .whenNotMatched() .insertExpr( new HashMap<String, String>() {{ put("id", "updates.id"); put("firstName", "updates.firstName"); put("middleName", "updates.middleName"); put("lastName", "updates.lastName"); put("gender", "updates.gender"); put("birthDate", "updates.birthDate"); put("ssn", "updates.ssn"); put("salary", "updates.salary"); }}) .execute();
有关 Scala、Java 和 Python 语法详细信息,请参阅 Delta Lake API。
使用合并修改所有不匹配的行
标题为“使用合并修改所有不匹配的行”的章节你可以使用 WHEN NOT MATCHED BY SOURCE
子句来 UPDATE
或 DELETE
目标表中没有相应源表记录的记录。我们建议添加一个可选的条件子句,以避免完全重写目标表。
以下代码示例展示了使用此方法进行删除的基本语法,它使用源表的内容覆盖目标表并删除目标表中不匹配的记录。
MERGE INTO targetUSING sourceON source.key = target.keyWHEN MATCHED UPDATE SET *WHEN NOT MATCHED INSERT *WHEN NOT MATCHED BY SOURCE DELETE
(targetDF .merge(sourceDF, "source.key = target.key") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .whenNotMatchedBySourceDelete() .execute())
targetDF .merge(sourceDF, "source.key = target.key") .whenMatched() .updateAll() .whenNotMatched() .insertAll() .whenNotMatchedBySource() .delete() .execute()
以下示例向 WHEN NOT MATCHED BY SOURCE
子句添加了条件,并指定了要在不匹配的目标行中更新的值。
MERGE INTO targetUSING sourceON source.key = target.keyWHEN MATCHED THEN UPDATE SET target.lastSeen = source.timestampWHEN NOT MATCHED THEN INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN UPDATE SET target.status = 'inactive'
(targetDF .merge(sourceDF, "source.key = target.key") .whenMatchedUpdate( set = {"target.lastSeen": "source.timestamp"} ) .whenNotMatchedInsert( values = { "target.key": "source.key", "target.lastSeen": "source.timestamp", "target.status": "'active'" } ) .whenNotMatchedBySourceUpdate( condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)", set = {"target.status": "'inactive'"} ) .execute())
targetDF .merge(sourceDF, "source.key = target.key") .whenMatched() .updateExpr(Map("target.lastSeen" -> "source.timestamp")) .whenNotMatched() .insertExpr(Map( "target.key" -> "source.key", "target.lastSeen" -> "source.timestamp", "target.status" -> "'active'", ) ) .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)") .updateExpr(Map("target.status" -> "'inactive'")) .execute()
操作语义
标题为“操作语义”的章节以下是 merge
编程操作的详细描述。
-
可以有任意数量的
whenMatched
和whenNotMatched
子句。 -
当源行与目标表行根据匹配条件匹配时,执行
whenMatched
子句。这些子句具有以下语义。-
whenMatched
子句最多可以有一个update
和一个delete
操作。merge
中的update
操作仅更新匹配的目标行的指定列(类似于update
操作)。delete
操作删除匹配的行。 -
每个
whenMatched
子句可以有一个可选条件。如果此子句条件存在,则仅当子句条件为真时,才对任何匹配的源-目标行对执行update
或delete
操作。 -
如果存在多个
whenMatched
子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有whenMatched
子句都必须具有条件。 -
如果对于与合并条件匹配的源和目标行对,所有
whenMatched
条件都不为真,则目标行保持不变。 -
要使用源数据集的相应列更新目标 Delta 表的所有列,请使用
whenMatched(...).updateAll()
。这等效于whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))对于目标 Delta 表的所有列。因此,此操作假定源表具有与目标表中相同的列,否则查询将引发分析错误。
-
-
当源行与目标行不根据匹配条件匹配时,执行
whenNotMatched
子句。这些子句具有以下语义。-
whenNotMatched
子句只能有insert
操作。新行根据指定的列和相应的表达式生成。你无需指定目标表中的所有列。对于未指定的目标列,插入NULL
。 -
每个
whenNotMatched
子句可以有一个可选条件。如果存在子句条件,则仅当该行条件为真时,才插入源行。否则,忽略源列。 -
如果存在多个
whenNotMatched
子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有whenNotMatched
子句都必须具有条件。 -
要使用源数据集的相应列插入目标 Delta 表的所有列,请使用
whenNotMatched(...).insertAll()
。这等效于whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))对于目标 Delta 表的所有列。因此,此操作假定源表具有与目标表中相同的列,否则查询将引发分析错误。
-
-
当目标行根据合并条件不匹配任何源行时,执行
whenNotMatchedBySource
子句。这些子句具有以下语义。whenNotMatchedBySource
子句可以指定delete
和update
操作。- 每个
whenNotMatchedBySource
子句可以有一个可选条件。如果存在子句条件,则仅当该行条件为真时,才修改目标行。否则,目标行保持不变。 - 如果存在多个
whenNotMatchedBySource
子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有whenNotMatchedBySource
子句都必须具有条件。 - 根据定义,
whenNotMatchedBySource
子句没有源行可从中提取列值,因此不能引用源列。对于要修改的每个列,你可以指定文字值或对目标列执行操作,例如SET target.deleted_count = target.deleted_count + 1
。
Schema 验证
标题为“Schema 验证”的章节merge
自动验证插入和更新表达式生成的数据的 Schema 与表的 Schema 兼容。它使用以下规则来确定 merge
操作是否兼容
- 对于
update
和insert
操作,指定的 target 列必须存在于 target Delta 表中。 - 对于
updateAll
和insertAll
操作,源数据集必须包含目标 Delta 表的所有列。源数据集可以有额外的列,它们将被忽略。
如果你不希望忽略额外列,而是希望更新目标表 Schema 以包含新列,请参阅 自动 Schema 演进。
- 对于所有操作,如果生成目标列的表达式生成的数据类型与目标 Delta 表中相应列的数据类型不同,
merge
会尝试将它们转换为表中的类型。
自动 Schema 演进
标题为“自动 Schema 演进”的章节Schema 演进允许用户解决合并时目标表和源表之间的 Schema 不匹配问题。它处理以下两种情况
- 源表中的列不存在于目标表中。新列被添加到目标 Schema,其值使用源值进行插入或更新。
- 目标表中的列不存在于源表中。目标 Schema 保持不变;附加目标列中的值要么保持不变(对于
UPDATE
),要么设置为NULL
(对于INSERT
)。
以下是一些有无 Schema 演进的 merge
操作效果示例。
列 | 查询(在 SQL 中) | 无 Schema 演进时的行为(默认) | 有 Schema 演进时的行为 |
---|---|---|---|
目标: key, value 源: key, value, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * | 表 Schema 保持不变;只更新/插入列 key , value 。 | 表 Schema 更改为 (key, value, new_value) 。现有匹配记录将使用源中的 value 和 new_value 进行更新。新行将使用 Schema (key, value, new_value) 插入。 |
目标: key, old_value 源: key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * | UPDATE 和 INSERT 操作会抛出错误,因为目标列 old_value 不在源中。 | 表 Schema 更改为 (key, old_value, new_value) 。现有匹配记录将使用源中的 new_value 进行更新,同时 old_value 保持不变。新记录将使用指定的 key 、new_value 插入,并且 old_value 为 NULL 。 |
目标: key, old_value 源: key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value | UPDATE 抛出错误,因为目标表中不存在列 new_value 。 | 表 Schema 更改为 (key, old_value, new_value) 。现有匹配记录将使用源中的 new_value 进行更新,同时 old_value 保持不变,不匹配的记录将 new_value 设置为 NULL 。 |
目标: key, old_value 源: key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) | INSERT 抛出错误,因为目标表中不存在列 new_value 。 | 表 Schema 更改为 (key, old_value, new_value) 。新记录将使用指定的 key 、new_value 插入,并且 old_value 为 NULL 。现有记录将 new_value 设置为 NULL ,同时 old_value 保持不变。请参阅注释 (1)。 |
包含结构体数组的 Schema 的特殊注意事项
标题为“包含结构体数组的 Schema 的特殊注意事项”的章节Delta MERGE INTO
支持按名称解析结构体字段,并演进结构体数组的 Schema。启用 Schema 演进后,目标表 Schema 将针对结构体数组进行演进,这也适用于数组内部的任何嵌套结构体。
以下是一些有无 Schema 演进的合并操作对结构体数组的影响示例。
源 Schema | 目标 Schema | 无 Schema 演进时的行为(默认) | 有 Schema 演进时的行为 |
---|---|---|---|
array<struct<b: string, a: string>> | array<struct<a: int, b: int>> | 表 Schema 保持不变。列将按名称解析并进行更新或插入。 | 表 Schema 保持不变。列将按名称解析并进行更新或插入。 |
array<struct<a: int, c: string, d: string>> | array<struct<a: string, b: string>> | update 和 insert 抛出错误,因为 c 和 d 不存在于目标表中。 | 表 Schema 更改为 array<struct<a: string, b: string, c: string, d: string>>。c 和 d 在目标表的现有条目中作为 NULL 插入。update 和 insert 用 a 转换为字符串和 b 作为 NULL 填充源表中的条目。 |
array<struct<a: string, b: struct<c: string, d: string>>> | array<struct<a: string, b: struct<c: string>>> | update 和 insert 抛出错误,因为 d 不存在于目标表中。 | 目标表 Schema 更改为 array<struct<a: string, b: struct<c: string, d: string>>>。d 在目标表的现有条目中作为 NULL 插入。 |
性能调优
标题为“性能调优”的章节你可以使用以下方法减少合并所需的时间
-
减少匹配搜索空间:默认情况下,
merge
操作会搜索整个 Delta 表以查找源表中的匹配项。加快merge
的一种方法是在匹配条件中添加已知约束以减少搜索空间。例如,假设你有一个按country
和date
分区的表,并且你希望使用merge
更新最后一天和特定国家的信息。添加条件events.date = current_date() AND events.country = 'USA'将使查询更快,因为它只在相关的分区中查找匹配项。此外,它还将减少与其他并发操作冲突的机会。有关更多详细信息,请参阅 并发控制。
-
压缩文件:如果数据存储在许多小文件中,读取数据以搜索匹配项可能会变慢。你可以将小文件压缩成更大的文件以提高读取吞吐量。有关详细信息,请参阅 压缩文件。
-
控制写入的 shuffle 分区:
merge
操作会多次 shuffle 数据以计算和写入更新后的数据。用于 shuffle 的任务数由 Spark 会话配置spark.sql.shuffle.partitions
控制。设置此参数不仅控制并行度,还决定输出文件的数量。增加值会增加并行度,但也会生成更多的小数据文件。 -
在写入前重新分区输出数据:对于分区表,
merge
可能会生成比 shuffle 分区数量多得多的文件,这会成为性能瓶颈。这是因为每个 shuffle 任务都可以在多个分区中写入多个文件。在许多情况下,在写入之前,通过表的 partition 列重新分区输出数据会有帮助。可以通过将 Spark 会话配置spark.databricks.delta.merge.repartitionBeforeWrite.enabled
设置为true
来启用此功能。
合并示例
标题为“合并示例”的章节以下是一些如何在不同场景中使用 merge
的示例。
写入 Delta 表时的数据去重
标题为“写入 Delta 表时的数据去重”的章节常见的 ETL 用例是将日志收集到 Delta 表中,通过将其附加到表中。然而,源通常会生成重复的日志记录,需要下游去重步骤来处理它们。使用 merge
,你可以避免插入重复记录。
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS THEN INSERT *
deltaTable.alias("logs").merge( newDedupedLogs.alias("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") \.whenNotMatchedInsertAll() \.execute()
deltaTable .as("logs") .merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") .whenNotMatched() .insertAll() .execute()
deltaTable .as("logs") .merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") .whenNotMatched() .insertAll() .execute();
如果你知道只有几天可能会出现重复记录,你可以通过按日期对表进行分区,然后指定目标表的日期范围来进一步优化查询。
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS THEN INSERT *
deltaTable.alias("logs").merge( newDedupedLogs.alias("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \.execute()
deltaTable.as("logs").merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS").whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS").insertAll().execute()
deltaTable.as("logs").merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS").whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS").insertAll().execute();
这比之前的命令更有效,因为它只在最近 7 天的日志中查找重复项,而不是整个表。此外,你可以将此仅插入合并与结构化流式处理结合使用,以对日志执行连续去重。
- 在流式查询中,你可以在
foreachBatch
中使用合并操作,将任何流式数据持续写入到 Delta 表中并进行去重。有关foreachBatch
的更多信息,请参阅以下流式示例。 - 在另一个流式查询中,你可以持续从这个 Delta 表中读取去重后的数据。这是可能的,因为仅插入的合并操作只向 Delta 表中追加新数据。
Delta 表中的慢变数据 (SCD) Type 2 操作
标题为“Delta 表中的慢变数据 (SCD) Type 2 操作”的章节另一个常见的操作是 SCD Type 2,它维护维度表中每个键的所有更改历史记录。此类操作需要更新现有行以将键的先前值标记为旧值,并将新行插入为最新值。给定一个带有更新的源表和带有维度数据的目标表,SCD Type 2 可以用 merge
来表达。
这是一个具体的示例,说明如何维护客户地址的历史记录以及每个地址的活动日期范围。当客户的地址需要更新时,你必须将以前的地址标记为非当前地址,更新其活动日期范围,并将新地址添加为当前地址。
customersTable = ... # DeltaTable with schema (customerId, address, current, effectiveDate, endDate)
updatesDF = ... # DataFrame with schema (customerId, address, effectiveDate)
# Rows to INSERT new addresses of existing customersnewAddressesToInsert = updatesDF \ .alias("updates") \ .join(customersTable.toDF().alias("customers"), "customerid") \ .where("customers.current = true AND updates.address <> customers.address")
# Stage the update by unioning two sets of rows# 1. Rows that will be inserted in the whenNotMatched clause# 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersstagedUpdates = ( newAddressesToInsert .selectExpr("NULL as mergeKey", "updates.*") # Rows for 1 .union(updatesDF.selectExpr("updates.customerId as mergeKey", "*")) # Rows for 2.)
# Apply SCD Type 2 operation using mergecustomersTable.alias("customers").merge( stagedUpdates.alias("staged_updates"), "customers.customerId = mergeKey") \.whenMatchedUpdate( condition = "customers.current = true AND customers.address <> staged_updates.address", set = { # Set current to false and endDate to source's effective date. "current": "false", "endDate": "staged_updates.effectiveDate" }).whenNotMatchedInsert( values = { "customerid": "staged_updates.customerId", "address": "staged_updates.address", "current": "true", "effectiveDate": "staged_updates.effectiveDate", # Set current to true along with the new address and its effective date. "endDate": "null" }).execute()
val customersTable: DeltaTable = ... // table with schema (customerId, address, current, effectiveDate, endDate)
val updatesDF: DataFrame = ... // DataFrame with schema (customerId, address, effectiveDate)
// Rows to INSERT new addresses of existing customersval newAddressesToInsert = updatesDF .as("updates") .join(customersTable.toDF.as("customers"), "customerid") .where("customers.current = true AND updates.address <> customers.address")
// Stage the update by unioning two sets of rows// 1. Rows that will be inserted in the whenNotMatched clause// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersval stagedUpdates = newAddressesToInsert .selectExpr("NULL as mergeKey", "updates.*") // Rows for 1. .union( updatesDF.selectExpr("updates.customerId as mergeKey", "*") // Rows for 2. )
// Apply SCD Type 2 operation using mergecustomersTable .as("customers") .merge( stagedUpdates.as("staged_updates"), "customers.customerId = mergeKey") .whenMatched("customers.current = true AND customers.address <> staged_updates.address") .updateExpr(Map( // Set current to false and endDate to source's effective date. "current" -> "false", "endDate" -> "staged_updates.effectiveDate")) .whenNotMatched() .insertExpr(Map( "customerid" -> "staged_updates.customerId", "address" -> "staged_updates.address", "current" -> "true", "effectiveDate" -> "staged_updates.effectiveDate", // Set current to true along with the new address and its effective date. "endDate" -> "null")) .execute()
将变更数据写入 Delta 表
标题为“将变更数据写入 Delta 表”的章节与 SCD 类似,另一个常见的用例,通常称为变更数据捕获 (CDC),是将外部数据库生成的所有数据更改应用于 Delta 表。换句话说,需要将应用于外部表的一组更新、删除和插入应用于 Delta 表。你可以使用 merge
如下进行此操作。
deltaTable = ... # DeltaTable with schema (key, value)
# DataFrame with changes having following columns# - key: key of the change# - time: time of change for ordering between changes (can replaced by other ordering id)# - newValue: updated or inserted value if key was not deleted# - deleted: true if the key was deleted, false if the key was inserted or updatedchangesDF = spark.table("changes")
# Find the latest change for each key based on the timestamp# Note: For nested structs, max on struct is computed as# max on first struct field, if equal fall back to second fields, and so on.latestChangeForEachKey = changesDF \ .selectExpr("key", "struct(time, newValue, deleted) as otherCols") \ .groupBy("key") \ .agg(max("otherCols").alias("latest")) \ .select("key", "latest.*") \
deltaTable.alias("t").merge( latestChangeForEachKey.alias("s"), "s.key = t.key") \ .whenMatchedDelete(condition = "s.deleted = true") \ .whenMatchedUpdate(set = { "key": "s.key", "value": "s.newValue" }) \ .whenNotMatchedInsert( condition = "s.deleted = false", values = { "key": "s.key", "value": "s.newValue" } ).execute()
val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)
// DataFrame with changes having following columns// - key: key of the change// - time: time of change for ordering between changes (can replaced by other ordering id)// - newValue: updated or inserted value if key was not deleted// - deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame = ...
// Find the latest change for each key based on the timestamp// Note: For nested structs, max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF .selectExpr("key", "struct(time, newValue, deleted) as otherCols" ) .groupBy("key") .agg(max("otherCols").as("latest")) .selectExpr("key", "latest.*")
deltaTable.as("t") .merge( latestChangeForEachKey.as("s"), "s.key = t.key") .whenMatched("s.deleted = true") .delete() .whenMatched() .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .whenNotMatched("s.deleted = false") .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .execute()
使用 foreachBatch
从流式查询进行插入更新
标题为“使用 foreachBatch 从流式查询进行插入更新”的章节你可以结合使用 merge
和 foreachBatch
(有关更多信息,请参阅 foreachbatch)将复杂的插入更新从流式查询写入 Delta 表。例如
-
在更新模式下写入流式聚合:这比完整模式效率高得多。
from delta.tables import *deltaTable = DeltaTable.forPath(spark, "/data/aggregates")# Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF, batchId):deltaTable.alias("t").merge(microBatchOutputDF.alias("s"),"s.key = t.key") \.whenMatchedUpdateAll() \.whenNotMatchedInsertAll() \.execute()}# Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream \.format("delta") \.foreachBatch(upsertToDelta) \.outputMode("update") \.start()import io.delta.tables.*val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")// Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {deltaTable.as("t").merge(microBatchOutputDF.as("s"),"s.key = t.key").whenMatched().updateAll().whenNotMatched().insertAll().execute()}// Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start() -
将数据库变更流写入 Delta 表:用于写入变更数据的合并查询 可以在
foreachBatch
中使用,以将变更流持续应用于 Delta 表。 -
将流数据写入 Delta 表并进行去重:用于去重的仅插入合并查询可以在
foreachBatch
中使用,以持续将数据(包含重复项)写入 Delta 表并自动去重。