Snowflake 连接器
请访问 Snowflake Delta Lake 支持文档以使用该连接器。
Snowflake 可以使用“清单文件”读取 Delta 表,清单文件是一个文本文件,其中包含查询 Delta 表所需读取的数据文件列表。本文介绍了如何使用清单文件设置 Delta Lake 到 Snowflake 的集成并查询 Delta 表。
设置 Delta Lake 到 Snowflake 的集成并查询 Delta 表
标题为“设置 Delta Lake 到 Snowflake 的集成并查询 Delta 表”的部分您可以通过以下步骤设置 Delta Lake 到 Snowflake 的集成。
步骤 1:使用 Apache Spark 生成 Delta 表的清单
标题为“步骤 1:使用 Apache Spark 生成 Delta 表的清单”的部分在 Delta 表位于 <path-to-delta-table>
的位置上运行 generate
操作
GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);deltaTable.generate("symlink_format_manifest");
deltaTable = DeltaTable.forPath(<path-to-delta-table>)deltaTable.generate("symlink_format_manifest")
有关详细信息,请参阅生成清单文件。
generate
操作会在 <path-to-delta-table>/_symlink_format_manifest/
生成清单文件。换句话说,此目录中的文件包含应读取的数据文件(即 Parquet 文件)的名称,以便读取 Delta 表的快照。
步骤 2:配置 Snowflake 以读取生成的清单
标题为“步骤 2:配置 Snowflake 以读取生成的清单”的部分在您的 Snowflake 环境中运行以下命令。
在清单文件上定义一个外部表
标题为“在清单文件上定义一个外部表”的部分要在 Snowflake 中定义一个外部表,您必须首先定义一个外部阶段 my_staged_table
,该阶段指向 Delta 表。在 Snowflake 中,运行以下命令。
create or replace stage my_staged_table url='<path-to-delta-table>'
将 <path-to-delta-table>
替换为 Delta 表的完整路径。使用此阶段,您可以定义一个表 delta_manifest_table
,该表按如下方式读取清单文件中指定的文件名
VARCHAR AS split_part(VALUE:c1, '/', -1) ) WITH LOCATION =@my_staged_table/_symlink_format_manifest/ FILE_FORMAT = (TYPE = CSV)PATTERN = '.*[/]manifest' AUTO_REFRESH = true; ```</TabItem></Tabs>
<Aside type="note">In this query:
- The location is the manifest subdirectory.- The `filename` column contains the name of the files (not the full path) defined in the manifest.</Aside>
#### Define an external table on Parquet files
You can define a table `my_parquet_data_table` that reads all the Parquet files in the Delta table.
<Tabs><TabItem label="SQL">```sqlCREATE OR REPLACE EXTERNAL TABLE my_parquet_data_table( id INT AS (VALUE:id::INT), part INT AS (VALUE:part::INT), ..., parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1))WITH LOCATION = @my_staged_table/FILE_FORMAT = (TYPE = PARQUET)PATTERN = '.*[/]part-[^/]*[.]parquet'AUTO_REFRESH = true;
如果您的 Delta 表已分区,则您必须在表定义中明确提取分区值。例如,如果表按名为 part
的单个整数列分区,则可以按如下方式提取值
CREATE OR REPLACE EXTERNAL TABLE my_parquet_data_partitioned_table( id INT AS (VALUE:id::INT), part INT AS ( nullif( regexp_replace(metadata$filename, '.*part\\=(.*)\\/.*', '\\1'), '__HIVE_DEFAULT_PARTITION__' )::INT ), ..., parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1))WITH LOCATION = @my_staged_partitioned_table/FILE_FORMAT = (TYPE = PARQUET)PATTERN = '.*[/]part-[^/]*[.]parquet'AUTO_REFRESH = true;
正则表达式用于提取列 part
的分区值。
将 Delta 表查询为 Parquet 表将产生不正确的结果,因为此查询将读取此表中的所有 Parquet 文件,而不仅仅是定义表一致快照的文件。您可以使用清单表获取一致的快照数据。
定义视图以使用清单表获取 Delta 表的正确内容
标题为“定义视图以使用清单表获取 Delta 表的正确内容”的部分要仅读取属于生成清单中定义的一致快照的行,您可以应用过滤器以仅保留 Parquet 表中来自清单表中定义的文件。
CREATE OR REPLACE VIEW my_delta_table ASSELECT id, part, ...FROM my_parquet_data_tableWHERE parquet_filename IN ( SELECT filename FROM delta-manifest-table);
查询此视图将为您提供 Delta 表的一致视图。
步骤 3:更新清单
标题为“步骤 3:更新清单”的部分当 Delta 表中的数据更新时,您必须使用以下任一方法重新生成清单
-
显式更新:所有数据更新后,您可以运行
generate
操作来更新清单。 -
自动更新:您可以配置 Delta 表,以便对表的所有写入操作都会自动更新清单。要启用此自动模式,请使用以下 SQL 命令设置相应的表属性。
ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
Snowflake 集成在行为上存在已知限制。
数据一致性
标题为“数据一致性”的部分每当 Delta Lake 生成更新的清单时,它都会原子地覆盖现有的清单文件。因此,Snowflake 始终会看到数据文件的一致视图;它将看到所有旧版本文件或所有新版本文件。但是,一致性保证的粒度取决于表是否已分区。
- 未分区表:所有文件名都写入一个清单文件,该文件以原子方式更新。在这种情况下,Snowflake 将看到完整的表快照一致性。
- 分区表:清单文件以与原始 Delta 表相同的 Hive 分区样式目录结构进行分区。这意味着每个分区都是原子更新的,Snowflake 将看到每个分区的一致视图,但看不到跨分区的一致视图。此外,由于所有分区的所有清单无法同时更新,并发尝试生成清单可能导致不同分区具有不同版本的清单。
根据您用于 Delta 表的存储系统,当 Snowflake 在清单文件正在重写时并发查询清单,可能会得到不正确的结果。在缺少原子文件覆盖的文件系统实现中,清单文件可能会暂时不可用。因此,如果清单更新可能与 Snowflake 的查询同时发生,请谨慎使用清单。
这是一个实验性集成,其性能和可伸缩性特征尚未经过测试。
模式演变
标题为“模式演变”的部分Delta Lake 支持模式演变,对 Delta 表的查询会自动使用最新模式,无论 Hive 元存储中表的模式如何。但是,Snowflake 使用其表定义中定义的模式,并且在表定义更新为新模式之前不会使用更新后的模式进行查询。