存储配置
Delta Lake ACID 保证以存储系统的原子性和持久性保证为前提。具体来说,Delta Lake 在与存储系统交互时依赖以下特性:
- 原子可见性:文件必须能够完全可见或完全不可见。
- 互斥:最终目的地只能有一个写入器能够创建(或重命名)文件。
- 一致列表:一旦文件写入目录,该目录的所有未来列表都必须返回该文件。
由于存储系统不一定开箱即用地提供所有这些保证,Delta Lake 事务操作通常通过 LogStore API 而不是直接访问存储系统。为了为不同的存储系统提供 ACID 保证,您可能需要使用不同的 LogStore
实现。本文介绍如何为各种存储系统配置 Delta Lake。存储系统分为两类:
-
内置支持的存储系统:对于某些存储系统,您不需要额外的配置。Delta Lake 使用路径的方案(即
s3a://path
中的s3a
)动态识别存储系统并使用相应的LogStore
实现来提供事务保证。但是,对于 S3,并发写入存在额外的注意事项。有关详细信息,请参阅S3 部分。 -
其他存储系统:
LogStore
类似于 Apache Spark,使用 HadoopFileSystem
API 执行读写操作。因此,Delta Lake 支持对任何提供FileSystem
API 实现的存储系统进行并发读取。对于具有事务保证的并发写入,根据FileSystem
实现提供的保证,有两种情况。如果实现提供一致的列表和原子重命名(不覆盖)(即rename(..., overwrite = false)
将原子地生成目标文件,或者如果目标文件已存在则失败并抛出java.nio.file.FileAlreadyExistsException
),那么使用重命名的默认LogStore
实现将允许具有保证的并发写入。否则,您必须通过设置以下 Spark 配置来配置LogStore
的自定义实现:
spark.delta.logStore.<scheme>.impl=<full-qualified-class-name>
其中 <scheme>
是您的存储系统路径的方案。这将配置 Delta Lake 仅对这些路径动态使用给定的 LogStore
实现。您可以在应用程序中为不同的方案设置多个此类配置,从而允许它同时从不同的存储系统进行读写。
Amazon S3
标题为“Amazon S3”的部分Delta Lake 支持以两种不同模式对 S3 进行读写:单集群和多集群。
单集群 | 多集群 | |
---|---|---|
配置 | Delta Lake 开箱即用 | 是实验性的,需要额外配置 |
读取 | 支持来自多个集群的并发读取 | 支持来自多个集群的并发读取 |
写入 | 支持来自单个 Spark 驱动程序的并发写入 | 支持多集群写入 |
权限 | S3 凭证 | S3 和 DynamoDB 操作权限 |
单集群设置(默认)
标题为“单集群设置(默认)”的部分在此默认模式下,Delta Lake 支持来自多个集群的并发读取,但对 S3 的并发写入必须来自单个 Spark 驱动程序,以便 Delta Lake 提供事务保证。这是因为 S3 目前不提供互斥,即无法确保只有一个写入器能够创建文件。
要求(S3 单集群)
标题为“要求(S3 单集群)”的部分- S3 凭证:IAM 角色(推荐)或访问密钥
- 与相应 Delta Lake 版本关联的 Apache Spark。
- Apache Spark 编译所用的 Hadoop 版本的 Hadoop AWS 连接器 (hadoop-aws)。
快速入门(S3 单集群)
标题为“快速入门(S3 单集群)”的部分本节介绍如何使用单集群模式快速开始在 S3 上读写 Delta 表。有关配置的详细说明,请参阅设置配置(S3 多集群)。
- 使用以下命令启动支持 Delta Lake 和 S3 的 Spark shell(假设您使用预构建用于 Hadoop 3.4.0 的 Spark 4.0.0)
bin/spark-shell \ --packages io.delta:delta-spark_2.13:4.0.0,org.apache.hadoop:hadoop-aws:3.4.0 \ --conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \ --conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key>
- 在 S3 上尝试一些基本的 Delta 表操作(使用 Scala)
// Create a Delta table on S3:spark.range(5).write.format("delta").save("s3a://<your-s3-bucket>/<path-to-delta-table>")
// Read a Delta table on S3:spark.read.format("delta").load("s3a://<your-s3-bucket>/<path-to-delta-table>").show()
有关其他语言和更多 Delta 表操作示例,请参阅快速入门页面。
为了高效列出 S3 上的 Delta Lake 元数据文件,请设置配置 delta.enableFastS3AListFrom=true
。此性能优化处于实验支持模式。它仅适用于 S3A
文件系统,不适用于 Amazon EMR 默认文件系统 S3
。
bin/spark-shell \ --packages io.delta:delta-spark_2.13:4.0.0,org.apache.hadoop:hadoop-aws:3.4.0 \ --conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \ --conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key> \ --conf "spark.hadoop.delta.enableFastS3AListFrom=true
多集群设置
标题为“多集群设置”的部分此模式支持来自多个集群对 S3 的并发写入,并且必须通过配置 Delta Lake 以使用正确的 LogStore
实现来显式启用。此实现使用 DynamoDB 来提供 S3 缺乏的互斥功能。
要求(S3 多集群)
标题为“要求(S3 多集群)”的部分- 要求(S3 单集群)部分中列出的所有要求
- 除了 S3 凭证,您还需要 DynamoDB 操作权限
快速入门(S3 多集群)
标题为“快速入门(S3 多集群)”的部分本节介绍如何使用多集群模式快速开始在 S3 上读写 Delta 表。
- 使用以下命令启动支持 Delta Lake 和 S3 的 Spark shell(假设您使用预构建用于 Hadoop 3.4.0 的 Spark 4.0.0)
bin/spark-shell \ --packages io.delta:delta-spark_2.13:3,org.apache.hadoop:hadoop-aws:3.4.0,io.delta:delta-storage-s3-dynamodb:4.0.0 \ --conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \ --conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key> \ --conf spark.delta.logStore.s3a.impl=io.delta.storage.S3DynamoDBLogStore \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-west-2
- 在 S3 上尝试一些基本的 Delta 表操作(使用 Scala)
// Create a Delta table on S3:spark.range(5).write.format("delta").save("s3a://<your-s3-bucket>/<path-to-delta-table>")
// Read a Delta table on S3:spark.read.format("delta").load("s3a://<your-s3-bucket>/<path-to-delta-table>").show()
有关其他语言和更多 Delta 表操作示例,请参阅快速入门页面。
设置配置(S3 多集群)
标题为“设置配置(S3 多集群)”的部分-
创建 DynamoDB 表。
您可以选择自己创建 DynamoDB 表(推荐)或让系统自动为您创建。
-
自己创建 DynamoDB 表
此 DynamoDB 表将维护多个 Delta 表的提交元数据,并且其配置的读写容量模式(例如,按需或预置)对于您的用例至关重要。因此,我们强烈建议您自己创建 DynamoDB 表。以下示例使用 AWS CLI。要了解更多信息,请参阅 create-table 命令参考。
-
aws dynamodb create-table \ --region us-east-1 \ --table-name delta_log \ --attribute-definitions AttributeName=tablePath,AttributeType=S \ AttributeName=fileName,AttributeType=S \ --key-schema AttributeName=tablePath,KeyType=HASH \ AttributeName=fileName,KeyType=RANGE \ --billing-mode PAY_PER_REQUEST
-
按照配置(S3 单集群)部分中列出的配置步骤进行操作。
-
在类路径中包含
delta-storage-s3-dynamodb
JAR。 -
在您的 Spark 会话中配置
LogStore
实现。首先,为此方案
s3
配置此LogStore
实现。您也可以为方案s3a
和s3n
复制此命令。
spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore
接下来,指定实例化 DynamoDB 客户端所需的附加信息。您必须在每个 Spark 会话中使用相同的 tableName
和 region
实例化 DynamoDB 客户端,以便此多集群模式正常工作。下面列出了每个会话的配置及其默认值:
spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName=delta_logspark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-east-1spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider=<AWSCredentialsProvider* used by the client>spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu=5spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu=5
生产配置(S3 多集群)
标题为“生产配置(S3 多集群)”的部分至此,此多集群设置已完全可运行。但是,您还可以进行额外配置以在生产环境中提高性能和优化存储。
-
调整您的读写容量模式。
如果您使用此
LogStore
实现为您创建的默认 DynamoDB 表,其默认 RCU 和 WCU 可能不足以满足您的工作负载。您可以调整预置吞吐量或更新到按需模式。 -
使用生存时间 (TTL) 清理旧的 DynamoDB 条目。
一旦 DynamoDB 元数据条目被标记为已完成,并且经过足够长的时间,我们可以仅依靠 S3 来防止其相应 Delta 文件的意外覆盖,则可以安全地从 DynamoDB 中删除该条目。最便宜的方法是使用 DynamoDB 的 TTL 功能,这是一种免费的自动方法,可以从 DynamoDB 表中删除项目。
在给定的 DynamoDB 表上运行以下命令以启用 TTL
aws dynamodb update-time-to-live \ --region us-east-1 \ --table-name delta_log \ --time-to-live-specification "Enabled=true, AttributeName=expireTime"
默认的 expireTime
将是 DynamoDB 条目被标记为完成后的第一天。
-
使用 S3 生命周期过期清理旧的 AWS S3 临时文件。
在此
LogStore
实现中,会创建一个临时文件,其中包含要提交到 Delta 日志的元数据的副本。一旦 Delta 日志的提交完成,并且相应的 DynamoDB 条目已被删除,就可以安全地删除此临时文件。实际上,在提交失败的恢复过程中,只使用最新的临时文件。这里有两种删除这些临时文件的简单方法
-
使用 S3 CLI 手动删除。
这是最安全的选择。以下命令将删除给定
<bucket>
和<table>
中除最新临时文件之外的所有临时文件
-
aws s3 ls s3://<bucket>/<delta_table_path>/_delta_log/.tmp/ --recursive | awk 'NF>1{print $4}' | grep . | sort | head -n -1 | while read -r line ; do echo "Removing ${line}" aws s3 rm s3://<bucket>/<delta_table_path>/_delta_log/.tmp/${line}done
-
使用 S3 生命周期过期规则删除
一个更自动化的选项是使用 S3 生命周期过期规则,其中过滤器前缀指向表中位于
<delta_table_path>/_delta_log/.tmp/
文件夹,过期值为 30 天。
AWS 文档 此处 描述了配置桶生命周期配置的各种方法。
一种方法是使用 S3 的 put-bucket-lifecycle-configuration
命令。有关详细信息,请参阅 S3 生命周期配置。下面给出了一个示例规则和命令调用:
在名为 file://lifecycle.json
的文件中
{ "Rules":[ { "ID":"expire_tmp_files", "Filter":{ "Prefix":"path/to/table/_delta_log/.tmp/" }, "Status":"Enabled", "Expiration":{ "Days":30 } } ]}
aws s3api put-bucket-lifecycle-configuration \ --bucket my-bucket \ --lifecycle-configuration file://lifecycle.json
Microsoft Azure 存储
标题为“Microsoft Azure 存储”的部分Delta Lake 对各种 Azure 存储系统具有内置支持,并为来自多个集群的并发读写提供完整的事务保证。
Delta Lake 依赖 Hadoop FileSystem
API 访问 Azure 存储服务。具体来说,Delta Lake 要求 FileSystem.rename()
的实现是原子的,这仅在较新的 Hadoop 版本中支持(Hadoop-15156 和 Hadoop-15086)。因此,您可能需要使用较新的 Hadoop 版本构建 Spark 并将其用于部署应用程序。有关使用特定 Hadoop 版本构建 Spark 的信息,请参阅指定 Hadoop 版本和启用 YARN,有关使用 Delta Lake 设置 Spark 的信息,请参阅快速入门。
以下是每种 Azure 存储系统特定的要求列表:
Azure Blob 存储
标题为“Azure Blob 存储”的部分要求(Azure Blob 存储)
标题为“要求(Azure Blob 存储)”的部分- 一个共享密钥或共享访问签名 (SAS)
- Delta Lake 0.2.0 或更高版本
- 用于以下版本部署的 Hadoop Azure Blob 存储库
- Hadoop 2 为 2.9.1+
- Hadoop 3 为 3.0.1+
- 与相应 Delta Lake 版本关联的 Apache Spark,并使用与所选 Hadoop 库兼容的 Hadoop 版本编译。
例如,一个可行的组合是 Delta 0.7.0 或更高版本,以及使用 Hadoop 3.2 编译和部署的 Apache Spark 3.0。
配置(Azure Blob 存储)
标题为“配置(Azure Blob 存储)”的部分以下是在 Azure Blob 存储上配置 Delta Lake 的步骤。
-
在类路径中包含
hadoop-azure
JAR。有关版本详细信息,请参阅上述要求。 -
设置凭证。
您可以在 Spark 配置属性中设置您的凭证。
我们建议您使用 SAS 令牌。在 Scala 中,您可以按如下方式使用:
spark.conf.set( "fs.azure.sas.<your-container-name>.<your-storage-account-name>.blob.core.windows.net", "<complete-query-string-of-your-sas-for-the-container>")
或者您可以指定帐户访问密钥:
spark.conf.set( "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net", "<your-storage-account-access-key>")
用法(Azure Blob 存储)
标题为“用法(Azure Blob 存储)”的部分spark.range(5).write.format("delta").save("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>")spark.read.format("delta").load("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>").show()
Azure Data Lake Storage Gen1
标题为“Azure Data Lake Storage Gen1”的部分要求 (ADLS Gen1)
标题为“要求 (ADLS Gen1)”的部分- 一个用于 OAuth 2.0 访问的服务主体
- Delta Lake 0.2.0 或更高版本
- 用于以下版本部署的 Hadoop Azure Data Lake Storage Gen1 库
- Hadoop 2 为 2.9.1+
- Hadoop 3 为 3.0.1+
- 与相应 Delta Lake 版本关联的 Apache Spark,并使用与所选 Hadoop 库兼容的 Hadoop 版本编译。
配置 (ADLS Gen1)
标题为“配置 (ADLS Gen1)”的部分以下是在 Azure Data Lake Storage Gen1 上配置 Delta Lake 的步骤。
-
在类路径中包含
hadoop-azure-datalake
JAR。有关版本详细信息,请参阅上述要求。 -
设置 Azure Data Lake Storage Gen1 凭证。
您可以使用凭证设置以下Hadoop 配置(在 Scala 中):
spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")spark.conf.set("dfs.adls.oauth2.client.id", "<your-oauth2-client-id>")spark.conf.set("dfs.adls.oauth2.credential", "<your-oauth2-credential>")spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/<your-directory-id>/oauth2/token")
用法 (ADLS Gen1)
标题为“用法 (ADLS Gen1)”的部分spark.range(5).write.format("delta").save("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>")spark.read.format("delta").load("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>").show()
Azure Data Lake Storage Gen2
标题为“Azure Data Lake Storage Gen2”的部分要求 (ADLS Gen2)
标题为“要求 (ADLS Gen2)”的部分- 一个用于 OAuth 2.0 访问的服务主体或共享密钥
- Delta Lake 0.2.0 或更高版本
- 用于以下版本部署的 Hadoop Azure Data Lake Storage Gen2 库
- Hadoop 3 为 3.2.0+
- 与相应 Delta Lake 版本关联的 Apache Spark,并使用与所选 Hadoop 库兼容的 Hadoop 版本编译。
配置 (ADLS Gen2)
标题为“配置 (ADLS Gen2)”的部分以下是在 Azure Data Lake Storage Gen2 上配置 Delta Lake 的步骤。
-
在类路径中包含
hadoop-azure
和azure-storage
JAR。有关版本详细信息,请参阅上述要求。 -
设置凭证。
您可以使用 OAuth 2.0 与服务主体或共享密钥身份验证
对于使用服务主体的 OAuth 2.0(推荐)
spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", "<service-credential>")spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
对于共享密钥身份验证
spark.conf.set("fs.azure.account.key.<storage-account>.dfs.core.windows.net", "<storage-account-access-key>")
用法 (ADLS Gen2)
标题为“用法 (ADLS Gen2)”的部分spark.range(5).write.format("delta").save("abfss://<container-name>@<storage-account>.dfs.core.windows.net/<path-to-delta-table>")spark.read.format("delta").load("abfss://<container-name>@<storage-account>.dfs.core.windows.net/<path-to-delta-table>").show()
HDFS
标题为“HDFS”的部分Delta Lake 对 HDFS 具有内置支持,并为来自多个集群的并发读写提供完整的事务保证。无需额外配置。
用法 (HDFS)
标题为“用法 (HDFS)”的部分spark.range(5).write.format("delta").save("hdfs://<namenode>:<port>/<path-to-delta-table>")spark.read.format("delta").load("hdfs://<namenode>:<port>/<path-to-delta-table>").show()
Google Cloud Storage
标题为“Google Cloud Storage”的部分Delta Lake 对 Google Cloud Storage (GCS) 具有内置支持,并为来自多个集群的并发读写提供完整的事务保证。
要求 (GCS)
标题为“要求 (GCS)”的部分- Google Cloud Storage 凭证
- Delta Lake 0.2.0 或更高版本
- Apache Spark 编译所用的 Hadoop 版本的 Hadoop GCS 连接器
配置 (GCS)
标题为“配置 (GCS)”的部分-
在类路径中包含 GCS 连接器 JAR。
-
使用以下方法之一设置凭据
- 使用应用程序默认凭证
- 在 Spark 配置中配置服务帐户凭证
spark.conf.set("google.cloud.auth.service.account.json.keyfile", "<path-to-json-key-file>")
用法 (GCS)
标题为“用法 (GCS)”的部分spark.range(5).write.format("delta").save("gs://<bucket>/<path-to-delta-table>")spark.read.format("delta").load("gs://<bucket>/<path-to-delta-table>").show()
Oracle Cloud Infrastructure
标题为“Oracle Cloud Infrastructure”的部分Delta Lake 支持 Oracle Cloud Infrastructure (OCI) 对象存储,并为来自多个集群的并发读写提供完整的事务保证。
要求 (OCI)
标题为“要求 (OCI)”的部分- OCI 凭证
- Delta Lake 0.2.0 或更高版本
- Apache Spark 编译所用的 Hadoop 版本的 Hadoop OCI 连接器
配置 (OCI)
标题为“配置 (OCI)”的部分-
在类路径中包含 OCI 连接器 JAR。
-
在 Spark 配置中设置凭证
spark.conf.set("fs.oci.client.auth.tenantId", "<tenant-ocid>")spark.conf.set("fs.oci.client.auth.userId", "<user-ocid>")spark.conf.set("fs.oci.client.auth.fingerprint", "<api-key-fingerprint>")spark.conf.set("fs.oci.client.auth.pemfilepath", "<path-to-private-key-file>")
用法 (OCI)
标题为“用法 (OCI)”的部分spark.range(5).write.format("delta").save("oci://<bucket>@<namespace>/<path-to-delta-table>")spark.read.format("delta").load("oci://<bucket>@<namespace>/<path-to-delta-table>").show()
IBM Cloud Object Storage
标题为“IBM Cloud Object Storage”的部分Delta Lake 支持 IBM Cloud Object Storage,并为来自多个集群的并发读写提供完整的事务保证。
要求 (IBM COS)
标题为“要求 (IBM COS)”的部分- IBM Cloud Object Storage 凭证
- Delta Lake 0.2.0 或更高版本
- Apache Spark 编译所用的 Hadoop 版本的 Hadoop Stocator 连接器
配置 (IBM COS)
标题为“配置 (IBM COS)”的部分-
在类路径中包含 Stocator 连接器 JAR。
-
在 Spark 配置中设置凭证
spark.conf.set("fs.cos.service.endpoint", "<endpoint-url>")spark.conf.set("fs.cos.service.access.key", "<access-key>")spark.conf.set("fs.cos.service.secret.key", "<secret-key>")
用法 (IBM COS)
标题为“用法 (IBM COS)”的部分spark.range(5).write.format("delta").save("cos://<bucket>.<service>/<path-to-delta-table>")spark.read.format("delta").load("cos://<bucket>.<service>/<path-to-delta-table>").show()