读取 Delta Sharing 表
Delta Sharing 是一个用于安全实时交换大型数据集的开放协议,它允许组织实时共享数据,无论他们使用哪种计算平台。它是一个简单的 REST 协议,安全地授予对部分云数据集的访问权限,并利用现代云存储系统(如 S3、ADLS、GCS 或 R2)可靠地传输数据。
在 Delta Sharing 中,数据提供者是拥有原始数据集或表并将其共享给广泛接收者的人。每个表都可以配置为使用不同的选项(历史记录、过滤等)进行共享。本文档将重点介绍如何使用共享表。
Delta Sharing 数据源支持 Apache Spark DataFrame 提供的大部分选项,用于通过批处理、流式处理或表更改 (CDF) API 对共享表执行读取。Delta Sharing 不支持写入共享表。有关更多详细信息,请参阅 Delta Sharing Repo。请按照快速入门来利用 Delta Sharing python 连接器发现共享表。
对于具有高级 Delta Lake 功能(例如删除向量和列映射)的共享表的 Delta Sharing 读取,您需要在创建新的 SparkSession
时,通过设置与 Delta Lake 相同的配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 delta-sharing-spark 3.1 起)的集成。请参阅配置 SparkSession。
读取快照
标题为“读取快照”的部分在本地保存 配置文件 并使用连接器库启动 Spark 后,您可以访问共享表。数据提供者将配置文件提供给数据接收者。
-- A table path is the profile file path followed by `#` and the fully qualified name-- of a table (`<share-name>.<schema-name>.<table-name>`).CREATE TABLE mytable USING deltaSharing LOCATION '<profile-file-path>#<share-name>.<schema-name>.<table-name>';SELECT * FROM mytable;
# A table path is the profile file path followed by `#` and the fully qualified name# of a table (`<share-name>.<schema-name>.<table-name>`).table_path = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"df = spark.read.format("deltaSharing").load(table_path)
// A table path is the profile file path followed by `#` and the fully qualified name// of a table (`<share-name>.<schema-name>.<table-name>`).val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"val df = spark.read.format("deltaSharing").load(tablePath)
// A table path is the profile file path followed by `#` and the fully qualified name// of a table (`<share-name>.<schema-name>.<table-name>`).String tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>";Dataset<Row> df = spark.read.format("deltaSharing").load(tablePath);
返回的 DataFrame 会自动读取表的最新快照以进行任何查询。
Delta Sharing 支持谓词下推,以便在查询中存在适用谓词时高效地从 Delta Sharing 服务器获取数据。
查询共享表的旧快照(时间旅行)
标题为“查询共享表的旧快照(时间旅行)”的部分一旦数据提供者启用共享表的历史记录共享,Delta Sharing 时间旅行允许您查询共享表的旧快照。
SELECT * FROM mytable TIMESTAMP AS OF timestamp_expressionSELECT * FROM mytable VERSION AS OF version
spark.read.format("deltaSharing").option("timestampAsOf", timestamp_string).load(tablePath)spark.read.format("deltaSharing").option("versionAsOf", version).load(tablePath)
spark.read.format("deltaSharing").option("timestampAsOf", timestamp_string).load(tablePath)spark.read.format("deltaSharing").option("versionAsOf", version).load(tablePath)
timestamp_expression
和 version
共享与 Delta 相同的语法。
读取表更改 (CDF)
标题为“读取表更改 (CDF)”的部分一旦数据提供者在原始 Delta Lake 表上启用 CDF 并通过 Delta Sharing 共享历史记录,接收者就可以像Delta 表的 CDF一样查询 Delta Sharing 表的 CDF。
CREATE TABLE mytable USING deltaSharing LOCATION '<profile-file-path>#<share-name>.<schema-name>.<table-name>';
-- version as ints or longs e.g. changes from version 0 to 10SELECT * FROM table_changes('mytable', 0, 10)
-- timestamp as string formatted timestampsSELECT * FROM table_changes('mytable', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestampSELECT * FROM table_changes('mytable', 0)
table_path = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
# version as ints or longsspark.read.format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingVersion", 0) \ .option("endingVersion", 10) \ .load(tablePath)
# timestamps as formatted timestampspark.read.format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingTimestamp", '2021-04-21 05:45:46') \ .option("endingTimestamp", '2021-05-21 12:00:00') \ .load(tablePath)
# providing only the startingVersion/timestampspark.read.format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingVersion", 0) \ .load(tablePath)
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
// version as ints or longsspark.read.format("deltaSharing") .option("readChangeFeed", "true") .option("startingVersion", 0) .option("endingVersion", 10) .load(tablePath)
// timestamps as formatted timestampspark.read.format("deltaSharing") .option("readChangeFeed", "true") .option("startingTimestamp", "2024-01-18 05:45:46") .option("endingTimestamp", "2024-01-18 12:00:00") .load(tablePath)
// providing only the startingVersion/timestampspark.read.format("deltaSharing") .option("readChangeFeed", "true") .option("startingVersion", 0) .load(tablePath)
流式处理
标题为“流式处理”的部分Delta Sharing 流式处理与 Spark Structured Streaming 通过 readStream
深度集成,并能够连接到任何能够执行 writeStream
的接收器。
一旦数据提供者共享带有历史记录的表,接收者就可以对该表执行流式查询。当您将 Delta Sharing 表加载为流源并在流式查询中使用它时,查询将处理共享表中存在的所有数据以及流启动后到达的任何新数据。
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
spark.readStream.format("deltaSharing").load(tablePath)
Delta Sharing 流式处理支持与 Delta Streaming 相同的功能:限制输入速率、忽略更新和删除、指定初始位置
此外,还提供了 maxVersionsPerRpc
来决定每次 Delta Sharing rpc 中从服务器请求多少版本的文件。这有助于减少每个 rpc 的工作负载,使 Delta Sharing 流式处理作业更稳定,尤其是在流式处理从检查点恢复时累积了许多新版本的情况下。默认值为 100。
读取 Delta Sharing 中的高级 Delta Lake 功能
标题为“读取 Delta Sharing 中的高级 Delta Lake 功能”的部分为了支持 Delta Sharing 中的高级 Delta Lake 功能,自 delta-sharing-client 1.0 和 delta-sharing-spark 3.1 起引入了“Delta Format Sharing”,其中共享表的动作以 Delta Lake 格式返回,允许 Delta Lake 库读取它。
请记住设置配置 SparkSession中提到的 Spark 配置,以便读取具有删除向量和列映射的共享表。
读取表功能 | 可用版本 |
---|---|
删除向量 | 3.1.0 |
列映射 | 3.1.0 |
无时区时间戳 | 3.3.0 |
类型扩展(预览) | 3.3.0 |
变体类型(预览) | 3.3.0 |
批处理查询可以按原样执行,因为它能够根据共享表的表功能自动解析 responseFormat
。当读取启用删除向量或列映射的共享表时,对于 cdf 和流式查询,需要设置额外的选项 responseFormat=delta
。
import org.apache.spark.sql.SparkSession
val spark = SparkSession .builder() .appName("...") .master("...") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
// Batch queryspark.read.format("deltaSharing").load(tablePath)
// CDF queryspark.read.format("deltaSharing") .option("readChangeFeed", "true") .option("responseFormat", "delta") .option("startingVersion", 1) .load(tablePath)
// Streaming queryspark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)