跳到内容

Delta Standalone(已弃用)

Delta Standalone 库是一个单节点 Java 库,可用于读取和写入 Delta 表。具体而言,该库提供与事务日志中表元数据交互的 API,实现 Delta 事务日志协议,以实现 Delta Lake 格式的事务保证。值得注意的是,该项目不依赖 Apache Spark,并且只有少量传递依赖。因此,任何处理引擎或应用程序都可以使用它来访问 Delta 表。

Delta Standalone 针对您希望使用自己选择的非 Spark 引擎读取和写入 Delta 表的场景进行了优化。它是一个“低级”库,我们鼓励开发人员为其所需的引擎贡献开源的高级连接器,这些连接器使用 Delta Standalone 进行所有 Delta Lake 元数据交互。您可以在 Delta Lake 仓库中找到 Hive 源连接器和 Flink sink/源连接器。其他连接器正在开发中。

Delta Standalone 通过使用迭代器递增加载 Delta Lake 事务日志来最大程度地减少 JVM 中的内存使用。然而,Delta Standalone 在单个 JVM 中运行,并且受该 JVM 的处理和内存能力限制。用户必须配置 JVM 以避免内存不足 (OOM) 问题。

Delta Standalone 确实提供了读取 Parquet 数据的基本 API,但不包括写入 Parquet 数据的 API。用户必须自行写入新的 Parquet 数据文件,然后使用 Delta Standalone 将这些更改提交到 Delta 表,并使新数据对读取者可见。

Delta Standalone 提供用于读取数据、查询元数据和提交到事务日志的类和实体。此处将重点介绍其中几个及其主要接口。有关完整的类和实体集,请参阅 Java API 文档

DeltaLog 是以编程方式与 Delta 表事务日志中的元数据交互的主要接口。

  • 使用 DeltaLog.forTable(hadoopConf, path) 实例化一个 DeltaLog,并传入 Delta 表根位置的 path
  • 使用 DeltaLog::snapshot 访问当前快照。
  • 使用 DeltaLog::update 获取最新快照,包括添加到日志中的任何新数据文件。
  • 使用 DeltaLog::getSnapshotForTimestampAsOfDeltaLog::getSnapshotForVersionAsOf 获取日志在某个历史状态下的快照。
  • 使用 DeltaLog::startTransaction 启动一个新的事务以提交到事务日志。
  • 使用 DeltaLog::getChanges 获取所有元数据操作,而无需计算完整的快照。

A Snapshot 表示表在特定版本下的状态。

  • 使用 Snapshot::getAllFiles 获取元数据文件列表。
  • 对于元数据文件上的内存优化迭代器,使用 Snapshot::scan 获取 DeltaScan(如后所述),可选地传入分区过滤的 predicate
  • 使用 Snapshot::open 读取实际数据,它返回 Delta 表行的迭代器。

提交一组更新到事务日志的主要类是 OptimisticTransaction。在事务期间,所有读取都必须通过 OptimisticTransaction 实例而不是 DeltaLog,以便检测逻辑冲突和并发更新。

  • 使用 OptimisticTransaction::markFilesAsRead 在事务期间读取元数据文件,该方法返回与 readPredicate 匹配的文件 DeltaScan
  • 使用 OptimisticTransaction::commit 提交到事务日志。
  • 使用 OptimisticTransaction::txnVersion 获取给定应用程序 ID 提交的最新版本(例如,用于幂等性)。(请注意,此 API 要求用户提交 SetTransaction 操作。)
  • 使用 OptimisticTransaction::updateMetadata 在提交时更新表的元数据。

DeltaScanSnapshot 中与给定 readPredicate 匹配的文件的包装类。

  • 使用 DeltaScan::getFiles 访问与 readPredicate 的分区过滤部分匹配的文件。这返回一个内存优化的表元数据文件迭代器。
  • 要进一步过滤返回的非分区列文件,请使用 DeltaScan::getResidualPredicate 获取未应用的输入谓词部分。

Delta Standalone 当前提供的唯一公共 API 位于 io.delta.standalone 包中。io.delta.standalone.internal 包中的类和方法被认为是内部的,并且在次要版本和补丁版本之间可能会发生变化。

您可以使用首选的构建工具将 Delta Standalone 库添加为依赖项。Delta Standalone 依赖于 hadoop-clientparquet-hadoop 包。以下部分列出了示例构建文件。

  • JDK 8 或更高版本。
  • Scala 2.11 或 2.12。

hadoop-client 的版本替换为您正在使用的版本。

Scala 2.12

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>

Scala 2.11

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.11</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>

hadoop-client 的版本替换为您正在使用的版本。

libraryDependencies ++= Seq(
"io.delta" %% "delta-standalone" % "0.5.0",
"org.apache.hadoop" % "hadoop-client" % "3.1.0)

Delta Standalone 屏蔽了其自己的 Parquet 依赖项,以便开箱即用并减少您环境中的依赖项冲突。但是,如果您想使用实用程序类 io.delta.standalone.util.ParquetSchemaConverter,则必须提供您自己的 org.apache.parquet:parquet-hadoop 版本。

Delta Lake ACID 保证基于存储系统的原子性和持久性保证。并非所有存储系统都提供所有必要的保证。

由于存储系统不一定开箱即用提供所有这些保证,Delta Lake 事务操作通常通过 LogStore API 而不是直接访问存储系统。为了为不同的存储系统提供 ACID 保证,您可能需要使用不同的 LogStore 实现。本节介绍如何为各种存储系统配置 Delta Standalone。存储系统分为两类:

  • 内置支持的存储系统:对于某些存储系统,您不需要额外的配置。Delta Standalone 使用路径方案(即 s3a://path 中的 s3a)动态识别存储系统并使用相应的 LogStore 实现来提供事务保证。但是,对于 S3,并发写入存在额外注意事项。有关详细信息,请参阅 S3 部分

  • 其他存储系统LogStore,类似于 Apache Spark,使用 Hadoop FileSystem API 执行读写。Delta Standalone 支持对任何提供 FileSystem API 实现的存储系统进行并发读取。对于具有事务保证的并发写入,根据 FileSystem 实现提供的保证,有两种情况。如果实现提供一致的列表和原子重命名而不覆盖(即 rename(... , overwrite = false) 将原子地生成目标文件,如果它已经存在则会失败并抛出 java.nio.file.FileAlreadyExistsException),那么使用重命名的默认 LogStore 实现将允许具有保证的并发写入。否则,您必须通过在实例化 DeltaLog 时设置以下 Hadoop 配置来配置 LogStore 的自定义实现:DeltaLog.forTable(hadoopConf, path)

    delta.logStore.<scheme>.impl=<full-qualified-class-name>

    在这里,<scheme> 是您的存储系统路径的方案。这会将 Delta Standalone 配置为仅对这些路径动态使用给定的 LogStore 实现。您可以在应用程序中为不同的方案设置多个此类配置,从而允许它同时从不同的存储系统读取和写入。

Delta Standalone 支持两种不同模式对 S3 进行读写:单集群和多集群。

单集群多集群
配置开箱即用是实验性的,需要额外配置
读取支持来自多个集群的并发读取支持来自多个集群的并发读取
写入支持来自单个集群的并发写入支持多集群写入
权限S3 凭据S3 和 DynamoDB 操作权限

默认情况下,Delta Standalone 支持来自多个集群的并发读取。但是,对 S3 的并发写入必须源自单个集群,才能提供事务保证。这是因为 S3 当前不提供互斥,也就是说,无法确保只有一个写入器能够创建文件。

要将 Delta Standalone 与 S3 一起使用,您必须满足以下要求。如果您使用访问密钥进行身份验证和授权,则必须在实例化 DeltaLog 时配置如下指定的 Hadoop Configuration:DeltaLog.forTable(hadoopConf, path)

  1. 在类路径中包含 hadoop-aws JAR。

  2. 设置 S3 凭据。我们建议您使用 IAM 角色 进行身份验证和授权。但是,如果您想使用密钥,请使用以下配置您的 org.apache.hadoop.conf.Configuration

    conf.set("fs.s3a.access.key", "<your-s3-access-key>");
    conf.set("fs.s3a.secret.key", "<your-s3-secret-key>");
  • “要求(S3 单集群)”部分中列出的所有要求
  • 除了 S3 凭据,您还需要 DynamoDB 操作权限
  1. 创建 DynamoDB 表。有关自行创建表(推荐)或自动创建表的更多详细信息,请参阅创建 DynamoDB 表

  2. 按照配置(S3 单集群)部分中列出的配置步骤进行操作。

  3. 在类路径中包含 delta-storage-s3-dynamodb JAR。

  4. 配置 LogStore 实现。

    首先,为方案 s3 配置此 LogStore 实现。您也可以为方案 s3as3n 复制此命令。

    conf.set("delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
配置键描述默认
io.delta.storage.S3DynamoDBLogStore.ddb.tableName要使用的 DynamoDB 表的名称delta_log
io.delta.storage.S3DynamoDBLogStore.ddb.region客户端要使用的区域us-east-1
io.delta.storage.S3DynamoDBLogStore.credentials.provider客户端使用的 AWSCredentialsProvider*DefaultAWSCredentialsProviderChain
io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu(仅限表创建**)读取容量单位5
io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu(仅限表创建**)写入容量单位5

*有关 AWS 凭证提供商的更多详细信息,请参阅 AWS 文档

**这些配置仅在给定的 DynamoDB 表尚不存在且需要自动创建时使用。

至此,此多集群设置已完全运行。但是,在生产环境中运行时,您可以进行额外的配置以提高性能和优化存储。有关更多详细信息,请参阅 Delta Lake 文档

Delta Standalone 支持从多个集群并发读写,并为各种 Azure 存储系统提供完整的事务保证。要使用 Azure 存储系统,您必须满足以下要求,并在实例化 DeltaLog 时配置指定的 Hadoop Configuration:DeltaLog.forTable(hadoopConf, path)

  1. 在类路径中包含 hadoop-azure JAR。

  2. 设置凭据。

    • 对于 SAS 令牌,请配置 org.apache.hadoop.conf.Configuration

      conf.set(
      "fs.azure.sas.<your-container-name>.<your-storage-account-name>.blob.core.windows.net",
      "<complete-query-string-of-your-sas-for-the-container>");
    • 指定账户访问密钥

      conf.set(
      "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
      "<your-storage-account-access-key>");
  1. 在类路径中包含 hadoop-azure-datalake JAR。

  2. 设置 Azure Data Lake Storage Gen1 凭据。配置 org.apache.hadoop.conf.Configuration

    conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential");
    conf.set("dfs.adls.oauth2.client.id", "<your-oauth2-client-id>");
    conf.set("dfs.adls.oauth2.credential", "<your-oauth2-credential>");
    conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/<your-directory-id>/oauth2/token");
  1. 在类路径中包含 hadoop-azure-datalake JAR。此外,您可能还需要包含 Maven 工件 hadoop-azurewildfly-openssl 的 JAR。

  2. 设置 Azure Data Lake Storage Gen2 凭据。配置您的 org.apache.hadoop.conf.Configuration

    conf.set("fs.azure.account.auth.type.<storage-account-name>.dfs.core.windows.net", "OAuth");
    conf.set("fs.azure.account.oauth.provider.type.<storage-account-name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider");
    conf.set("fs.azure.account.oauth2.client.id.<storage-account-name>.dfs.core.windows.net", "<application-id>");
    conf.set("fs.azure.account.oauth2.client.secret.<storage-account-name>.dfs.core.windows.net","<password>");
    conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account-name>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token");

    其中 <storage-account-name><application-id><directory-id><password> 是我们之前设置的要求中的服务主体详细信息。

Delta Standalone 内置支持 HDFS,提供对多个集群的并发读写以及完整的事务保证。有关配置凭据的信息,请参阅 Hadoop 文档

  1. 在类路径中包含 gcs-connector 的 JAR。有关如何使用 GCS 连接器配置项目的详细信息,请参阅文档

本示例演示如何使用 Delta Standalone 进行以下操作:

  • 查找 Parquet 文件。
  • 写入 Parquet 数据。
  • 提交到事务日志。
  • 从事务日志读取。
  • 回读 Parquet 数据。

请注意,本示例使用虚构的非 Spark 引擎 Zappy 写入实际的 Parquet 数据,因为 Delta Standalone 不提供任何数据写入 API。相反,Delta Standalone Writer 允许您在写入数据后将元数据提交到 Delta 日志。这就是 Delta Standalone 与如此多的连接器(例如 Flink、Presto、Trino 等)配合良好的原因,因为它们提供了 Parquet 写入功能。

使用以下 SBT 项目配置:

// <project-root>/build.sbt
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"io.delta" %% "delta-standalone" % "0.5.0",
"org.apache.hadoop" % "hadoop-client" % "3.1.0")

我们有一个存储销售数据的 Delta 表 Sales,但发现客户 XYZ 在 2021 年 11 月写入的所有数据都有不正确的 total_cost 值。因此,我们需要使用正确的值更新所有这些记录。我们将使用虚构的分布式引擎 Zappy 和 Delta Standalone 来更新我们的 Delta 表。

销售表模式如下所示。

Sales
|-- year: int // partition column
|-- month: int // partition column
|-- day: int // partition column
|-- customer: string
|-- sale_id: string
|-- total_cost: float

由于我们必须读取现有数据才能执行所需的更新操作,因此我们必须使用 OptimisticTransaction::markFilesAsRead 才能自动检测对我们读取的分区进行的任何并发修改。由于 Delta Standalone 仅支持分区剪枝,我们必须应用剩余谓词以进一步过滤返回的文件。

import io.delta.standalone.DeltaLog;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.expressions.And;
import io.delta.standalone.expressions.EqualTo;
import io.delta.standalone.expressions.Literal;
DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
OptimisticTransaction txn = log.startTransaction();
DeltaScan scan = txn.markFilesAsRead(
new And(
new And(
new EqualTo(schema.column("year"), Literal.of(2021)), // partition filter
new EqualTo(schema.column("month"), Literal.of(11))), // partition filter
new EqualTo(schema.column("customer"), Literal.of("XYZ")) // non-partition filter
)
);
CloseableIterator<AddFile> iter = scan.getFiles();
Map<String, AddFile> addFileMap = new HashMap<String, AddFile>(); // partition filtered files: year=2021, month=11
while (iter.hasNext()) {
AddFile addFile = iter.next();
addFileMap.put(addFile.getPath(), addFile);
}
iter.close();
List<String> filteredFiles = ZappyReader.filterFiles( // fully filtered files: year=2021, month=11, customer=XYZ
addFileMap.keySet(),
toZappyExpression(scan.getResidualPredicate())
);

由于 Delta Standalone 不提供任何 Parquet 数据写入 API,我们使用 Zappy 来写入数据。

ZappyDataFrame correctedSaleIdToTotalCost = ...;
ZappyDataFrame invalidSales = ZappyReader.readParquet(filteredFiles);
ZappyDataFrame correctedSales = invalidSales.join(correctedSaleIdToTotalCost, "id");
ZappyWriteResult dataWriteResult = ZappyWritter.writeParquet("/data/sales", correctedSales);

前面代码写入的数据文件将具有类似于以下的层次结构:

终端窗口
$ tree /data/sales
.
├── _delta_log
└── ...
└── 00000000000000001082.json
└── 00000000000000001083.json
├── year=2019
└── month=1
...
├── year=2020
└── month=1
└── day=1
└── part-00000-195768ae-bad8-4c53-b0c2-e900e0f3eaee-c000.snappy.parquet // previous
└── part-00001-53c3c553-f74b-4384-b9b5-7aa45bc2291b-c000.snappy.parquet // new
| ...
└── day=2
└── part-00000-b9afbcf5-b90d-4f92-97fd-a2522aa2d4f6-c000.snappy.parquet // previous
└── part-00001-c0569730-5008-42fa-b6cb-5a152c133fde-c000.snappy.parquet // new
| ...

现在我们已经写入了正确的数据,我们需要提交到事务日志以添加新文件,并删除旧的不正确文件。

import io.delta.standalone.Operation;
import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
import io.delta.standalone.types.StructType;
List<RemoveFile> removeOldFiles = filteredFiles.stream()
.map(path -> addFileMap.get(path).remove())
.collect(Collectors.toList());
List<AddFile> addNewFiles = dataWriteResult.getNewFiles()
.map(file ->
new AddFile(
file.getPath(),
file.getPartitionValues(),
file.getSize(),
System.currentTimeMillis(),
true, // isDataChange
null, // stats
null // tags
);
).collect(Collectors.toList());
List<Action> totalCommitFiles = new ArrayList<>();
totalCommitFiles.addAll(removeOldFiles);
totalCommitFiles.addAll(addNewFiles);
try {
txn.commit(totalCommitFiles, new Operation(Operation.Name.UPDATE), "Zippy/1.0.0");
} catch (DeltaConcurrentModificationException e) {
// handle exception here
}

Delta Standalone 提供读取元数据和数据的 API,如下所示。

对于大多数用例,特别是当您处理大量数据时,我们建议您将 Delta Standalone 库用作仅元数据读取器,然后自己执行 Parquet 数据读取,最可能以分布式方式。

Delta Standalone 提供了两个 API,用于读取给定表快照中的文件。Snapshot::getAllFiles 返回一个内存列表。自 0.3.0 版本起,我们还提供了 Snapshot::scan(filter)::getFiles,它支持分区剪枝和优化的内部迭代器实现。此处我们将使用后者。

import io.delta.standalone.Snapshot;
DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
Snapshot latestSnapshot = log.update();
StructType schema = latestSnapshot.getMetadata().getSchema();
DeltaScan scan = latestSnapshot.scan(
new And(
new And(
new EqualTo(schema.column("year"), Literal.of(2021)),
new EqualTo(schema.column("month"), Literal.of(11))),
new EqualTo(schema.column("customer"), Literal.of("XYZ"))
)
);
CloseableIterator<AddFile> iter = scan.getFiles();
try {
while (iter.hasNext()) {
AddFile addFile = iter.next();
// Zappy engine to handle reading data in `addFile.getPath()` and apply any `scan.getResidualPredicate()`
}
} finally {
iter.close();
}

Delta Standalone 允许使用 Snapshot::open 直接读取 Parquet 数据。

import io.delta.standalone.data.RowRecord;
CloseableIterator<RowRecord> dataIter = log.update().open();
try {
while (dataIter.hasNext()) {
RowRecord row = dataIter.next();
int year = row.getInt("year");
String customer = row.getString("customer");
float totalCost = row.getFloat("total_cost");
}
} finally {
dataIter.close();
}

我们使用 GitHub Issues 跟踪社区报告的问题。您也可以联系社区以获取答案。

我们欢迎对 Delta Lake 仓库的贡献。我们使用 GitHub Pull Requests 接受更改。

有两种方式与 Delta Lake 社区沟通:

在 IntelliJ 中本地调试 standalone 测试之前,请运行所有测试:build/sbt standalone/test。这有助于 IntelliJ 将黄金表识别为类资源。