跳到内容

表实用命令

Delta 表支持许多实用程序命令。

对于许多 Delta Lake 操作,您可以通过在创建新的 SparkSession 时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API(从 3.0 开始)的集成。请参阅配置 SparkSession

您可以通过对表运行 vacuum 命令来移除 Delta 表不再引用且早于保留阈值的文件。vacuum 不会自动触发。文件的默认保留阈值为 7 天。要更改此行为,请参阅数据保留

VACUUM eventsTable -- This runs VACUUM in ‘FULL’ mode and deletes data files outside of the retention duration and all files in the table directory not referenced by the table.
VACUUM eventsTable LITE -- This VACUUM in ‘LITE’ mode runs faster.
-- Instead of finding all files in the table directory, `VACUUM LITE` uses the Delta transaction log to identify and remove files no longer referenced by any table versions within the retention duration.
-- If `VACUUM LITE` cannot be completed because the Delta log has been pruned a `DELTA_CANNOT_VACUUM_LITE` exception is raised.
-- This mode is available only in Delta 3.3 and above.
VACUUM '/data/events' -- vacuum files in path-based table
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable DRY RUN -- do dry run to get the list of files to be deleted
VACUUM eventsTable USING INVENTORY inventoryTable —- vacuum files based on a provided reservoir of files as a delta table
VACUUM eventsTable USING INVENTORY (select * from inventoryTable) —- vacuum files based on a provided reservoir of files as spark SQL query

请参阅 Delta Lake API 以获取 Scala、Java 和 Python 语法详细信息。

清单表包含文件路径列表及其大小、类型(是否为目录)和上次修改时间。当提供 INVENTORY 选项时,VACUUM 将考虑其中列出的文件,而不是对表目录进行完整列表,这对于非常大的表可能非常耗时。清单表可以指定为 delta 表或 spark SQL 查询,提供预期的表架构。架构应如下所示:

列名类型描述
路径字符串完全限定 URI
长度整数大小(字节)
isDirboolean布尔值,指示是否为目录
modificationTime整数文件更新时间(毫秒)

您可以通过运行 history 命令检索 Delta 表每次写入操作的信息,例如操作、用户、时间戳等。操作按时间倒序返回。默认情况下,表历史记录保留 30 天。

请参阅配置 SparkSession,了解如何在 Apache Spark 中启用 SQL 命令支持的步骤。

DESCRIBE HISTORY '/data/events/' -- get the full history of the table
DESCRIBE HISTORY delta.`/data/events/`
DESCRIBE HISTORY '/data/events/' LIMIT 1 -- get the last operation only
DESCRIBE HISTORY eventsTable

请参阅 Delta Lake API,了解 Scala/Java/Python 语法详细信息。

history 操作的输出具有以下列。

类型描述
版本long操作生成的表版本。
时间戳时间戳此版本提交的时间。
userId字符串运行该操作的用户 ID。
userName字符串运行该操作的用户姓名。
操作字符串操作名称。
operationParameters地图操作参数(例如,谓词)。
作业结构体运行该操作的作业详细信息。
笔记本结构体运行该操作的笔记本详细信息。
clusterId字符串运行该操作的集群 ID。
readVersionlong执行写入操作时读取的表版本。
isolationLevel字符串此操作使用的隔离级别。
isBlindAppendboolean此操作是否附加数据。
operationMetrics地图操作指标(例如,修改的行数和文件数)。
userMetadata字符串如果指定,用户定义的提交元数据
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| null| null| DELETE|[predicate -> ["(...|null| null| null| 4| Serializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| null| null| UPDATE|[predicate -> (id...|null| null| null| 3| Serializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| null| null| DELETE|[predicate -> ["(...|null| null| null| 2| Serializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| null| null| UPDATE|[predicate -> (id...|null| null| null| 1| Serializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| Serializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| Serializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+

history 操作在 operationMetrics 列映射中返回操作指标的集合。

下表列出了按操作分类的映射键定义。

Operation指标名称描述
写入、创建表 AS SELECT、替换表 AS SELECT、复制到
numFiles写入的文件数。
numOutputBytes写入内容的字节大小。
numOutputRows写入的行数。
流式更新
numAddedFiles添加的文件数。
numRemovedFiles移除的文件数。
numOutputRows写入的行数。
numOutputBytes写入大小(字节)。
删除
numAddedFiles添加的文件数。删除表分区时不提供。
numRemovedFiles移除的文件数。
numDeletedRows删除的行数。删除表分区时不提供。
numCopiedRows删除文件过程中复制的行数。
executionTimeMs执行整个操作所花费的时间。
scanTimeMs扫描文件以查找匹配项所花费的时间。
rewriteTimeMs重写匹配文件所花费的时间。
截断
numRemovedFiles移除的文件数。
executionTimeMs执行整个操作所花费的时间。
合并
numSourceRows源 DataFrame 中的行数。
numTargetRowsInserted插入到目标表中的行数。
numTargetRowsUpdated目标表中更新的行数。
numTargetRowsDeleted目标表中删除的行数。
numTargetRowsCopied复制的目标行数。
numOutputRows写入的总行数。
numTargetFilesAdded添加到接收器(目标)的文件数。
numTargetFilesRemoved从接收器(目标)移除的文件数。
executionTimeMs执行整个操作所花费的时间。
scanTimeMs扫描文件以查找匹配项所花费的时间。
rewriteTimeMs重写匹配文件所花费的时间。
更新
numAddedFiles添加的文件数。
numRemovedFiles移除的文件数。
numUpdatedRows更新的行数。
numCopiedRows更新文件过程中仅复制的行数。
executionTimeMs执行整个操作所花费的时间。
scanTimeMs扫描文件以查找匹配项所花费的时间。
rewriteTimeMs重写匹配文件所花费的时间。
FSCKnumRemovedFiles移除的文件数。
转换numConvertedFiles已转换的 Parquet 文件数。
优化
numAddedFiles添加的文件数。
numRemovedFiles优化的文件数。
numAddedBytes表优化后添加的字节数。
numRemovedBytes删除的字节数。
minFileSize表优化后最小文件的大小。
p25FileSize表优化后第 25 百分位数文件的大小。
p50FileSize表优化后中位数文件大小。
p75FileSize表优化后第 75 百分位数文件的大小。
maxFileSize表优化后最大文件的大小。
VACUUM
numDeletedFiles已删除的文件数。
numVacuumedDirectories已清理的目录数。
numFilesToDelete要删除的文件数。
恢复
tableSizeAfterRestore恢复后表的大小(字节)。
numOfFilesAfterRestore恢复后表中的文件数。
numRemovedFiles恢复操作删除的文件数。
numRestoredFiles由于恢复而添加的文件数。
removedFilesSize恢复删除的文件大小(字节)。
restoredFilesSize恢复添加的文件大小(字节)。

您可以使用 DESCRIBE DETAIL 检索 Delta 表的详细信息(例如,文件数量、数据大小)。

请参阅配置 SparkSession,了解如何在 Apache Spark 中启用 SQL 命令支持的步骤。

DESCRIBE DETAIL '/data/events/'
DESCRIBE DETAIL eventsTable

请参阅 Delta Lake API,了解 Scala/Java/Python 语法详细信息。

此操作的输出只有一行,其架构如下。

类型描述
格式字符串表的格式,即 delta
ID字符串表的唯一 ID。
名称字符串在元存储中定义的表名称。
描述字符串表的描述。
location字符串表的存储位置。
createdAt时间戳表创建时间。
lastModified时间戳表上次修改时间。
partitionColumns字符串数组如果表已分区,则为分区列的名称。
numFileslong表中最新版本的文件数量。
sizeInBytesint表最新快照的大小(字节)。
properties字符串到字符串的映射为此表设置的所有属性。
minReaderVersionint可以读取表的读取器(根据日志协议)的最小版本。
minWriterVersionint可以写入表的写入器(根据日志协议)的最小版本。
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format| id| name|description| location| createdAt| lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable| null|file:/Users/tuor/...|2020-06-05 12:20:...|2020-06-05 12:20:20| []| 10| 12345| []| 1| 2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+

您可以为 Delta 表生成清单文件,其他处理引擎(即 Apache Spark 以外的引擎)可以使用该文件读取 Delta 表。例如,要生成可供 Presto 和 Athena 读取 Delta 表的清单文件,您可以运行以下命令:

GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`

请参阅配置 SparkSession,了解如何在 Apache Spark 中启用 SQL 命令支持的步骤。

将 Parquet 表原地转换为 Delta 表。此命令列出目录中的所有文件,创建一个跟踪这些文件的 Delta Lake 事务日志,并通过读取所有 Parquet 文件的页脚自动推断数据模式。如果您的数据已分区,则必须将分区列的模式指定为 DDL 格式的字符串(即 <column-name1> <type>, <column-name2> <type>, ...)。

默认情况下,此命令将收集每个文件的统计信息(例如,每列的最小值和最大值)。这些统计信息将在查询时用于提供更快的查询。您可以使用 NO STATISTICS 在 SQL API 中禁用此统计信息收集。

-- Convert unpartitioned Parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`
-- Convert unpartitioned Parquet table and disable statistics collection
CONVERT TO DELTA parquet.`<path-to-table>` NO STATISTICS
-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)
-- Convert partitioned Parquet table and disable statistics collection
CONVERT TO DELTA parquet.`<path-to-table>` NO STATISTICS PARTITIONED BY (part int, part2 int)

如果 Iceberg 表的基础文件格式是 Parquet,您可以将 Iceberg 表原地转换为 Delta 表。与从 Parquet 表转换类似,此转换是原地的,不会有任何数据复制或数据重写。原始 Iceberg 表和转换后的 Delta 表具有独立的历史记录,因此只要不触及或删除源数据 Parquet 文件,修改 Delta 表就不会影响 Iceberg 表。

以下命令根据 Iceberg 表的本机文件清单、模式和分区信息创建 Delta Lake 事务日志。转换器还在转换期间收集列统计信息,除非指定了 NO STATISTICS

-- Convert the Iceberg table in the path <path-to-table>.
CONVERT TO DELTA iceberg.\`<path-to-table>\`
-- Convert the Iceberg table in the path <path-to-table> without collecting statistics.
CONVERT TO DELTA iceberg.\`<path-to-table>\` NO STATISTICS

您可以使用以下步骤轻松地将 Delta 表转换回 Parquet 表

  1. 如果您执行了可能更改数据文件的 Delta Lake 操作(例如 deletemerge),请运行 vacuum (保留期为 0 小时)以删除所有不属于表最新版本的数据文件。
  2. 删除表目录中的 _delta_log 目录。

您可以使用 RESTORE 命令将 Delta 表恢复到其较早的状态。Delta 表内部维护表的历史版本,使其能够恢复到较早的状态。RESTORE 命令支持与较早状态对应的版本或创建较早状态时的时间戳作为选项。

RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

例如

表版本OperationDelta 日志更新数据更改日志更新中的记录
0插入AddFile(/path/to/file-1, dataChange = true)(name = Viktor, age = 29), (name = George, age = 55)
1插入AddFile(/path/to/file-2, dataChange = true)(name = George, age = 39)
2优化AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2)(无记录,因为优化压缩不会更改表中的数据)
3RESTORE(version=1)RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true)(name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

在前面的示例中,RESTORE 命令导致已在读取 Delta 表版本 0 和 1 时看到的更新。如果流式查询正在读取此表,则这些文件将被视为新添加的数据并再次处理。

RESTORE 在操作完成后将以下指标报告为单行 DataFrame

  • table_size_after_restore: 恢复后表的大小。
  • num_of_files_after_restore: 恢复后表中的文件数。
  • num_removed_files: 从表中删除(逻辑删除)的文件数。
  • num_restored_files: 由于回滚而恢复的文件数。
  • removed_files_size: 从表中删除的文件总大小(字节)。
  • restored_files_size: 恢复的文件总大小(字节)。

Restore metrics example

您可以使用 shallow clone 命令以特定版本创建现有 Delta 表的浅层副本。

对浅层副本所做的任何更改只影响副本本身,而不影响源表,只要它们不触及源数据 Parquet 文件。

克隆的元数据包括:模式、分区信息、不变性、可为空性。对于浅层克隆,流元数据不克隆。未克隆的元数据是表描述和用户定义的提交元数据

CREATE TABLE delta.`/data/target/` SHALLOW CLONE delta.`/data/source/` -- Create a shallow clone of /data/source at /data/target
CREATE OR REPLACE TABLE db.target_table SHALLOW CLONE db.source_table -- Replace the target. target needs to be emptied
CREATE TABLE IF NOT EXISTS delta.`/data/target/` SHALLOW CLONE db.source_table -- No-op if the target table exists
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source`
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` VERSION AS OF version
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` TIMESTAMP AS OF timestamp_expression -- timestamp can be like “2019-01-01” or like date_sub(current_date(), 1)

CLONE 在操作完成后将以下指标作为单行 DataFrame 报告

  • source_table_size: 正在克隆的源表大小(以字节为单位)。
  • source_num_of_files: 源表中的文件数量。

如果您已创建浅层克隆,则任何读取浅层克隆的用户都需要权限才能读取原始表中的文件,因为数据文件保留在克隆源表的目录中。要更改克隆,用户需要对克隆目录具有写入权限。

在进行机器学习时,您可能希望存档训练 ML 模型的某个版本的表。未来的模型可以使用此存档数据集进行测试。

-- Trained model on version 15 of Delta table
CREATE TABLE delta.`/model/dataset` SHALLOW CLONE entire_dataset VERSION AS OF 15

为了在生产表上测试工作流而不损坏表,您可以轻松创建一个浅层克隆。这允许您在包含所有生产数据的克隆表上运行任意工作流,而不会影响任何生产工作负载。

-- Perform shallow clone
CREATE OR REPLACE TABLE my_test SHALLOW CLONE my_prod_table;
UPDATE my_test WHERE user_id is null SET invalid=true;
-- Run a bunch of validations. Once happy:
-- This should leverage the update information in the clone to prune to only
-- changed files in the clone if possible
MERGE INTO my_prod_table
USING my_test
ON my_test.user_id <=> my_prod_table.user_id
WHEN MATCHED AND my_test.user_id is null THEN UPDATE *;
DROP TABLE my_test;

表属性覆盖对于以下情况特别有用

  • 与不同的业务部门共享数据时,使用所有者或用户信息对表进行注释。
  • 归档 Delta 表并需要时间旅行。您可以为归档表独立指定日志保留期。例如
CREATE OR REPLACE TABLE archive.my_table SHALLOW CLONE prod.my_table
TBLPROPERTIES (
delta.logRetentionDuration = '3650 days',
delta.deletedFileRetentionDuration = '3650 days'
)
LOCATION 'xx://archive/my_table'

Parquet 和 Iceberg 的浅层克隆结合了用于克隆 Delta 表和将表转换为 Delta Lake 的功能,您可以使用克隆功能将数据从 Parquet 或 Iceberg 数据源转换为托管或外部 Delta 表,并使用相同的基本语法。

replace 具有与 Delta 浅层克隆相同的限制,即目标表在应用 replace 之前必须清空。

CREATE OR REPLACE TABLE <target_table_name> SHALLOW CLONE parquet.`/path/to/data`;
CREATE OR REPLACE TABLE <target_table_name> SHALLOW CLONE iceberg.`/path/to/data`;