跳到内容

最佳实践

您可以按列对 Delta 表进行分区。最常用的分区列是 date。请遵循以下两条经验法则来决定按哪个列进行分区

  • 如果某个列的基数非常高,请勿将其用于分区。例如,如果您按列 userId 进行分区,并且可能有 1M 个不同的用户 ID,那么这是一个糟糕的分区策略。
  • 每个分区中的数据量:如果您预计该分区中的数据至少为 1 GB,则可以按列进行分区。

如果您持续向 Delta 表写入数据,随着时间的推移,它会积累大量文件,特别是当您以小批量添加数据时。这可能会对表读取效率产生不利影响,也可能会影响文件系统的性能。理想情况下,大量小文件应定期重写为少量大文件。这被称为压缩。

您可以通过将其重新分区为较少数量的文件来压缩表。此外,您可以将选项 dataChange 指定为 false,表示该操作不更改数据,只重新排列数据布局。这将确保其他并发操作由于此压缩操作而受到的影响最小。

例如,您可以将表压缩成 16 个文件

val path = "..."
val numFiles = 16
spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path)

如果您的表已分区,并且您只想根据谓词重新分区一个分区,您可以使用 where 只读取该分区,然后使用 replaceWhere 写回该分区

val path = "..."
val partition = "year = '2019'"
val numFilesPerPartition = 16
spark.read
.format("delta")
.load(path)
.where(partition)
.repartition(numFilesPerPartition)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", partition)
.save(path)

有时您可能想替换 Delta 表。例如

  • 您发现表中的数据不正确,想替换内容。
  • 您想重写整个表以进行不兼容的模式更改(例如更改列类型)。

虽然您可以删除 Delta 表的整个目录并在同一路径上创建新表,但不推荐这样做,因为

  • 删除目录效率不高。包含非常大文件的目录可能需要数小时甚至数天才能删除。
  • 您会丢失已删除文件中的所有内容;如果您删错了表,将很难恢复。
  • 目录删除不是原子性的。当您删除表时,同时读取表的并发查询可能会失败或看到部分表。

如果您不需要更改表模式,可以从 Delta 表中删除数据并插入新数据,或者更新表以修复不正确的值。

如果您想更改表模式,可以原子地替换整个表。例如

dataframe.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy(<your-partition-columns>) \
.saveAsTable("<your-table>") # Managed table
dataframe.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.option("path", "<your-table-path>") \
.partitionBy(<your-partition-columns>) \
.saveAsTable("<your-table>") # External table

这种方法有多种好处

  • 覆盖表要快得多,因为它不需要递归列出目录或删除任何文件。
  • 表的旧版本仍然存在。如果您删错了表,可以使用时间旅行轻松检索旧数据。
  • 这是一个原子操作。当您删除表时,并发查询仍然可以读取表。
  • 由于 Delta Lake ACID 事务保证,如果覆盖表失败,表将恢复到其先前的状态。

此外,如果您想在覆盖表后删除旧文件以节省存储成本,可以使用VACUUM 删除它们。它针对文件删除进行了优化,通常比删除整个目录更快。

您不应使用 Spark 缓存,原因如下

  • 您会丢失因在缓存的 DataFrame 之上添加额外过滤器而可能带来的任何数据跳过。
  • 如果使用不同的标识符访问表(例如,您执行 spark.table(x).cache(),但随后使用 spark.write.save(/some/path) 写入表),则缓存的数据可能不会更新。