跳到内容

Snowflake 连接器

请访问 Snowflake Delta Lake 支持文档以使用该连接器。

Snowflake 可以使用“清单文件”读取 Delta 表,清单文件是一个文本文件,其中包含查询 Delta 表所需读取的数据文件列表。本文介绍了如何使用清单文件设置 Delta Lake 到 Snowflake 的集成并查询 Delta 表。

设置 Delta Lake 到 Snowflake 的集成并查询 Delta 表

标题为“设置 Delta Lake 到 Snowflake 的集成并查询 Delta 表”的部分

您可以通过以下步骤设置 Delta Lake 到 Snowflake 的集成。

步骤 1:使用 Apache Spark 生成 Delta 表的清单

标题为“步骤 1:使用 Apache Spark 生成 Delta 表的清单”的部分

在 Delta 表位于 <path-to-delta-table> 的位置上运行 generate 操作

GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`

有关详细信息,请参阅生成清单文件

generate 操作会在 <path-to-delta-table>/_symlink_format_manifest/ 生成清单文件。换句话说,此目录中的文件包含应读取的数据文件(即 Parquet 文件)的名称,以便读取 Delta 表的快照。

步骤 2:配置 Snowflake 以读取生成的清单

标题为“步骤 2:配置 Snowflake 以读取生成的清单”的部分

在您的 Snowflake 环境中运行以下命令。

要在 Snowflake 中定义一个外部表,您必须首先定义一个外部阶段 my_staged_table,该阶段指向 Delta 表。在 Snowflake 中,运行以下命令。

create or replace stage my_staged_table url='<path-to-delta-table>'

<path-to-delta-table> 替换为 Delta 表的完整路径。使用此阶段,您可以定义一个表 delta_manifest_table,该表按如下方式读取清单文件中指定的文件名

VARCHAR AS split_part(VALUE:c1, '/', -1) ) WITH LOCATION =
@my_staged_table/_symlink_format_manifest/ FILE_FORMAT = (TYPE = CSV)
PATTERN = '.*[/]manifest' AUTO_REFRESH = true; ```
</TabItem>
</Tabs>
<Aside type="note">
In this query:
- The location is the manifest subdirectory.
- The `filename` column contains the name of the files (not the full path) defined in the manifest.
</Aside>
#### Define an external table on Parquet files
You can define a table `my_parquet_data_table` that reads all the Parquet files in the Delta table.
<Tabs>
<TabItem label="SQL">
```sql
CREATE OR REPLACE EXTERNAL TABLE my_parquet_data_table(
id INT AS (VALUE:id::INT),
part INT AS (VALUE:part::INT),
...,
parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1)
)
WITH LOCATION = @my_staged_table/
FILE_FORMAT = (TYPE = PARQUET)
PATTERN = '.*[/]part-[^/]*[.]parquet'
AUTO_REFRESH = true;

如果您的 Delta 表已分区,则您必须在表定义中明确提取分区值。例如,如果表按名为 part 的单个整数列分区,则可以按如下方式提取值

CREATE OR REPLACE EXTERNAL TABLE my_parquet_data_partitioned_table(
id INT AS (VALUE:id::INT),
part INT AS (
nullif(
regexp_replace(metadata$filename, '.*part\\=(.*)\\/.*', '\\1'),
'__HIVE_DEFAULT_PARTITION__'
)::INT
),
...,
parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1)
)
WITH LOCATION = @my_staged_partitioned_table/
FILE_FORMAT = (TYPE = PARQUET)
PATTERN = '.*[/]part-[^/]*[.]parquet'
AUTO_REFRESH = true;

正则表达式用于提取列 part 的分区值。

将 Delta 表查询为 Parquet 表将产生不正确的结果,因为此查询将读取此表中的所有 Parquet 文件,而不仅仅是定义表一致快照的文件。您可以使用清单表获取一致的快照数据。

定义视图以使用清单表获取 Delta 表的正确内容

标题为“定义视图以使用清单表获取 Delta 表的正确内容”的部分

要仅读取属于生成清单中定义的一致快照的行,您可以应用过滤器以仅保留 Parquet 表中来自清单表中定义的文件。

CREATE OR REPLACE VIEW my_delta_table AS
SELECT id, part, ...
FROM my_parquet_data_table
WHERE parquet_filename IN (
SELECT filename
FROM delta-manifest-table
);

查询此视图将为您提供 Delta 表的一致视图。

当 Delta 表中的数据更新时,您必须使用以下任一方法重新生成清单

  • 显式更新:所有数据更新后,您可以运行 generate 操作来更新清单。

  • 自动更新:您可以配置 Delta 表,以便对表的所有写入操作都会自动更新清单。要启用此自动模式,请使用以下 SQL 命令设置相应的表属性。

ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)

Snowflake 集成在行为上存在已知限制。

每当 Delta Lake 生成更新的清单时,它都会原子地覆盖现有的清单文件。因此,Snowflake 始终会看到数据文件的一致视图;它将看到所有旧版本文件或所有新版本文件。但是,一致性保证的粒度取决于表是否已分区。

  • 未分区表:所有文件名都写入一个清单文件,该文件以原子方式更新。在这种情况下,Snowflake 将看到完整的表快照一致性。
  • 分区表:清单文件以与原始 Delta 表相同的 Hive 分区样式目录结构进行分区。这意味着每个分区都是原子更新的,Snowflake 将看到每个分区的一致视图,但看不到跨分区的一致视图。此外,由于所有分区的所有清单无法同时更新,并发尝试生成清单可能导致不同分区具有不同版本的清单。

根据您用于 Delta 表的存储系统,当 Snowflake 在清单文件正在重写时并发查询清单,可能会得到不正确的结果。在缺少原子文件覆盖的文件系统实现中,清单文件可能会暂时不可用。因此,如果清单更新可能与 Snowflake 的查询同时发生,请谨慎使用清单。

这是一个实验性集成,其性能和可伸缩性特征尚未经过测试。

Delta Lake 支持模式演变,对 Delta 表的查询会自动使用最新模式,无论 Hive 元存储中表的模式如何。但是,Snowflake 使用其表定义中定义的模式,并且在表定义更新为新模式之前不会使用更新后的模式进行查询。