跳到内容

表流式读写

Delta Lake 通过 readStreamwriteStreamSpark Structured Streaming 深度集成。Delta Lake 克服了流系统和文件通常相关的许多限制,包括:

  • 使用多个流(或并发批处理作业)维护“精确一次”处理

  • 将文件用作流的源时,高效地发现哪些文件是新的

对于 Delta Lake 对表进行的许多操作,您可以通过在创建新的 SparkSession 时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 3.0 起)的集成。请参阅配置 SparkSession

当您将 Delta 表加载为流源并在流查询中使用它时,查询会处理表中所有存在的数据以及流启动后到达的任何新数据。

spark.readStream.format("delta")
.load("/tmp/delta/events")
import io.delta.implicits._
spark.readStream.delta("/tmp/delta/events")

以下选项可用于控制微批次:

  • maxFilesPerTrigger:每个微批次中要考虑的新文件数量。默认值为 1000。
  • maxBytesPerTrigger:每个微批次处理的数据量。此选项设置一个“软最大值”,这意味着批次处理的数据量大约为这个值,并且在最小输入单元大于此限制的情况下,为了使流查询向前推进,可能会处理超过此限制的数据。如果您的流式处理使用 Trigger.Once,则此选项将被忽略。默认情况下不设置此选项。

如果将 maxBytesPerTriggermaxFilesPerTrigger 结合使用,则微批次会一直处理数据,直到达到 maxFilesPerTriggermaxBytesPerTrigger 限制。

Structured Streaming 不处理非追加的输入,如果源表发生任何修改,则会抛出异常。有两种主要策略来处理无法自动传播到下游的更改:

  • 您可以删除输出和检查点,然后从头重新启动流。
  • 您可以设置以下两个选项之一:
    • ignoreDeletes:忽略在分区边界删除数据的事务。
    • ignoreChanges:如果源表中的文件因数据更改操作(例如 UPDATEMERGE INTODELETE(分区内)或 OVERWRITE)而必须重写,则重新处理更新。未更改的行可能仍会发出,因此您的下游使用者应该能够处理重复项。删除不会传播到下游。ignoreChanges 包含了 ignoreDeletes。因此,如果使用 ignoreChanges,您的流不会因源表的删除或更新而中断。

例如,假设您有一个 user_events 表,其中包含按 date 分区的 dateuser_emailaction 列。您从 user_events 表中进行流式传输,并且需要根据 GDPR 从中删除数据。

当您在分区边界删除时(即 WHERE 在分区列上),文件已经按值分段,因此删除只是从元数据中删除这些文件。因此,如果您只想从某些分区删除数据,可以使用

spark.readStream.format("delta")
.option("ignoreDeletes", "true")
.load("/tmp/delta/user_events")

但是,如果必须根据 user_email 删除数据,则需要使用

spark.readStream.format("delta")
.option("ignoreChanges", "true")
.load("/tmp/delta/user_events")

如果您使用 UPDATE 语句更新 user_email,则包含相关 user_email 的文件将被重写。当您使用 ignoreChanges 时,新记录将与同一文件中的所有其他未更改记录一起传播到下游。您的逻辑应该能够处理这些传入的重复记录。

您可以使用以下选项来指定 Delta Lake 流式源的起点,而无需处理整个表。

  • startingVersion:Delta Lake 的起始版本。从该版本(包括该版本)开始的所有表更改都将由流式源读取。您可以从 DESCRIBE HISTORY 命令输出的 version 列获取提交版本。
  • 要仅返回最新更改,请指定 latest
  • startingTimestamp:起始时间戳。从该时间戳(包括该时间戳)或之后提交的所有表更改都将由流式源读取。其中之一是:
    • 时间戳字符串。例如,"2019-01-01T00:00:00.000Z"
    • 日期字符串。例如,"2019-01-01"

不能同时设置这两个选项;只能使用其中一个。它们仅在启动新的流式查询时生效。如果流式查询已启动并且其检查点中已记录进度,则这些选项将被忽略。

例如,假设您有一个 user_events 表。如果您想读取版本 5 以来的更改,请使用

spark.readStream.format("delta")
.option("startingVersion", "5")
.load("/tmp/delta/user_events")

如果您想读取 2018-10-18 以来的更改,请使用

spark.readStream.format("delta")
.option("startingTimestamp", "2018-10-18")
.load("/tmp/delta/user_events")

当使用 Delta 表作为流源时,查询首先处理表中存在的所有数据。此版本的 Delta 表称为初始快照。默认情况下,Delta 表的数据文件是根据文件上次修改时间进行处理的。然而,上次修改时间不一定代表记录事件的时间顺序。

在带有定义水印的有状态流查询中,按修改时间处理文件可能导致记录处理顺序错误。这可能导致记录被水印作为晚期事件丢弃。

您可以通过启用以下选项来避免数据丢失问题:

  • withEventTimeOrder:初始快照是否应按事件时间顺序处理。

启用事件时间顺序后,初始快照数据的事件时间范围会划分为时间桶。每个微批次通过过滤时间范围内的数据来处理一个桶。maxFilesPerTriggermaxBytesPerTrigger 配置选项仍然适用于控制微批次大小,但由于处理的性质,只能近似控制。

下图显示了此过程:

Initial Snapshot

关于此功能的重要信息:

  • 数据丢失问题仅在以默认顺序处理有状态流查询的初始 Delta 快照时发生。
  • 一旦流查询启动,在初始快照仍在处理期间,您就无法更改 withEventTimeOrder。要使用更改后的 withEventTimeOrder 重新启动,您需要删除检查点。
  • 如果您正在运行启用 withEventTimeOrder 的流查询,则在初始快照处理完成之前,您不能将其降级到不支持此功能的 Delta 版本。如果需要降级,可以等待初始快照完成,或者删除检查点并重新启动查询。
  • 在以下不常见的场景中不支持此功能:
    • 事件时间列是生成列,并且 Delta 源和水印之间存在非投影转换。
    • 流查询中存在一个水印,该水印具有多个 Delta 源。
  • 启用事件时间排序后,Delta 初始快照处理的性能可能会变慢。
  • 每个微批次扫描初始快照以过滤相应事件时间范围内的数据。为了加快过滤操作,建议使用 Delta 源列作为事件时间,以便可以应用数据跳过(检查 _ 何时适用)。此外,沿事件时间列进行表分区可以进一步加快处理速度。您可以在 Spark UI 中查看特定微批次扫描了多少个 delta 文件。

假设您有一个带有 event_time 列的 user_events 表。您的流式查询是一个聚合查询。如果您想确保在初始快照处理期间不会丢失数据,可以使用:

spark.readStream.format("delta")
.option("withEventTimeOrder", "true")
.load("/tmp/delta/user_events")
.withWatermark("event_time", "10 seconds")

您可以提供一个模式跟踪位置,以启用从启用了列映射的 Delta 表进行流式传输。这克服了非增加模式更改可能导致流中断的问题,通过允许流读取与表数据完全相同的模式,就像表进行了时间旅行一样。

对数据源进行的每个流式读取都必须指定其自己的 schemaTrackingLocation。指定的 schemaTrackingLocation 必须包含在用于流式写入目标表的 checkpointLocation 目录中。

选项 schemaTrackingLocation 用于指定模式跟踪的路径,如以下代码示例所示:

checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)

您还可以使用 Structured Streaming 将数据写入 Delta 表。事务日志使 Delta Lake 能够保证精确一次的处理,即使有其他流或批处理查询同时针对该表运行。

默认情况下,流以追加模式运行,该模式将新记录添加到表中。

您可以使用路径方法

events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/_checkpoints/")
.start("/delta/events")

toTable 方法(在 Spark 3.1 及更高版本中),如下所示:

events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")

您还可以使用 Structured Streaming 在每个批次中替换整个表。一个示例用例是使用聚合计算摘要:

(spark.readStream
.format("delta")
.load("/tmp/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.start("/tmp/delta/eventsByCustomer")
)

前面的示例持续更新一个包含按客户聚合的事件数量的表。

对于延迟要求更宽松的应用程序,您可以使用一次性触发器节省计算资源。使用这些触发器可以按给定计划更新摘要聚合表,仅处理自上次更新以来到达的新数据。

命令 foreachBatch 允许您指定一个函数,该函数在流查询中的任意转换之后在每个微批次的输出上执行。这允许实现一个 foreachBatch 函数,该函数可以将微批次输出写入一个或多个目标 Delta 表目的地。但是,foreachBatch 不会使这些写入具有幂等性,因为这些写入尝试缺乏批次是否正在重新执行的信息。例如,重新运行失败的批次可能导致重复的数据写入。

为了解决这个问题,Delta 表支持以下 DataFrameWriter 选项以使写入具有幂等性:

  • txnAppId:您可以在每次 DataFrame 写入时传递的唯一字符串。例如,您可以使用 StreamingQuery ID 作为 txnAppId
  • txnVersion:一个单调递增的数字,作为事务版本。

Delta 表使用 txnAppIdtxnVersion 的组合来识别重复写入并忽略它们。

如果批处理写入因失败而中断,重新运行批处理将使用相同的应用程序和批处理 ID,这将有助于运行时正确识别重复写入并忽略它们。应用程序 ID (txnAppId) 可以是任何用户生成的唯一字符串,并且不必与流 ID 相关。

相同的 DataFrameWriter 选项可用于在非流式作业中实现幂等写入。有关详细信息,请参阅 幂等写入

app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2