跳到内容

读取 Delta Sharing 表

Delta Sharing 是一个用于安全实时交换大型数据集的开放协议,它允许组织实时共享数据,无论他们使用哪种计算平台。它是一个简单的 REST 协议,安全地授予对部分云数据集的访问权限,并利用现代云存储系统(如 S3、ADLS、GCS 或 R2)可靠地传输数据。

在 Delta Sharing 中,数据提供者是拥有原始数据集或表并将其共享给广泛接收者的人。每个表都可以配置为使用不同的选项(历史记录、过滤等)进行共享。本文档将重点介绍如何使用共享表。

Delta Sharing 数据源支持 Apache Spark DataFrame 提供的大部分选项,用于通过批处理流式处理表更改 (CDF) API 对共享表执行读取。Delta Sharing 不支持写入共享表。有关更多详细信息,请参阅 Delta Sharing Repo。请按照快速入门来利用 Delta Sharing python 连接器发现共享表。

对于具有高级 Delta Lake 功能(例如删除向量列映射)的共享表的 Delta Sharing 读取,您需要在创建新的 SparkSession 时,通过设置与 Delta Lake 相同的配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 delta-sharing-spark 3.1 起)的集成。请参阅配置 SparkSession

在本地保存 配置文件 并使用连接器库启动 Spark 后,您可以访问共享表。数据提供者将配置文件提供给数据接收者。

-- A table path is the profile file path followed by `#` and the fully qualified name
-- of a table (`<share-name>.<schema-name>.<table-name>`).
CREATE TABLE mytable USING deltaSharing LOCATION '<profile-file-path>#<share-name>.<schema-name>.<table-name>';
SELECT * FROM mytable;

返回的 DataFrame 会自动读取表的最新快照以进行任何查询。

Delta Sharing 支持谓词下推,以便在查询中存在适用谓词时高效地从 Delta Sharing 服务器获取数据。

一旦数据提供者启用共享表的历史记录共享,Delta Sharing 时间旅行允许您查询共享表的旧快照。

SELECT * FROM mytable TIMESTAMP AS OF timestamp_expression
SELECT * FROM mytable VERSION AS OF version

timestamp_expressionversion 共享与 Delta 相同的语法。

一旦数据提供者在原始 Delta Lake 表上启用 CDF 并通过 Delta Sharing 共享历史记录,接收者就可以像Delta 表的 CDF一样查询 Delta Sharing 表的 CDF。

CREATE TABLE mytable USING deltaSharing LOCATION '<profile-file-path>#<share-name>.<schema-name>.<table-name>';
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('mytable', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('mytable', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('mytable', 0)

Delta Sharing 流式处理与 Spark Structured Streaming 通过 readStream 深度集成,并能够连接到任何能够执行 writeStream 的接收器。

一旦数据提供者共享带有历史记录的表,接收者就可以对该表执行流式查询。当您将 Delta Sharing 表加载为流源并在流式查询中使用它时,查询将处理共享表中存在的所有数据以及流启动后到达的任何新数据。

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
spark.readStream.format("deltaSharing").load(tablePath)

Delta Sharing 流式处理支持与 Delta Streaming 相同的功能:限制输入速率忽略更新和删除指定初始位置

此外,还提供了 maxVersionsPerRpc 来决定每次 Delta Sharing rpc 中从服务器请求多少版本的文件。这有助于减少每个 rpc 的工作负载,使 Delta Sharing 流式处理作业更稳定,尤其是在流式处理从检查点恢复时累积了许多新版本的情况下。默认值为 100。

读取 Delta Sharing 中的高级 Delta Lake 功能

标题为“读取 Delta Sharing 中的高级 Delta Lake 功能”的部分

为了支持 Delta Sharing 中的高级 Delta Lake 功能,自 delta-sharing-client 1.0 和 delta-sharing-spark 3.1 起引入了“Delta Format Sharing”,其中共享表的动作以 Delta Lake 格式返回,允许 Delta Lake 库读取它。

请记住设置配置 SparkSession中提到的 Spark 配置,以便读取具有删除向量和列映射的共享表。

读取表功能可用版本
删除向量3.1.0
列映射3.1.0
无时区时间戳3.3.0
类型扩展(预览)3.3.0
变体类型(预览)3.3.0

批处理查询可以按原样执行,因为它能够根据共享表的表功能自动解析 responseFormat。当读取启用删除向量或列映射的共享表时,对于 cdf 和流式查询,需要设置额外的选项 responseFormat=delta

import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
// Batch query
spark.read.format("deltaSharing").load(tablePath)
// CDF query
spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("responseFormat", "delta")
.option("startingVersion", 1)
.load(tablePath)
// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)