变更数据流
更改数据 Feed (CDF) 功能允许 Delta 表跟踪 Delta 表版本之间的行级更改。在 Delta 表上启用后,运行时会记录写入表中所有数据的“更改事件”。这包括行数据以及指示指定行是插入、删除还是更新的元数据。
您可以使用 DataFrame API(即 df.read
)在批处理查询中读取更改事件,并使用 DataFrame API(即 df.readStream
)在流式查询中读取更改事件。
更改数据 Feed 默认未启用。以下用例应促使您启用更改数据 Feed。
- Silver 和 Gold 表:通过仅处理初始
MERGE
、UPDATE
或DELETE
操作后的行级更改来提高 Delta 性能,以加速和简化 ETL 和 ELT 操作。 - 传输更改:将更改数据 Feed 发送到下游系统,例如 Kafka 或 RDBMS,这些系统可以在数据管道的后期阶段使用它来增量处理。
- 审计跟踪表:将更改数据 Feed 捕获为 Delta 表,提供永久存储和高效查询功能,以查看随时间变化的所有更改,包括何时发生删除以及进行了哪些更新。
启用更改数据 Feed
标题为“启用更改数据 Feed”的部分您必须使用以下方法之一明确启用更改数据 Feed 选项
-
新表:在
CREATE TABLE
命令中设置表属性delta.enableChangeDataFeed = true
。CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true) -
现有表:在
ALTER TABLE
命令中设置表属性delta.enableChangeDataFeed = true
。ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true) -
所有新表:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
更改数据存储
标题为“更改数据存储”的部分Delta Lake 在 Delta 表目录下的 _change_data
文件夹中记录 UPDATE
、DELETE
和 MERGE
操作的更改数据。当 Delta Lake 检测到它可以直接从事务日志高效计算更改数据 Feed 时,这些记录可能会被跳过。特别是,仅插入操作和完全分区删除将不会在 _change_data
目录中生成数据。
_change_data
文件夹中的文件遵循表的保留策略。因此,如果您运行 VACUUM 命令,更改数据 Feed 数据也会被删除。
在批处理查询中读取更改
标题为“在批处理查询中读取更改”的部分您可以为开始和结束提供版本或时间戳。开始和结束版本和时间戳在查询中是包含的。要从特定开始版本读取表的最新版本,请仅指定开始版本或时间戳。
您将版本指定为整数,将时间戳指定为 yyyy-MM-dd[ HH:mm:ss[.SSS]]
格式的字符串。
如果您提供的版本低于或时间戳早于已记录更改事件的版本,即更改数据 Feed 启用时,将抛出错误,指示更改数据 Feed 未启用。
-- version as ints or longs e.g. changes from version 0 to 10SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestampsSELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestampSELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special charactersSELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tablesSELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# version as ints or longsspark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", 0) \ .option("endingVersion", 10) \ .table("myDeltaTable")
# timestamps as formatted timestampspark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingTimestamp", '2021-04-21 05:45:46') \ .option("endingTimestamp", '2021-05-21 12:00:00') \ .table("myDeltaTable")
# providing only the startingVersion/timestampspark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", 0) \ .table("myDeltaTable")
# path based tablesspark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingTimestamp", '2021-04-21 05:45:46') \ .load("pathToMyDeltaTable")
// version as ints or longsspark.read.format("delta") .option("readChangeFeed", "true") .option("startingVersion", 0) .option("endingVersion", 10) .table("myDeltaTable")
// timestamps as formatted timestampspark.read.format("delta") .option("readChangeFeed", "true") .option("startingTimestamp", "2021-04-21 05:45:46") .option("endingTimestamp", "2021-05-21 12:00:00") .table("myDeltaTable")
// providing only the startingVersion/timestampspark.read.format("delta") .option("readChangeFeed", "true") .option("startingVersion", 0) .table("myDeltaTable")
// path based tablesspark.read.format("delta") .option("readChangeFeed", "true") .option("startingTimestamp", "2021-04-21 05:45:46") .load("pathToMyDeltaTable")
在流式查询中读取更改
标题为“在流式查询中读取更改”的部分# providing a starting versionspark.readStream.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", 0) \ .table("myDeltaTable")
# providing a starting timestampspark.readStream.format("delta") \ .option("readChangeFeed", "true") \ .option("startingTimestamp", "2021-04-21 05:35:43") \ .load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched firstspark.readStream.format("delta") \ .option("readChangeFeed", "true") \ .table("myDeltaTable")
// providing a starting versionspark.readStream.format("delta") .option("readChangeFeed", "true") .option("startingVersion", 0) .table("myDeltaTable")
// providing a starting timestampspark.readStream.format("delta") .option("readChangeFeed", "true") .option("startingVersion", "2021-04-21 05:35:43") .load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched firstspark.readStream.format("delta") .option("readChangeFeed", "true") .table("myDeltaTable")
要在读取表时获取更改数据,请将选项 readChangeFeed
设置为 true
。startingVersion
或 startingTimestamp
是可选的,如果未提供,则流将返回流式处理时表的最新快照作为 INSERT
,以及未来的更改作为更改数据。读取更改数据时,还支持速率限制(maxFilesPerTrigger
、maxBytesPerTrigger
)和 excludeRegex
等选项。
更改数据 Feed 的架构是什么?
标题为“更改数据 Feed 的架构是什么?”的部分当您从表的更改数据 Feed 读取时,将使用最新表版本的架构。
除了 Delta 表架构中的数据列之外,更改数据 Feed 还包含标识更改事件类型的元数据列
列名 | 类型 | 值 |
---|---|---|
_change_type | String | insert 、update_preimage 、update_postimage 、delete (1) |
_commit_version | 长整型 | 包含更改的 Delta 日志或表版本。 |
_commit_timestamp | 时间戳 | 与提交创建时关联的时间戳。 |
(1) preimage
是更新之前的值,postimage
是更新之后的值。
对启用列映射的表的更改数据 Feed 限制
标题为“对启用列映射的表的更改数据 Feed 限制”的部分在 Delta 表上启用列映射后,您可以在不重写现有数据文件的情况下删除或重命名表中的列。启用列映射后,在执行非添加性架构更改(例如重命名或删除列、更改数据类型或可为空性更改)后,更改数据 Feed 存在限制。
常见问题 (FAQ)
标题为“常见问题 (FAQ)”的部分启用更改数据 Feed 的开销是多少?
标题为“启用更改数据 Feed 的开销是多少?”的部分没有显著影响。更改数据记录在查询执行过程中即时生成,通常远小于重写文件的总大小。
更改记录的保留策略是什么?
标题为“更改记录的保留策略是什么?”的部分更改记录遵循与过时表版本相同的保留策略,如果超出指定的保留期,将通过 VACUUM 进行清理。
新记录何时在更改数据 Feed 中可用?
标题为“新记录何时在更改数据 Feed 中可用?”的部分更改数据与 Delta Lake 事务一起提交,并与表中的新数据同时可用。