AWS Redshift Spectrum 连接器
AWS Redshift Spectrum 可以使用**清单文件**读取 Delta 表。清单文件是一个文本文件,其中包含查询 Delta 表所需读取的数据文件列表。本文介绍了如何使用清单文件设置 AWS Redshift Spectrum 到 Delta Lake 的集成并查询 Delta 表。
设置 AWS Redshift Spectrum 到 Delta Lake 的集成并查询 Delta 表
标题为“设置 AWS Redshift Spectrum 到 Delta Lake 的集成并查询 Delta 表”的部分您可以通过以下步骤设置 AWS Redshift Spectrum 到 Delta Lake 的集成。
步骤 1:使用 Apache Spark 生成 Delta 表的清单
标题为“步骤 1:使用 Apache Spark 生成 Delta 表的清单文件”的部分在位于 `
GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);deltaTable.generate("symlink_format_manifest");
deltaTable = DeltaTable.forPath(<path-to-delta-table>)deltaTable.generate("symlink_format_manifest")
详情请参阅生成清单文件。
`generate` 操作会在 `
第 2 步:配置 AWS Redshift Spectrum 以读取生成的清单文件
标题为“步骤 2:配置 AWS Redshift Spectrum 以读取生成的清单文件”的部分在您的 AWS Redshift Spectrum 环境中运行以下命令。
- 在 AWS Redshift Spectrum 中定义一个新的外部表,使用格式 `SymlinkTextInputFormat` 和清单位置 `
/_symlink_format_manifest/`。
CREATE EXTERNAL TABLE mytable ([(col_name1 col_datatype1, ...)])[PARTITIONED BY (col_name2 col_datatype2, ...)]ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION '<path-to-delta-table>/_symlink_format_manifest/' -- location of the generated manifest
`SymlinkTextInputFormat` 配置 AWS Redshift Spectrum,使其通过读取清单文件而不是使用目录列表来查找数据文件,从而为 `mytable` 计算文件拆分。将 `mytable` 替换为外部表的名称,将 `
- 您不能在 Apache Spark 中使用此表定义;它只能由 AWS Redshift Spectrum 使用。
-
如果 Delta 表已分区,则必须将分区显式添加到 AWS Redshift Spectrum 表中。这是必需的,因为分区表的清单本身以与表相同的目录结构进行分区。
- 对于表中的每个分区,请在 AWS Redshift Spectrum 中运行以下命令,可以直接在 AWS Redshift Spectrum 中运行,也可以使用 AWS CLI 或数据 API。
ALTER TABLE mytable.redshiftdeltatable ADD IF NOT EXISTS PARTITION (col_name=col_value) LOCATION '<path-to-delta-table>/_symlink_format_manifest/col_name=col_value'
这些步骤将为您提供 Delta 表的一致视图。
步骤 3:更新清单
标题为“步骤 3:更新清单文件”的部分当 Delta 表中的数据更新时,您必须使用以下任一方法重新生成清单文件:
-
显式更新:所有数据更新后,您可以运行
generate
操作来更新清单。 -
自动更新:您可以配置 Delta 表,以便对表的所有写入操作都会自动更新清单。要启用此自动模式,请使用以下 SQL 命令设置相应的表属性。
ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
要禁用此自动模式,请将此属性设置为 `false`。
是否自动或显式更新取决于 Delta 表上写入操作的并发性质和所需的数据一致性。例如,如果启用自动模式,并发写入操作会导致清单文件的并发覆盖。在这种无序写入的情况下,写入操作完成后,清单文件不保证指向表的最新版本。因此,如果预期有并发写入并且您希望避免过时的清单文件,您应该考虑在预期写入操作完成后显式更新清单文件。
此外,如果您的表已分区,则必须按照前面步骤中描述的相同过程添加任何新分区或删除已删除的分区。
AWS Redshift Spectrum 集成在其行为方面存在已知限制。
数据一致性
标题为“数据一致性”的部分每当 Delta Lake 生成更新的清单文件时,它都会原子地覆盖现有的清单文件。因此,AWS Redshift Spectrum 将始终看到数据文件的一致视图;它将看到所有旧版本文件或所有新版本文件。但是,一致性保证的粒度取决于表是否已分区。
- **未分区表**:所有文件名都写入一个清单文件中,该文件是原子更新的。在这种情况下,AWS Redshift Spectrum 将看到完整的表快照一致性。
- **分区表**:清单文件以与原始 Delta 表相同的 Hive 分区式目录结构进行分区。这意味着每个分区都是原子更新的,AWS Redshift Spectrum 将看到每个分区的一致视图,但不是跨分区的一致视图。此外,由于所有分区的所有清单文件不能同时更新,并发生成清单文件的尝试可能导致不同分区具有不同版本的清单文件。虽然数据更改下的这种一致性保证弱于使用 Spark 读取 Delta 表,但它仍然强于 Parquet 等格式,因为它们不提供分区级别的一致性。
根据您用于 Delta 表的存储系统,当 AWS Redshift Spectrum 在清单文件正在重写时并发查询清单文件时,可能会得到不正确的结果。在缺少原子文件覆盖的文件系统实现中,清单文件可能会暂时不可用。因此,如果清单文件的更新可能与来自 AWS Redshift Spectrum 的查询同时发生,请谨慎使用清单文件。
这是一个实验性集成,其性能和可扩展性尚未经过测试。
Delta Lake 支持 Schema 演变,并且 Delta 表上的查询会自动使用最新的 Schema,无论 Hive metastore 中表中定义的 Schema 如何。但是,AWS Redshift Spectrum 使用其表定义中定义的 Schema,并且在表定义更新为新 Schema 之前,不会使用更新后的 Schema 进行查询。