Delta Connect(又称 Delta 中的 Spark Connect 支持)
Delta Connect 为 Apache Spark 的 Delta Lake 添加了 Spark Connect 支持。Spark Connect 是一项新举措,它增加了独立客户端-服务器基础设施,允许从任何地方远程连接到 Spark。Delta Connect 允许所有 Delta Lake 操作在作为连接到 Spark 服务器的客户端运行的应用程序中工作。
Delta Connect 预计将带来与 Spark Connect 相同的好处
- 升级到 Spark 和 Delta Lake 的最新版本现在更容易,因为客户端接口与服务器完全解耦。
- Spark 和 Delta Lake 与开发人员工具的集成更简单。IDE 不再需要与完整的 Spark 和 Delta Lake 实现集成,而是可以与轻量级客户端集成。
- 支持 Java/Scala 和 Python 以外的语言。客户端“只需”生成协议缓冲区,因此实现起来更简单。
- Spark 和 Delta Lake 将变得更稳定,因为用户代码不再与 Spark 驱动程序在同一个 JVM 中运行。
- 远程连接。代码现在可以在任何地方运行,因为用户界面和驱动程序之间有一个 gRPC 层。
如何使用 Delta 启动 Spark 服务器
标题为“如何使用 Delta 启动 Spark 服务器”的部分-
从 Spark 4.0.0 下载
spark-4.0.0-bin-hadoop3.tgz
。 -
使用 Delta Lake Connect 插件启动 Spark Connect 服务器
终端窗口 sbin/start-connect-server.sh \--packages io.delta:delta-connect-server_2.13:4.0.0,com.google.protobuf:protobuf-java:3.25.1 \--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \--conf "spark.connect.extensions.relation.classes=org.apache.spark.sql.connect.delta.DeltaRelationPlugin" \--conf "spark.connect.extensions.command.classes=org.apache.spark.sql.connect.delta.DeltaCommandPlugin"
如何使用 Delta 的 Python Spark Connect 客户端
标题为“如何使用 Delta 的 Python Spark Connect 客户端”的部分Delta Lake Connect Python 客户端与 Delta Lake Spark 包含在同一个 PyPi 包中。
pip install pyspark==4.0.0
.pip install delta-spark==4.0.0
.- 用法与 Spark Connect 相同(例如
./bin/pyspark --remote "sc://"
)。我们只需要将远程SparkSession
(而不是本地的)传递给DeltaTable
API。
示例
from delta.tables import DeltaTablefrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "my_table")deltaTable.toDF().show()
deltaTable.update( condition = "id % 2 == 0", set = {"id": "id + 100"})
如何使用 Delta 的 Scala Spark Connect 客户端
标题为“如何使用 Delta 的 Scala Spark Connect 客户端”的部分请确保您使用的是 Java 17!
./bin/spark-shell --remote "sc://" --packages io.delta:delta-connect-client_2.13:4.0.0,com.google.protobuf:protobuf-java:3.25.1
示例
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "my_table")deltaTable.toDF.show()
deltaTable.updateExpr( condition = "id % 2 == 0", set = Map("id" -> "id + 100"))