欢迎来到 Delta Lake 的 Python 文档页面¶
DeltaTable¶
-
class
delta.tables.
DeltaTable
(spark: pyspark.sql.session.SparkSession, jdt: JavaObject)¶ 用于以编程方式与 Delta 表交互的主要类。您可以使用 Delta 表的路径创建 DeltaTable 实例。
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
此外,您还可以将现有的 Parquet 表就地转换为 Delta 表。
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")
0.4 版新增。
-
toDF
() → pyspark.sql.dataframe.DataFrame¶ 获取此 Delta 表的 DataFrame 表示。
0.4 版新增。
-
alias
(aliasName: str) → delta.tables.DeltaTable¶ 为 Delta 表应用别名。
0.4 版新增。
-
generate
(mode: str) → None¶ 为给定的 delta 表生成清单文件。
- 参数
mode –
要生成的清单文件类型的模式。有效模式如下(不区分大小写)
- “symlink_format_manifest”:这将生成符号链接格式的清单,
支持 Presto 和 Athena 读取。
有关更多信息,请参阅在线文档。
0.5 版新增。
-
delete
(condition: Union[pyspark.sql.column.Column, str, None] = None) → None¶ 从表中删除与给定
condition
匹配的数据。示例
deltaTable.delete("date < '2017-01-01'") # predicate using SQL formatted string deltaTable.delete(col("date") < "2017-01-01") # predicate using Spark SQL functions
- 参数
condition (str 或 pyspark.sql.Column) – 更新条件
0.4 版新增。
-
update
(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → None¶ 根据
set
定义的规则,更新表中与给定condition
匹配的行中的数据。示例
# condition using SQL formatted string deltaTable.update( condition = "eventType = 'clck'", set = { "eventType": "'click'" } ) # condition using Spark SQL functions deltaTable.update( condition = col("eventType") == "clck", set = { "eventType": lit("click") } )
- 参数
condition (str 或 pyspark.sql.Column) – 更新的可选条件
set (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。
0.4 版新增。
-
merge
(source: pyspark.sql.dataframe.DataFrame, condition: Union[str, pyspark.sql.column.Column]) → delta.tables.DeltaMergeBuilder¶ 根据给定的合并 condition 从 source DataFrame 合并数据。这将返回一个
DeltaMergeBuilder
对象,可用于根据行是否匹配条件来指定要对行执行的更新、删除或插入操作。有关此操作的完整描述以及允许的更新、删除和插入操作的组合,请参见DeltaMergeBuilder
。示例 1:条件和更新表达式为 SQL 格式的字符串
deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = "events.eventId = updates.eventId" ).whenMatchedUpdate(set = { "data": "updates.data", "count": "events.count + 1" } ).whenNotMatchedInsert(values = { "date": "updates.date", "eventId": "updates.eventId", "data": "updates.data", "count": "1" } ).execute()
示例 2:条件和更新表达式为 Spark SQL 函数
from pyspark.sql.functions import * deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = expr("events.eventId = updates.eventId") ).whenMatchedUpdate(set = { "data" : col("updates.data"), "count": col("events.count") + 1 } ).whenNotMatchedInsert(values = { "date": col("updates.date"), "eventId": col("updates.eventId"), "data": col("updates.data"), "count": lit("1") } ).execute()
- 参数
source (pyspark.sql.DataFrame) – 源 DataFrame
condition (str 或 pyspark.sql.Column) – 将源行与 Delta 表行匹配的条件。
- 返回
构建器对象,用于根据条件是否匹配来指定是否更新、删除或插入行
- 返回类型
0.4 版新增。
-
vacuum
(retentionHours: Optional[float] = None) → pyspark.sql.dataframe.DataFrame¶ 递归删除表中在给定保留阈值之前维护旧版本不需要的文件和目录。此方法在成功完成后将返回一个空的 DataFrame。
示例
deltaTable.vacuum() # vacuum files not required by versions more than 7 days old deltaTable.vacuum(100) # vacuum files not required by versions more than 100 hours old
- 参数
retentionHours – 可选的历史保留小时数。如果未指定,则将使用默认保留期 168 小时(7 天)。
0.4 版新增。
-
history
(limit: Optional[int] = None) → pyspark.sql.dataframe.DataFrame¶ 以 Spark DataFrame 的形式获取此表上最新的 limit 次提交的信息。信息按时间倒序排列。
示例
fullHistoryDF = deltaTable.history() # get the full history of the table lastOperationDF = deltaTable.history(1) # get the last operation
- 参数
limit – 可选,历史记录中要返回的最新提交次数。
- 返回
表的提交历史。有关更多详细信息,请参阅在线 Delta Lake 文档。
- 返回类型
pyspark.sql.DataFrame
0.4 版新增。
-
detail
() → pyspark.sql.dataframe.DataFrame¶ 获取 Delta 表的详细信息,例如格式、名称和大小。
示例
detailDF = deltaTable.detail() # get the full details of the table
:return 表的信息(格式、名称、大小等) :rtype: pyspark.sql.DataFrame
注意
正在演变
2.1 版新增。
-
classmethod
convertToDelta
(sparkSession: pyspark.sql.session.SparkSession, identifier: str, partitionSchema: Union[str, pyspark.sql.types.StructType, None] = None) → delta.tables.DeltaTable¶ 从给定的 parquet 表创建 DeltaTable。获取现有 parquet 表并在表的基路径中构建 delta 事务日志。注意:转换过程中对表的任何更改可能不会在转换结束时导致一致的状态。用户应在转换开始前停止对表的任何更改。
示例
# Convert unpartitioned parquet table at path 'path/to/table' deltaTable = DeltaTable.convertToDelta( spark, "parquet.`path/to/table`") # Convert partitioned parquet table at path 'path/to/table' and partitioned by # integer column named 'part' partitionedDeltaTable = DeltaTable.convertToDelta( spark, "parquet.`path/to/table`", "part int")
- 参数
sparkSession (pyspark.sql.SparkSession) – 用于转换的 SparkSession
identifier (str) – 格式为“parquet.`path`”的 Parquet 表标识符
partitionSchema – Hive DDL 格式的字符串,或 pyspark.sql.types.StructType
- 返回
表示转换后的 Delta 表的 DeltaTable
- 返回类型
0.4 版新增。
-
classmethod
forPath
(sparkSession: pyspark.sql.session.SparkSession, path: str, hadoopConf: Dict[str, str] = {}) → delta.tables.DeltaTable¶ 实例化一个
DeltaTable
对象,表示给定路径中的数据。如果给定路径无效(即不存在表或现有表不是 Delta 表),则会引发“not a Delta table”错误。- 参数
sparkSession (pyspark.sql.SparkSession) – 用于加载表的 SparkSession
hadoopConf (可选字典,键为 str,值为 str。) – 以“fs.”或“dfs.”开头的 Hadoop 配置将被 DeltaTable 拾取,以便在执行查询时访问文件系统。其他配置将不被允许。
- 返回
已加载的 Delta 表
- 返回类型
示例
hadoopConf = {"fs.s3a.access.key" : "<access-key>", "fs.s3a.secret.key": "secret-key"} deltaTable = DeltaTable.forPath( spark, "/path/to/table", hadoopConf)
0.4 版新增。
-
classmethod
forName
(sparkSession: pyspark.sql.session.SparkSession, tableOrViewName: str) → delta.tables.DeltaTable¶ 使用给定的表名或视图名实例化一个
DeltaTable
对象。如果给定的 tableOrViewName 无效(即不存在表或现有表不是 Delta 表),则会引发“not a Delta table”错误。给定的 tableOrViewName 也可以是 delta 数据源的绝对路径(即 delta.`path`)。如果是,则实例化一个
DeltaTable
对象,表示给定路径中的数据(与 forPath 一致)。- 参数
sparkSession – 用于加载表的 SparkSession
tableOrViewName – 表或视图的名称
- 返回
已加载的 Delta 表
- 返回类型
示例
deltaTable = DeltaTable.forName(spark, "tblName")
0.7 版新增。
-
classmethod
create
(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder¶ 返回
DeltaTableBuilder
对象,可用于指定表名、位置、列、分区列、表注释和表属性以创建 Delta 表,如果表已存在则报错(与 SQL CREATE TABLE 相同)。有关此操作的完整描述和示例,请参见
DeltaTableBuilder
。- 参数
sparkSession – 用于创建表的 SparkSession
- 返回
DeltaTableBuilder 的实例
- 返回类型
注意
正在演变
1.0 版新增。
-
classmethod
createIfNotExists
(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder¶ 返回
DeltaTableBuilder
对象,可用于指定表名、位置、列、分区列、表注释和表属性以创建 Delta 表,如果表不存在(与 SQL CREATE TABLE IF NOT EXISTS 相同)。有关此操作的完整描述和示例,请参见
DeltaTableBuilder
。- 参数
sparkSession – 用于创建表的 SparkSession
- 返回
DeltaTableBuilder 的实例
- 返回类型
注意
正在演变
1.0 版新增。
-
classmethod
replace
(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder¶ 返回
DeltaTableBuilder
对象,可用于指定表名、位置、列、分区列、表注释和表属性以替换 Delta 表,如果表不存在则报错(与 SQL REPLACE TABLE 相同)。有关此操作的完整描述和示例,请参见
DeltaTableBuilder
。- 参数
sparkSession – 用于创建表的 SparkSession
- 返回
DeltaTableBuilder 的实例
- 返回类型
注意
正在演变
1.0 版新增。
-
classmethod
createOrReplace
(sparkSession: Optional[pyspark.sql.session.SparkSession] = None) → delta.tables.DeltaTableBuilder¶ 返回
DeltaTableBuilder
对象,可用于指定表名、位置、列、分区列、表注释和表属性以替换 Delta 表,如果表不存在则报错(与 SQL REPLACE TABLE 相同)。有关此操作的完整描述和示例,请参见
DeltaTableBuilder
。- 参数
sparkSession – 用于创建表的 SparkSession
- 返回
DeltaTableBuilder 的实例
- 返回类型
注意
正在演变
1.0 版新增。
-
classmethod
isDeltaTable
(sparkSession: pyspark.sql.session.SparkSession, identifier: str) → bool¶ 使用给定的 SparkSession 检查提供的 identifier 字符串(此处为文件路径)是否是 Delta 表的根目录。
- 参数
sparkSession – 用于执行检查的 SparkSession
path – 表的位置
- 返回
表是否为 delta 表
- 返回类型
bool
示例
DeltaTable.isDeltaTable(spark, "/path/to/table")
0.4 版新增。
-
upgradeTableProtocol
(readerVersion: int, writerVersion: int) → None¶ 更新表的协议版本以利用新功能。升级读取器版本将阻止所有具有较旧 Delta Lake 版本的客户端访问此表。升级写入器版本将阻止较旧的 Delta Lake 版本写入此表。读取器或写入器版本无法降级。
有关更多详细信息,请参阅在线文档和 PROTOCOL.md 中的 Delta 协议规范。
0.8 版新增。
-
restoreToVersion
(version: int) → pyspark.sql.dataframe.DataFrame¶ 将 DeltaTable 恢复到由版本号指定的表的旧版本。
示例
io.delta.tables.DeltaTable.restoreToVersion(1)
- 参数
version – 恢复表的T目标版本
- 返回
包含恢复操作指标的 Dataframe。
- 返回类型
pyspark.sql.DataFrame
1.2 版新增。
-
restoreToTimestamp
(timestamp: str) → pyspark.sql.dataframe.DataFrame¶ 将 DeltaTable 恢复到由时间戳指定的表的旧版本。时间戳的格式可以是 yyyy-MM-dd 或 yyyy-MM-dd HH:mm:ss
示例
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01') io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
- 参数
timestamp – 恢复表的T目标时间戳
- 返回
包含恢复操作指标的 Dataframe。
- 返回类型
pyspark.sql.DataFrame
1.2 版新增。
-
optimize
() → delta.tables.DeltaOptimizeBuilder¶ 优化表的数据布局。这将返回一个
DeltaOptimizeBuilder
对象,可用于指定分区过滤器以限制优化范围,并执行不同的优化技术,例如文件压缩或使用 Z-Order 曲线排序数据。有关此操作的完整描述,请参见
DeltaOptimizeBuilder
。示例
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()
- 返回
DeltaOptimizeBuilder 的一个实例。
- 返回类型
2.0 版新增。
-
-
class
delta.tables.
DeltaMergeBuilder
(spark: pyspark.sql.session.SparkSession, jbuilder: JavaObject)¶ 构建器,用于指定如何将源 DataFrame 中的数据合并到目标 Delta 表中。使用
delta.tables.DeltaTable.merge()
创建此类的对象。使用此构建器,您可以指定任意数量的whenMatched
、whenNotMatched
和whenNotMatchedBySource
子句。以下是这些子句的约束。whenMatched
子句中的约束whenMatched
子句中的条件是可选的。但是,如果有多个whenMatched
子句,则只有最后一个可以省略条件。当有多个
whenMatched
子句并且存在条件(或缺乏条件),使得一行满足多个子句时,将执行第一个满足的子句的操作。换句话说,whenMatched
子句的顺序很重要。如果所有
whenMatched
子句都不匹配满足合并条件的源-目标行对,则目标行将不被更新或删除。如果要使用源 DataFrame 的相应列更新目标 Delta 表的所有列,则可以使用
whenMatchedUpdateAll()
。这等同于whenMatchedUpdate(set = { "col1": "source.col1", "col2": "source.col2", ... # for all columns in the delta table })
whenNotMatched
子句中的约束whenNotMatched
子句中的条件是可选的。但是,如果有多个whenNotMatched
子句,则只有最后一个可以省略条件。当有多个
whenNotMatched
子句并且存在条件(或缺乏条件),使得一行满足多个子句时,将执行第一个满足的子句的操作。换句话说,whenNotMatched
子句的顺序很重要。如果没有
whenNotMatched
子句,或者它存在但非匹配源行不满足条件,则不插入源行。如果要使用源 DataFrame 的相应列插入目标 Delta 表的所有列,则可以使用
whenNotMatchedInsertAll()
。这等同于whenNotMatchedInsert(values = { "col1": "source.col1", "col2": "source.col2", ... # for all columns in the delta table })
whenNotMatchedBySource
子句中的约束whenNotMatchedBySource
子句中的条件是可选的。但是,如果有多个whenNotMatchedBySource
子句,则只有最后一个whenNotMatchedBySource
子句可以省略条件。whenNotMatchedBySource
子句中的条件和更新表达式只能引用目标 Delta 表中的列。当有多个
whenNotMatchedBySource
子句并且存在条件(或缺乏条件),使得一行满足多个子句时,将执行第一个满足的子句的操作。换句话说,whenNotMatchedBySource
子句的顺序很重要。如果没有
whenNotMatchedBySource
子句,或者它存在但非匹配目标行不满足任何whenNotMatchedBySource
子句条件,则目标行将不被更新或删除。
示例 1:条件和更新表达式为 SQL 格式的字符串
deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = "events.eventId = updates.eventId" ).whenMatchedUpdate(set = { "data": "updates.data", "count": "events.count + 1" } ).whenNotMatchedInsert(values = { "date": "updates.date", "eventId": "updates.eventId", "data": "updates.data", "count": "1", "missed_count": "0" } ).whenNotMatchedBySourceUpdate(set = { "missed_count": "events.missed_count + 1" } ).execute()
示例 2:条件和更新表达式为 Spark SQL 函数
from pyspark.sql.functions import * deltaTable.alias("events").merge( source = updatesDF.alias("updates"), condition = expr("events.eventId = updates.eventId") ).whenMatchedUpdate(set = { "data" : col("updates.data"), "count": col("events.count") + 1 } ).whenNotMatchedInsert(values = { "date": col("updates.date"), "eventId": col("updates.eventId"), "data": col("updates.data"), "count": lit("1"), "missed_count": lit("0") } ).whenNotMatchedBySourceUpdate(set = { "missed_count": col("events.missed_count") + 1 } ).execute()
0.4 版新增。
-
whenMatchedUpdate
(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder¶ 根据
set
定义的规则更新匹配的表行。如果指定了condition
,则该行必须评估为 true 才能更新。有关完整的用法详细信息,请参见
DeltaMergeBuilder
。- 参数
condition (str 或 pyspark.sql.Column) – 更新的可选条件
set (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。
- 返回
此构建器
0.4 版新增。
-
whenMatchedUpdateAll
(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder¶ 使用源行中相应列的值更新匹配表行的所有列。如果指定了
condition
,则新行必须为 true 才能更新。有关完整的用法详细信息,请参见
DeltaMergeBuilder
。- 参数
condition (str 或 pyspark.sql.Column) – 插入的可选条件
- 返回
此构建器
0.4 版新增。
-
whenMatchedDelete
(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder¶ 仅当给定
condition
(如果指定)对匹配行成立时,才从表中删除匹配行。有关完整的用法详细信息,请参见
DeltaMergeBuilder
。- 参数
condition (str 或 pyspark.sql.Column) – 删除的可选条件
- 返回
此构建器
0.4 版新增。
-
whenNotMatchedInsert
(condition: Union[pyspark.sql.column.Column, str, None] = None, values: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder¶ 根据
values
定义的规则向目标表插入新行。如果指定了condition
,则新行必须评估为 true 才能插入。有关完整的用法详细信息,请参见
DeltaMergeBuilder
。- 参数
condition (str 或 pyspark.sql.Column) – 插入的可选条件
values (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。
- 返回
此构建器
0.4 版新增。
-
whenNotMatchedInsertAll
(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder¶ 通过将目标列分配给源行中相应列的值来插入新的目标 Delta 表行。如果指定了
condition
,则新行必须评估为 true 才能插入。有关完整的用法详细信息,请参见
DeltaMergeBuilder
。- 参数
condition (str 或 pyspark.sql.Column) – 插入的可选条件
- 返回
此构建器
0.4 版新增。
-
whenNotMatchedBySourceUpdate
(condition: Union[pyspark.sql.column.Column, str, None] = None, set: Optional[Dict[str, Union[str, pyspark.sql.column.Column]]] = None) → delta.tables.DeltaMergeBuilder¶ 根据
set
定义的规则更新源中没有匹配项的目标行。如果指定了condition
,则该行必须评估为 true 才能更新。有关完整的用法详细信息,请参见
DeltaMergeBuilder
。- 参数
condition (str 或 pyspark.sql.Column) – 更新的可选条件
set (键为 str,值为 str 或 pyspark.sql.Column 的字典) – 定义设置需要更新的列的值的规则。注意:此参数是必需的。存在默认值 None 以允许跨语言以相同顺序的定位参数。
- 返回
此构建器
2.3 版新增。
-
whenNotMatchedBySourceDelete
(condition: Union[pyspark.sql.column.Column, str, None] = None) → delta.tables.DeltaMergeBuilder¶ 仅当给定
condition
(如果指定)对目标行成立时,才从表中删除源中没有匹配项的目标行。有关完整的用法详细信息,请参见
DeltaMergeBuilder
。- 参数
condition (str 或 pyspark.sql.Column) – 删除的可选条件
- 返回
此构建器
2.3 版新增。
-
execute
() → None¶ 根据构建的匹配和不匹配操作执行合并操作。
有关完整的用法详细信息,请参见
DeltaMergeBuilder
。0.4 版新增。
-
class
delta.tables.
DeltaTableBuilder
(spark: pyspark.sql.session.SparkSession, jbuilder: JavaObject)¶ 构建器,用于指定如何创建/替换 Delta 表。您必须在执行构建器之前指定表名或路径。您可以指定表列、分区列、数据位置、表注释和属性,以及如何创建/替换 Delta 表。
执行构建器后,将返回一个
DeltaTable
对象。使用
delta.tables.DeltaTable.create()
、delta.tables.DeltaTable.createIfNotExists()
、delta.tables.DeltaTable.replace()
、delta.tables.DeltaTable.createOrReplace()
创建此类的对象。示例 1:使用表名创建具有独立列的 Delta 表
deltaTable = DeltaTable.create(sparkSession) .tableName("testTable") .addColumn("c1", dataType = "INT", nullable = False) .addColumn("c2", dataType = IntegerType(), generatedAlwaysAs = "c1 + 1") .partitionedBy("c1") .execute()
示例 2:使用位置替换具有现有列的 Delta 表
df = spark.createDataFrame([('a', 1), ('b', 2), ('c', 3)], ["key", "value"]) deltaTable = DeltaTable.replace(sparkSession) .tableName("testTable") .addColumns(df.schema) .execute()
1.0 版新增。
注意
正在演变
-
tableName
(identifier: str) → delta.tables.DeltaTableBuilder¶ 指定表名。可选地用数据库名限定 [database_name.] table_name。
- 参数
identifier (str) – 表名
- 返回
此构建器
注意
正在演变
1.0 版新增。
-
location
(location: str) → delta.tables.DeltaTableBuilder¶ 指定存储表数据的目录路径,可以是分布式存储上的路径。
- 参数
location (str) – 数据存储位置
- 返回
此构建器
注意
正在演变
1.0 版新增。
-
comment
(comment: str) → delta.tables.DeltaTableBuilder¶ 描述表的注释。
- 参数
comment (str) – 表注释
- 返回
此构建器
注意
正在演变
1.0 版新增。
-
addColumn
(colName: str, dataType: Union[str, pyspark.sql.types.DataType], nullable: bool = True, generatedAlwaysAs: Optional[str] = None, comment: Optional[str] = None) → delta.tables.DeltaTableBuilder¶ 指定表中的列
- 参数
colName (str) – 列名
dataType (str 或 pyspark.sql.types.DataType) – 列数据类型
nullable (bool) – 列是否可为空
generatedAlwaysAs (str) – 如果列始终作为其他列的函数生成,则为 SQL 表达式。有关生成列的详细信息,请参阅在线文档。
comment (str) – 列注释
- 返回
此构建器
注意
正在演变
1.0 版新增。
-
addColumns
(cols: Union[pyspark.sql.types.StructType, List[pyspark.sql.types.StructField]]) → delta.tables.DeltaTableBuilder¶ 使用现有模式指定表中的列
- 参数
cols (pyspark.sql.types.StructType 或 pyspark.sql.types.StructType 列表。) – 现有模式中的列
- 返回
此构建器
注意
正在演变
1.0 版新增。
-
partitionedBy
(*cols) → delta.tables.DeltaTableBuilder¶ 指定用于分区的列
- 参数
cols (str 或 列名列表) – 分区列
- 返回
此构建器
注意
正在演变
1.0 版新增。
-
property
(key: str, value: str) → delta.tables.DeltaTableBuilder¶ 指定表属性
- 参数
key – 表属性键
- 返回
此构建器
注意
正在演变
1.0 版新增。
-
execute
() → delta.tables.DeltaTable¶ 执行表创建。
- 返回类型
注意
正在演变
1.0 版新增。
-
-
class
delta.tables.
DeltaOptimizeBuilder
(spark: pyspark.sql.session.SparkSession, jbuilder: JavaObject)¶ 用于构建 OPTIMIZE 命令并执行的构建器类。
使用
delta.tables.DeltaTable.optimize()
创建此类的实例。2.0.0 版新增。
-
where
(partitionFilter: str) → delta.tables.DeltaOptimizeBuilder¶ 在此优化命令构建器上应用分区过滤器,以限制在选定分区上的操作。
- 参数
partitionFilter (str) – 要应用的分区过滤器
- 返回
应用了分区过滤器的 DeltaOptimizeBuilder
- 返回类型
2.0 版新增。
-
executeCompaction
() → pyspark.sql.dataframe.DataFrame¶ 压缩选定分区中的小文件。
- 返回
包含 OPTIMIZE 执行指标的 DataFrame
- 返回类型
pyspark.sql.DataFrame
2.0 版新增。
-
executeZOrderBy
(*cols) → pyspark.sql.dataframe.DataFrame¶ 使用给定列对选定分区中的数据进行 Z-Order 排序。
- 参数
cols (str 或 列名列表) – Z-Order 列
- 返回
包含 OPTIMIZE 执行指标的 DataFrame
- 返回类型
pyspark.sql.DataFrame
2.0 版新增。
-
异常¶
-
exception
delta.exceptions.
DeltaConcurrentModificationException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 所有 Delta 提交冲突异常的基本类。
1.0 版新增。
注意
正在演变
-
exception
delta.exceptions.
ConcurrentWriteException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 当并发事务在当前事务读取表后写入数据时抛出。
1.0 版新增。
注意
正在演变
-
exception
delta.exceptions.
MetadataChangedException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 当 Delta 表的元数据在读取时间和提交时间之间发生更改时抛出。
1.0 版新增。
注意
正在演变
-
exception
delta.exceptions.
ProtocolChangedException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 当协议版本在读取时间和提交时间之间发生更改时抛出。
1.0 版新增。
注意
正在演变
-
exception
delta.exceptions.
ConcurrentAppendException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 当添加的文件将被当前事务读取时抛出。
1.0 版新增。
注意
正在演变
-
exception
delta.exceptions.
ConcurrentDeleteReadException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 当当前事务读取被并发事务删除的数据时抛出。
1.0 版新增。
注意
正在演变
-
exception
delta.exceptions.
ConcurrentDeleteDeleteException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 当当前事务删除被并发事务删除的数据时抛出。
1.0 版新增。
注意
正在演变
-
exception
delta.exceptions.
ConcurrentTransactionException
(desc: Optional[str] = None, stackTrace: Optional[str] = None, cause: Optional[py4j.protocol.Py4JJavaError] = None, origin: Optional[py4j.protocol.Py4JJavaError] = None)¶ 当并发事务都尝试更新相同的幂等事务时抛出。
1.0 版新增。
注意
正在演变
其他¶
-
delta.pip_utils.
configure_spark_with_delta_pip
(spark_session_builder: pyspark.sql.session.SparkSession.Builder, extra_packages: Optional[List[str]] = None) → pyspark.sql.session.SparkSession.Builder¶ 实用函数,用于配置 SparkSession 构建器,使其生成的 SparkSession 将自动从 Maven 下载所需的 Delta Lake JAR。当您想要
使用 pip 在本地安装 Delta Lake,以及
直接使用 Delta Lake + Pyspark 执行 Python 代码,即不使用 spark-submit –packages io.delta:… 或 pyspark –packages io.delta:…。
builder = SparkSession.builder .master(“local[*]”) .appName(“test”)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
如果您想添加更多包,请使用 extra_packages 参数。
builder = SparkSession.builder .master(“local[*]”) .appName(“test”) my_packages = [“org.apache.spark:spark-sql-kafka-0-10_2.12:x.y.z”] spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
- 参数
spark_session_builder – 用于配置和创建 SparkSession 的 SparkSession.Builder 对象。
extra_packages – 除了 Delta Lake 之外,设置要添加到 Spark 会话的其他包。
- 返回
已更新的 SparkSession.Builder 对象
1.0 版新增。
注意
正在演变