跳到内容

表批处理读写

Delta Lake 支持 Apache Spark DataFrame 读写 API 提供的大多数选项,用于对表执行批量读写操作。

对于表上的许多 Delta Lake 操作,您可以通过在创建新的 SparkSession 时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 3.0 版起)的集成。请参阅配置 SparkSession

Delta Lake 支持创建两种类型的表——元数据存储中定义的表和按路径定义的表。

要使用元数据存储定义的表,您必须通过在创建新的 SparkSession 时设置配置来启用与 Apache Spark DataSourceV2 和 Catalog API 的集成。请参阅配置 SparkSession

您可以通过以下方式创建表:

  • SQL DDL 命令:您可以使用 Apache Spark 中支持的标准 SQL DDL 命令(例如,CREATE TABLEREPLACE TABLE)来创建 Delta 表。

    CREATE TABLE IF NOT EXISTS default.people10m (
    id INT,
    firstName STRING,
    middleName STRING,
    lastName STRING,
    gender STRING,
    birthDate TIMESTAMP,
    ssn STRING,
    salary INT
    ) USING DELTA
    CREATE OR REPLACE TABLE default.people10m (
    id INT,
    firstName STRING,
    middleName STRING,
    lastName STRING,
    gender STRING,
    birthDate TIMESTAMP,
    ssn STRING,
    salary INT
    ) USING DELTA

    SQL 还支持在路径上创建表,而无需在 Hive 元数据存储中创建条目。

    -- Create or replace table with path
    CREATE OR REPLACE TABLE delta.`/tmp/delta/people10m` (
    id INT,
    firstName STRING,
    middleName STRING,
    lastName STRING,
    gender STRING,
    birthDate TIMESTAMP,
    ssn STRING,
    salary INT
    ) USING DELTA
  • DataFrameWriter API:如果您想同时创建表并从 Spark DataFrames 或 Datasets 插入数据,您可以使用 Spark DataFrameWriterScala 或 JavaPython)。

    # Create table in the metastore using DataFrame's schema and write data to it
    df.write.format("delta").saveAsTable("default.people10m")
    # Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it
    df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")

    您还可以使用 Spark DataFrameWriterV2 API 创建 Delta 表。

  • DeltaTableBuilder API:您还可以使用 Delta Lake 中的 DeltaTableBuilder API 来创建表。与 DataFrameWriter API 相比,此 API 更易于指定其他信息,例如列注释、表属性和生成列

有关详细信息,请参阅API 文档

您可以对数据进行分区,以加快包含分区列谓词的查询或 DML。要在创建 Delta 表时对数据进行分区,请指定按列分区。以下示例按性别进行分区。

-- Create table in the metastore
CREATE TABLE default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
USING DELTA
PARTITIONED BY (gender)

要确定表是否包含特定分区,请使用语句 SELECT COUNT(*) > 0 FROM <table-name> WHERE <partition-column> = <value>。如果分区存在,则返回 true。例如:

SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = "M"

对于在元数据存储中定义的表,您可以选择将 LOCATION 指定为路径。使用指定 LOCATION 创建的表被视为由元数据存储非托管。与未指定路径的托管表不同,当您 DROP 表时,非托管表的文件不会被删除。

当您运行 CREATE TABLE 并且 LOCATION 已经包含使用 Delta Lake 存储的数据时,Delta Lake 执行以下操作:

  • 如果您仅指定表名和位置,例如:

    CREATE TABLE default.people10m
    USING DELTA
    LOCATION '/tmp/delta/people10m'

    元数据存储中的表会自动继承现有数据的模式、分区和表属性。此功能可用于将数据“导入”元数据存储。

  • 如果您指定任何配置(模式、分区或表属性),Delta Lake 会验证该规范是否与现有数据的配置完全匹配。

Delta Lake 支持生成列,这是一种特殊类型的列,其值根据用户指定的 Delta 表中其他列上的函数自动生成。当您写入具有生成列的表而未明确为其提供值时,Delta Lake 会自动计算这些值。例如,您可以从时间戳列自动生成日期列(用于按日期对表进行分区);对表的任何写入只需指定时间戳列的数据。但是,如果您明确为其提供值,则这些值必须满足约束 (<value> <=> <generation expression>) IS TRUE,否则写入将失败并出现错误。

以下示例展示了如何创建具有生成列的表:

DeltaTable.create(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()

生成列像普通列一样存储。也就是说,它们占用存储空间。

以下限制适用于生成列:

  • 生成表达式可以使用 Spark 中任何在给定相同参数值时始终返回相同结果的 SQL 函数,除了以下类型的函数:
    • 用户定义的函数。
    • 聚合函数。
    • 窗口函数。
    • 返回多行的函数。
  • 对于 Delta Lake 1.1.0 及更高版本,当您将 spark.databricks.delta.schema.autoMerge.enabled 设置为 true 时,MERGE 操作支持生成列。

每当分区列由以下表达式之一定义时,Delta Lake 都可以为查询生成分区筛选器:

  • CAST(col AS DATE)col 的类型为 TIMESTAMP
  • YEAR(col)col 的类型为 TIMESTAMP
  • YEAR(col), MONTH(col) 定义的两个分区列,且 col 的类型为 TIMESTAMP
  • YEAR(col), MONTH(col), DAY(col) 定义的三个分区列,且 col 的类型为 TIMESTAMP
  • YEAR(col), MONTH(col), DAY(col), HOUR(col) 定义的四个分区列,且 col 的类型为 TIMESTAMP
  • SUBSTRING(col, pos, len)col 的类型为 STRING
  • DATE_FORMAT(col, format)col 的类型为 TIMESTAMP
  • DATE_TRUNC(format, col)col 的类型为 TIMESTAMPDATE
  • TRUNC(col, format)col 的类型为 TIMESTAMPDATE

如果分区列由上述表达式之一定义,并且查询使用生成表达式的基础基列筛选数据,则 Delta Lake 会查看基列和生成列之间的关系,并在可能的情况下根据生成的​​分区列填充分区筛选器。例如,给定下表:

DeltaTable.create(spark) \
.tableName("default.events") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("eventDate", "DATE", generatedAlwaysAs="CAST(eventTime AS DATE)") \
.partitionedBy("eventType", "eventDate") \
.execute()

如果您随后运行以下查询:

spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')

Delta Lake 会自动生成一个分区筛选器,以便即使未指定分区筛选器,上述查询也只读取分区 date=2020-10-01 中的数据。

再举一个例子,给定下表:

DeltaTable.create(spark) \
.tableName("default.events") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("year", "INT", generatedAlwaysAs="YEAR(eventTime)") \
.addColumn("month", "INT", generatedAlwaysAs="MONTH(eventTime)") \
.addColumn("day", "INT", generatedAlwaysAs="DAY(eventTime)") \
.partitionedBy("eventType", "year", "month", "day") \
.execute()

如果您随后运行以下查询:

spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')

Delta Lake 会自动生成一个分区筛选器,以便即使未指定分区筛选器,上述查询也只读取分区 year=2020/month=10/day=01 中的数据。

您可以使用 EXPLAIN 子句并检查提供的计划,以查看 Delta Lake 是否自动生成了任何分区筛选器。

Delta Lake 3.3 及更高版本支持 Delta Lake 标识列。它们是一种生成列,为插入表中的每条记录分配唯一值。以下示例显示了如何在创建表命令期间声明标识列:

from delta.tables import DeltaTable, IdentityGenerator
from pyspark.sql.types import LongType
DeltaTable.create()
.tableName("table_name")
.addColumn("id_col1", dataType=LongType(), generatedAlwaysAs=IdentityGenerator())
.addColumn("id_col2", dataType=LongType(), generatedAlwaysAs=IdentityGenerator(start=-1, step=1))
.addColumn("id_col3", dataType=LongType(), generatedByDefaultAs=IdentityGenerator())
.addColumn("id_col4", dataType=LongType(), generatedByDefaultAs=IdentityGenerator(start=-1, step=1))
.execute()

您可以选择指定以下内容:

  • 起始值。
  • 步长,可以是正数或负数。

起始值和步长都默认为 1。您不能指定步长为 0

标识列分配的值是唯一的,并按指定步长的方向递增,并按指定步长的倍数递增,但不保证是连续的。例如,起始值为 0 且步长为 2,则所有值都是正偶数,但某些偶数可能会被跳过。

当标识列指定为 generated by default as identity 时,插入操作可以为标识列指定值。将其指定为 generated always as identity 以覆盖手动设置值的能力。

标识列仅支持 LongType,如果分配的值超出 LongType 支持的范围,则操作将失败。

您可以使用 ALTER TABLE table_name ALTER COLUMN column_name SYNC IDENTITY 将标识列的元数据与实际数据同步。当您将自己的值写入标识列时,它可能不符合元数据。此选项评估状态并更新元数据以与实际数据保持一致。在此命令之后,下一个自动分配的标识值将从 start + (n + 1) * step 开始,其中 n 是满足 start + n * step >= max() 的最小值(对于正步长)。

在使用 CREATE TABLE table_name AS SELECT (CTAS) 语句时,您无法定义模式、标识列约束或任何其他表规范。

要使用标识列创建新表并用现有数据填充它,请执行以下操作:

  1. 创建一个具有正确模式的表,包括标识列定义和其他表属性。
  2. 运行插入操作。

以下示例将标识列定义为 generated by default as identity。如果插入表中的数据包含标识列的有效值,则使用这些值。

from delta.tables import DeltaTable, IdentityGenerator
from pyspark.sql.types import LongType, DateType
DeltaTable.create(spark)
.tableName("new_table")
.addColumn("id", dataType=LongType(), generatedByDefaultAs=IdentityGenerator(start=5, step=1))
.addColumn("event_date", dataType=DateType())
.addColumn("some_value", dataType=LongType())
.execute()
# Insert records including existing IDs
old_table_df = spark.table("old_table").select("id", "event_date", "some_value")
old_table_df.write
.format("delta")
.mode("append")
.saveAsTable("new_table")
# Insert records and generate new IDs
new_records_df = spark.table("new_records").select("event_date", "some_value")
new_records_df.write
.format("delta")
.mode("append")
.saveAsTable("new_table")

使用标识列时存在以下限制:

  • 不支持在启用标识列的表上进行并发事务。
  • 您不能按标识列对表进行分区。
  • 您不能 ADDREPLACECHANGE 标识列。
  • 您不能更新现有记录的标识列的值。

Delta 允许为 Delta 表中的列指定默认表达式。当用户在不明确提供某些列的值的情况下写入这些表,或者当他们明确使用 SQL 关键字 DEFAULT 来指定列时,Delta 会自动为这些列生成默认值。有关详细信息,请参阅专门的文档页面。

默认情况下,列名不支持空格和 ,;{}()\n\t= 等特殊字符。要在表的列名中包含这些特殊字符,请启用列映射。

在 SparkSession 中设置的 Delta Lake 配置会覆盖会话中创建的新 Delta Lake 表的默认表属性。SparkSession 中使用的前缀与表属性中使用的配置不同。

Delta Lake 配置SparkSession 配置
delta.<conf>spark.databricks.delta.properties.defaults.<conf>

例如,要为会话中创建的所有新 Delta Lake 表设置 delta.appendOnly = true 属性,请设置以下内容:

SET spark.databricks.delta.properties.defaults.appendOnly = true

您可以通过指定表名或路径将 Delta 表加载为 DataFrame:

SELECT * FROM default.people10m -- query table in the metastore
SELECT * FROM delta.`/tmp/delta/people10m` -- query table by path

返回的 DataFrame 会自动读取表的最新快照以进行任何查询;您无需运行 REFRESH TABLE。当查询中存在适用谓词时,Delta Lake 会自动使用分区和统计信息来读取最少的数据。

Delta Lake 时间旅行允许您查询 Delta 表的旧快照。时间旅行有许多用例,包括:

  • 重新创建分析、报告或输出(例如,机器学习模型的输出)。这对于调试或审计可能很有用,尤其是在受监管的行业中。
  • 编写复杂的时态查询。
  • 修复数据中的错误。
  • 为一组针对快速变化的查询提供快照隔离。

本节描述了查询表旧版本的受支持方法、数据保留问题,并提供了示例。

本节展示了如何查询 Delta 表的旧版本。

SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version
  • timestamp_expression 可以是以下之一:

    • '2018-10-18T22:15:12.013Z',即可以转换为时间戳的字符串
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',即日期字符串
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • 任何其他是或可以转换为时间戳的表达式
  • version 是一个长整型值,可以从 DESCRIBE HISTORY table_spec 的输出中获取。

timestamp_expressionversion 都不能是子查询。

SELECT * FROM default.people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

DataFrameReader 选项允许您从固定到表的特定版本的 Delta 表创建 DataFrame。

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")

对于 timestamp_string,仅接受日期或时间戳字符串。例如,"2019-01-01""2019-01-01T00:00:00.000Z"

一种常见模式是在作业执行期间使用 Delta 表的最新状态来更新下游应用程序。

由于 Delta 表会自动更新,因此从 Delta 表加载的 DataFrame 在底层数据更新时,在不同调用之间可能会返回不同的结果。通过使用时间旅行,您可以修复 DataFrame 在不同调用之间返回的数据:

history = spark.sql("DESCRIBE HISTORY delta.`/tmp/delta/people10m`")
latest_version = history.selectExpr("max(version)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")
  • 修复用户 111 意外删除的表

    yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]
    df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")
    df.where("userId = 111").write.format("delta").mode("append").save("/tmp/delta/events")
  • 修复表中意外的错误更新

    yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]
    df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")
    df.createOrReplaceTempView("my_table_yesterday")
    spark.sql('''
    MERGE INTO delta.`/tmp/delta/events` target
    USING my_table_yesterday source
    ON source.userId = target.userId
    WHEN MATCHED THEN UPDATE SET *
    ''')
  • 查询上周新增客户的数量

    last_week = spark.sql("SELECT CAST(date_sub(current_date(), 7) AS STRING)").collect()[0][0]
    df = spark.read.format("delta").option("timestampAsOf", last_week).load("/tmp/delta/events")
    last_week_count = df.select("userId").distinct().count()
    count = spark.read.format("delta").load("/tmp/delta/events").select("userId").distinct().count()
    new_customers_count = count - last_week_count

要时间旅行到以前的版本,您必须保留该版本的所有日志文件和数据文件。

支持 Delta 表的数据文件从不自动删除;数据文件仅在您运行 VACUUM 时删除。VACUUM 不会删除 Delta 日志文件;日志文件在检查点写入后会自动清理。

默认情况下,您可以将 Delta 表时间旅行到最多 30 天前,除非您已:

  • 在 Delta 表上运行 VACUUM
  • 使用以下表属性更改了数据或日志文件保留期限:
    • delta.logRetentionDuration = "interval <interval>":控制表历史记录的保留时间。默认值为 interval 30 days

每次写入检查点时,Delta 都会自动清理超出保留间隔的日志条目。如果您将此配置设置为足够大的值,则会保留许多日志条目。这不会影响性能,因为对日志的操作是常量时间。历史记录操作是并行的,但随着日志大小的增加会变得更加昂贵。

  • delta.deletedFileRetentionDuration = "interval <interval>":控制文件必须在多久以前被删除,然后才能成为 VACUUM 的候选文件。默认值为 interval 7 days

    即使您在 Delta 表上运行 VACUUM,也要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"。此设置可能会导致您的存储成本增加。

要以原子方式将新数据添加到现有 Delta 表,请使用 append 模式:

INSERT INTO default.people10m SELECT * FROM morePeople

要以原子方式替换表中的所有数据,请使用 overwrite 模式:

INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople

您可以选择性地仅覆盖与任意表达式匹配的数据。此功能在 Delta Lake 1.1.0 及更高版本中支持 DataFrame,在 Delta Lake 2.4.0 及更高版本中支持 SQL。

以下命令以原子方式将目标表中(按 start_date 分区)一月份的事件替换为 replace_data 中的数据:

INSERT INTO TABLE events REPLACE WHERE start_data >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

此示例代码会写入 replace_data 中的数据,验证其是否全部匹配谓词,并执行原子替换。如果您想写入不完全匹配谓词的数据,以替换目标表中匹配的行,则可以通过将 spark.databricks.delta.replaceWhere.constraintCheck.enabled 设置为 false 来禁用约束检查:

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

在 Delta Lake 1.0.0 及更早版本中,replaceWhere 仅覆盖分区列上匹配谓词的数据。以下命令以原子方式将目标表中(按 date 分区)一月份的数据替换为 df 中的数据:

df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
.save("/tmp/delta/people10m")

在 Delta Lake 1.1.0 及更高版本中,如果您想回退到旧行为,可以禁用 spark.databricks.delta.replaceWhere.dataColumns.enabled 标志:

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Delta Lake 2.0 及更高版本支持分区表的动态分区覆盖模式。

在动态分区覆盖模式下,我们将覆盖写入将提交新数据的每个逻辑分区中的所有现有数据。写入不包含数据的任何现有逻辑分区将保持不变。此模式仅适用于以覆盖模式写入数据时:SQL 中的 INSERT OVERWRITE 或使用 df.write.mode("overwrite") 的 DataFrame 写入。

通过将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic 来配置动态分区覆盖模式。您还可以通过将 DataFrameWriter 选项 partitionOverwriteMode 设置为 dynamic 来启用此功能。如果存在,查询特定选项会覆盖会话配置中定义的模式。partitionOverwriteMode 的默认值为 static

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

有关 Delta Lake 支持更新表的更多信息,请参阅表删除、更新和合并

您可以使用 SQL 会话配置 spark.sql.files.maxRecordsPerFile 来指定写入 Delta Lake 表的单个文件中的最大记录数。指定零或负值表示无限制。

在使用 DataFrame API 写入 Delta Lake 表时,您还可以使用 DataFrameWriter 选项 maxRecordsPerFile。指定 maxRecordsPerFile 时,将忽略 SQL 会话配置 spark.sql.files.maxRecordsPerFile 的值。

df.write.format("delta") \
.mode("append") \
.option("maxRecordsPerFile", "10000") \
.save("/tmp/delta/people10m")

有时,由于各种原因(例如作业遇到故障),写入 Delta 表的作业会重新启动。失败的作业可能在终止之前已将数据写入 Delta 表,也可能未写入。如果数据已写入 Delta 表,则重新启动的作业会将相同的数据写入 Delta 表,从而导致数据重复。

为了解决这个问题,Delta 表支持以下 DataFrameWriter 选项,以使写入具有幂等性:

  • txnAppId:您可以在每次 DataFrame 写入时传递的唯一字符串。例如,这可以是作业的名称。
  • txnVersion:单调递增的数字,用作事务版本。此数字对于写入 Delta 表的数据必须是唯一的。例如,这可以是首次尝试查询时的纪元秒数。同一作业的任何后续重新启动都必须具有相同的 txnVersion 值。

上述选项组合对于每次摄取到 Delta 表中的新数据都必须是唯一的,并且 txnVersion 必须高于上次摄取到 Delta 表中的数据。例如:

  • 上次成功写入的数据包含选项值 dailyETL:23423 (txnAppId:txnVersion)。
  • 下次写入数据时,txnAppId 应为 dailyETLtxnVersion 至少应为 23424(比上次写入数据的 txnVersion 多一个)。
  • 任何尝试写入 txnAppId = dailyETLtxnVersion23422 或更小的数据都将被忽略,因为 txnVersion 小于表中上次记录的 txnVersion
  • 尝试写入 txnAppId:txnVersionanotherETL:23424 的数据成功写入表,因为它包含与上次摄取数据中相同选项值不同的 txnAppId

您还可以通过设置 Spark 会话配置 spark.databricks.delta.write.txnAppIdspark.databricks.delta.write.txnVersion 来配置幂等写入。此外,您可以将 spark.databricks.delta.write.txnVersion.autoReset.enabled 设置为 true,以便在每次写入后自动重置 spark.databricks.delta.write.txnVersion。当同时设置写入器选项和会话配置时,我们将使用写入器选项值。

SET spark.databricks.delta.write.txnAppId = ...;
SET spark.databricks.delta.write.txnVersion = ...;
SET spark.databricks.delta.write.txnVersion.autoReset.enabled = true; -- if set to true, this will reset txnVersion after every write

您可以使用 DataFrameWriter 选项 userMetadata 或 SparkSession 配置 spark.databricks.delta.commitInfo.userMetadata 将用户定义的字符串作为元数据指定在这些操作进行的提交中。如果两者都已指定,则选项优先。此用户定义的元数据可在历史记录操作中读取。

SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople

Delta Lake 会自动验证正在写入的 DataFrame 的模式是否与表的模式兼容。Delta Lake 使用以下规则来确定从 DataFrame 到表的写入是否兼容:

  • 所有 DataFrame 列都必须存在于目标表中。如果 DataFrame 中存在表中不存在的列,则会引发异常。表中存在但 DataFrame 中不存在的列将设置为 null。
  • DataFrame 列数据类型必须与目标表中的列数据类型匹配。如果不匹配,则会引发异常。
  • DataFrame 列名不能仅通过大小写来区分。这意味着您不能在同一表中定义“Foo”和“foo”等列。虽然您可以在区分大小写或不区分大小写(默认)模式下使用 Spark,但 Parquet 在存储和返回列信息时是区分大小写的。Delta Lake 在存储模式时保留大小写但忽略大小写,并有此限制以避免潜在的错误、数据损坏或丢失问题。

Delta Lake 支持 DDL 显式添加新列和自动更新模式的能力。

如果您将其他选项(例如 partitionBy)与追加模式结合使用,Delta Lake 会验证它们是否匹配,并对任何不匹配的情况抛出错误。当不存在 partitionBy 时,追加会自动遵循现有数据的分区。

Delta Lake 允许您更新表的模式。支持以下类型的更改:

  • 添加新列(在任意位置)
  • 重新排序现有列

您可以使用 DDL 显式进行这些更改,也可以使用 DML 隐式进行更改。

您可以使用以下 DDL 显式更改表的模式。

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

默认情况下,可为空性为 true

要向嵌套字段添加列,请使用:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

如果在运行 ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) 之前的模式是:

- root
| - colA
| - colB
| +-field1
| +-field2

之后的模式是:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

要更改嵌套字段中的列,请使用:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

如果在运行 ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST 之前的模式是:

- root
| - colA
| - colB
| +-field1
| +-field2

之后的模式是:

- root
| - colA
| - colB
| +-field2
| +-field1
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

当运行以下 DDL 时:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

如果之前的模式是:

- root
| - colA
| - colB
| +-field1
| +-field2

之后的模式是:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

要重命名列而不重写任何列的现有数据,您必须为表启用列映射。请参阅启用列映射

重命名列:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

重命名嵌套字段:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

当您运行以下命令时:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

如果之前的模式是:

- root
| - colA
| - colB
| +-field1
| +-field2

那么之后的模式是:

- root
| - colA
| - colB
| +-field001
| +-field2

要将列作为仅元数据操作删除而不重写任何数据文件,您必须为表启用列映射。请参阅启用列映射

删除列:

ALTER TABLE table_name DROP COLUMN col_name

删除多个列:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

您可以通过重写表来更改列的类型或名称,或删除列。为此,请使用 overwriteSchema 选项:

spark.read.table(...) \
.withColumn("birthDate", col("birthDate").cast("date")) \
.write \
.format("delta") \
.mode("overwrite")
.option("overwriteSchema", "true") \
.saveAsTable(...)
spark.read.table(...) \
.withColumnRenamed("dateOfBirth", "birthDate") \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(...)

Delta Lake 可以在 DML 事务(追加或覆盖)中自动更新表的模式,并使模式与正在写入的数据兼容。

当满足以下条件时,DataFrame 中存在但表中缺失的列会自动作为写入事务的一部分添加:

  • writewriteStream 具有 .option("mergeSchema", "true")
  • spark.databricks.delta.schema.autoMerge.enabledtrue

当两个选项都指定时,DataFrameWriter 中的选项优先。添加的列将附加到它们所在的结构体的末尾。在附加新列时,会保留大小写。

由于 Parquet 不支持 NullType,因此在写入 Delta 表时,NullType 列将从 DataFrame 中删除,但仍存储在模式中。当该列收到不同的数据类型时,Delta Lake 会将模式合并到新的数据类型。如果 Delta Lake 为现有列收到 NullType,则会保留旧模式,并在写入期间删除新列。

流式传输中的 NullType 不受支持。由于在使用流式传输时必须设置模式,因此这种情况应该非常罕见。NullType 也不接受复杂类型,例如 ArrayTypeMapType

默认情况下,覆盖表中的数据不会覆盖模式。当使用 overwrite 模式且不带 replaceWhere 覆盖表时,您可能仍希望覆盖正在写入的数据的模式。您可以通过将 overwriteSchema 选项设置为 true 来替换表的模式和分区:

df.write.option("overwriteSchema", "true")

Delta Lake 支持在 Delta 表之上创建视图,就像您可能使用数据源表一样。

使用视图操作时的核心挑战是解析模式。如果您更改 Delta 表模式,则必须重新创建派生视图以考虑模式的任何添加。例如,如果您向 Delta 表添加新列,则必须确保此列在基于该基表构建的相应视图中可用。

您可以使用 CREATEALTER 中的 TBLPROPERTIES 将您自己的元数据存储为表属性。然后,您可以 SHOW 该元数据。例如:

ALTER TABLE default.people10m SET TBLPROPERTIES ('department' = 'accounting', 'delta.appendOnly' = 'true');
-- Show the table's properties.
SHOW TBLPROPERTIES default.people10m;
-- Show just the 'department' table property.
SHOW TBLPROPERTIES default.people10m ('department');

TBLPROPERTIES 作为 Delta 表元数据的一部分存储。如果 Delta 表已存在于给定位置,则您不能在 CREATE 语句中定义新的 TBLPROPERTIES

此外,为了调整行为和性能,Delta Lake 支持某些 Delta 表属性:

  • 阻止 Delta 表中的删除和更新:delta.appendOnly=true
  • 配置时间旅行保留属性:delta.logRetentionDuration=<interval-string>delta.deletedFileRetentionDuration=<interval-string>。有关详细信息,请参阅数据保留
  • 配置收集统计信息的列数:delta.dataSkippingNumIndexedCols=n。此属性指示写入器仅收集表中前 n 列的统计信息。此外,数据跳过代码会忽略此列索引之外的任何列的统计信息。此属性仅对写入的新数据生效。

您还可以在首次提交到 Delta 表时使用 Spark 配置设置 delta. 前缀属性。例如,要使用属性 delta.appendOnly=true 初始化 Delta 表,请将 Spark 配置 spark.databricks.delta.properties.defaults.appendOnly 设置为 true。例如:

spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")

另请参阅Delta 表属性参考

将表模式和属性同步到 Hive 元数据存储

标题为“将表模式和属性同步到 Hive 元数据存储”的部分

您可以通过将 spark.databricks.delta.catalog.update.enabled 设置为 true 来启用表模式和属性的异步同步到元数据存储。每当 Delta 客户端检测到其中任何一个因更新而更改时,它都会将更改同步到元数据存储。

模式存储在 HMS 的表属性中。如果模式很小,它将直接存储在键 spark.sql.sources.schema 下。

{
"spark.sql.sources.schema": "{'name':'col1','type':'string','nullable':true, 'metadata':{}},{'name':'col2','type':'string','nullable':true,'metadata':{}}"
}

如果模式很大,模式将被分解为多个部分。将它们拼接在一起应该会得到正确的模式。例如:

{
"spark.sql.sources.schema.numParts": "4",
"spark.sql.sources.schema.part.1": "{'name':'col1','type':'string','nullable':tr",
"spark.sql.sources.schema.part.2": "ue, 'metadata':{}},{'name':'co",
"spark.sql.sources.schema.part.3": "l2','type':'string','nullable':true,'meta",
"spark.sql.sources.schema.part.4": "data':{}}"
}

Delta Lake 具有探索表元数据的丰富功能。

它支持 SHOW COLUMNSDESCRIBE TABLE

它还提供了以下独有命令

提供有关模式、分区、表大小等信息。有关详细信息,请参阅检索 Delta 表详细信息

提供血缘信息,包括操作、用户等,以及每次对表的写入的操作指标。表历史记录保留 30 天。有关详细信息,请参阅检索 Delta 表历史记录

对于许多 Delta Lake 操作,您可以通过在创建新的 SparkSession 时设置以下配置来启用与 Apache Spark DataSourceV2 和 Catalog API(自 3.0 起)的集成。

from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

或者,您可以在使用 spark-submit 提交 Spark 应用程序时,或在启动 spark-shellpyspark 时,将它们指定为命令行参数来添加配置。

终端窗口
spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...
终端窗口
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Delta Lake 使用 Hadoop FileSystem API 访问存储系统。存储系统的凭据通常可以通过 Hadoop 配置来设置。Delta Lake 提供了多种设置 Hadoop 配置的方式,类似于 Apache Spark。

当您在集群上启动 Spark 应用程序时,您可以以 spark.hadoop.* 的形式设置 Spark 配置,以传递您的自定义 Hadoop 配置。例如,为 spark.hadoop.a.b.c 设置值将把该值作为 Hadoop 配置 a.b.c 传递,Delta Lake 将使用它来访问 Hadoop FileSystem API。

有关更多详细信息,请参阅Spark 文档

Spark SQL 会将所有当前的SQL 会话配置传递给 Delta Lake,Delta Lake 将使用它们来访问 Hadoop FileSystem API。例如,SET a.b.c=x.y.z 将告诉 Delta Lake 将值 x.y.z 作为 Hadoop 配置 a.b.c 传递,Delta Lake 将使用它来访问 Hadoop FileSystem API。

除了通过 Spark(集群)配置或 SQL 会话配置设置 Hadoop 文件系统配置外,Delta 还支持在使用 DataFrameReader.load(path)DataFrameWriter.save(path) 读取或写入表时,从 DataFrameReaderDataFrameWriter 选项(即以 fs. 前缀开头的选项键)读取 Hadoop 文件系统配置。

例如,您可以通过 DataFrame 选项传递存储凭据

df1 = spark.read.format("delta") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>") \
.read("...")
df2 = spark.read.format("delta") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>") \
.read("...")
df1.union(df2).write.format("delta") \
.mode("overwrite") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>") \
.save("...")

您可以在存储配置中找到存储的 Hadoop 文件系统配置详细信息。