快速入门
本指南旨在帮助您快速探索 Delta Lake 的主要功能。它提供了代码片段,展示了如何从交互式、批处理和流式查询中读取和写入 Delta 表。
使用 Delta Lake 设置 Apache Spark
标题为“使用 Delta Lake 设置 Apache Spark”的部分请按照以下说明使用 Spark 设置 Delta Lake。您可以通过以下两种方式在本地计算机上运行本指南中的步骤:
- 交互式运行:启动带有 Delta Lake 的 Spark shell(Scala 或 Python),并在 shell 中交互式运行代码片段。
- 作为项目运行:设置带有 Delta Lake 的 Maven 或 SBT 项目(Scala 或 Java),将代码片段复制到源文件,然后运行该项目。或者,您可以使用 Github 仓库中提供的示例。
先决条件:设置 Java
标题为“先决条件:设置 Java”的部分正如官方 Apache Spark 安装说明此处所述,请确保您已安装有效的 Java 版本(8、11 或 17),并且 Java 已使用系统 PATH
或 JAVA_HOME
环境变量在您的系统上正确配置。
Windows 用户应遵循此博客中的说明,确保使用与 Delta Lake 4.0.0
兼容的正确版本的 Apache Spark。
设置交互式 shell
标题为“设置交互式 shell”的部分要在 Spark SQL、Scala 或 Python shell 中交互式使用 Delta Lake,您需要本地安装 Apache Spark。根据您是想使用 SQL、Python 还是 Scala,您可以分别设置 SQL、PySpark 或 Spark shell。
Spark SQL Shell
标题为“Spark SQL Shell”的部分通过遵循下载 Spark 中的说明,下载兼容版本的 Apache Spark,可以通过 pip
或下载并解压存档并在解压的目录中运行 spark-sql
。
bin/spark-sql --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
PySpark Shell
标题为“PySpark Shell”的部分-
运行以下命令安装与 Delta Lake 版本兼容的 PySpark 版本
终端窗口 pip install pyspark==<compatible-spark-version> -
使用 Delta Lake 包和附加配置运行 PySpark
终端窗口 pyspark --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Spark Scala Shell
标题为“Spark Scala Shell”的部分通过遵循下载 Spark 中的说明,下载兼容版本的 Apache Spark,可以通过 pip
或下载并解压存档并在解压的目录中运行 spark-shell
。
bin/spark-shell --packages io.delta:delta-spark_2.13:4.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
设置项目
标题为“设置项目”的部分如果您想使用 Maven Central Repository 中的 Delta Lake 二进制文件构建项目,可以使用以下 Maven 坐标。
Maven
标题为“Maven”的部分通过将 Delta Lake 作为依赖项添加到 POM 文件中,即可将其包含在您的 Maven 项目中。Delta Lake 使用 Scala 2.13 编译。
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.13</artifactId> <version>4.0.0</version></dependency>
SBT
标题为“SBT”的部分通过将以下行添加到 build.sbt
文件中,将 Delta Lake 包含在 SBT 项目中
libraryDependencies += "io.delta" %% "delta-spark" % "4.0.0"
Python
标题为“Python”的部分要设置 Python 项目(例如,用于单元测试),您可以使用 pip install delta-spark==4.0.0
安装 Delta Lake,然后使用 Delta Lake 中的 configure_spark_with_delta_pip()
实用函数配置 SparkSession。
import pysparkfrom delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
创建表
标题为“创建表”的部分要创建 Delta 表,请以 delta
格式写入 DataFrame。您可以使用现有的 Spark SQL 代码,并将格式从 parquet
、csv
、json
等更改为 delta
。
CREATE TABLE delta.`/tmp/delta-table` USING DELTA AS SELECT col1 as id FROM VALUES 0,1,2,3,4;
data = spark.range(0, 5)data.write.format("delta").save("/tmp/delta-table")
val data = spark.range(0, 5)data.write.format("delta").save("/tmp/delta-table")
import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;
SparkSession spark = ... // create SparkSession
Dataset<Row> data = spark.range(0, 5);data.write().format("delta").save("/tmp/delta-table");
这些操作使用从 DataFrame 推断出的 schema 创建新的 Delta 表。有关创建新 Delta 表时可用的所有选项,请参阅创建表和写入表。
读取数据
标题为“读取数据”的部分通过指定文件路径来读取 Delta 表中的数据:"/tmp/delta-table"
SELECT * FROM delta.`/tmp/delta-table`;
df = spark.read.format("delta").load("/tmp/delta-table")df.show()
val df = spark.read.format("delta").load("/tmp/delta-table")df.show()
Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table");df.show();
更新表数据
标题为“更新表数据”的部分Delta Lake 支持使用标准 DataFrame API 修改表的多种操作。此示例运行批处理作业以覆盖表中的数据
INSERT OVERWRITE delta.`/tmp/delta-table` SELECT col1 as id FROM VALUES 5,6,7,8,9;
data = spark.range(5, 10)data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
val data = spark.range(5, 10)data.write.format("delta").mode("overwrite").save("/tmp/delta-table")df.show()
Dataset<Row> data = spark.range(5, 10);data.write().format("delta").mode("overwrite").save("/tmp/delta-table");
如果您再次读取此表,您应该只看到您添加的值 5-9
,因为您覆盖了之前的数据。
不覆盖的条件更新
标题为“不覆盖的条件更新”的部分Delta Lake 提供编程 API,用于有条件地更新、删除和合并(upsert)表中的数据。以下是一些示例。
-- Update every even value by adding 100 to itUPDATE delta.`/tmp/delta-table` SET id = id + 100 WHERE id % 2 == 0;
-- Delete every even valueDELETE FROM delta.`/tmp/delta-table` WHERE id % 2 == 0;
-- Upsert (merge) new dataCREATE TEMP VIEW newData AS SELECT col1 AS id FROM VALUES 1,3,5,7,9,11,13,15,17,19;
MERGE INTO delta.`/tmp/delta-table` AS oldDataUSING newDataON oldData.id = newData.idWHEN MATCHED THEN UPDATE SET id = newData.idWHEN NOT MATCHED THEN INSERT (id) VALUES (newData.id);
SELECT * FROM delta.`/tmp/delta-table`;
from delta.tables import *from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# Update every even value by adding 100 to itdeltaTable.update( condition = expr("id % 2 == 0"), set = { "id": expr("id + 100") })
# Delete every even valuedeltaTable.delete(condition = expr("id % 2 == 0"))
# Upsert (merge) new datanewData = spark.range(0, 20)
deltaTable.alias("oldData") \ .merge( newData.alias("newData"), "oldData.id = newData.id") \ .whenMatchedUpdate(set = { "id": col("newData.id") }) \ .whenNotMatchedInsert(values = { "id": col("newData.id") }) \ .execute()
deltaTable.toDF().show()
import io.delta.tables._import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath("/tmp/delta-table")
// Update every even value by adding 100 to itdeltaTable.update( condition = expr("id % 2 == 0"), set = Map("id" -> expr("id + 100")))
// Delete every even valuedeltaTable.delete(condition = expr("id % 2 == 0"))
// Upsert (merge) new dataval newData = spark.range(0, 20).toDF
deltaTable.as("oldData") .merge( newData.as("newData"), "oldData.id = newData.id") .whenMatched .update(Map("id" -> col("newData.id"))) .whenNotMatched .insert(Map("id" -> col("newData.id"))) .execute()
deltaTable.toDF.show()
import io.delta.tables.*;import org.apache.spark.sql.functions;import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");
// Update every even value by adding 100 to itdeltaTable.update( functions.expr("id % 2 == 0"), new HashMap<String, Column>() {{ put("id", functions.expr("id + 100")); }});
// Delete every even valuedeltaTable.delete(condition = functions.expr("id % 2 == 0"));
// Upsert (merge) new dataDataset<Row> newData = spark.range(0, 20).toDF();
deltaTable.as("oldData") .merge( newData.as("newData"), "oldData.id = newData.id") .whenMatched() .update( new HashMap<String, Column>() {{ put("id", functions.col("newData.id")); }}) .whenNotMatched() .insertExpr( new HashMap<String, Column>() {{ put("id", functions.col("newData.id")); }}) .execute();
deltaTable.toDF().show();
您应该会看到一些现有行已更新,并且已插入新行。
有关这些操作的更多信息,请参阅表删除、更新和合并。
使用时间旅行读取旧版本数据
标题为“使用时间旅行读取旧版本数据”的部分您可以通过使用时间旅行查询 Delta 表的先前快照。如果您想访问已覆盖的数据,您可以使用 versionAsOf
选项查询覆盖第一组数据之前的表快照。
SELECT * FROM delta.`/tmp/delta-table` VERSION AS OF 0;
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")df.show()
val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")df.show()
Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table");df.show();
您应该会看到第一组数据,即在您覆盖它之前的数据。时间旅行利用 Delta Lake 事务日志的强大功能来访问不再位于表中的数据。删除版本 0
选项(或指定版本 1
)将使您再次看到较新的数据。有关详细信息,请参阅查询表的旧快照(时间旅行)。
向表中写入数据流
标题为“向表中写入数据流”的部分您还可以使用结构化流式传输写入 Delta 表。Delta Lake 事务日志保证精确一次处理,即使有其他流或批处理查询同时针对该表运行。默认情况下,流以追加模式运行,该模式将新记录添加到表中
streamingDf = spark.readStream.format("rate").load()stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
val streamingDf = spark.readStream.format("rate").load()val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
import org.apache.spark.sql.streaming.StreamingQuery;
Dataset<Row> streamingDf = spark.readStream().format("rate").load();StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table");
在流运行时,您可以使用早期命令读取表。
您可以通过在启动流的同一终端中运行 stream.stop()
来停止流。
有关 Delta Lake 与结构化流式传输集成的更多信息,请参阅表流式读取和写入。另请参阅 Apache Spark 网站上的结构化流式传输编程指南。
从表中读取变更流
标题为“从表中读取变更流”的部分当流正在写入 Delta 表时,您也可以将该表作为流源进行读取。例如,您可以启动另一个流查询,打印对 Delta 表所做的所有更改。您可以通过提供 startingVersion
或 startingTimestamp
选项来指定结构化流式传输应从哪个版本开始,以获取从该点开始的更改。有关详细信息,请参阅结构化流式传输。
stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start();