并发控制
Delta Lake 在读写之间提供 ACID 事务保证。这意味着:
- 对于支持的存储系统,跨多个集群的多个写入者可以同时修改表分区,并看到表的一致快照视图,并且这些写入将按顺序进行。
- 即使在作业期间修改了表,读取者仍会看到 Apache Spark 作业开始时表的一致快照视图。
Delta Lake 使用乐观并发控制来提供写入之间的事务保证。在此机制下,写入操作分为三个阶段:
- 读取:读取(如果需要)表的最新可用版本,以识别需要修改(即重写)的文件。
- 写入:通过写入新的数据文件来暂存所有更改。
- 验证和提交:在提交更改之前,检查提议的更改是否与自读取快照以来可能已并发提交的任何其他更改冲突。如果没有冲突,所有暂存的更改都将作为新的版本化快照提交,并且写入操作成功。但是,如果存在冲突,写入操作将因并发修改异常而失败,而不是像 Parquet 表上的写入操作那样损坏表。
写入冲突
Write conflicts下表描述了哪些写入操作对可能发生冲突。压缩是指使用选项 dataChange
设置为 false
写入的文件压缩操作。
INSERT | UPDATE、DELETE、MERGE INTO | COMPACTION | |
---|---|---|---|
INSERT | 不会冲突 | ||
UPDATE、DELETE、MERGE INTO | 可能会冲突 | 可能会冲突 | |
COMPACTION | 不会冲突 | 可能会冲突 | 可能会冲突 |
使用分区和不相交的命令条件避免冲突
使用分区和不相交的命令条件避免冲突在所有标记为“可能冲突”的情况下,两个操作是否会冲突取决于它们是否操作相同的文件集。您可以通过按与操作条件中使用的相同列对表进行分区,使两个文件集不相交。例如,如果表未按日期分区,则命令 UPDATE table WHERE date > '2010-01-01' ...
和 DELETE table WHERE date < '2010-01-01'
将发生冲突,因为两者都可能尝试修改相同的文件集。按 date
分区表将避免冲突。因此,根据命令中常用条件对表进行分区可以显著减少冲突。但是,按高基数列对表进行分区可能会由于大量子目录而导致其他性能问题。
冲突异常
冲突异常当发生事务冲突时,您将看到以下异常之一:
ConcurrentAppendException
ConcurrentAppendException当并发操作在您的操作读取的同一分区(或未分区表中的任何位置)添加文件时,会发生此异常。文件添加可能由 INSERT
、DELETE
、UPDATE
或 MERGE
操作引起。
此异常通常在并发的 DELETE
、UPDATE
或 MERGE
操作期间抛出。虽然并发操作可能正在物理更新不同的分区目录,但其中一个可能读取另一个并发更新的同一分区,从而导致冲突。您可以通过在操作条件中明确分离来避免此问题。考虑以下示例。
// Target 'deltaTable' is partitioned by date and countrydeltaTable.as("t") .merge( source.as("s"), "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country" ) .whenMatched() .updateAll() .whenNotMatched() .insertAll() .execute()
假设您针对不同的日期或国家/地区并发运行上述代码。由于每个作业都在目标 Delta 表的独立分区上工作,因此您不希望发生任何冲突。但是,条件不够明确,可能会扫描整个表并与更新任何其他分区的并发操作发生冲突。相反,您可以重写您的语句以将特定的日期和国家/地区添加到合并条件,如以下示例所示。
// Target 'deltaTable' is partitioned by date and countrydeltaTable.as("t") .merge( source.as("s"), "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'" ) .whenMatched() .updateAll() .whenNotMatched() .insertAll() .execute()
此操作现在可以安全地在不同日期和国家/地区并发运行。
ConcurrentDeleteReadException
ConcurrentDeleteReadException当并发操作删除了您的操作读取的文件时,会发生此异常。常见原因是重写文件的 DELETE
、UPDATE
或 MERGE
操作。
ConcurrentDeleteDeleteException
ConcurrentDeleteDeleteException当并发操作删除了您的操作也删除的文件时,会发生此异常。这可能是由两个并发压缩操作重写相同文件引起的。
MetadataChangedException
MetadataChangedException当并发事务更新 Delta 表的元数据时,会发生此异常。常见原因是 ALTER TABLE
操作或更新表模式的写入 Delta 表的操作。
ConcurrentTransactionException
ConcurrentTransactionException如果使用相同检查点位置的流式查询同时多次启动并尝试同时写入 Delta 表,则会发生此异常。您不应该让两个流式查询使用相同的检查点位置并同时运行。
ProtocolChangedException
ProtocolChangedException此异常可能在以下情况下发生:
- 当您的 Delta 表升级到新版本时。为了使未来的操作成功,您可能需要升级您的 Delta Lake 版本。
- 当多个写入者同时创建或替换表时。
- 当多个写入者同时写入空路径时。