跳到内容

快速入门

本指南旨在帮助您快速探索 Delta Lake 的主要功能。它提供了代码片段,展示了如何从交互式、批处理和流式查询中读取和写入 Delta 表。

请按照以下说明使用 Spark 设置 Delta Lake。您可以通过以下两种方式在本地计算机上运行本指南中的步骤:

  1. 交互式运行:启动带有 Delta Lake 的 Spark shell(Scala 或 Python),并在 shell 中交互式运行代码片段。
  2. 作为项目运行:设置带有 Delta Lake 的 Maven 或 SBT 项目(Scala 或 Java),将代码片段复制到源文件,然后运行该项目。或者,您可以使用 Github 仓库中提供的示例

正如官方 Apache Spark 安装说明此处所述,请确保您已安装有效的 Java 版本(8、11 或 17),并且 Java 已使用系统 PATHJAVA_HOME 环境变量在您的系统上正确配置。

Windows 用户应遵循此博客中的说明,确保使用与 Delta Lake 4.0.0 兼容的正确版本的 Apache Spark。

要在 Spark SQL、Scala 或 Python shell 中交互式使用 Delta Lake,您需要本地安装 Apache Spark。根据您是想使用 SQL、Python 还是 Scala,您可以分别设置 SQL、PySpark 或 Spark shell。

通过遵循下载 Spark 中的说明,下载兼容版本的 Apache Spark,可以通过 pip 或下载并解压存档并在解压的目录中运行 spark-sql

终端窗口
bin/spark-sql --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
  1. 运行以下命令安装与 Delta Lake 版本兼容的 PySpark 版本

    终端窗口
    pip install pyspark==<compatible-spark-version>
  2. 使用 Delta Lake 包和附加配置运行 PySpark

    终端窗口
    pyspark --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

通过遵循下载 Spark 中的说明,下载兼容版本的 Apache Spark,可以通过 pip 或下载并解压存档并在解压的目录中运行 spark-shell

终端窗口
bin/spark-shell --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

如果您想使用 Maven Central Repository 中的 Delta Lake 二进制文件构建项目,可以使用以下 Maven 坐标。

通过将 Delta Lake 作为依赖项添加到 POM 文件中,即可将其包含在您的 Maven 项目中。Delta Lake 使用 Scala 2.13 编译。

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_2.13</artifactId>
<version>4.0.0</version>
</dependency>

通过将以下行添加到 build.sbt 文件中,将 Delta Lake 包含在 SBT 项目中

libraryDependencies += "io.delta" %% "delta-spark" % "4.0.0"

要设置 Python 项目(例如,用于单元测试),您可以使用 pip install delta-spark==4.0.0 安装 Delta Lake,然后使用 Delta Lake 中的 configure_spark_with_delta_pip() 实用函数配置 SparkSession。

import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

要创建 Delta 表,请以 delta 格式写入 DataFrame。您可以使用现有的 Spark SQL 代码,并将格式从 parquetcsvjson 等更改为 delta

CREATE TABLE delta.`/tmp/delta-table` USING DELTA AS SELECT col1 as id FROM VALUES 0,1,2,3,4;

这些操作使用从 DataFrame 推断出的 schema 创建新的 Delta 表。有关创建新 Delta 表时可用的所有选项,请参阅创建表写入表

通过指定文件路径来读取 Delta 表中的数据:"/tmp/delta-table"

SELECT * FROM delta.`/tmp/delta-table`;

Delta Lake 支持使用标准 DataFrame API 修改表的多种操作。此示例运行批处理作业以覆盖表中的数据

INSERT OVERWRITE delta.`/tmp/delta-table` SELECT col1 as id FROM VALUES 5,6,7,8,9;

如果您再次读取此表,您应该只看到您添加的值 5-9,因为您覆盖了之前的数据。

Delta Lake 提供编程 API,用于有条件地更新、删除和合并(upsert)表中的数据。以下是一些示例。

-- Update every even value by adding 100 to it
UPDATE delta.`/tmp/delta-table` SET id = id + 100 WHERE id % 2 == 0;
-- Delete every even value
DELETE FROM delta.`/tmp/delta-table` WHERE id % 2 == 0;
-- Upsert (merge) new data
CREATE TEMP VIEW newData AS SELECT col1 AS id FROM VALUES 1,3,5,7,9,11,13,15,17,19;
MERGE INTO delta.`/tmp/delta-table` AS oldData
USING newData
ON oldData.id = newData.id
WHEN MATCHED
THEN UPDATE SET id = newData.id
WHEN NOT MATCHED
THEN INSERT (id) VALUES (newData.id);
SELECT * FROM delta.`/tmp/delta-table`;

您应该会看到一些现有行已更新,并且已插入新行。

有关这些操作的更多信息,请参阅表删除、更新和合并

您可以通过使用时间旅行查询 Delta 表的先前快照。如果您想访问已覆盖的数据,您可以使用 versionAsOf 选项查询覆盖第一组数据之前的表快照。

SELECT * FROM delta.`/tmp/delta-table` VERSION AS OF 0;

您应该会看到第一组数据,即在您覆盖它之前的数据。时间旅行利用 Delta Lake 事务日志的强大功能来访问不再位于表中的数据。删除版本 0 选项(或指定版本 1)将使您再次看到较新的数据。有关详细信息,请参阅查询表的旧快照(时间旅行)

您还可以使用结构化流式传输写入 Delta 表。Delta Lake 事务日志保证精确一次处理,即使有其他流或批处理查询同时针对该表运行。默认情况下,流以追加模式运行,该模式将新记录添加到表中

streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

在流运行时,您可以使用早期命令读取表。

您可以通过在启动流的同一终端中运行 stream.stop() 来停止流。

有关 Delta Lake 与结构化流式传输集成的更多信息,请参阅表流式读取和写入。另请参阅 Apache Spark 网站上的结构化流式传输编程指南

当流正在写入 Delta 表时,您也可以将该表作为流源进行读取。例如,您可以启动另一个流查询,打印对 Delta 表所做的所有更改。您可以通过提供 startingVersionstartingTimestamp 选项来指定结构化流式传输应从哪个版本开始,以获取从该点开始的更改。有关详细信息,请参阅结构化流式传输

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()