快速入门
本指南旨在帮助您快速探索 Delta Lake 的主要功能。它提供了代码片段,展示了如何从交互式、批处理和流式查询中读取和写入 Delta 表。
使用 Delta Lake 设置 Apache Spark
标题为“使用 Delta Lake 设置 Apache Spark”的部分请按照以下说明使用 Spark 设置 Delta Lake。您可以通过以下两种方式在本地计算机上运行本指南中的步骤:
- 交互式运行:启动带有 Delta Lake 的 Spark shell(Scala 或 Python),并在 shell 中交互式运行代码片段。
- 作为项目运行:设置带有 Delta Lake 的 Maven 或 SBT 项目(Scala 或 Java),将代码片段复制到源文件,然后运行该项目。或者,您可以使用 Github 仓库中提供的示例。
先决条件:设置 Java
标题为“先决条件:设置 Java”的部分正如官方 Apache Spark 安装说明此处所述,请确保您已安装有效的 Java 版本(8、11 或 17),并且 Java 已使用系统 PATH 或 JAVA_HOME 环境变量在您的系统上正确配置。
Windows 用户应遵循此博客中的说明,确保使用与 Delta Lake 4.0.0 兼容的正确版本的 Apache Spark。
设置交互式 shell
标题为“设置交互式 shell”的部分要在 Spark SQL、Scala 或 Python shell 中交互式使用 Delta Lake,您需要本地安装 Apache Spark。根据您是想使用 SQL、Python 还是 Scala,您可以分别设置 SQL、PySpark 或 Spark shell。
Spark SQL Shell
标题为“Spark SQL Shell”的部分通过遵循下载 Spark 中的说明,下载兼容版本的 Apache Spark,可以通过 pip 或下载并解压存档并在解压的目录中运行 spark-sql。
bin/spark-sql --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"PySpark Shell
标题为“PySpark Shell”的部分- 
运行以下命令安装与 Delta Lake 版本兼容的 PySpark 版本 终端窗口 pip install pyspark==<compatible-spark-version>
- 
使用 Delta Lake 包和附加配置运行 PySpark 终端窗口 pyspark --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Spark Scala Shell
标题为“Spark Scala Shell”的部分通过遵循下载 Spark 中的说明,下载兼容版本的 Apache Spark,可以通过 pip 或下载并解压存档并在解压的目录中运行 spark-shell。
bin/spark-shell --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"设置项目
标题为“设置项目”的部分如果您想使用 Maven Central Repository 中的 Delta Lake 二进制文件构建项目,可以使用以下 Maven 坐标。
Maven
标题为“Maven”的部分通过将 Delta Lake 作为依赖项添加到 POM 文件中,即可将其包含在您的 Maven 项目中。Delta Lake 使用 Scala 2.13 编译。
<dependency>  <groupId>io.delta</groupId>  <artifactId>delta-spark_2.13</artifactId>  <version>4.0.0</version></dependency>SBT
标题为“SBT”的部分通过将以下行添加到 build.sbt 文件中,将 Delta Lake 包含在 SBT 项目中
libraryDependencies += "io.delta" %% "delta-spark" % "4.0.0"Python
标题为“Python”的部分要设置 Python 项目(例如,用于单元测试),您可以使用 pip install delta-spark==4.0.0 安装 Delta Lake,然后使用 Delta Lake 中的 configure_spark_with_delta_pip() 实用函数配置 SparkSession。
import pysparkfrom delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()创建表
标题为“创建表”的部分要创建 Delta 表,请以 delta 格式写入 DataFrame。您可以使用现有的 Spark SQL 代码,并将格式从 parquet、csv、json 等更改为 delta。
CREATE TABLE delta.`/tmp/delta-table` USING DELTA AS SELECT col1 as id FROM VALUES 0,1,2,3,4;data = spark.range(0, 5)data.write.format("delta").save("/tmp/delta-table")val data = spark.range(0, 5)data.write.format("delta").save("/tmp/delta-table")import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;
SparkSession spark = ...   // create SparkSession
Dataset<Row> data = spark.range(0, 5);data.write().format("delta").save("/tmp/delta-table");这些操作使用从 DataFrame 推断出的 schema 创建新的 Delta 表。有关创建新 Delta 表时可用的所有选项,请参阅创建表和写入表。
读取数据
标题为“读取数据”的部分通过指定文件路径来读取 Delta 表中的数据:"/tmp/delta-table"
SELECT * FROM delta.`/tmp/delta-table`;df = spark.read.format("delta").load("/tmp/delta-table")df.show()val df = spark.read.format("delta").load("/tmp/delta-table")df.show()Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table");df.show();更新表数据
标题为“更新表数据”的部分Delta Lake 支持使用标准 DataFrame API 修改表的多种操作。此示例运行批处理作业以覆盖表中的数据
INSERT OVERWRITE delta.`/tmp/delta-table` SELECT col1 as id FROM VALUES 5,6,7,8,9;data = spark.range(5, 10)data.write.format("delta").mode("overwrite").save("/tmp/delta-table")val data = spark.range(5, 10)data.write.format("delta").mode("overwrite").save("/tmp/delta-table")df.show()Dataset<Row> data = spark.range(5, 10);data.write().format("delta").mode("overwrite").save("/tmp/delta-table");如果您再次读取此表,您应该只看到您添加的值 5-9,因为您覆盖了之前的数据。
不覆盖的条件更新
标题为“不覆盖的条件更新”的部分Delta Lake 提供编程 API,用于有条件地更新、删除和合并(upsert)表中的数据。以下是一些示例。
-- Update every even value by adding 100 to itUPDATE delta.`/tmp/delta-table` SET id = id + 100 WHERE id % 2 == 0;
-- Delete every even valueDELETE FROM delta.`/tmp/delta-table` WHERE id % 2 == 0;
-- Upsert (merge) new dataCREATE TEMP VIEW newData AS SELECT col1 AS id FROM VALUES 1,3,5,7,9,11,13,15,17,19;
MERGE INTO delta.`/tmp/delta-table` AS oldDataUSING newDataON oldData.id = newData.idWHEN MATCHED  THEN UPDATE SET id = newData.idWHEN NOT MATCHED  THEN INSERT (id) VALUES (newData.id);
SELECT * FROM delta.`/tmp/delta-table`;from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# Update every even value by adding 100 to itdeltaTable.update(  condition = expr("id % 2 == 0"),  set = { "id": expr("id + 100") })
# Delete every even valuedeltaTable.delete(condition = expr("id % 2 == 0"))
# Upsert (merge) new datanewData = spark.range(0, 20)
deltaTable.alias("oldData") \  .merge(    newData.alias("newData"),    "oldData.id = newData.id") \  .whenMatchedUpdate(set = { "id": col("newData.id") }) \  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \  .execute()
deltaTable.toDF().show()import io.delta.tables._import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath("/tmp/delta-table")
// Update every even value by adding 100 to itdeltaTable.update(  condition = expr("id % 2 == 0"),  set = Map("id" -> expr("id + 100")))
// Delete every even valuedeltaTable.delete(condition = expr("id % 2 == 0"))
// Upsert (merge) new dataval newData = spark.range(0, 20).toDF
deltaTable.as("oldData")  .merge(    newData.as("newData"),    "oldData.id = newData.id")  .whenMatched  .update(Map("id" -> col("newData.id")))  .whenNotMatched  .insert(Map("id" -> col("newData.id")))  .execute()
deltaTable.toDF.show()import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");
// Update every even value by adding 100 to itdeltaTable.update(  functions.expr("id % 2 == 0"),  new HashMap<String, Column>() {{    put("id", functions.expr("id + 100"));  }});
// Delete every even valuedeltaTable.delete(condition = functions.expr("id % 2 == 0"));
// Upsert (merge) new dataDataset<Row> newData = spark.range(0, 20).toDF();
deltaTable.as("oldData")  .merge(    newData.as("newData"),    "oldData.id = newData.id")  .whenMatched()  .update(    new HashMap<String, Column>() {{      put("id", functions.col("newData.id"));    }})  .whenNotMatched()  .insertExpr(    new HashMap<String, Column>() {{      put("id", functions.col("newData.id"));    }})  .execute();
deltaTable.toDF().show();您应该会看到一些现有行已更新,并且已插入新行。
有关这些操作的更多信息,请参阅表删除、更新和合并。
使用时间旅行读取旧版本数据
标题为“使用时间旅行读取旧版本数据”的部分您可以通过使用时间旅行查询 Delta 表的先前快照。如果您想访问已覆盖的数据,您可以使用 versionAsOf 选项查询覆盖第一组数据之前的表快照。
SELECT * FROM delta.`/tmp/delta-table` VERSION AS OF 0;df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")df.show()val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")df.show()Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table");df.show();您应该会看到第一组数据,即在您覆盖它之前的数据。时间旅行利用 Delta Lake 事务日志的强大功能来访问不再位于表中的数据。删除版本 0 选项(或指定版本 1)将使您再次看到较新的数据。有关详细信息,请参阅查询表的旧快照(时间旅行)。
向表中写入数据流
标题为“向表中写入数据流”的部分您还可以使用结构化流式传输写入 Delta 表。Delta Lake 事务日志保证精确一次处理,即使有其他流或批处理查询同时针对该表运行。默认情况下,流以追加模式运行,该模式将新记录添加到表中
streamingDf = spark.readStream.format("rate").load()stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")val streamingDf = spark.readStream.format("rate").load()val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")import org.apache.spark.sql.streaming.StreamingQuery;
Dataset<Row> streamingDf = spark.readStream().format("rate").load();StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table");在流运行时,您可以使用早期命令读取表。
您可以通过在启动流的同一终端中运行 stream.stop() 来停止流。
有关 Delta Lake 与结构化流式传输集成的更多信息,请参阅表流式读取和写入。另请参阅 Apache Spark 网站上的结构化流式传输编程指南。
从表中读取变更流
标题为“从表中读取变更流”的部分当流正在写入 Delta 表时,您也可以将该表作为流源进行读取。例如,您可以启动另一个流查询,打印对 Delta 表所做的所有更改。您可以通过提供 startingVersion 或 startingTimestamp 选项来指定结构化流式传输应从哪个版本开始,以获取从该点开始的更改。有关详细信息,请参阅结构化流式传输。
stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start();