优化
为了提高查询速度,Delta Lake 支持优化存储中数据布局的能力。有多种方法可以优化布局。
文件合并 (bin-packing)
Section titled “Compaction (bin-packing)”Delta Lake 可以通过将小文件合并为大文件来提高从表中读取查询的速度。
OPTIMIZE '/path/to/delta/table' -- Optimizes the path-based Delta Lake table
OPTIMIZE delta_table_name;
OPTIMIZE delta.`/path/to/delta/table`;
-- If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using `WHERE`:OPTIMIZE delta_table_name WHERE date >= '2017-01-01'
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable) # For path-based tables# For Hive metastore-based tables: deltaTable = DeltaTable.forName(spark, tableName)
deltaTable.optimize().executeCompaction()
# If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using `where`deltaTable.optimize().where("date='2021-11-18'").executeCompaction()
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable) // For path-based tables// For Hive metastore-based tables: val deltaTable = DeltaTable.forName(spark, tableName)
deltaTable.optimize().executeCompaction()
// If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using `where`deltaTable.optimize().where("date='2021-11-18'").executeCompaction()
有关 Scala、Java 和 Python API 语法详细信息,请参阅 Delta Lake API。
Delta 表的读取器使用快照隔离,这意味着当 OPTIMIZE
从事务日志中删除不必要的文件时,它们不会中断。OPTIMIZE
不对表进行数据相关更改,因此 OPTIMIZE
前后的读取具有相同的结果。对作为流源的表执行 OPTIMIZE
不会影响将此表作为源的任何当前或未来流。OPTIMIZE
返回操作删除的文件和添加的文件(最小值、最大值、总数等)的文件统计信息。优化统计信息还包含批次数量和优化的分区数量。
自动文件合并将 Delta 表分区内的小文件组合起来,以自动减少小文件问题。自动文件合并在写入表成功后发生,并在执行写入的集群上同步运行。自动文件合并仅合并以前未合并过的文件。
您可以通过设置配置 spark.databricks.delta.autoCompact.maxFileSize
来控制输出文件大小。
自动文件合并仅针对至少包含一定数量小文件的分区或表触发。您可以通过设置 spark.databricks.delta.autoCompact.minNumFiles
选择性地更改触发自动文件合并所需的最小文件数。
可以使用以下设置在表或会话级别启用自动文件合并
- 表属性:
delta.autoOptimize.autoCompact
- SparkSession 设置:
spark.databricks.delta.autoCompact.enabled
这些设置接受以下选项
选项 | 行为 |
---|---|
true | 启用自动文件合并。默认情况下,将使用 128 MB 作为目标文件大小。 |
false | 关闭自动文件合并。可以在会话级别设置,以覆盖工作负载中修改的所有 Delta 表的自动文件合并。 |
当您将数据写入 Delta Lake 表时,数据跳过信息会自动收集。Delta Lake 在查询时利用此信息(每列的最小值和最大值)以提供更快的查询。您无需配置数据跳过;该功能在适用时会自动激活。然而,其有效性取决于您的数据布局。为了获得最佳结果,请应用 Z-Ordering。
收集包含长值(如 string
或 binary
)的列的统计信息是一项昂贵的操作。为了避免收集此类列的统计信息,您可以配置表属性 delta.dataSkippingNumIndexedCols
。此属性表示表中列在表模式中的位置索引。所有位置索引小于 delta.dataSkippingNumIndexedCols
属性的列都将收集统计信息。为了收集统计信息,嵌套列中的每个字段都被视为一个单独的列。为了避免收集包含长值的列的统计信息,可以设置 delta.dataSkippingNumIndexedCols
属性,使长值列在表模式中位于此索引之后,或者通过使用 ALTER TABLE ALTER COLUMN 将包含长字符串的列移动到大于 delta.dataSkippingNumIndexedCols
属性的索引位置。
Z-Ordering (多维聚类)
Section titled “Z-Ordering (multi-dimensional clustering)”Z-Ordering 是一种技术,用于将相关信息共同定位在同一组文件中。Delta Lake 在数据跳过算法中自动使用此共同定位。此行为显著减少了 Apache Spark 上的 Delta Lake 需要读取的数据量。要对数据进行 Z-Order,您可以在 ZORDER BY
子句中指定要排序的列
OPTIMIZE events ZORDER BY (eventType)
-- If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate by using "where".OPTIMIZE events WHERE date = '2021-11-18' ZORDER BY (eventType)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable) # path-based table# For Hive metastore-based tables: deltaTable = DeltaTable.forName(spark, tableName)
deltaTable.optimize().executeZOrderBy(eventType)
# If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using `where`deltaTable.optimize().where("date='2021-11-18'").executeZOrderBy(eventType)
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable) // path-based table// For Hive metastore-based tables: val deltaTable = DeltaTable.forName(spark, tableName)
deltaTable.optimize().executeZOrderBy(eventType)
// If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate by using "where".deltaTable.optimize().where("date='2021-11-18'").executeZOrderBy(eventType)
有关 Scala、Java 和 Python API 语法详细信息,请参阅 Delta Lake API
如果您预计某个列将常用于查询谓词,并且该列具有高基数(即大量不同的值),则使用 ZORDER BY
。
您可以将多个列作为逗号分隔列表指定用于 ZORDER BY
。但是,局部性效果会随着每个额外列的增加而降低。对未收集统计信息的列进行 Z-Ordering 将是无效的,并且会浪费资源。这是因为数据跳过需要列局部统计信息,例如最小值、最大值和计数。您可以通过重新排列模式中的列或增加要收集统计信息的列数来配置某些列的统计信息收集。请参阅数据跳过。
Delta Lake 表会定期自动将所有增量更新压缩到 Delta 日志中,形成一个 Parquet 文件。这种“检查点”允许读取查询快速重建表的当前状态(即要处理哪些文件,当前模式是什么),而无需读取太多包含增量更新的文件。
Delta Lake 协议允许将检查点拆分为多个 Parquet 文件。这使得检查点的写入并行化并加速。在 Delta Lake 中,默认情况下每个检查点都写入单个 Parquet 文件。要使用此功能,请设置 SQL 配置 spark.databricks.delta.checkpoint.partSize=<n>
,其中 n
是 Delta Lake on Apache Spark 将开始并行化检查点并尝试为每个检查点文件写入最多此数量的操作(例如 AddFile
)的操作限制。
Delta Lake 协议允许新的日志合并文件,其格式为 <x>.<y>.compact.json
。这些文件包含提交范围 [x, y]
的聚合操作。日志合并减少了频繁检查点的需求,并最大程度地减少了由它们引起的延迟峰值。
Delta Lake 3.0.0 及更高版本支持日志合并文件的读取。它默认启用,可以通过 SQL 配置 spark.databricks.delta.deltaLog.minorCompaction.useForReads=<value>
禁用,其中 value
可以是 true/false
。日志合并的写入支持将在未来的 Delta 版本中添加。
优化写入可在写入数据时改进文件大小,并有利于后续对表的读取。
优化写入对分区表最有效,因为它们减少了写入每个分区的小文件数量。写入少量大文件比写入大量小文件更高效,但您仍然可能会看到写入延迟增加,因为数据在写入之前会进行混洗。
下图演示了优化写入的工作原理
优化写入功能默认_禁用_。它可以在表、SQL 会话和/或 DataFrameWriter 级别启用,使用以下设置(按优先级从低到高排序)
delta.autoOptimize.optimizeWrite
表属性(默认=None);spark.databricks.delta.optimizeWrite.enabled
SQL 配置(默认=None);- DataFrameWriter 选项
optimizeWrite
(默认=None)。
除上述之外,还可以使用以下高级 SQL 配置来进一步微调写入的文件数量和大小
spark.databricks.delta.optimizeWrite.binSize
(默认=512MiB),控制每个输出文件的目标内存大小;spark.databricks.delta.optimizeWrite.numShuffleBlocks
(默认=50,000,000),控制“目标最大混洗块数”;spark.databricks.delta.optimizeWrite.maxShufflePartitions
(默认=2,000),控制“优化写入可使用的最大输出桶(reducer)数”。