最佳实践
选择正确的分区列
标题为“选择正确的分区列”的部分您可以按列对 Delta 表进行分区。最常用的分区列是 date
。请遵循以下两条经验法则来决定按哪个列进行分区
- 如果某个列的基数非常高,请勿将其用于分区。例如,如果您按列
userId
进行分区,并且可能有 1M 个不同的用户 ID,那么这是一个糟糕的分区策略。 - 每个分区中的数据量:如果您预计该分区中的数据至少为 1 GB,则可以按列进行分区。
压缩文件
标题为“压缩文件”的部分如果您持续向 Delta 表写入数据,随着时间的推移,它会积累大量文件,特别是当您以小批量添加数据时。这可能会对表读取效率产生不利影响,也可能会影响文件系统的性能。理想情况下,大量小文件应定期重写为少量大文件。这被称为压缩。
您可以通过将其重新分区为较少数量的文件来压缩表。此外,您可以将选项 dataChange
指定为 false
,表示该操作不更改数据,只重新排列数据布局。这将确保其他并发操作由于此压缩操作而受到的影响最小。
例如,您可以将表压缩成 16 个文件
val path = "..."val numFiles = 16
spark.read .format("delta") .load(path) .repartition(numFiles) .write .option("dataChange", "false") .format("delta") .mode("overwrite") .save(path)
path = "..."numFiles = 16
(spark.read .format("delta") .load(path) .repartition(numFiles) .write .option("dataChange", "false") .format("delta") .mode("overwrite") .save(path))
如果您的表已分区,并且您只想根据谓词重新分区一个分区,您可以使用 where
只读取该分区,然后使用 replaceWhere
写回该分区
val path = "..."val partition = "year = '2019'"val numFilesPerPartition = 16
spark.read .format("delta") .load(path) .where(partition) .repartition(numFilesPerPartition) .write .option("dataChange", "false") .format("delta") .mode("overwrite") .option("replaceWhere", partition) .save(path)
path = "..."partition = "year = '2019'"numFilesPerPartition = 16
(spark.read .format("delta") .load(path) .where(partition) .repartition(numFilesPerPartition) .write .option("dataChange", "false") .format("delta") .mode("overwrite") .option("replaceWhere", partition) .save(path))
替换表内容或模式
标题为“替换表内容或模式”的部分有时您可能想替换 Delta 表。例如
- 您发现表中的数据不正确,想替换内容。
- 您想重写整个表以进行不兼容的模式更改(例如更改列类型)。
虽然您可以删除 Delta 表的整个目录并在同一路径上创建新表,但不推荐这样做,因为
- 删除目录效率不高。包含非常大文件的目录可能需要数小时甚至数天才能删除。
- 您会丢失已删除文件中的所有内容;如果您删错了表,将很难恢复。
- 目录删除不是原子性的。当您删除表时,同时读取表的并发查询可能会失败或看到部分表。
如果您不需要更改表模式,可以从 Delta 表中删除数据并插入新数据,或者更新表以修复不正确的值。
如果您想更改表模式,可以原子地替换整个表。例如
dataframe.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .partitionBy(<your-partition-columns>) \ .saveAsTable("<your-table>") # Managed tabledataframe.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .option("path", "<your-table-path>") \ .partitionBy(<your-partition-columns>) \ .saveAsTable("<your-table>") # External table
REPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) AS SELECT ... -- Managed tableREPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) LOCATION "<your-table-path>" AS SELECT ... -- External table
dataframe.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .partitionBy(<your-partition-columns>) .saveAsTable("<your-table>") // Managed tabledataframe.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .option("path", "<your-table-path>") .partitionBy(<your-partition-columns>) .saveAsTable("<your-table>") // External table
这种方法有多种好处
- 覆盖表要快得多,因为它不需要递归列出目录或删除任何文件。
- 表的旧版本仍然存在。如果您删错了表,可以使用时间旅行轻松检索旧数据。
- 这是一个原子操作。当您删除表时,并发查询仍然可以读取表。
- 由于 Delta Lake ACID 事务保证,如果覆盖表失败,表将恢复到其先前的状态。
此外,如果您想在覆盖表后删除旧文件以节省存储成本,可以使用VACUUM 删除它们。它针对文件删除进行了优化,通常比删除整个目录更快。
Spark 缓存
标题为“Spark 缓存”的部分您不应使用 Spark 缓存,原因如下
- 您会丢失因在缓存的
DataFrame
之上添加额外过滤器而可能带来的任何数据跳过。 - 如果使用不同的标识符访问表(例如,您执行
spark.table(x).cache()
,但随后使用spark.write.save(/some/path)
写入表),则缓存的数据可能不会更新。