跳到内容

变更数据流

更改数据 Feed (CDF) 功能允许 Delta 表跟踪 Delta 表版本之间的行级更改。在 Delta 表上启用后,运行时会记录写入表中所有数据的“更改事件”。这包括行数据以及指示指定行是插入、删除还是更新的元数据。

您可以使用 DataFrame API(即 df.read)在批处理查询中读取更改事件,并使用 DataFrame API(即 df.readStream)在流式查询中读取更改事件。

更改数据 Feed 默认未启用。以下用例应促使您启用更改数据 Feed。

  • Silver 和 Gold 表:通过仅处理初始 MERGEUPDATEDELETE 操作后的行级更改来提高 Delta 性能,以加速和简化 ETL 和 ELT 操作。
  • 传输更改:将更改数据 Feed 发送到下游系统,例如 Kafka 或 RDBMS,这些系统可以在数据管道的后期阶段使用它来增量处理。
  • 审计跟踪表:将更改数据 Feed 捕获为 Delta 表,提供永久存储和高效查询功能,以查看随时间变化的所有更改,包括何时发生删除以及进行了哪些更新。

您必须使用以下方法之一明确启用更改数据 Feed 选项

  • 新表:在 CREATE TABLE 命令中设置表属性 delta.enableChangeDataFeed = true

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
  • 现有表:在 ALTER TABLE 命令中设置表属性 delta.enableChangeDataFeed = true

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
  • 所有新表:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

Delta Lake 在 Delta 表目录下的 _change_data 文件夹中记录 UPDATEDELETEMERGE 操作的更改数据。当 Delta Lake 检测到它可以直接从事务日志高效计算更改数据 Feed 时,这些记录可能会被跳过。特别是,仅插入操作和完全分区删除将不会在 _change_data 目录中生成数据。

_change_data 文件夹中的文件遵循表的保留策略。因此,如果您运行 VACUUM 命令,更改数据 Feed 数据也会被删除。

您可以为开始和结束提供版本或时间戳。开始和结束版本和时间戳在查询中是包含的。要从特定开始版本读取表的最新版本,请仅指定开始版本或时间戳。

您将版本指定为整数,将时间戳指定为 yyyy-MM-dd[ HH:mm:ss[.SSS]] 格式的字符串。

如果您提供的版本低于或时间戳早于已记录更改事件的版本,即更改数据 Feed 启用时,将抛出错误,指示更改数据 Feed 未启用。

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")

要在读取表时获取更改数据,请将选项 readChangeFeed 设置为 truestartingVersionstartingTimestamp 是可选的,如果未提供,则流将返回流式处理时表的最新快照作为 INSERT,以及未来的更改作为更改数据。读取更改数据时,还支持速率限制(maxFilesPerTriggermaxBytesPerTrigger)和 excludeRegex 等选项。

当您从表的更改数据 Feed 读取时,将使用最新表版本的架构。

除了 Delta 表架构中的数据列之外,更改数据 Feed 还包含标识更改事件类型的元数据列

列名类型
_change_typeStringinsertupdate_preimageupdate_postimagedelete (1)
_commit_version长整型包含更改的 Delta 日志或表版本。
_commit_timestamp时间戳与提交创建时关联的时间戳。

(1) preimage 是更新之前的值,postimage 是更新之后的值。

对启用列映射的表的更改数据 Feed 限制

标题为“对启用列映射的表的更改数据 Feed 限制”的部分

在 Delta 表上启用列映射后,您可以在不重写现有数据文件的情况下删除或重命名表中的列。启用列映射后,在执行非添加性架构更改(例如重命名或删除列、更改数据类型或可为空性更改)后,更改数据 Feed 存在限制。

没有显著影响。更改数据记录在查询执行过程中即时生成,通常远小于重写文件的总大小。

更改记录遵循与过时表版本相同的保留策略,如果超出指定的保留期,将通过 VACUUM 进行清理。

新记录何时在更改数据 Feed 中可用?

标题为“新记录何时在更改数据 Feed 中可用?”的部分

更改数据与 Delta Lake 事务一起提交,并与表中的新数据同时可用。