跳到内容

存储配置

Delta Lake ACID 保证以存储系统的原子性和持久性保证为前提。具体来说,Delta Lake 在与存储系统交互时依赖以下特性:

  • 原子可见性:文件必须能够完全可见或完全不可见。
  • 互斥:最终目的地只能有一个写入器能够创建(或重命名)文件。
  • 一致列表:一旦文件写入目录,该目录的所有未来列表都必须返回该文件。

由于存储系统不一定开箱即用地提供所有这些保证,Delta Lake 事务操作通常通过 LogStore API 而不是直接访问存储系统。为了为不同的存储系统提供 ACID 保证,您可能需要使用不同的 LogStore 实现。本文介绍如何为各种存储系统配置 Delta Lake。存储系统分为两类:

  • 内置支持的存储系统:对于某些存储系统,您不需要额外的配置。Delta Lake 使用路径的方案(即 s3a://path 中的 s3a)动态识别存储系统并使用相应的 LogStore 实现来提供事务保证。但是,对于 S3,并发写入存在额外的注意事项。有关详细信息,请参阅S3 部分

  • 其他存储系统LogStore 类似于 Apache Spark,使用 Hadoop FileSystem API 执行读写操作。因此,Delta Lake 支持对任何提供 FileSystem API 实现的存储系统进行并发读取。对于具有事务保证的并发写入,根据 FileSystem 实现提供的保证,有两种情况。如果实现提供一致的列表和原子重命名(不覆盖)(即 rename(..., overwrite = false) 将原子地生成目标文件,或者如果目标文件已存在则失败并抛出 java.nio.file.FileAlreadyExistsException),那么使用重命名的默认 LogStore 实现将允许具有保证的并发写入。否则,您必须通过设置以下 Spark 配置来配置 LogStore 的自定义实现:

spark.delta.logStore.<scheme>.impl=<full-qualified-class-name>

其中 <scheme> 是您的存储系统路径的方案。这将配置 Delta Lake 仅对这些路径动态使用给定的 LogStore 实现。您可以在应用程序中为不同的方案设置多个此类配置,从而允许它同时从不同的存储系统进行读写。

Delta Lake 支持以两种不同模式对 S3 进行读写:单集群和多集群。

单集群多集群
配置Delta Lake 开箱即用是实验性的,需要额外配置
读取支持来自多个集群的并发读取支持来自多个集群的并发读取
写入支持来自单个 Spark 驱动程序的并发写入支持多集群写入
权限S3 凭证S3 和 DynamoDB 操作权限

在此默认模式下,Delta Lake 支持来自多个集群的并发读取,但对 S3 的并发写入必须来自单个 Spark 驱动程序,以便 Delta Lake 提供事务保证。这是因为 S3 目前不提供互斥,即无法确保只有一个写入器能够创建文件。

  • S3 凭证:IAM 角色(推荐)或访问密钥
  • 与相应 Delta Lake 版本关联的 Apache Spark。
  • Apache Spark 编译所用的 Hadoop 版本的 Hadoop AWS 连接器 (hadoop-aws)

本节介绍如何使用单集群模式快速开始在 S3 上读写 Delta 表。有关配置的详细说明,请参阅设置配置(S3 多集群)

  1. 使用以下命令启动支持 Delta Lake 和 S3 的 Spark shell(假设您使用预构建用于 Hadoop 3.4.0 的 Spark 4.0.0)
终端窗口
bin/spark-shell \
--packages io.delta:delta-spark_2.13:4.0.0,org.apache.hadoop:hadoop-aws:3.4.0 \
--conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
--conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key>
  1. 在 S3 上尝试一些基本的 Delta 表操作(使用 Scala)
// Create a Delta table on S3:
spark.range(5).write.format("delta").save("s3a://<your-s3-bucket>/<path-to-delta-table>")
// Read a Delta table on S3:
spark.read.format("delta").load("s3a://<your-s3-bucket>/<path-to-delta-table>").show()

有关其他语言和更多 Delta 表操作示例,请参阅快速入门页面。

为了高效列出 S3 上的 Delta Lake 元数据文件,请设置配置 delta.enableFastS3AListFrom=true。此性能优化处于实验支持模式。它仅适用于 S3A 文件系统,不适用于 Amazon EMR 默认文件系统 S3

bin/spark-shell \
--packages io.delta:delta-spark_2.13:4.0.0,org.apache.hadoop:hadoop-aws:3.4.0 \
--conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
--conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key> \
--conf "spark.hadoop.delta.enableFastS3AListFrom=true

此模式支持来自多个集群对 S3 的并发写入,并且必须通过配置 Delta Lake 以使用正确的 LogStore 实现来显式启用。此实现使用 DynamoDB 来提供 S3 缺乏的互斥功能。

本节介绍如何使用多集群模式快速开始在 S3 上读写 Delta 表。

  1. 使用以下命令启动支持 Delta Lake 和 S3 的 Spark shell(假设您使用预构建用于 Hadoop 3.4.0 的 Spark 4.0.0)
终端窗口
bin/spark-shell \
--packages io.delta:delta-spark_2.13:3,org.apache.hadoop:hadoop-aws:3.4.0,io.delta:delta-storage-s3-dynamodb:4.0.0 \
--conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
--conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key> \
--conf spark.delta.logStore.s3a.impl=io.delta.storage.S3DynamoDBLogStore \
--conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-west-2
  1. 在 S3 上尝试一些基本的 Delta 表操作(使用 Scala)
// Create a Delta table on S3:
spark.range(5).write.format("delta").save("s3a://<your-s3-bucket>/<path-to-delta-table>")
// Read a Delta table on S3:
spark.read.format("delta").load("s3a://<your-s3-bucket>/<path-to-delta-table>").show()

有关其他语言和更多 Delta 表操作示例,请参阅快速入门页面。

  1. 创建 DynamoDB 表。

    您可以选择自己创建 DynamoDB 表(推荐)或让系统自动为您创建。

    • 自己创建 DynamoDB 表

      此 DynamoDB 表将维护多个 Delta 表的提交元数据,并且其配置的读写容量模式(例如,按需或预置)对于您的用例至关重要。因此,我们强烈建议您自己创建 DynamoDB 表。以下示例使用 AWS CLI。要了解更多信息,请参阅 create-table 命令参考。

终端窗口
aws dynamodb create-table \
--region us-east-1 \
--table-name delta_log \
--attribute-definitions AttributeName=tablePath,AttributeType=S \
AttributeName=fileName,AttributeType=S \
--key-schema AttributeName=tablePath,KeyType=HASH \
AttributeName=fileName,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
  1. 按照配置(S3 单集群)部分中列出的配置步骤进行操作。

  2. 在类路径中包含 delta-storage-s3-dynamodb JAR。

  3. 在您的 Spark 会话中配置 LogStore 实现。

    首先,为此方案 s3 配置此 LogStore 实现。您也可以为方案 s3as3n 复制此命令。

spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore

接下来,指定实例化 DynamoDB 客户端所需的附加信息。您必须在每个 Spark 会话中使用相同的 tableNameregion 实例化 DynamoDB 客户端,以便此多集群模式正常工作。下面列出了每个会话的配置及其默认值:

spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName=delta_log
spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-east-1
spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider=<AWSCredentialsProvider* used by the client>
spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu=5
spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu=5

至此,此多集群设置已完全可运行。但是,您还可以进行额外配置以在生产环境中提高性能和优化存储。

  1. 调整您的读写容量模式。

    如果您使用此 LogStore 实现为您创建的默认 DynamoDB 表,其默认 RCU 和 WCU 可能不足以满足您的工作负载。您可以调整预置吞吐量更新到按需模式

  2. 使用生存时间 (TTL) 清理旧的 DynamoDB 条目。

    一旦 DynamoDB 元数据条目被标记为已完成,并且经过足够长的时间,我们可以仅依靠 S3 来防止其相应 Delta 文件的意外覆盖,则可以安全地从 DynamoDB 中删除该条目。最便宜的方法是使用 DynamoDB 的 TTL 功能,这是一种免费的自动方法,可以从 DynamoDB 表中删除项目。

    在给定的 DynamoDB 表上运行以下命令以启用 TTL

终端窗口
aws dynamodb update-time-to-live \
--region us-east-1 \
--table-name delta_log \
--time-to-live-specification "Enabled=true, AttributeName=expireTime"

默认的 expireTime 将是 DynamoDB 条目被标记为完成后的第一天。

  1. 使用 S3 生命周期过期清理旧的 AWS S3 临时文件。

    在此 LogStore 实现中,会创建一个临时文件,其中包含要提交到 Delta 日志的元数据的副本。一旦 Delta 日志的提交完成,并且相应的 DynamoDB 条目已被删除,就可以安全地删除此临时文件。实际上,在提交失败的恢复过程中,只使用最新的临时文件。

    这里有两种删除这些临时文件的简单方法

    1. 使用 S3 CLI 手动删除。

      这是最安全的选择。以下命令将删除给定 <bucket><table> 中除最新临时文件之外的所有临时文件

终端窗口
aws s3 ls s3://<bucket>/<delta_table_path>/_delta_log/.tmp/ --recursive | awk 'NF>1{print $4}' | grep . | sort | head -n -1 | while read -r line ; do
echo "Removing ${line}"
aws s3 rm s3://<bucket>/<delta_table_path>/_delta_log/.tmp/${line}
done
  1. 使用 S3 生命周期过期规则删除

    一个更自动化的选项是使用 S3 生命周期过期规则,其中过滤器前缀指向表中位于 <delta_table_path>/_delta_log/.tmp/ 文件夹,过期值为 30 天。

AWS 文档 此处 描述了配置桶生命周期配置的各种方法。

一种方法是使用 S3 的 put-bucket-lifecycle-configuration 命令。有关详细信息,请参阅 S3 生命周期配置。下面给出了一个示例规则和命令调用:

在名为 file://lifecycle.json 的文件中

{
"Rules":[
{
"ID":"expire_tmp_files",
"Filter":{
"Prefix":"path/to/table/_delta_log/.tmp/"
},
"Status":"Enabled",
"Expiration":{
"Days":30
}
}
]
}
终端窗口
aws s3api put-bucket-lifecycle-configuration \
--bucket my-bucket \
--lifecycle-configuration file://lifecycle.json

Delta Lake 对各种 Azure 存储系统具有内置支持,并为来自多个集群的并发读写提供完整的事务保证。

Delta Lake 依赖 Hadoop FileSystem API 访问 Azure 存储服务。具体来说,Delta Lake 要求 FileSystem.rename() 的实现是原子的,这仅在较新的 Hadoop 版本中支持(Hadoop-15156Hadoop-15086)。因此,您可能需要使用较新的 Hadoop 版本构建 Spark 并将其用于部署应用程序。有关使用特定 Hadoop 版本构建 Spark 的信息,请参阅指定 Hadoop 版本和启用 YARN,有关使用 Delta Lake 设置 Spark 的信息,请参阅快速入门

以下是每种 Azure 存储系统特定的要求列表:

例如,一个可行的组合是 Delta 0.7.0 或更高版本,以及使用 Hadoop 3.2 编译和部署的 Apache Spark 3.0。

以下是在 Azure Blob 存储上配置 Delta Lake 的步骤。

  1. 在类路径中包含 hadoop-azure JAR。有关版本详细信息,请参阅上述要求。

  2. 设置凭证。

    您可以在 Spark 配置属性中设置您的凭证。

    我们建议您使用 SAS 令牌。在 Scala 中,您可以按如下方式使用:

spark.conf.set(
"fs.azure.sas.<your-container-name>.<your-storage-account-name>.blob.core.windows.net",
"<complete-query-string-of-your-sas-for-the-container>")

或者您可以指定帐户访问密钥:

spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
"<your-storage-account-access-key>")
spark.range(5).write.format("delta").save("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>")
spark.read.format("delta").load("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>").show()

以下是在 Azure Data Lake Storage Gen1 上配置 Delta Lake 的步骤。

  1. 在类路径中包含 hadoop-azure-datalake JAR。有关版本详细信息,请参阅上述要求。

  2. 设置 Azure Data Lake Storage Gen1 凭证。

    您可以使用凭证设置以下Hadoop 配置(在 Scala 中):

spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", "<your-oauth2-client-id>")
spark.conf.set("dfs.adls.oauth2.credential", "<your-oauth2-credential>")
spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/<your-directory-id>/oauth2/token")
spark.range(5).write.format("delta").save("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>")
spark.read.format("delta").load("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>").show()

以下是在 Azure Data Lake Storage Gen2 上配置 Delta Lake 的步骤。

  1. 在类路径中包含 hadoop-azureazure-storage JAR。有关版本详细信息,请参阅上述要求。

  2. 设置凭证。

    您可以使用 OAuth 2.0 与服务主体或共享密钥身份验证

    对于使用服务主体的 OAuth 2.0(推荐)

spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

对于共享密钥身份验证

spark.conf.set("fs.azure.account.key.<storage-account>.dfs.core.windows.net", "<storage-account-access-key>")
spark.range(5).write.format("delta").save("abfss://<container-name>@<storage-account>.dfs.core.windows.net/<path-to-delta-table>")
spark.read.format("delta").load("abfss://<container-name>@<storage-account>.dfs.core.windows.net/<path-to-delta-table>").show()

Delta Lake 对 HDFS 具有内置支持,并为来自多个集群的并发读写提供完整的事务保证。无需额外配置。

spark.range(5).write.format("delta").save("hdfs://<namenode>:<port>/<path-to-delta-table>")
spark.read.format("delta").load("hdfs://<namenode>:<port>/<path-to-delta-table>").show()

Delta Lake 对 Google Cloud Storage (GCS) 具有内置支持,并为来自多个集群的并发读写提供完整的事务保证。

  • Google Cloud Storage 凭证
  • Delta Lake 0.2.0 或更高版本
  • Apache Spark 编译所用的 Hadoop 版本的 Hadoop GCS 连接器
  1. 在类路径中包含 GCS 连接器 JAR。

  2. 使用以下方法之一设置凭据

spark.conf.set("google.cloud.auth.service.account.json.keyfile", "<path-to-json-key-file>")
spark.range(5).write.format("delta").save("gs://<bucket>/<path-to-delta-table>")
spark.read.format("delta").load("gs://<bucket>/<path-to-delta-table>").show()

Delta Lake 支持 Oracle Cloud Infrastructure (OCI) 对象存储,并为来自多个集群的并发读写提供完整的事务保证。

  • OCI 凭证
  • Delta Lake 0.2.0 或更高版本
  • Apache Spark 编译所用的 Hadoop 版本的 Hadoop OCI 连接器
  1. 在类路径中包含 OCI 连接器 JAR。

  2. 在 Spark 配置中设置凭证

spark.conf.set("fs.oci.client.auth.tenantId", "<tenant-ocid>")
spark.conf.set("fs.oci.client.auth.userId", "<user-ocid>")
spark.conf.set("fs.oci.client.auth.fingerprint", "<api-key-fingerprint>")
spark.conf.set("fs.oci.client.auth.pemfilepath", "<path-to-private-key-file>")
spark.range(5).write.format("delta").save("oci://<bucket>@<namespace>/<path-to-delta-table>")
spark.read.format("delta").load("oci://<bucket>@<namespace>/<path-to-delta-table>").show()

Delta Lake 支持 IBM Cloud Object Storage,并为来自多个集群的并发读写提供完整的事务保证。

  • IBM Cloud Object Storage 凭证
  • Delta Lake 0.2.0 或更高版本
  • Apache Spark 编译所用的 Hadoop 版本的 Hadoop Stocator 连接器
  1. 在类路径中包含 Stocator 连接器 JAR。

  2. 在 Spark 配置中设置凭证

spark.conf.set("fs.cos.service.endpoint", "<endpoint-url>")
spark.conf.set("fs.cos.service.access.key", "<access-key>")
spark.conf.set("fs.cos.service.secret.key", "<secret-key>")
spark.range(5).write.format("delta").save("cos://<bucket>.<service>/<path-to-delta-table>")
spark.read.format("delta").load("cos://<bucket>.<service>/<path-to-delta-table>").show()