欢迎来到 Delta Lake 的 Python 文档页面

DeltaTable

class delta.tables.DeltaTable(spark: pyspark.sql.session.SparkSession, jdt: JavaObject)

用于以编程方式与 Delta 表交互的主要类。您可以使用 Delta 表的路径创建 DeltaTable 实例。

deltaTable = DeltaTable.forPath(spark, "/path/to/table")

此外,您还可以将现有的 Parquet 表就地转换为 Delta 表。

deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

0.4 版新增。

toDF() → pyspark.sql.dataframe.DataFrame

获取此 Delta 表的 DataFrame 表示。

0.4 版新增。

alias(aliasName: str) → delta.tables.DeltaTable

为 Delta 表应用别名。

0.4 版新增。

generate(mode: str) → None

为给定的 delta 表生成清单文件。

参数

mode

要生成的清单文件类型的模式。有效模式如下(不区分大小写)

  • “symlink_format_manifest”:这将生成符号链接格式的清单,

    支持 Presto 和 Athena 读取。

有关更多信息,请参阅在线文档。

0.5 版新增。

delete(condition: Union[pyspark.sql.column.Column, str, None] = None) → None

从表中删除与给定 condition 匹配的数据。

示例

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions
参数

condition (strpyspark.sql.Column) – 更新条件

0.4 版新增。

update(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → None

根据 set 定义的规则,更新表中与给定 condition 匹配的行中的数据。

示例

# condition using SQL formatted string
deltaTable.update(
    condition = "eventType = 'clck'",
    set = { "eventType": "'click'" } )

# condition using Spark SQL functions
deltaTable.update(
    condition = col("eventType") == "clck",
    set = { "eventType": lit("click") } )
参数
  • condition (strpyspark.sql.Column) – 更新的可选条件

  • set (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。

0.4 版新增。

merge(source: pyspark.sql.dataframe.DataFrame, condition: Union[str, pyspark.sql.column.Column]) → delta.tables.DeltaMergeBuilder

根据给定的合并 conditionsource DataFrame 合并数据。这将返回一个 DeltaMergeBuilder 对象,可用于根据行是否匹配条件来指定要对行执行的更新、删除或插入操作。有关此操作的完整描述以及允许的更新、删除和插入操作的组合,请参见 DeltaMergeBuilder

示例 1:条件和更新表达式为 SQL 格式的字符串

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = "events.eventId = updates.eventId"
  ).whenMatchedUpdate(set =
    {
      "data": "updates.data",
      "count": "events.count + 1"
    }
  ).whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data",
      "count": "1"
    }
  ).execute()

示例 2:条件和更新表达式为 Spark SQL 函数

from pyspark.sql.functions import *

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = expr("events.eventId = updates.eventId")
  ).whenMatchedUpdate(set =
    {
      "data" : col("updates.data"),
      "count": col("events.count") + 1
    }
  ).whenNotMatchedInsert(values =
    {
      "date": col("updates.date"),
      "eventId": col("updates.eventId"),
      "data": col("updates.data"),
      "count": lit("1")
    }
  ).execute()
参数
  • source (pyspark.sql.DataFrame) – 源 DataFrame

  • condition (strpyspark.sql.Column) – 将源行与 Delta 表行匹配的条件。

返回

构建器对象,用于根据条件是否匹配来指定是否更新、删除或插入行

返回类型

delta.tables.DeltaMergeBuilder

0.4 版新增。

vacuum(retentionHours: Optional[float] = None) → pyspark.sql.dataframe.DataFrame

递归删除表中在给定保留阈值之前维护旧版本不需要的文件和目录。此方法在成功完成后将返回一个空的 DataFrame。

示例

deltaTable.vacuum()     # vacuum files not required by versions more than 7 days old

deltaTable.vacuum(100)  # vacuum files not required by versions more than 100 hours old
参数

retentionHours – 可选的历史保留小时数。如果未指定,则将使用默认保留期 168 小时(7 天)。

0.4 版新增。

history(limit: Optional[int] = None) → pyspark.sql.dataframe.DataFrame

以 Spark DataFrame 的形式获取此表上最新的 limit 次提交的信息。信息按时间倒序排列。

示例

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation
参数

limit – 可选,历史记录中要返回的最新提交次数。

返回

表的提交历史。有关更多详细信息,请参阅在线 Delta Lake 文档。

返回类型

pyspark.sql.DataFrame

0.4 版新增。

detail() → pyspark.sql.dataframe.DataFrame

获取 Delta 表的详细信息,例如格式、名称和大小。

示例

detailDF = deltaTable.detail() # get the full details of the table

:return 表的信息(格式、名称、大小等) :rtype: pyspark.sql.DataFrame

注意

正在演变

2.1 版新增。

classmethod convertToDelta(sparkSession: pyspark.sql.session.SparkSession, identifier: str, partitionSchema: Union[str, pyspark.sql.types.StructType, None] = None) → delta.tables.DeltaTable

从给定的 parquet 表创建 DeltaTable。获取现有 parquet 表并在表的基路径中构建 delta 事务日志。注意:转换过程中对表的任何更改可能不会在转换结束时导致一致的状态。用户应在转换开始前停止对表的任何更改。

示例

# Convert unpartitioned parquet table at path 'path/to/table'
deltaTable = DeltaTable.convertToDelta(
    spark, "parquet.`path/to/table`")

# Convert partitioned parquet table at path 'path/to/table' and partitioned by
# integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(
    spark, "parquet.`path/to/table`", "part int")
参数
  • sparkSession (pyspark.sql.SparkSession) – 用于转换的 SparkSession

  • identifier (str) – 格式为“parquet.`path`”的 Parquet 表标识符

  • partitionSchema – Hive DDL 格式的字符串,或 pyspark.sql.types.StructType

返回

表示转换后的 Delta 表的 DeltaTable

返回类型

DeltaTable

0.4 版新增。

classmethod forPath(sparkSession: pyspark.sql.session.SparkSession, path: str, hadoopConf: Dict[str, str] = {}) → delta.tables.DeltaTable

实例化一个 DeltaTable 对象,表示给定路径中的数据。如果给定路径无效(即不存在表或现有表不是 Delta 表),则会引发“not a Delta table”错误。

参数
  • sparkSession (pyspark.sql.SparkSession) – 用于加载表的 SparkSession

  • hadoopConf (可选字典,键为 str,值为 str。) – 以“fs.”或“dfs.”开头的 Hadoop 配置将被 DeltaTable 拾取,以便在执行查询时访问文件系统。其他配置将不被允许。

返回

已加载的 Delta 表

返回类型

DeltaTable

示例

hadoopConf = {"fs.s3a.access.key" : "<access-key>",
           "fs.s3a.secret.key": "secret-key"}
deltaTable = DeltaTable.forPath(
               spark,
               "/path/to/table",
               hadoopConf)

0.4 版新增。

classmethod forName(sparkSession: pyspark.sql.session.SparkSession, tableOrViewName: str) → delta.tables.DeltaTable

使用给定的表名或视图名实例化一个 DeltaTable 对象。如果给定的 tableOrViewName 无效(即不存在表或现有表不是 Delta 表),则会引发“not a Delta table”错误。

给定的 tableOrViewName 也可以是 delta 数据源的绝对路径(即 delta.`path`)。如果是,则实例化一个 DeltaTable 对象,表示给定路径中的数据(与 forPath 一致)。

参数
  • sparkSession – 用于加载表的 SparkSession

  • tableOrViewName – 表或视图的名称

返回

已加载的 Delta 表

返回类型

DeltaTable

示例

deltaTable = DeltaTable.forName(spark, "tblName")

0.7 版新增。

classmethod create(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder

返回 DeltaTableBuilder 对象,可用于指定表名、位置、列、分区列、表注释和表属性以创建 Delta 表,如果表已存在则报错(与 SQL CREATE TABLE 相同)。

有关此操作的完整描述和示例,请参见 DeltaTableBuilder

参数

sparkSession – 用于创建表的 SparkSession

返回

DeltaTableBuilder 的实例

返回类型

DeltaTableBuilder

注意

正在演变

1.0 版新增。

classmethod createIfNotExists(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder

返回 DeltaTableBuilder 对象,可用于指定表名、位置、列、分区列、表注释和表属性以创建 Delta 表,如果表不存在(与 SQL CREATE TABLE IF NOT EXISTS 相同)。

有关此操作的完整描述和示例,请参见 DeltaTableBuilder

参数

sparkSession – 用于创建表的 SparkSession

返回

DeltaTableBuilder 的实例

返回类型

DeltaTableBuilder

注意

正在演变

1.0 版新增。

classmethod replace(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder

返回 DeltaTableBuilder 对象,可用于指定表名、位置、列、分区列、表注释和表属性以替换 Delta 表,如果表不存在则报错(与 SQL REPLACE TABLE 相同)。

有关此操作的完整描述和示例,请参见 DeltaTableBuilder

参数

sparkSession – 用于创建表的 SparkSession

返回

DeltaTableBuilder 的实例

返回类型

DeltaTableBuilder

注意

正在演变

1.0 版新增。

classmethod createOrReplace(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder

返回 DeltaTableBuilder 对象,可用于指定表名、位置、列、分区列、表注释和表属性以替换 Delta 表,如果表不存在则报错(与 SQL REPLACE TABLE 相同)。

有关此操作的完整描述和示例,请参见 DeltaTableBuilder

参数

sparkSession – 用于创建表的 SparkSession

返回

DeltaTableBuilder 的实例

返回类型

DeltaTableBuilder

注意

正在演变

1.0 版新增。

classmethod isDeltaTable(sparkSession: pyspark.sql.session.SparkSession, identifier: str) → bool

使用给定的 SparkSession 检查提供的 identifier 字符串(此处为文件路径)是否是 Delta 表的根目录。

参数
  • sparkSession – 用于执行检查的 SparkSession

  • path – 表的位置

返回

表是否为 delta 表

返回类型

bool

示例

DeltaTable.isDeltaTable(spark, "/path/to/table")

0.4 版新增。

upgradeTableProtocol(readerVersion: int, writerVersion: int) → None

更新表的协议版本以利用新功能。升级读取器版本将阻止所有具有较旧 Delta Lake 版本的客户端访问此表。升级写入器版本将阻止较旧的 Delta Lake 版本写入此表。读取器或写入器版本无法降级。

有关更多详细信息,请参阅在线文档和 PROTOCOL.md 中的 Delta 协议规范。

0.8 版新增。

restoreToVersion(version: int) → pyspark.sql.dataframe.DataFrame

将 DeltaTable 恢复到由版本号指定的表的旧版本。

示例

io.delta.tables.DeltaTable.restoreToVersion(1)
参数

version – 恢复表的T目标版本

返回

包含恢复操作指标的 Dataframe。

返回类型

pyspark.sql.DataFrame

1.2 版新增。

restoreToTimestamp(timestamp: str) → pyspark.sql.dataframe.DataFrame

将 DeltaTable 恢复到由时间戳指定的表的旧版本。时间戳的格式可以是 yyyy-MM-dd 或 yyyy-MM-dd HH:mm:ss

示例

io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
参数

timestamp – 恢复表的T目标时间戳

返回

包含恢复操作指标的 Dataframe。

返回类型

pyspark.sql.DataFrame

1.2 版新增。

optimize() → delta.tables.DeltaOptimizeBuilder

优化表的数据布局。这将返回一个 DeltaOptimizeBuilder 对象,可用于指定分区过滤器以限制优化范围,并执行不同的优化技术,例如文件压缩或使用 Z-Order 曲线排序数据。

有关此操作的完整描述,请参见 DeltaOptimizeBuilder

示例

deltaTable.optimize().where("date='2021-11-18'").executeCompaction()
返回

DeltaOptimizeBuilder 的一个实例。

返回类型

DeltaOptimizeBuilder

2.0 版新增。

class delta.tables.DeltaMergeBuilder(spark: pyspark.sql.session.SparkSession, jbuilder: JavaObject)

构建器,用于指定如何将源 DataFrame 中的数据合并到目标 Delta 表中。使用 delta.tables.DeltaTable.merge() 创建此类的对象。使用此构建器,您可以指定任意数量的 whenMatchedwhenNotMatchedwhenNotMatchedBySource 子句。以下是这些子句的约束。

  • whenMatched 子句中的约束

    • whenMatched 子句中的条件是可选的。但是,如果有多个 whenMatched 子句,则只有最后一个可以省略条件。

    • 当有多个 whenMatched 子句并且存在条件(或缺乏条件),使得一行满足多个子句时,将执行第一个满足的子句的操作。换句话说,whenMatched 子句的顺序很重要。

    • 如果所有 whenMatched 子句都不匹配满足合并条件的源-目标行对,则目标行将不被更新或删除。

    • 如果要使用源 DataFrame 的相应列更新目标 Delta 表的所有列,则可以使用 whenMatchedUpdateAll()。这等同于

      whenMatchedUpdate(set = {
        "col1": "source.col1",
        "col2": "source.col2",
        ...    # for all columns in the delta table
      })
      
  • whenNotMatched 子句中的约束

    • whenNotMatched 子句中的条件是可选的。但是,如果有多个 whenNotMatched 子句,则只有最后一个可以省略条件。

    • 当有多个 whenNotMatched 子句并且存在条件(或缺乏条件),使得一行满足多个子句时,将执行第一个满足的子句的操作。换句话说,whenNotMatched 子句的顺序很重要。

    • 如果没有 whenNotMatched 子句,或者它存在但非匹配源行不满足条件,则不插入源行。

    • 如果要使用源 DataFrame 的相应列插入目标 Delta 表的所有列,则可以使用 whenNotMatchedInsertAll()。这等同于

      whenNotMatchedInsert(values = {
        "col1": "source.col1",
        "col2": "source.col2",
        ...    # for all columns in the delta table
      })
      
  • whenNotMatchedBySource 子句中的约束

    • whenNotMatchedBySource 子句中的条件是可选的。但是,如果有多个 whenNotMatchedBySource 子句,则只有最后一个 whenNotMatchedBySource 子句可以省略条件。

    • whenNotMatchedBySource 子句中的条件和更新表达式只能引用目标 Delta 表中的列。

    • 当有多个 whenNotMatchedBySource 子句并且存在条件(或缺乏条件),使得一行满足多个子句时,将执行第一个满足的子句的操作。换句话说,whenNotMatchedBySource 子句的顺序很重要。

    • 如果没有 whenNotMatchedBySource 子句,或者它存在但非匹配目标行不满足任何 whenNotMatchedBySource 子句条件,则目标行将不被更新或删除。

示例 1:条件和更新表达式为 SQL 格式的字符串

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = "events.eventId = updates.eventId"
  ).whenMatchedUpdate(set =
    {
      "data": "updates.data",
      "count": "events.count + 1"
    }
  ).whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data",
      "count": "1",
      "missed_count": "0"
    }
  ).whenNotMatchedBySourceUpdate(set =
    {
      "missed_count": "events.missed_count + 1"
    }
  ).execute()

示例 2:条件和更新表达式为 Spark SQL 函数

from pyspark.sql.functions import *

deltaTable.alias("events").merge(
    source = updatesDF.alias("updates"),
    condition = expr("events.eventId = updates.eventId")
  ).whenMatchedUpdate(set =
    {
      "data" : col("updates.data"),
      "count": col("events.count") + 1
    }
  ).whenNotMatchedInsert(values =
    {
      "date": col("updates.date"),
      "eventId": col("updates.eventId"),
      "data": col("updates.data"),
      "count": lit("1"),
      "missed_count": lit("0")
    }
  ).whenNotMatchedBySourceUpdate(set =
    {
      "missed_count": col("events.missed_count") + 1
    }
  ).execute()

0.4 版新增。

whenMatchedUpdate(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder

根据 set 定义的规则更新匹配的表行。如果指定了 condition,则该行必须评估为 true 才能更新。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

参数
  • condition (strpyspark.sql.Column) – 更新的可选条件

  • set (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。

返回

此构建器

0.4 版新增。

whenMatchedUpdateAll(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

使用源行中相应列的值更新匹配表行的所有列。如果指定了 condition,则新行必须为 true 才能更新。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

参数

condition (strpyspark.sql.Column) – 插入的可选条件

返回

此构建器

0.4 版新增。

whenMatchedDelete(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

仅当给定 condition(如果指定)对匹配行成立时,才从表中删除匹配行。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

参数

condition (strpyspark.sql.Column) – 删除的可选条件

返回

此构建器

0.4 版新增。

whenNotMatchedInsert(condition: Union[pyspark.sql.column.Column, str, None] = None, values: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder

根据 values 定义的规则向目标表插入新行。如果指定了 condition,则新行必须评估为 true 才能插入。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

参数
  • condition (strpyspark.sql.Column) – 插入的可选条件

  • values (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。

返回

此构建器

0.4 版新增。

whenNotMatchedInsertAll(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

通过将目标列分配给源行中相应列的值来插入新的目标 Delta 表行。如果指定了 condition,则新行必须评估为 true 才能插入。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

参数

condition (strpyspark.sql.Column) – 插入的可选条件

返回

此构建器

0.4 版新增。

whenNotMatchedBySourceUpdate(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder

根据 set 定义的规则更新源中没有匹配项的目标行。如果指定了 condition,则该行必须评估为 true 才能更新。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

参数
  • condition (strpyspark.sql.Column) – 更新的可选条件

  • set (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。

返回

此构建器

2.3 版新增。

whenNotMatchedBySourceDelete(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder

仅当给定 condition(如果指定)对目标行成立时,才从表中删除源中没有匹配项的目标行。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

参数

condition (strpyspark.sql.Column) – 删除的可选条件

返回

此构建器

2.3 版新增。

execute() → None

根据构建的匹配和不匹配操作执行合并操作。

有关完整的用法详细信息,请参见 DeltaMergeBuilder

0.4 版新增。

class delta.tables.DeltaTableBuilder(spark: pyspark.sql.session.SparkSession, jbuilder: JavaObject)

构建器,用于指定如何创建/替换 Delta 表。您必须在执行构建器之前指定表名或路径。您可以指定表列、分区列、数据位置、表注释和属性,以及如何创建/替换 Delta 表。

执行构建器后,将返回一个 DeltaTable 对象。

使用 delta.tables.DeltaTable.create()delta.tables.DeltaTable.createIfNotExists()delta.tables.DeltaTable.replace()delta.tables.DeltaTable.createOrReplace() 创建此类的对象。

示例 1:使用表名创建具有独立列的 Delta 表

deltaTable = DeltaTable.create(sparkSession)
    .tableName("testTable")
    .addColumn("c1", dataType = "INT", nullable = False)
    .addColumn("c2", dataType = IntegerType(), generatedAlwaysAs = "c1 + 1")
    .partitionedBy("c1")
    .execute()

示例 2:使用位置替换具有现有列的 Delta 表

df = spark.createDataFrame([('a', 1), ('b', 2), ('c', 3)], ["key", "value"])

deltaTable = DeltaTable.replace(sparkSession)
    .tableName("testTable")
    .addColumns(df.schema)
    .execute()

1.0 版新增。

注意

正在演变

tableName(identifier: str) → delta.tables.DeltaTableBuilder

指定表名。可选地用数据库名限定 [database_name.] table_name。

参数

identifier (str) – 表名

返回

此构建器

注意

正在演变

1.0 版新增。

location(location: str) → delta.tables.DeltaTableBuilder

指定存储表数据的目录路径,可以是分布式存储上的路径。

参数

location (str) – 数据存储位置

返回

此构建器

注意

正在演变

1.0 版新增。

comment(comment: str) → delta.tables.DeltaTableBuilder

描述表的注释。

参数

comment (str) – 表注释

返回

此构建器

注意

正在演变

1.0 版新增。

addColumn(colName: str, dataType: Union[str, pyspark.sql.types.DataType], nullable: bool = True, generatedAlwaysAs: Optional[str] = None, comment: Optional[str] = None) → delta.tables.DeltaTableBuilder

指定表中的列

参数
  • colName (str) – 列名

  • dataType (strpyspark.sql.types.DataType) – 列数据类型

  • nullable (bool) – 列是否可为空

  • generatedAlwaysAs (str) – 如果列始终作为其他列的函数生成,则为 SQL 表达式。有关生成列的详细信息,请参阅在线文档。

  • comment (str) – 列注释

返回

此构建器

注意

正在演变

1.0 版新增。

addColumns(cols: Union[pyspark.sql.types.StructType, List[pyspark.sql.types.StructField]]) → delta.tables.DeltaTableBuilder

使用现有模式指定表中的列

参数

cols (pyspark.sql.types.StructTypepyspark.sql.types.StructType 列表。) – 现有模式中的列

返回

此构建器

注意

正在演变

1.0 版新增。

partitionedBy(*cols) → delta.tables.DeltaTableBuilder

指定用于分区的列

参数

cols (str列名列表) – 分区列

返回

此构建器

注意

正在演变

1.0 版新增。

property(key: str, value: str) → delta.tables.DeltaTableBuilder

指定表属性

参数

key – 表属性键

返回

此构建器

注意

正在演变

1.0 版新增。

execute() → delta.tables.DeltaTable

执行表创建。

返回类型

DeltaTable

注意

正在演变

1.0 版新增。

class delta.tables.DeltaOptimizeBuilder(spark: pyspark.sql.session.SparkSession, jbuilder: JavaObject)

用于构建 OPTIMIZE 命令并执行的构建器类。

使用 delta.tables.DeltaTable.optimize() 创建此类的实例。

2.0.0 版新增。

where(partitionFilter: str) → delta.tables.DeltaOptimizeBuilder

在此优化命令构建器上应用分区过滤器,以限制在选定分区上的操作。

参数

partitionFilter (str) – 要应用的分区过滤器

返回

应用了分区过滤器的 DeltaOptimizeBuilder

返回类型

DeltaOptimizeBuilder

2.0 版新增。

executeCompaction() → pyspark.sql.dataframe.DataFrame

压缩选定分区中的小文件。

返回

包含 OPTIMIZE 执行指标的 DataFrame

返回类型

pyspark.sql.DataFrame

2.0 版新增。

executeZOrderBy(*cols) → pyspark.sql.dataframe.DataFrame

使用给定列对选定分区中的数据进行 Z-Order 排序。

参数

cols (str列名列表) – Z-Order 列

返回

包含 OPTIMIZE 执行指标的 DataFrame

返回类型

pyspark.sql.DataFrame

2.0 版新增。

异常

exception delta.exceptions.DeltaConcurrentModificationException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

所有 Delta 提交冲突异常的基本类。

1.0 版新增。

注意

正在演变

exception delta.exceptions.ConcurrentWriteException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

当并发事务在当前事务读取表后写入数据时抛出。

1.0 版新增。

注意

正在演变

exception delta.exceptions.MetadataChangedException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

当 Delta 表的元数据在读取时间和提交时间之间发生更改时抛出。

1.0 版新增。

注意

正在演变

exception delta.exceptions.ProtocolChangedException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

当协议版本在读取时间和提交时间之间发生更改时抛出。

1.0 版新增。

注意

正在演变

exception delta.exceptions.ConcurrentAppendException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

当添加的文件将被当前事务读取时抛出。

1.0 版新增。

注意

正在演变

exception delta.exceptions.ConcurrentDeleteReadException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

当当前事务读取被并发事务删除的数据时抛出。

1.0 版新增。

注意

正在演变

exception delta.exceptions.ConcurrentDeleteDeleteException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

当当前事务删除被并发事务删除的数据时抛出。

1.0 版新增。

注意

正在演变

exception delta.exceptions.ConcurrentTransactionException(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)

当并发事务都尝试更新相同的幂等事务时抛出。

1.0 版新增。

注意

正在演变

其他

delta.pip_utils.configure_spark_with_delta_pip(spark_session_builder: pyspark.sql.session.SparkSession.Builder, extra_packages: Optional[List[str]] = None) → pyspark.sql.session.SparkSession.Builder

实用函数,用于配置 SparkSession 构建器,使其生成的 SparkSession 将自动从 Maven 下载所需的 Delta Lake JAR。当您想要

  1. 使用 pip 在本地安装 Delta Lake,以及

  2. 直接使用 Delta Lake + Pyspark 执行 Python 代码,即不使用 spark-submit –packages io.delta:…pyspark –packages io.delta:…

    builder = SparkSession.builder .master(“local[*]”) .appName(“test”)

    spark = configure_spark_with_delta_pip(builder).getOrCreate()

  3. 如果您想添加更多包,请使用 extra_packages 参数。

    builder = SparkSession.builder .master(“local[*]”) .appName(“test”) my_packages = [“org.apache.spark:spark-sql-kafka-0-10_2.12:x.y.z”] spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

参数
  • spark_session_builder – 用于配置和创建 SparkSession 的 SparkSession.Builder 对象。

  • extra_packages – 除了 Delta Lake 之外,设置要添加到 Spark 会话的其他包。

返回

已更新的 SparkSession.Builder 对象

1.0 版新增。

注意

正在演变