表流式读写
Delta Lake 通过 readStream
和 writeStream
与 Spark Structured Streaming 深度集成。Delta Lake 克服了流系统和文件通常相关的许多限制,包括:
-
使用多个流(或并发批处理作业)维护“精确一次”处理
-
将文件用作流的源时,高效地发现哪些文件是新的
对于 Delta Lake 对表进行的许多操作,您可以通过在创建新的 SparkSession
时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 3.0 起)的集成。请参阅配置 SparkSession。
Delta 表作为源
标题为“Delta 表作为源”的部分当您将 Delta 表加载为流源并在流查询中使用它时,查询会处理表中所有存在的数据以及流启动后到达的任何新数据。
spark.readStream.format("delta") .load("/tmp/delta/events")
import io.delta.implicits._spark.readStream.delta("/tmp/delta/events")
限制输入速率
标题为“限制输入速率”的部分以下选项可用于控制微批次:
maxFilesPerTrigger
:每个微批次中要考虑的新文件数量。默认值为 1000。maxBytesPerTrigger
:每个微批次处理的数据量。此选项设置一个“软最大值”,这意味着批次处理的数据量大约为这个值,并且在最小输入单元大于此限制的情况下,为了使流查询向前推进,可能会处理超过此限制的数据。如果您的流式处理使用Trigger.Once
,则此选项将被忽略。默认情况下不设置此选项。
如果将 maxBytesPerTrigger
与 maxFilesPerTrigger
结合使用,则微批次会一直处理数据,直到达到 maxFilesPerTrigger
或 maxBytesPerTrigger
限制。
忽略更新和删除
标题为“忽略更新和删除”的部分Structured Streaming 不处理非追加的输入,如果源表发生任何修改,则会抛出异常。有两种主要策略来处理无法自动传播到下游的更改:
- 您可以删除输出和检查点,然后从头重新启动流。
- 您可以设置以下两个选项之一:
ignoreDeletes
:忽略在分区边界删除数据的事务。ignoreChanges
:如果源表中的文件因数据更改操作(例如UPDATE
、MERGE INTO
、DELETE
(分区内)或OVERWRITE
)而必须重写,则重新处理更新。未更改的行可能仍会发出,因此您的下游使用者应该能够处理重复项。删除不会传播到下游。ignoreChanges
包含了ignoreDeletes
。因此,如果使用ignoreChanges
,您的流不会因源表的删除或更新而中断。
例如,假设您有一个 user_events
表,其中包含按 date
分区的 date
、user_email
和 action
列。您从 user_events
表中进行流式传输,并且需要根据 GDPR 从中删除数据。
当您在分区边界删除时(即 WHERE
在分区列上),文件已经按值分段,因此删除只是从元数据中删除这些文件。因此,如果您只想从某些分区删除数据,可以使用
spark.readStream.format("delta") .option("ignoreDeletes", "true") .load("/tmp/delta/user_events")
但是,如果必须根据 user_email
删除数据,则需要使用
spark.readStream.format("delta") .option("ignoreChanges", "true") .load("/tmp/delta/user_events")
如果您使用 UPDATE
语句更新 user_email
,则包含相关 user_email
的文件将被重写。当您使用 ignoreChanges
时,新记录将与同一文件中的所有其他未更改记录一起传播到下游。您的逻辑应该能够处理这些传入的重复记录。
指定初始位置
标题为“指定初始位置”的部分您可以使用以下选项来指定 Delta Lake 流式源的起点,而无需处理整个表。
startingVersion
:Delta Lake 的起始版本。从该版本(包括该版本)开始的所有表更改都将由流式源读取。您可以从 DESCRIBE HISTORY 命令输出的version
列获取提交版本。- 要仅返回最新更改,请指定
latest
。 startingTimestamp
:起始时间戳。从该时间戳(包括该时间戳)或之后提交的所有表更改都将由流式源读取。其中之一是:- 时间戳字符串。例如,
"2019-01-01T00:00:00.000Z"
。 - 日期字符串。例如,
"2019-01-01"
。
- 时间戳字符串。例如,
不能同时设置这两个选项;只能使用其中一个。它们仅在启动新的流式查询时生效。如果流式查询已启动并且其检查点中已记录进度,则这些选项将被忽略。
例如,假设您有一个 user_events
表。如果您想读取版本 5 以来的更改,请使用
spark.readStream.format("delta") .option("startingVersion", "5") .load("/tmp/delta/user_events")
如果您想读取 2018-10-18 以来的更改,请使用
spark.readStream.format("delta") .option("startingTimestamp", "2018-10-18") .load("/tmp/delta/user_events")
处理初始快照而不丢失数据
标题为“处理初始快照而不丢失数据”的部分当使用 Delta 表作为流源时,查询首先处理表中存在的所有数据。此版本的 Delta 表称为初始快照。默认情况下,Delta 表的数据文件是根据文件上次修改时间进行处理的。然而,上次修改时间不一定代表记录事件的时间顺序。
在带有定义水印的有状态流查询中,按修改时间处理文件可能导致记录处理顺序错误。这可能导致记录被水印作为晚期事件丢弃。
您可以通过启用以下选项来避免数据丢失问题:
- withEventTimeOrder:初始快照是否应按事件时间顺序处理。
启用事件时间顺序后,初始快照数据的事件时间范围会划分为时间桶。每个微批次通过过滤时间范围内的数据来处理一个桶。maxFilesPerTrigger
和 maxBytesPerTrigger
配置选项仍然适用于控制微批次大小,但由于处理的性质,只能近似控制。
下图显示了此过程:
关于此功能的重要信息:
- 数据丢失问题仅在以默认顺序处理有状态流查询的初始 Delta 快照时发生。
- 一旦流查询启动,在初始快照仍在处理期间,您就无法更改
withEventTimeOrder
。要使用更改后的withEventTimeOrder
重新启动,您需要删除检查点。 - 如果您正在运行启用
withEventTimeOrder
的流查询,则在初始快照处理完成之前,您不能将其降级到不支持此功能的 Delta 版本。如果需要降级,可以等待初始快照完成,或者删除检查点并重新启动查询。 - 在以下不常见的场景中不支持此功能:
- 事件时间列是生成列,并且 Delta 源和水印之间存在非投影转换。
- 流查询中存在一个水印,该水印具有多个 Delta 源。
- 启用事件时间排序后,Delta 初始快照处理的性能可能会变慢。
- 每个微批次扫描初始快照以过滤相应事件时间范围内的数据。为了加快过滤操作,建议使用 Delta 源列作为事件时间,以便可以应用数据跳过(检查 _ 何时适用)。此外,沿事件时间列进行表分区可以进一步加快处理速度。您可以在 Spark UI 中查看特定微批次扫描了多少个 delta 文件。
假设您有一个带有 event_time
列的 user_events
表。您的流式查询是一个聚合查询。如果您想确保在初始快照处理期间不会丢失数据,可以使用:
spark.readStream.format("delta") .option("withEventTimeOrder", "true") .load("/tmp/delta/user_events") .withWatermark("event_time", "10 seconds")
跟踪非增加模式更改
标题为“跟踪非增加模式更改”的部分您可以提供一个模式跟踪位置,以启用从启用了列映射的 Delta 表进行流式传输。这克服了非增加模式更改可能导致流中断的问题,通过允许流读取与表数据完全相同的模式,就像表进行了时间旅行一样。
对数据源进行的每个流式读取都必须指定其自己的 schemaTrackingLocation
。指定的 schemaTrackingLocation
必须包含在用于流式写入目标表的 checkpointLocation
目录中。
选项 schemaTrackingLocation
用于指定模式跟踪的路径,如以下代码示例所示:
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream .option("schemaTrackingLocation", checkpoint_path) .table("delta_source_table") .writeStream .option("checkpointLocation", checkpoint_path) .toTable("output_table"))
Delta 表作为接收器
标题为“Delta 表作为接收器”的部分您还可以使用 Structured Streaming 将数据写入 Delta 表。事务日志使 Delta Lake 能够保证精确一次的处理,即使有其他流或批处理查询同时针对该表运行。
追加模式
标题为“追加模式”的部分默认情况下,流以追加模式运行,该模式将新记录添加到表中。
您可以使用路径方法
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta/_checkpoints/") .start("/delta/events")
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") .start("/tmp/delta/events")
import io.delta.implicits._events.writeStream .outputMode("append") .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") .delta("/tmp/delta/events")
或 toTable
方法(在 Spark 3.1 及更高版本中),如下所示:
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") .toTable("events")
events.writeStream .outputMode("append") .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") .toTable("events")
完整模式
标题为“完整模式”的部分您还可以使用 Structured Streaming 在每个批次中替换整个表。一个示例用例是使用聚合计算摘要:
(spark.readStream .format("delta") .load("/tmp/delta/events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/") .start("/tmp/delta/eventsByCustomer"))
spark.readStream .format("delta") .load("/tmp/delta/events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/") .start("/tmp/delta/eventsByCustomer")
前面的示例持续更新一个包含按客户聚合的事件数量的表。
对于延迟要求更宽松的应用程序,您可以使用一次性触发器节省计算资源。使用这些触发器可以按给定计划更新摘要聚合表,仅处理自上次更新以来到达的新数据。
foreachBatch 中的幂等表写入
标题为“foreachBatch 中的幂等表写入”的部分命令 foreachBatch
允许您指定一个函数,该函数在流查询中的任意转换之后在每个微批次的输出上执行。这允许实现一个 foreachBatch
函数,该函数可以将微批次输出写入一个或多个目标 Delta 表目的地。但是,foreachBatch
不会使这些写入具有幂等性,因为这些写入尝试缺乏批次是否正在重新执行的信息。例如,重新运行失败的批次可能导致重复的数据写入。
为了解决这个问题,Delta 表支持以下 DataFrameWriter
选项以使写入具有幂等性:
txnAppId
:您可以在每次DataFrame
写入时传递的唯一字符串。例如,您可以使用 StreamingQuery ID 作为txnAppId
。txnVersion
:一个单调递增的数字,作为事务版本。
Delta 表使用 txnAppId
和 txnVersion
的组合来识别重复写入并忽略它们。
如果批处理写入因失败而中断,重新运行批处理将使用相同的应用程序和批处理 ID,这将有助于运行时正确识别重复写入并忽略它们。应用程序 ID (txnAppId
) 可以是任何用户生成的唯一字符串,并且不必与流 ID 相关。
相同的 DataFrameWriter
选项可用于在非流式作业中实现幂等写入。有关详细信息,请参阅 幂等写入。
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id): batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1 batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
val appId = ... // A unique string that is used as an application ID.streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1 batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2}