跳到内容

Delta Connect(又称 Delta 中的 Spark Connect 支持)

Delta Connect 为 Apache Spark 的 Delta Lake 添加了 Spark Connect 支持。Spark Connect 是一项新举措,它增加了独立客户端-服务器基础设施,允许从任何地方远程连接到 Spark。Delta Connect 允许所有 Delta Lake 操作在作为连接到 Spark 服务器的客户端运行的应用程序中工作。

Delta Connect 预计将带来与 Spark Connect 相同的好处

  1. 升级到 Spark 和 Delta Lake 的最新版本现在更容易,因为客户端接口与服务器完全解耦。
  2. Spark 和 Delta Lake 与开发人员工具的集成更简单。IDE 不再需要与完整的 Spark 和 Delta Lake 实现集成,而是可以与轻量级客户端集成。
  3. 支持 Java/Scala 和 Python 以外的语言。客户端“只需”生成协议缓冲区,因此实现起来更简单。
  4. Spark 和 Delta Lake 将变得更稳定,因为用户代码不再与 Spark 驱动程序在同一个 JVM 中运行。
  5. 远程连接。代码现在可以在任何地方运行,因为用户界面和驱动程序之间有一个 gRPC 层。
  1. Spark 4.0.0 下载 spark-4.0.0-bin-hadoop3.tgz

  2. 使用 Delta Lake Connect 插件启动 Spark Connect 服务器

    终端窗口
    sbin/start-connect-server.sh \
    --packages io.delta:delta-connect-server_2.13:4.0.0,com.google.protobuf:protobuf-java:3.25.1 \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    --conf "spark.connect.extensions.relation.classes=org.apache.spark.sql.connect.delta.DeltaRelationPlugin" \
    --conf "spark.connect.extensions.command.classes=org.apache.spark.sql.connect.delta.DeltaCommandPlugin"

如何使用 Delta 的 Python Spark Connect 客户端

标题为“如何使用 Delta 的 Python Spark Connect 客户端”的部分

Delta Lake Connect Python 客户端与 Delta Lake Spark 包含在同一个 PyPi 包中。

  1. pip install pyspark==4.0.0.
  2. pip install delta-spark==4.0.0.
  3. 用法与 Spark Connect 相同(例如 ./bin/pyspark --remote "sc://")。我们只需要将远程 SparkSession(而不是本地的)传递给 DeltaTable API。

示例

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "my_table")
deltaTable.toDF().show()
deltaTable.update(
condition = "id % 2 == 0",
set = {"id": "id + 100"}
)

如何使用 Delta 的 Scala Spark Connect 客户端

标题为“如何使用 Delta 的 Scala Spark Connect 客户端”的部分

请确保您使用的是 Java 17!

终端窗口
./bin/spark-shell --remote "sc://" --packages io.delta:delta-connect-client_2.13:4.0.0,com.google.protobuf:protobuf-java:3.25.1

示例

import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "my_table")
deltaTable.toDF.show()
deltaTable.updateExpr(
condition = "id % 2 == 0",
set = Map("id" -> "id + 100")
)