表删除、更新和合并
Delta Lake 支持多种语句,以方便从 Delta 表中删除数据和更新数据。
从表中删除
标题为“从表中删除”的章节你可以从 Delta 表中删除与谓词匹配的数据。例如,在名为 people10m 的表或路径 /tmp/delta/people-10m 中,要删除 birthDate 列中值早于 1955 的所有人员对应的行,你可以运行以下命令
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession。
from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.deltaTable.delete(col('birthDate') < '1960-01-01')import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.deltaTable.delete(col("birthDate") < "1955-01-01")import io.delta.tables.*;import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m");
// Declare the predicate by using a SQL-formatted string.deltaTable.delete("birthDate < '1955-01-01'");
// Declare the predicate by using Spark SQL functions.deltaTable.delete(functions.col("birthDate").lt(functions.lit("1955-01-01")));有关详细信息,请参阅 Delta Lake API。
更新表
标题为“更新表”的章节你可以更新 Delta 表中与谓词匹配的数据。例如,在名为 people10m 的表或路径 /tmp/delta/people-10m 中,要将 gender 列中的缩写从 M 或 F 更改为 Male 或 Female,你可以运行以下命令
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession。
from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.deltaTable.update(  condition = "gender = 'F'",  set = { "gender": "'Female'" })
# Declare the predicate by using Spark SQL functions.deltaTable.update(  condition = col('gender') == 'M',  set = { 'gender': lit('Male') })import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.deltaTable.updateExpr(  "gender = 'F'",  Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.deltaTable.update(  col("gender") === "M",  Map("gender" -> lit("Male")));import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");
// Declare the predicate by using a SQL-formatted string.deltaTable.updateExpr(  "gender = 'F'",  new HashMap<String, String>() {{    put("gender", "'Female'");  }});
// Declare the predicate by using Spark SQL functions.deltaTable.update(  functions.col(gender).eq("M"),  new HashMap<String, Column>() {{    put("gender", functions.lit("Male"));  }});有关详细信息,请参阅 Delta Lake API。
使用合并将数据插入或更新到表中
标题为“使用合并将数据插入或更新到表中”的章节你可以使用 MERGE SQL 操作将源表、视图或 DataFrame 中的数据插入或更新到目标 Delta 表中。Delta Lake 支持在 MERGE 中进行插入、更新和删除操作,并且支持超出 SQL 标准的扩展语法,以方便高级用例。
假设你有一个名为 people10mupdates 的源表或路径 /tmp/delta/people-10m-updates,其中包含目标表 people10m 或路径 /tmp/delta/people-10m 的新数据。其中一些新记录可能已存在于目标数据中。要合并新数据,你需要更新其中人员 id 已存在的行,并插入没有匹配 id 的新行。你可以运行以下命令
MERGE INTO people10mUSING people10mupdatesON people10m.id = people10mupdates.idWHEN MATCHED THEN  UPDATE SET    id = people10mupdates.id,    firstName = people10mupdates.firstName,    middleName = people10mupdates.middleName,    lastName = people10mupdates.lastName,    gender = people10mupdates.gender,    birthDate = people10mupdates.birthDate,    ssn = people10mupdates.ssn,    salary = people10mupdates.salaryWHEN NOT MATCHED  THEN INSERT (    id,    firstName,    middleName,    lastName,    gender,    birthDate,    ssn,    salary  )  VALUES (    people10mupdates.id,    people10mupdates.firstName,    people10mupdates.middleName,    people10mupdates.lastName,    people10mupdates.gender,    people10mupdates.birthDate,    people10mupdates.ssn,    people10mupdates.salary  )有关启用 SQL 命令支持的步骤,请参阅 配置 SparkSession。
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \  .merge(    dfUpdates.alias('updates'),    'people.id = updates.id'  ) \  .whenMatchedUpdate(set =    {      "id": "updates.id",      "firstName": "updates.firstName",      "middleName": "updates.middleName",      "lastName": "updates.lastName",      "gender": "updates.gender",      "birthDate": "updates.birthDate",      "ssn": "updates.ssn",      "salary": "updates.salary"    }  ) \  .whenNotMatchedInsert(values =    {      "id": "updates.id",      "firstName": "updates.firstName",      "middleName": "updates.middleName",      "lastName": "updates.lastName",      "gender": "updates.gender",      "birthDate": "updates.birthDate",      "ssn": "updates.ssn",      "salary": "updates.salary"    }  ) \  .execute()import io.delta.tables._import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople  .as("people")  .merge(    dfUpdates.as("updates"),    "people.id = updates.id")  .whenMatched  .updateExpr(    Map(      "id" -> "updates.id",      "firstName" -> "updates.firstName",      "middleName" -> "updates.middleName",      "lastName" -> "updates.lastName",      "gender" -> "updates.gender",      "birthDate" -> "updates.birthDate",      "ssn" -> "updates.ssn",      "salary" -> "updates.salary"    ))  .whenNotMatched  .insertExpr(    Map(      "id" -> "updates.id",      "firstName" -> "updates.firstName",      "middleName" -> "updates.middleName",      "lastName" -> "updates.lastName",      "gender" -> "updates.gender",      "birthDate" -> "updates.birthDate",      "ssn" -> "updates.ssn",      "salary" -> "updates.salary"    ))  .execute()import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")Dataset<Row> dfUpdates = spark.read("delta").load("/tmp/delta/people-10m-updates")
deltaTable  .as("people")  .merge(    dfUpdates.as("updates"),    "people.id = updates.id")  .whenMatched()  .updateExpr(    new HashMap<String, String>() {{      put("id", "updates.id");      put("firstName", "updates.firstName");      put("middleName", "updates.middleName");      put("lastName", "updates.lastName");      put("gender", "updates.gender");      put("birthDate", "updates.birthDate");      put("ssn", "updates.ssn");      put("salary", "updates.salary");    }})  .whenNotMatched()  .insertExpr(    new HashMap<String, String>() {{      put("id", "updates.id");      put("firstName", "updates.firstName");      put("middleName", "updates.middleName");      put("lastName", "updates.lastName");      put("gender", "updates.gender");      put("birthDate", "updates.birthDate");      put("ssn", "updates.ssn");      put("salary", "updates.salary");    }})  .execute();有关 Scala、Java 和 Python 语法详细信息,请参阅 Delta Lake API。
使用合并修改所有不匹配的行
标题为“使用合并修改所有不匹配的行”的章节你可以使用 WHEN NOT MATCHED BY SOURCE 子句来 UPDATE 或 DELETE 目标表中没有相应源表记录的记录。我们建议添加一个可选的条件子句,以避免完全重写目标表。
以下代码示例展示了使用此方法进行删除的基本语法,它使用源表的内容覆盖目标表并删除目标表中不匹配的记录。
MERGE INTO targetUSING sourceON source.key = target.keyWHEN MATCHED  UPDATE SET *WHEN NOT MATCHED  INSERT *WHEN NOT MATCHED BY SOURCE  DELETE(targetDF  .merge(sourceDF, "source.key = target.key")  .whenMatchedUpdateAll()  .whenNotMatchedInsertAll()  .whenNotMatchedBySourceDelete()  .execute())targetDF  .merge(sourceDF, "source.key = target.key")  .whenMatched()  .updateAll()  .whenNotMatched()  .insertAll()  .whenNotMatchedBySource()  .delete()  .execute()以下示例向 WHEN NOT MATCHED BY SOURCE 子句添加了条件,并指定了要在不匹配的目标行中更新的值。
MERGE INTO targetUSING sourceON source.key = target.keyWHEN MATCHED THEN  UPDATE SET target.lastSeen = source.timestampWHEN NOT MATCHED THEN  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN  UPDATE SET target.status = 'inactive'(targetDF  .merge(sourceDF, "source.key = target.key")  .whenMatchedUpdate(    set = {"target.lastSeen": "source.timestamp"}  )  .whenNotMatchedInsert(    values = {      "target.key": "source.key",      "target.lastSeen": "source.timestamp",      "target.status": "'active'"    }  )  .whenNotMatchedBySourceUpdate(    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",    set = {"target.status": "'inactive'"}  )  .execute())targetDF  .merge(sourceDF, "source.key = target.key")  .whenMatched()  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))  .whenNotMatched()  .insertExpr(Map(    "target.key" -> "source.key",    "target.lastSeen" -> "source.timestamp",    "target.status" -> "'active'",    )  )  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")  .updateExpr(Map("target.status" -> "'inactive'"))  .execute()操作语义
标题为“操作语义”的章节以下是 merge 编程操作的详细描述。
- 
可以有任意数量的 whenMatched和whenNotMatched子句。
- 
当源行与目标表行根据匹配条件匹配时,执行 whenMatched子句。这些子句具有以下语义。- 
whenMatched子句最多可以有一个update和一个delete操作。merge中的update操作仅更新匹配的目标行的指定列(类似于update操作)。delete操作删除匹配的行。
- 
每个 whenMatched子句可以有一个可选条件。如果此子句条件存在,则仅当子句条件为真时,才对任何匹配的源-目标行对执行update或delete操作。
- 
如果存在多个 whenMatched子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有whenMatched子句都必须具有条件。
- 
如果对于与合并条件匹配的源和目标行对,所有 whenMatched条件都不为真,则目标行保持不变。
- 
要使用源数据集的相应列更新目标 Delta 表的所有列,请使用 whenMatched(...).updateAll()。这等效于whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))对于目标 Delta 表的所有列。因此,此操作假定源表具有与目标表中相同的列,否则查询将引发分析错误。 
 
- 
- 
当源行与目标行不根据匹配条件匹配时,执行 whenNotMatched子句。这些子句具有以下语义。- 
whenNotMatched子句只能有insert操作。新行根据指定的列和相应的表达式生成。你无需指定目标表中的所有列。对于未指定的目标列,插入NULL。
- 
每个 whenNotMatched子句可以有一个可选条件。如果存在子句条件,则仅当该行条件为真时,才插入源行。否则,忽略源列。
- 
如果存在多个 whenNotMatched子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有whenNotMatched子句都必须具有条件。
- 
要使用源数据集的相应列插入目标 Delta 表的所有列,请使用 whenNotMatched(...).insertAll()。这等效于whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))对于目标 Delta 表的所有列。因此,此操作假定源表具有与目标表中相同的列,否则查询将引发分析错误。 
 
- 
- 
当目标行根据合并条件不匹配任何源行时,执行 whenNotMatchedBySource子句。这些子句具有以下语义。- whenNotMatchedBySource子句可以指定- delete和- update操作。
- 每个 whenNotMatchedBySource子句可以有一个可选条件。如果存在子句条件,则仅当该行条件为真时,才修改目标行。否则,目标行保持不变。
- 如果存在多个 whenNotMatchedBySource子句,则按照它们指定的顺序进行评估。除了最后一个子句外,所有whenNotMatchedBySource子句都必须具有条件。
- 根据定义,whenNotMatchedBySource子句没有源行可从中提取列值,因此不能引用源列。对于要修改的每个列,你可以指定文字值或对目标列执行操作,例如SET target.deleted_count = target.deleted_count + 1。
 
Schema 验证
标题为“Schema 验证”的章节merge 自动验证插入和更新表达式生成的数据的 Schema 与表的 Schema 兼容。它使用以下规则来确定 merge 操作是否兼容
- 对于 update和insert操作,指定的 target 列必须存在于 target Delta 表中。
- 对于 updateAll和insertAll操作,源数据集必须包含目标 Delta 表的所有列。源数据集可以有额外的列,它们将被忽略。
如果你不希望忽略额外列,而是希望更新目标表 Schema 以包含新列,请参阅 自动 Schema 演进。
- 对于所有操作,如果生成目标列的表达式生成的数据类型与目标 Delta 表中相应列的数据类型不同,merge会尝试将它们转换为表中的类型。
自动 Schema 演进
标题为“自动 Schema 演进”的章节Schema 演进允许用户解决合并时目标表和源表之间的 Schema 不匹配问题。它处理以下两种情况
- 源表中的列不存在于目标表中。新列被添加到目标 Schema,其值使用源值进行插入或更新。
- 目标表中的列不存在于源表中。目标 Schema 保持不变;附加目标列中的值要么保持不变(对于 UPDATE),要么设置为NULL(对于INSERT)。
以下是一些有无 Schema 演进的 merge 操作效果示例。
| 列 | 查询(在 SQL 中) | 无 Schema 演进时的行为(默认) | 有 Schema 演进时的行为 | 
|---|---|---|---|
| 目标: key, value源:key, value, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * | 表 Schema 保持不变;只更新/插入列 key,value。 | 表 Schema 更改为 (key, value, new_value)。现有匹配记录将使用源中的value和new_value进行更新。新行将使用 Schema(key, value, new_value)插入。 | 
| 目标: key, old_value源:key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * | UPDATE和INSERT操作会抛出错误,因为目标列old_value不在源中。 | 表 Schema 更改为 (key, old_value, new_value)。现有匹配记录将使用源中的new_value进行更新,同时old_value保持不变。新记录将使用指定的key、new_value插入,并且old_value为NULL。 | 
| 目标: key, old_value源:key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value | UPDATE抛出错误,因为目标表中不存在列new_value。 | 表 Schema 更改为 (key, old_value, new_value)。现有匹配记录将使用源中的new_value进行更新,同时old_value保持不变,不匹配的记录将new_value设置为NULL。 | 
| 目标: key, old_value源:key, new_value | sql MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) | INSERT抛出错误,因为目标表中不存在列new_value。 | 表 Schema 更改为 (key, old_value, new_value)。新记录将使用指定的key、new_value插入,并且old_value为NULL。现有记录将new_value设置为NULL,同时old_value保持不变。请参阅注释 (1)。 | 
包含结构体数组的 Schema 的特殊注意事项
标题为“包含结构体数组的 Schema 的特殊注意事项”的章节Delta MERGE INTO 支持按名称解析结构体字段,并演进结构体数组的 Schema。启用 Schema 演进后,目标表 Schema 将针对结构体数组进行演进,这也适用于数组内部的任何嵌套结构体。
以下是一些有无 Schema 演进的合并操作对结构体数组的影响示例。
| 源 Schema | 目标 Schema | 无 Schema 演进时的行为(默认) | 有 Schema 演进时的行为 | 
|---|---|---|---|
| array<struct<b: string, a: string>> | array<struct<a: int, b: int>> | 表 Schema 保持不变。列将按名称解析并进行更新或插入。 | 表 Schema 保持不变。列将按名称解析并进行更新或插入。 | 
| array<struct<a: int, c: string, d: string>> | array<struct<a: string, b: string>> | update和insert抛出错误,因为c和d不存在于目标表中。 | 表 Schema 更改为 array<struct<a: string, b: string, c: string, d: string>>。 c和d在目标表的现有条目中作为NULL插入。update和insert用a转换为字符串和b作为NULL填充源表中的条目。 | 
| array<struct<a: string, b: struct<c: string, d: string>>> | array<struct<a: string, b: struct<c: string>>> | update和insert抛出错误,因为d不存在于目标表中。 | 目标表 Schema 更改为 array<struct<a: string, b: struct<c: string, d: string>>>。 d在目标表的现有条目中作为NULL插入。 | 
性能调优
标题为“性能调优”的章节你可以使用以下方法减少合并所需的时间
- 
减少匹配搜索空间:默认情况下, merge操作会搜索整个 Delta 表以查找源表中的匹配项。加快merge的一种方法是在匹配条件中添加已知约束以减少搜索空间。例如,假设你有一个按country和date分区的表,并且你希望使用merge更新最后一天和特定国家的信息。添加条件events.date = current_date() AND events.country = 'USA'将使查询更快,因为它只在相关的分区中查找匹配项。此外,它还将减少与其他并发操作冲突的机会。有关更多详细信息,请参阅 并发控制。 
- 
压缩文件:如果数据存储在许多小文件中,读取数据以搜索匹配项可能会变慢。你可以将小文件压缩成更大的文件以提高读取吞吐量。有关详细信息,请参阅 压缩文件。 
- 
控制写入的 shuffle 分区: merge操作会多次 shuffle 数据以计算和写入更新后的数据。用于 shuffle 的任务数由 Spark 会话配置spark.sql.shuffle.partitions控制。设置此参数不仅控制并行度,还决定输出文件的数量。增加值会增加并行度,但也会生成更多的小数据文件。
- 
在写入前重新分区输出数据:对于分区表, merge可能会生成比 shuffle 分区数量多得多的文件,这会成为性能瓶颈。这是因为每个 shuffle 任务都可以在多个分区中写入多个文件。在许多情况下,在写入之前,通过表的 partition 列重新分区输出数据会有帮助。可以通过将 Spark 会话配置spark.databricks.delta.merge.repartitionBeforeWrite.enabled设置为true来启用此功能。
合并示例
标题为“合并示例”的章节以下是一些如何在不同场景中使用 merge 的示例。
写入 Delta 表时的数据去重
标题为“写入 Delta 表时的数据去重”的章节常见的 ETL 用例是将日志收集到 Delta 表中,通过将其附加到表中。然而,源通常会生成重复的日志记录,需要下游去重步骤来处理它们。使用 merge,你可以避免插入重复记录。
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS  THEN INSERT *deltaTable.alias("logs").merge(  newDedupedLogs.alias("newDedupedLogs"),  "logs.uniqueId = newDedupedLogs.uniqueId") \.whenNotMatchedInsertAll() \.execute()deltaTable  .as("logs")  .merge(    newDedupedLogs.as("newDedupedLogs"),    "logs.uniqueId = newDedupedLogs.uniqueId")  .whenNotMatched()  .insertAll()  .execute()deltaTable  .as("logs")  .merge(    newDedupedLogs.as("newDedupedLogs"),    "logs.uniqueId = newDedupedLogs.uniqueId")  .whenNotMatched()  .insertAll()  .execute();如果你知道只有几天可能会出现重复记录,你可以通过按日期对表进行分区,然后指定目标表的日期范围来进一步优化查询。
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS  THEN INSERT *deltaTable.alias("logs").merge(  newDedupedLogs.alias("newDedupedLogs"),  "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \.execute()deltaTable.as("logs").merge(  newDedupedLogs.as("newDedupedLogs"),  "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS").whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS").insertAll().execute()deltaTable.as("logs").merge(  newDedupedLogs.as("newDedupedLogs"),  "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS").whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS").insertAll().execute();这比之前的命令更有效,因为它只在最近 7 天的日志中查找重复项,而不是整个表。此外,你可以将此仅插入合并与结构化流式处理结合使用,以对日志执行连续去重。
- 在流式查询中,你可以在 foreachBatch中使用合并操作,将任何流式数据持续写入到 Delta 表中并进行去重。有关foreachBatch的更多信息,请参阅以下流式示例。
- 在另一个流式查询中,你可以持续从这个 Delta 表中读取去重后的数据。这是可能的,因为仅插入的合并操作只向 Delta 表中追加新数据。
Delta 表中的慢变数据 (SCD) Type 2 操作
标题为“Delta 表中的慢变数据 (SCD) Type 2 操作”的章节另一个常见的操作是 SCD Type 2,它维护维度表中每个键的所有更改历史记录。此类操作需要更新现有行以将键的先前值标记为旧值,并将新行插入为最新值。给定一个带有更新的源表和带有维度数据的目标表,SCD Type 2 可以用 merge 来表达。
这是一个具体的示例,说明如何维护客户地址的历史记录以及每个地址的活动日期范围。当客户的地址需要更新时,你必须将以前的地址标记为非当前地址,更新其活动日期范围,并将新地址添加为当前地址。
customersTable = ...  # DeltaTable with schema (customerId, address, current, effectiveDate, endDate)
updatesDF = ...       # DataFrame with schema (customerId, address, effectiveDate)
# Rows to INSERT new addresses of existing customersnewAddressesToInsert = updatesDF \  .alias("updates") \  .join(customersTable.toDF().alias("customers"), "customerid") \  .where("customers.current = true AND updates.address <> customers.address")
# Stage the update by unioning two sets of rows# 1. Rows that will be inserted in the whenNotMatched clause# 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersstagedUpdates = (  newAddressesToInsert  .selectExpr("NULL as mergeKey", "updates.*")   # Rows for 1  .union(updatesDF.selectExpr("updates.customerId as mergeKey", "*"))  # Rows for 2.)
# Apply SCD Type 2 operation using mergecustomersTable.alias("customers").merge(  stagedUpdates.alias("staged_updates"),  "customers.customerId = mergeKey") \.whenMatchedUpdate(  condition = "customers.current = true AND customers.address <> staged_updates.address",  set = {                                      # Set current to false and endDate to source's effective date.    "current": "false",    "endDate": "staged_updates.effectiveDate"  }).whenNotMatchedInsert(  values = {    "customerid": "staged_updates.customerId",    "address": "staged_updates.address",    "current": "true",    "effectiveDate": "staged_updates.effectiveDate",  # Set current to true along with the new address and its effective date.    "endDate": "null"  }).execute()val customersTable: DeltaTable = ...   // table with schema (customerId, address, current, effectiveDate, endDate)
val updatesDF: DataFrame = ...          // DataFrame with schema (customerId, address, effectiveDate)
// Rows to INSERT new addresses of existing customersval newAddressesToInsert = updatesDF  .as("updates")  .join(customersTable.toDF.as("customers"), "customerid")  .where("customers.current = true AND updates.address <> customers.address")
// Stage the update by unioning two sets of rows// 1. Rows that will be inserted in the whenNotMatched clause// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersval stagedUpdates = newAddressesToInsert  .selectExpr("NULL as mergeKey", "updates.*")   // Rows for 1.  .union(    updatesDF.selectExpr("updates.customerId as mergeKey", "*")  // Rows for 2.  )
// Apply SCD Type 2 operation using mergecustomersTable  .as("customers")  .merge(    stagedUpdates.as("staged_updates"),    "customers.customerId = mergeKey")  .whenMatched("customers.current = true AND customers.address <> staged_updates.address")  .updateExpr(Map(                                      // Set current to false and endDate to source's effective date.    "current" -> "false",    "endDate" -> "staged_updates.effectiveDate"))  .whenNotMatched()  .insertExpr(Map(    "customerid" -> "staged_updates.customerId",    "address" -> "staged_updates.address",    "current" -> "true",    "effectiveDate" -> "staged_updates.effectiveDate",  // Set current to true along with the new address and its effective date.    "endDate" -> "null"))  .execute()将变更数据写入 Delta 表
标题为“将变更数据写入 Delta 表”的章节与 SCD 类似,另一个常见的用例,通常称为变更数据捕获 (CDC),是将外部数据库生成的所有数据更改应用于 Delta 表。换句话说,需要将应用于外部表的一组更新、删除和插入应用于 Delta 表。你可以使用 merge 如下进行此操作。
deltaTable = ... # DeltaTable with schema (key, value)
# DataFrame with changes having following columns# - key: key of the change# - time: time of change for ordering between changes (can replaced by other ordering id)# - newValue: updated or inserted value if key was not deleted# - deleted: true if the key was deleted, false if the key was inserted or updatedchangesDF = spark.table("changes")
# Find the latest change for each key based on the timestamp# Note: For nested structs, max on struct is computed as# max on first struct field, if equal fall back to second fields, and so on.latestChangeForEachKey = changesDF \  .selectExpr("key", "struct(time, newValue, deleted) as otherCols") \  .groupBy("key") \  .agg(max("otherCols").alias("latest")) \  .select("key", "latest.*") \
deltaTable.alias("t").merge(    latestChangeForEachKey.alias("s"),    "s.key = t.key") \  .whenMatchedDelete(condition = "s.deleted = true") \  .whenMatchedUpdate(set = {    "key": "s.key",    "value": "s.newValue"  }) \  .whenNotMatchedInsert(    condition = "s.deleted = false",    values = {      "key": "s.key",      "value": "s.newValue"    }  ).execute()val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)
// DataFrame with changes having following columns// - key: key of the change// - time: time of change for ordering between changes (can replaced by other ordering id)// - newValue: updated or inserted value if key was not deleted// - deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame = ...
// Find the latest change for each key based on the timestamp// Note: For nested structs, max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF  .selectExpr("key", "struct(time, newValue, deleted) as otherCols" )  .groupBy("key")  .agg(max("otherCols").as("latest"))  .selectExpr("key", "latest.*")
deltaTable.as("t")  .merge(    latestChangeForEachKey.as("s"),    "s.key = t.key")  .whenMatched("s.deleted = true")  .delete()  .whenMatched()  .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))  .whenNotMatched("s.deleted = false")  .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue"))  .execute()使用 foreachBatch 从流式查询进行插入更新
标题为“使用 foreachBatch 从流式查询进行插入更新”的章节你可以结合使用 merge 和 foreachBatch(有关更多信息,请参阅 foreachbatch)将复杂的插入更新从流式查询写入 Delta 表。例如
- 
在更新模式下写入流式聚合:这比完整模式效率高得多。 from delta.tables import *deltaTable = DeltaTable.forPath(spark, "/data/aggregates")# Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF, batchId):deltaTable.alias("t").merge(microBatchOutputDF.alias("s"),"s.key = t.key") \.whenMatchedUpdateAll() \.whenNotMatchedInsertAll() \.execute()}# Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream \.format("delta") \.foreachBatch(upsertToDelta) \.outputMode("update") \.start()import io.delta.tables.*val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")// Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {deltaTable.as("t").merge(microBatchOutputDF.as("s"),"s.key = t.key").whenMatched().updateAll().whenNotMatched().insertAll().execute()}// Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start()
- 
将数据库变更流写入 Delta 表:用于写入变更数据的合并查询 可以在 foreachBatch中使用,以将变更流持续应用于 Delta 表。
- 
将流数据写入 Delta 表并进行去重:用于去重的仅插入合并查询可以在 foreachBatch中使用,以持续将数据(包含重复项)写入 Delta 表并自动去重。