表流式读写
Delta Lake 通过 readStream 和 writeStream 与 Spark Structured Streaming 深度集成。Delta Lake 克服了流系统和文件通常相关的许多限制,包括:
- 
使用多个流(或并发批处理作业)维护“精确一次”处理 
- 
将文件用作流的源时,高效地发现哪些文件是新的 
对于 Delta Lake 对表进行的许多操作,您可以通过在创建新的 SparkSession 时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 3.0 起)的集成。请参阅配置 SparkSession。
Delta 表作为源
标题为“Delta 表作为源”的部分当您将 Delta 表加载为流源并在流查询中使用它时,查询会处理表中所有存在的数据以及流启动后到达的任何新数据。
spark.readStream.format("delta")  .load("/tmp/delta/events")
import io.delta.implicits._spark.readStream.delta("/tmp/delta/events")限制输入速率
标题为“限制输入速率”的部分以下选项可用于控制微批次:
- maxFilesPerTrigger:每个微批次中要考虑的新文件数量。默认值为 1000。
- maxBytesPerTrigger:每个微批次处理的数据量。此选项设置一个“软最大值”,这意味着批次处理的数据量大约为这个值,并且在最小输入单元大于此限制的情况下,为了使流查询向前推进,可能会处理超过此限制的数据。如果您的流式处理使用- Trigger.Once,则此选项将被忽略。默认情况下不设置此选项。
如果将 maxBytesPerTrigger 与 maxFilesPerTrigger 结合使用,则微批次会一直处理数据,直到达到 maxFilesPerTrigger 或 maxBytesPerTrigger 限制。
忽略更新和删除
标题为“忽略更新和删除”的部分Structured Streaming 不处理非追加的输入,如果源表发生任何修改,则会抛出异常。有两种主要策略来处理无法自动传播到下游的更改:
- 您可以删除输出和检查点,然后从头重新启动流。
- 您可以设置以下两个选项之一:- ignoreDeletes:忽略在分区边界删除数据的事务。
- ignoreChanges:如果源表中的文件因数据更改操作(例如- UPDATE、- MERGE INTO、- DELETE(分区内)或- OVERWRITE)而必须重写,则重新处理更新。未更改的行可能仍会发出,因此您的下游使用者应该能够处理重复项。删除不会传播到下游。- ignoreChanges包含了- ignoreDeletes。因此,如果使用- ignoreChanges,您的流不会因源表的删除或更新而中断。
 
例如,假设您有一个 user_events 表,其中包含按 date 分区的 date、user_email 和 action 列。您从 user_events 表中进行流式传输,并且需要根据 GDPR 从中删除数据。
当您在分区边界删除时(即 WHERE 在分区列上),文件已经按值分段,因此删除只是从元数据中删除这些文件。因此,如果您只想从某些分区删除数据,可以使用
spark.readStream.format("delta")  .option("ignoreDeletes", "true")  .load("/tmp/delta/user_events")但是,如果必须根据 user_email 删除数据,则需要使用
spark.readStream.format("delta")  .option("ignoreChanges", "true")  .load("/tmp/delta/user_events")如果您使用 UPDATE 语句更新 user_email,则包含相关 user_email 的文件将被重写。当您使用 ignoreChanges 时,新记录将与同一文件中的所有其他未更改记录一起传播到下游。您的逻辑应该能够处理这些传入的重复记录。
指定初始位置
标题为“指定初始位置”的部分您可以使用以下选项来指定 Delta Lake 流式源的起点,而无需处理整个表。
- startingVersion:Delta Lake 的起始版本。从该版本(包括该版本)开始的所有表更改都将由流式源读取。您可以从 DESCRIBE HISTORY 命令输出的- version列获取提交版本。
- 要仅返回最新更改,请指定 latest。
- startingTimestamp:起始时间戳。从该时间戳(包括该时间戳)或之后提交的所有表更改都将由流式源读取。其中之一是:- 时间戳字符串。例如,"2019-01-01T00:00:00.000Z"。
- 日期字符串。例如,"2019-01-01"。
 
- 时间戳字符串。例如,
不能同时设置这两个选项;只能使用其中一个。它们仅在启动新的流式查询时生效。如果流式查询已启动并且其检查点中已记录进度,则这些选项将被忽略。
例如,假设您有一个 user_events 表。如果您想读取版本 5 以来的更改,请使用
spark.readStream.format("delta")  .option("startingVersion", "5")  .load("/tmp/delta/user_events")如果您想读取 2018-10-18 以来的更改,请使用
spark.readStream.format("delta")  .option("startingTimestamp", "2018-10-18")  .load("/tmp/delta/user_events")处理初始快照而不丢失数据
标题为“处理初始快照而不丢失数据”的部分当使用 Delta 表作为流源时,查询首先处理表中存在的所有数据。此版本的 Delta 表称为初始快照。默认情况下,Delta 表的数据文件是根据文件上次修改时间进行处理的。然而,上次修改时间不一定代表记录事件的时间顺序。
在带有定义水印的有状态流查询中,按修改时间处理文件可能导致记录处理顺序错误。这可能导致记录被水印作为晚期事件丢弃。
您可以通过启用以下选项来避免数据丢失问题:
- withEventTimeOrder:初始快照是否应按事件时间顺序处理。
启用事件时间顺序后,初始快照数据的事件时间范围会划分为时间桶。每个微批次通过过滤时间范围内的数据来处理一个桶。maxFilesPerTrigger 和 maxBytesPerTrigger 配置选项仍然适用于控制微批次大小,但由于处理的性质,只能近似控制。
下图显示了此过程:

关于此功能的重要信息:
- 数据丢失问题仅在以默认顺序处理有状态流查询的初始 Delta 快照时发生。
- 一旦流查询启动,在初始快照仍在处理期间,您就无法更改 withEventTimeOrder。要使用更改后的withEventTimeOrder重新启动,您需要删除检查点。
- 如果您正在运行启用 withEventTimeOrder的流查询,则在初始快照处理完成之前,您不能将其降级到不支持此功能的 Delta 版本。如果需要降级,可以等待初始快照完成,或者删除检查点并重新启动查询。
- 在以下不常见的场景中不支持此功能:- 事件时间列是生成列,并且 Delta 源和水印之间存在非投影转换。
- 流查询中存在一个水印,该水印具有多个 Delta 源。
 
- 启用事件时间排序后,Delta 初始快照处理的性能可能会变慢。
- 每个微批次扫描初始快照以过滤相应事件时间范围内的数据。为了加快过滤操作,建议使用 Delta 源列作为事件时间,以便可以应用数据跳过(检查 _ 何时适用)。此外,沿事件时间列进行表分区可以进一步加快处理速度。您可以在 Spark UI 中查看特定微批次扫描了多少个 delta 文件。
假设您有一个带有 event_time 列的 user_events 表。您的流式查询是一个聚合查询。如果您想确保在初始快照处理期间不会丢失数据,可以使用:
spark.readStream.format("delta")  .option("withEventTimeOrder", "true")  .load("/tmp/delta/user_events")  .withWatermark("event_time", "10 seconds")跟踪非增加模式更改
标题为“跟踪非增加模式更改”的部分您可以提供一个模式跟踪位置,以启用从启用了列映射的 Delta 表进行流式传输。这克服了非增加模式更改可能导致流中断的问题,通过允许流读取与表数据完全相同的模式,就像表进行了时间旅行一样。
对数据源进行的每个流式读取都必须指定其自己的 schemaTrackingLocation。指定的 schemaTrackingLocation 必须包含在用于流式写入目标表的 checkpointLocation 目录中。
选项 schemaTrackingLocation 用于指定模式跟踪的路径,如以下代码示例所示:
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream  .option("schemaTrackingLocation", checkpoint_path)  .table("delta_source_table")  .writeStream  .option("checkpointLocation", checkpoint_path)  .toTable("output_table"))Delta 表作为接收器
标题为“Delta 表作为接收器”的部分您还可以使用 Structured Streaming 将数据写入 Delta 表。事务日志使 Delta Lake 能够保证精确一次的处理,即使有其他流或批处理查询同时针对该表运行。
追加模式
标题为“追加模式”的部分默认情况下,流以追加模式运行,该模式将新记录添加到表中。
您可以使用路径方法
events.writeStream  .format("delta")  .outputMode("append")  .option("checkpointLocation", "/tmp/delta/_checkpoints/")  .start("/delta/events")events.writeStream  .format("delta")  .outputMode("append")  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")  .start("/tmp/delta/events")
import io.delta.implicits._events.writeStream  .outputMode("append")  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")  .delta("/tmp/delta/events")或 toTable 方法(在 Spark 3.1 及更高版本中),如下所示:
events.writeStream  .format("delta")  .outputMode("append")  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")  .toTable("events")events.writeStream  .outputMode("append")  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")  .toTable("events")完整模式
标题为“完整模式”的部分您还可以使用 Structured Streaming 在每个批次中替换整个表。一个示例用例是使用聚合计算摘要:
(spark.readStream  .format("delta")  .load("/tmp/delta/events")  .groupBy("customerId")  .count()  .writeStream  .format("delta")  .outputMode("complete")  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")  .start("/tmp/delta/eventsByCustomer"))spark.readStream  .format("delta")  .load("/tmp/delta/events")  .groupBy("customerId")  .count()  .writeStream  .format("delta")  .outputMode("complete")  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")  .start("/tmp/delta/eventsByCustomer")前面的示例持续更新一个包含按客户聚合的事件数量的表。
对于延迟要求更宽松的应用程序,您可以使用一次性触发器节省计算资源。使用这些触发器可以按给定计划更新摘要聚合表,仅处理自上次更新以来到达的新数据。
foreachBatch 中的幂等表写入
标题为“foreachBatch 中的幂等表写入”的部分命令 foreachBatch 允许您指定一个函数,该函数在流查询中的任意转换之后在每个微批次的输出上执行。这允许实现一个 foreachBatch 函数,该函数可以将微批次输出写入一个或多个目标 Delta 表目的地。但是,foreachBatch 不会使这些写入具有幂等性,因为这些写入尝试缺乏批次是否正在重新执行的信息。例如,重新运行失败的批次可能导致重复的数据写入。
为了解决这个问题,Delta 表支持以下 DataFrameWriter 选项以使写入具有幂等性:
- txnAppId:您可以在每次- DataFrame写入时传递的唯一字符串。例如,您可以使用 StreamingQuery ID 作为- txnAppId。
- txnVersion:一个单调递增的数字,作为事务版本。
Delta 表使用 txnAppId 和 txnVersion 的组合来识别重复写入并忽略它们。
如果批处理写入因失败而中断,重新运行批处理将使用相同的应用程序和批处理 ID,这将有助于运行时正确识别重复写入并忽略它们。应用程序 ID (txnAppId) 可以是任何用户生成的唯一字符串,并且不必与流 ID 相关。
相同的 DataFrameWriter 选项可用于在非流式作业中实现幂等写入。有关详细信息,请参阅 幂等写入。
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2val appId = ... // A unique string that is used as an application ID.streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2}