跳到内容

Delta Kernel

Delta 内核项目是一组用于构建 Delta 连接器的库(JavaRust),这些连接器无需了解 Delta 协议详情即可读取和写入 Delta 表。

您可以使用此库执行以下操作

  • 在单个进程中的单个线程中从小型 Delta 表读取数据。
  • 在单个进程中使用多个线程从大型 Delta 表读取数据。
  • 为分布式处理引擎构建一个复杂的连接器,并读取超大型 Delta 表。
  • 从单个进程或复杂的分布式引擎向 Delta 表插入数据。

以下是一个带有过滤器的简单表扫描示例

Engine myEngine = DefaultEngine.create() ; // define a engine (more details below)
Table myTable = Table.forPath("/delta/table/path"); // define what table to scan
Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine); // define which version of table to scan
Scan myScan = mySnapshot.getScanBuilder(myEngine) // specify the scan details
.withFilters(myEngine, scanFilter)
.build();
CloseableIterator<ColumnarBatch> physicalData = // read the Parquet data files
.. read from Parquet data files ...
Scan.transformPhysicalData(...) // returns the table data

上述示例程序的完整版本以及更多从 Delta 表读取和写入的示例可在此处找到:此处

请注意,有两个公共 API 集用于构建连接器。

  • 表 API - 像 TableSnapshot 这样的接口允许您读取(很快也将允许写入)Delta 表
  • 引擎 API - Engine 接口允许您在内核中插入连接器特定的优化,以用于计算密集型组件。例如,Delta 内核通过 DefaultEngine 提供一个*默认* Parquet 文件读取器,但您可以选择用自定义的 Engine 实现替换该默认读取器,该实现具有更快的 Parquet 读取器,适用于您的连接器/处理引擎。

Delta 内核是一个操作 Delta 表的库。具体来说,它提供了简单而狭窄的 API,用于读取和写入 Delta 表,而无需了解 Delta 协议的详细信息。您可以使用此库执行以下操作

  • 从您的应用程序读取和写入 Delta 表。
  • 为分布式引擎(如 Apache Spark™Apache FlinkTrino)构建连接器,用于读取或写入大规模 Delta 表。

您需要 io.delta:delta-kernel-apiio.delta:delta-kernel-defaults 依赖项。以下是 Maven pom 文件依赖项列表的示例。

delta-kernel-api 模块包含内核的核心,它抽象出 Delta 协议以实现 Delta 表的读写。它利用 Engine 接口,该接口由连接器传递给内核 API,用于执行繁重操作,例如读写 Parquet 或 JSON 文件、评估表达式或文件系统操作(例如列出 Delta 日志目录的内容)等。内核在 delta-kernel-defaults 模块中提供了 Engine 的默认实现。连接器可以实现自己的 Engine 版本,以利用其原生实现的 Engine 提供的功能。例如:连接器可以使用其 Parquet 读取器,而不是使用 DefaultEngine 中的读取器。有关此内容的更多详细信息,请参见稍后

<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-api</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
</dependencies>

如果您的连接器不使用内核提供的 DefaultEngine,则可以跳过上述列表中的 delta-kernel-defaults 依赖项。

在本节中,我们将介绍如何构建一个非常简单的单进程 Delta 连接器,该连接器可以使用 Delta 内核提供的默认 Engine 实现来读取 Delta 表。

您可以自行在项目中编写此代码,也可以使用 Delta 代码仓库中提供的示例

主要入口点是 io.delta.kernel.Table,它是 Delta 表的程序化表示。假设您在目录 myTablePath 中有一个 Delta 表。您可以按如下方式创建 Table 对象

import io.delta.kernel.*;
import io.delta.kernel.defaults.*;
import org.apache.hadoop.conf.Configuration;
String myTablePath = <my-table-path>; // fully qualified table path. Ex: file:/user/tables/myTable
Configuration hadoopConf = new Configuration();
Engine myEngine = DefaultEngine.create(hadoopConf);
Table myTable = Table.forPath(myEngine, myTablePath);

请注意我们创建的默认 Engine 来引导 myTable 对象。此对象允许您插入自己的库,用于计算密集型操作,例如 Parquet 文件读取、JSON 解析等。您现在可以忽略它。我们稍后在讨论如何为分布式处理引擎构建更复杂的连接器时将详细讨论这一点。

从这个 myTable 对象,您可以创建一个 Snapshot 对象,它表示表中特定版本的一致状态(又名快照一致性)。

Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);

现在我们有了一个表的一致快照视图,我们可以查询更多关于表的详细信息。例如,您可以获取此快照的版本和模式。

long version = mySnapshot.getVersion(myEngine);
StructType tableSchema = mySnapshot.getSchema(myEngine);

接下来,要读取表数据,我们必须*构建*一个 Scan 对象。为了构建一个 Scan 对象,请创建一个 ScanBuilder 对象,它可选地允许选择要读取的列子集或设置查询过滤器。目前,请忽略这些可选设置。

Scan myScan = mySnapshot.getScanBuilder(myEngine).build()
// Common information about scanning for all data files to read.
Row scanState = myScan.getScanState(myEngine)
// Information about the list of scan files to read
CloseableIterator<FilteredColumnarBatch> scanFiles = myScan.getScanFiles(myEngine)

这个 Scan 对象包含了开始读取表所需的所有元数据。从表中的文件读取数据需要两个关键信息。

  • myScan.getScanFiles(Engine):将扫描文件作为列式批处理返回(表示为 FilteredColumnarBatch 的迭代器,稍后会详细介绍),其中批处理中的每个选定行都包含一个文件的信息,该文件包含表数据。
  • myScan.getScanState(Engine):返回读取任何文件所需的快照级信息。请注意,这是一行,并且对所有扫描文件都通用。

对于每个扫描文件,必须从文件中读取物理数据。要读取的列在扫描文件状态中指定。读取物理数据后,您必须调用 ScanFile.transformPhysicalData(…),并传入扫描状态和从扫描文件读取的物理数据。此 API 负责将物理数据(例如添加分区列)转换为表的逻辑数据。以下是在单个线程中读取所有表数据的示例。

CloserableIterator<FilteredColumnarBatch> fileIter = scanObject.getScanFiles(myEngine);
Row scanStateRow = scanObject.getScanState(myEngine);
while(fileIter.hasNext()) {
FilteredColumnarBatch scanFileColumnarBatch = fileIter.next();
// Get the physical read schema of columns to read from the Parquet data files
StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(engine, scanStateRow);
try (CloseableIterator<Row> scanFileRows = scanFileColumnarBatch.getRows()) {
while (scanFileRows.hasNext()) {
Row scanFileRow = scanFileRows.next();
// From the scan file row, extract the file path, size and modification time metadata
// needed to read the file.
FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
// Open the scan file which is a Parquet file using connector's own
// Parquet reader or default Parquet reader provided by the Kernel (which
// is used in this example).
CloseableIterator<ColumnarBatch> physicalDataIter =
engine.getParquetHandler().readParquetFiles(
singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty() /* optional predicate the connector can apply to filter data from the reader */
);
// Now the physical data read from the Parquet data file is converted to a table
// logical data. Logical data may include the addition of partition columns and/or
// subset of rows deleted
try (
CloseableIterator<FilteredColumnarBatch> transformedData =
Scan.transformPhysicalData(
engine,
scanStateRow,
scanFileRow,
physicalDataIter)) {
while (transformedData.hasNext()) {
FilteredColumnarBatch logicalData = transformedData.next();
ColumnarBatch dataBatch = logicalData.getData();
// Not all rows in `dataBatch` are in the selected output.
// An optional selection vector determines whether a row with a
// specific row index is in the final output or not.
Optional<ColumnVector> selectionVector = dataReadResult.getSelectionVector();
// access the data for the column at ordinal 0
ColumnVector column0 = dataBatch.getColumnVector(0);
for (int rowIndex = 0; rowIndex < column0.getSize(); rowIndex++) {
// check if the row is selected or not
if (!selectionVector.isPresent() || // there is no selection vector, all records are selected
(!selectionVector.get().isNullAt(rowId) && selectionVector.get().getBoolean(rowId))) {
// Assuming the column type is String.
// If it is a different type, call the relevant function on the `ColumnVector`
System.out.println(column0.getString(rowIndex));
}
}
// access the data for column at ordinal 1
ColumnVector column1 = dataBatch.getColumnVector(1);
for (int rowIndex = 0; rowIndex < column1.getSize(); rowIndex++) {
// check if the row is selected or not
if (!selectionVector.isPresent() || // there is no selection vector, all records are selected
(!selectionVector.get().isNullAt(rowId) && selectionVector.get().getBoolean(rowId))) {
// Assuming the column type is Long.
// If it is a different type, call the relevant function on the `ColumnVector`
System.out.println(column1.getLong(rowIndex));
}
}
// .. more ..
}
}
}
}
}

此处提供了几个在单个进程中读取 Delta 表的工作示例:此处

我们已经探讨了如何进行全表扫描。然而,使用 Delta 格式的真正优势在于您可以使用查询过滤器跳过文件。为了实现这一点,Delta Kernel 提供了一个表达式框架,用于编码您的过滤器并将其提供给 Delta Kernel,以便在扫描文件生成期间跳过文件。例如,假设您的表按 columnX 分区,您只想查询分区 columnX=1。您可以生成表达式并使用它来构建扫描,如下所示

import io.delta.kernel.expressions.*;
import io.delta.kernel.defaults.engine.*;
Engine myEngine = DefaultEngine.create(new Configuration());
Predicate filter = new Predicate(
"=",
Arrays.asList(new Column("columnX"), Literal.ofInt(1)));
Scan myFilteredScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filter)
.build()
// Subset of the given filter that is not guaranteed to be satisfied by
// Delta Kernel when it returns data. This filter is used by Delta Kernel
// to do data skipping as much as possible. The connector should use this filter
// on top of the data returned by Delta Kernel in order for further filtering.
Optional<Predicate> remainingFilter = myFilteredScan.getRemainingFilter();

myFilteredScan.getScanFiles(myEngine) 返回的扫描文件将只包含所需分区的文件的行。同样,您可以为非分区列提供过滤器,如果表中的数据通过这些列很好地聚类,则 Delta Kernel 将能够尽可能多地跳过文件。

在本节中,我们将介绍如何构建一个 Delta 连接器,该连接器可以使用 Delta Kernel 提供的默认 Engine 实现来创建 Delta 表。

您可以自行在项目中编写此代码,也可以使用 Delta 代码仓库中提供的示例

主要入口点是 io.delta.kernel.Table,它是 Delta 表的程序化表示。假设您想在目录 myTablePath 创建 Delta 表。您可以按如下方式创建 Table 对象

package io.delta.kernel.examples;
import io.delta.kernel.*;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterable;
String myTablePath = <my-table-path>;
Configuration hadoopConf = new Configuration();
Engine myEngine = DefaultEngine.create(hadoopConf);
Table myTable = Table.forPath(myEngine, myTablePath);

请注意我们创建的默认 Engine 来引导 myTable 对象。此对象允许您插入自己的库,用于计算密集型操作,例如 Parquet 文件读取、JSON 解析等。您现在可以忽略它。我们稍后在讨论如何为分布式处理引擎构建更复杂的连接器时将详细讨论这一点。

从这个 myTable 对象,您可以创建一个 TransactionBuilder 对象,该对象允许您构建一个 Transaction 对象

TransactionBuilder txnBuilder =
myTable.createTransactionBuilder(
myEngine,
"Examples", /* engineInfo - connector can add its own identifier which is noted in the Delta Log */
Operation.CREATE_TABLE /* What is the operation we are trying to perform. This is noted in the Delta Log */
);

现在您有了 TransactionBuilder 对象,您可以设置表的模式和分区列。

StructType mySchema = new StructType()
.add("id", IntegerType.INTEGER)
.add("name", StringType.STRING)
.add("city", StringType.STRING)
.add("salary", DoubleType.DOUBLE);
// Partition columns are optional. Use it only if you are creating a partitioned table.
List<String> myPartitionColumns = Collections.singletonList("city");
// Set the schema of the new table on the transaction builder
txnBuilder = txnBuilder
.withSchema(engine, mySchema);
// Set the partition columns of the new table only if you are creating
// a partitioned table; otherwise, this step can be skipped.
txnBuilder = txnBuilder
.withPartitionColumns(engine, examplePartitionColumns);

TransactionBuilder 允许设置表的其他属性,例如启用某些 Delta 功能或设置幂等写入的标识符。我们将在下一节中介绍这些内容。下一步是使用 TransactionBuilder 对象构建 Transaction

// Build the transaction
Transaction txn = txnBuilder.build(engine);

Transaction 对象允许连接器选择性地添加任何数据并最终提交事务。成功的提交确保使用给定模式创建表。在此示例中,我们只是创建一个表,而不作为表的一部分添加任何数据。

// Commit the transaction.
// As we are just creating the table and not adding any data, the `dataActions` is empty.
TransactionCommitResult commitResult =
txn.commit(
engine,
CloseableIterable.emptyIterable() /* dataActions */
);

TransactionCommitResult 包含事务提交的版本以及表是否已准备好进行检查点。由于我们正在创建表,版本将为 0。我们稍后将讨论什么是检查点以及表准备好进行检查点意味着什么。

此处提供了一些用于创建分区和非分区 Delta 表的工作示例:此处

在本节中,我们将介绍如何构建一个 Delta 连接器,该连接器可以使用 Delta Kernel 提供的默认 Engine 实现来创建 Delta 表并向表中插入数据(类似于 SQL 中的 CREATE TABLE <table> AS <query> 构造)。

您可以自行在项目中编写此代码,也可以使用 Delta 代码仓库中提供的示例

第一步是构建一个 Transaction。下面是相关的代码。有关代码中每个步骤的含义的更多详细信息,请阅读创建表部分。

package io.delta.kernel.examples;
import io.delta.kernel.*;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterable;
String myTablePath = <my-table-path>;
Configuration hadoopConf = new Configuration();
Engine myEngine = DefaultEngine.create(hadoopConf);
Table myTable = Table.forPath(myEngine, myTablePath);
StructType mySchema = new StructType()
.add("id", IntegerType.INTEGER)
.add("name", StringType.STRING)
.add("city", StringType.STRING)
.add("salary", DoubleType.DOUBLE);
// Partition columns are optional. Use it only if you are creating a partitioned table.
List<String> myPartitionColumns = Collections.singletonList("city");
TransactionBuilder txnBuilder =
myTable.createTransactionBuilder(
myEngine,
"Examples", /* engineInfo - connector can add its own identifier which is noted in the Delta Log */
Operation.WRITE /* What is the operation we are trying to perform? This is noted in the Delta Log */
);
// Set the schema of the new table on the transaction builder
txnBuilder = txnBuilder
.withSchema(engine, mySchema);
// Set the partition columns of the new table only if you are creating
// a partitioned table; otherwise, this step can be skipped.
txnBuilder = txnBuilder
.withPartitionColumns(engine, examplePartitionColumns);
// Build the transaction
Transaction txn = txnBuilder.build(engine);

现在我们有了 Transaction 对象,下一步是生成符合表模式并根据表分区进行分区的数据。

StructType dataSchema = txn.getSchema(engine)
// Optional for un-partitioned tables
List<String> partitionColumnNames = txn.getPartitionColumns(engine)

使用数据模式和分区列名称,连接器可以规划查询并生成数据。在实际拥有数据以写入表的任务中,连接器可以要求内核将给定表模式中的数据转换为实际可以写入 Parquet 数据文件的物理数据。对于分区表,数据需要首先按分区列进行分区,然后连接器应要求内核为每个分区单独转换数据。需要分区步骤,因为 Delta 表中的任何给定数据文件都包含属于一个分区的数据。

获取事务的状态。事务状态包含有关如何将表模式中的数据转换为需要写入的物理数据的信息。转换取决于表的协议和特性。

Row txnState = txn.getTransactionState(engine);

准备数据。

// The data generated by the connector to write into a table
CloseableIterator<FilteredColumnarBatch> data = ...
// Create partition value map
Map<String, Literal> partitionValues =
Collections.singletonMap(
"city", // partition column name
// partition value. Depending upon the partition column type, the
// partition value should be created. In this example, the partition
// column is of type StringType, so we are creating a string literal.
Literal.ofString(city)
);

连接器数据作为 FilteredColumnarBatch 的迭代器传递。每个 FilteredColumnarBatch 都包含一个 ColumnarBatch,它实际包含列式访问格式的数据,以及一个可选的选择向量,允许连接器指定从 ColumnarBatch 中写入表哪些行。

分区值作为分区列名称到分区值的映射传递。对于未分区表,该映射应为空,因为它没有分区列。

// Transform the logical data to physical data that needs to be written to the Parquet
// files
CloseableIterator<FilteredColumnarBatch> physicalData =
Transaction.transformLogicalData(engine, txnState, data, partitionValues);

上述代码将给定分区的数据转换为需要写入 Parquet 数据文件的 FilteredColumnarBatch 迭代器。为了写入数据文件,连接器需要从内核获取 WriteContext,它告诉连接器在哪里写入数据文件以及从每个数据文件收集哪些列的统计信息。

// Get the write context
DataWriteContext writeContext = Transaction.getWriteContext(engine, txnState, partitionValues);

现在,连接器拥有需要写入 Parquet 数据文件的物理数据,以及这些文件应该写入的位置,它可以开始写入数据文件。

CloseableIterator<DataFileStatus> dataFiles = engine.getParquetHandler()
.writeParquetFiles(
writeContext.getTargetDirectory(),
physicalData,
writeContext.getStatisticsColumns()
);

在上述代码中,连接器正在利用 Engine 提供的 ParquetHandler 来写入数据,但连接器可以选择自己的 Parquet 文件写入器来写入数据。另请注意,上述调用的返回值是每个写入的数据文件的 DataFileStatus 的迭代器。它基本上包含文件路径、文件元数据和由 WriteContext.getStatisticsColumns() 指定的列的可选文件级统计信息。

将每个 DataFileStatus 转换为可写入 Delta 表日志的 Delta 日志操作。

CloseableIterator<Row> dataActions =
Transaction.generateAppendActions(engine, txnState, dataFiles, writeContext);

下一步是根据上面生成的所有 Delta 日志操作构造 CloseableIterable。构造 Iterable 的原因是事务提交涉及多次访问 Delta 日志操作列表(为了在对表进行多次写入时解决冲突)。Kernel 提供了一个 实用方法 来创建 CloseableIterable 的内存中版本。此接口还为连接器提供了一个选项,可以实现自定义实现,当内容太大而无法放入内存时,将数据操作溢出到磁盘。

// Create a iterable out of the data actions. If the contents are too big to fit in memory,
// the connector may choose to write the data actions to a temporary file and return an
// iterator that reads from the file.
CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(dataActions);

最后一步是提交事务!

TransactionCommitStatus commitStatus = txn.commit(engine, dataActionsIterable)

TransactionCommitResult 包含事务提交的版本以及表是否已准备好进行检查点。由于我们正在创建表,版本将为 0。我们稍后将讨论什么是检查点以及表准备好进行检查点意味着什么。

此处提供了几个创建分区和非分区 Delta 表并向其插入数据的工作示例:此处

在本节中,我们将介绍如何构建一个 Delta 连接器,该连接器可以使用 Delta Kernel 提供的默认 Engine 实现将数据插入到现有 Delta 表中(类似于 SQL 中的 INSERT INTO <table> <query> 构造)。

您可以自行在项目中编写此代码,也可以使用 Delta 代码仓库中提供的示例。步骤与创建表并插入数据完全相似,只是我们在构建 TransactionBuilder 时不会提供任何模式或分区列。

// Create a `Table` object with the given destination table path
Table table = Table.forPath(engine, tablePath);
// Create a transaction builder to build the transaction
TransactionBuilder txnBuilder =
table.createTransactionBuilder(
engine,
"Examples", /* engineInfo */
Operation.WRITE
);
/ Build the transaction - no need to provide the schema as the table already exists.
Transaction txn = txnBuilder.build(engine);
// Get the transaction state
Row txnState = txn.getTransactionState(engine);
List<Row> dataActions = new ArrayList<>();
// Generate the sample data for three partitions. Process each partition separately.
// This is just an example. In a real-world scenario, the data may come from different
// partitions. Connectors already have the capability to partition by partition values
// before writing to the table
// In the test data `city` is a partition column
for (String city : Arrays.asList("San Francisco", "Campbell", "San Jose")) {
FilteredColumnarBatch batch1 = generatedPartitionedDataBatch(
5 /* offset */, city /* partition value */);
FilteredColumnarBatch batch2 = generatedPartitionedDataBatch(
5 /* offset */, city /* partition value */);
FilteredColumnarBatch batch3 = generatedPartitionedDataBatch(
10 /* offset */, city /* partition value */);
CloseableIterator<FilteredColumnarBatch> data =
toCloseableIterator(Arrays.asList(batch1, batch2, batch3).iterator());
// Create partition value map
Map<String, Literal> partitionValues =
Collections.singletonMap(
"city", // partition column name
// partition value. Depending upon the parition column type, the
// partition value should be created. In this example, the partition
// column is of type StringType, so we are creating a string literal.
Literal.ofString(city));
// First transform the logical data to physical data that needs to be written
// to the Parquet
// files
CloseableIterator<FilteredColumnarBatch> physicalData =
Transaction.transformLogicalData(engine, txnState, data, partitionValues);
// Get the write context
DataWriteContext writeContext =
Transaction.getWriteContext(engine, txnState, partitionValues);
// Now write the physical data to Parquet files
CloseableIterator<DataFileStatus> dataFiles = engine.getParquetHandler()
.writeParquetFiles(
writeContext.getTargetDirectory(),
physicalData,
writeContext.getStatisticsColumns());
// Now convert the data file status to data actions that needs to be written to the Delta
// table log
CloseableIterator<Row> partitionDataActions = Transaction.generateAppendActions(
engine,
txnState,
dataFiles,
writeContext);
// Now add all the partition data actions to the main data actions list. In a
// distributed query engine, the partition data is written to files at tasks on executor
// nodes. The data actions are collected at the driver node and then written to the
// Delta table log using the `Transaction.commit`
while (partitionDataActions.hasNext()) {
dataActions.add(partitionDataActions.next());
}
}
// Create a iterable out of the data actions. If the contents are too big to fit in memory,
// the connector may choose to write the data actions to a temporary file and return an
// iterator that reads from the file.
CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(
toCloseableIterator(dataActions.iterator()));
// Commit the transaction.
TransactionCommitResult commitResult = txn.commit(engine, dataActionsIterable);

幂等写入允许连接器确保属于特定事务版本和应用程序 ID 的数据最多只插入到表中一次。在增量处理系统(例如流系统)中,使用其自己的特定于应用程序的版本跟踪进度需要记录已取得的进度,以避免在写入期间发生故障和重试时重复数据。通过设置事务标识符,Delta 表可以确保具有相同标识符的数据不会多次写入。更多信息请参阅 Delta 协议部分 事务标识符

要使数据追加幂等,请在 TransactionBuilder 上设置事务标识符

// Set the transaction identifiers for idempotent writes
// Delta/Kernel makes sure that there exists only one transaction in the Delta log
// with the given application id and txn version
txnBuilder =
txnBuilder.withTransactionId(
engine,
"my app id", /* application id */
100 /* monotonically increasing txn version with each new data insert */
);

这就是连接器进行幂等盲追加所需做的一切。

检查点是 Delta Log 中的一种优化,旨在更快地构建 Delta 表的状态。它基本上包含创建检查点时表在版本中的状态。Delta Kernel 允许连接器选择性地进行检查点。它针对表上的每几次提交(可配置的表属性)创建。

Transaction.commit 的结果返回一个 TransactionCommitResult,其中包含事务提交的版本以及表是否已准备好进行检查点。创建检查点需要时间,因为它需要构建表的整个状态。如果连接器不想自行进行检查点,而是使用其他更快创建检查点的连接器,则可以跳过检查点步骤。

如果要进行检查点,Table 对象具有一个 API 来检查点表。

TransactionCommitResult commitResult = txn.commit(engine, dataActionsIterable);
if (commitResult.isReadyForCheckpoint()) {
// Checkpoint the table
Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion());
}

与仅在单个进程中读取表的简单应用程序不同,为 Apache Spark™ 和 Trino 等复杂处理引擎构建连接器可能需要相当多的额外工作。例如,要为 SQL 引擎构建连接器,您必须执行以下操作

  • 了解引擎提供的 API 以构建连接器,以及如何使用 Delta Kernel 提供连接器 + 引擎操作 Delta 表所需的信息。
  • 决定使用哪些库来执行计算密集型操作,例如读取 Parquet 文件、解析 JSON、计算表达式等。Delta Kernel 提供了所有扩展点,允许您插入任何库,而无需了解 Delta 协议的所有低级细节。
  • 处理分布式引擎特有的详细信息。例如,
    • Delta 内核提供的 Delta 表元数据的序列化。
    • 将从 Parquet 读取的数据高效地转换为引擎内存处理格式。

在本节中,我们将概述构建连接器所需的步骤。

在上一节中,我们简要介绍了如何读取简单表中的 Engine。这是主要的扩展点,您可以在其中插入计算密集型操作(如读取 Parquet 文件、解析 JSON 等)的实现。对于简单情况,我们使用了一个在大多数情况下都有效的默认实现。但是,为了为复杂的处理引擎构建高性能连接器,您很可能需要使用与您的引擎兼容的库提供自己的实现。因此,在开始构建连接器之前,了解这些要求并计划构建自己的引擎非常重要。

以下是您需要构建连接器以读取 Delta 表的库/功能

  • 执行文件列表和从存储/文件系统读取文件。
  • 读取列式数据中的 Parquet 文件,最好是以内存中列式格式。
  • 解析 JSON 数据
  • 读取 JSON 文件
  • 在内存中的列式批处理上评估表达式

对于这些功能中的每一个,您可以选择构建自己的实现或重用默认实现。

步骤 1:在连接器项目中设置 Delta 内核

标题为“步骤 1:在连接器项目中设置 Delta 内核”的部分

在 Delta Kernel 项目中,您可以选择依赖多个依赖项。

  1. Delta Kernel 核心 API - 这是必备依赖项,它包含所有主要 API,如 Table、Snapshot 和 Scan,您将使用它们访问 Delta 表的元数据和数据。它的依赖项很少,减少了与您的连接器和引擎中的任何依赖项冲突的可能性。它还提供了 Engine 接口,允许您插入计算密集型操作的实现,但它不提供此接口的任何实现。
  2. Delta Kernel 默认 - 这有一个默认实现,名为 DefaultEngine 和额外的依赖项,例如 Hadoop。如果您希望重用此实现的所有或部分内容,则可以选择依赖它。

如上所述,您可以按以下方式导入一个或两个工件

<!-- Must have dependency -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-api</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
<!-- Optional depdendency -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${delta-kernel.version}</version>
</dependency>

在本节中,我们将探讨 Engine 接口,并逐步介绍如何实现您自己的实现,以便您可以插入您的连接器/引擎特定的计算密集型操作、线程模型、资源管理等。

[!重要] 在验证过程中,如果您认为默认 Engine 实现的所有依赖项都可以与您的连接器和引擎一起工作,那么您可以跳过此步骤,直接进入使用默认引擎实现连接器的步骤 3。如果以后您需要为您的连接器定制帮助程序,您可以重新访问此步骤。

Engine 接口结合了许多子接口,每个子接口都设计用于特定目的。以下是子接口的简要概述。有关更详细的视图,请参阅 API 文档 (Java)。

interface Engine {
/**
* Get the connector provided {@link ExpressionHandler}.
* @return An implementation of {@link ExpressionHandler}.
*/
ExpressionHandler getExpressionHandler();
/**
* Get the connector provided {@link JsonHandler}.
* @return An implementation of {@link JsonHandler}.
*/
JsonHandler getJsonHandler();
/**
* Get the connector provided {@link FileSystemClient}.
* @return An implementation of {@link FileSystemClient}.
*/
FileSystemClient getFileSystemClient();
/**
* Get the connector provided {@link ParquetHandler}.
* @return An implementation of {@link ParquetHandler}.
*/
ParquetHandler getParquetHandler();
}

要构建您自己的 Engine 实现,您可以选择使用每个子接口的默认实现,或者完全从头开始构建每个接口。

class MyEngine extends DefaultEngine {
FileSystemClient getFileSystemClient() {
// Build a new implementation from scratch
return new MyFileSystemClient();
}
// For all other sub-clients, use the default implementations provided by the `DefaultEngine`.
}

接下来,我们将逐步介绍如何实现每个接口。

FileSystemClient 接口包含基本的文件系统操作,例如列出目录、将路径解析为完全限定路径以及从文件中读取字节。此接口的实现必须在与 S3、Hadoop 或 ADLS 等存储系统交互时注意以下事项

  • 凭证和权限:连接器必须使用必要的配置和凭证填充其 FileSystemClient,以便客户端从存储系统检索必要的数据。例如,基于 Hadoop 文件系统抽象的实现可以通过 Hadoop 配置传递 S3 凭证。
  • 解密:如果文件系统对象已加密,则实现在返回数据之前必须解密数据。

顾名思义,此接口包含与读写 Parquet 文件相关的所有内容。它的设计使得连接器可以插入各种实现,从简单的单线程读取器到具有预取和高级连接器特定表达式下推的非常先进的多线程读取器。让我们探讨要实现的方法及其相关的保证。

方法 readParquetFiles(CloseableIterator<FileStatus> fileIter, StructType physicalSchema, java.util.Optional<Predicate> predicate)
标题为“方法 readParquetFiles(CloseableIterator<FileStatus> fileIter, StructType physicalSchema, java.util.Optional<Predicate> predicate)”的部分

方法FileStatus 作为输入,其中包含要读取的 Parquet 文件的文件路径、大小等元数据。要从 Parquet 文件读取的列由物理模式定义。要实现此方法,您可能需要首先实现自己的 ColumnarBatchColumnVector,它们用于表示从 Parquet 文件生成的内存中数据。

在识别要读取的列时,请注意物理模式中有多种类型的列(表示为 StructType)。

  • 数据列:预期从 Parquet 文件中读取的列。根据定义列的 StructField 对象,读取 Parquet 文件中与名称或字段 ID 匹配的列。如果列具有字段 ID(存储为 StructField 元数据中的 parquet.field.id),则应使用字段 ID 来匹配 Parquet 文件中的列。否则,应使用列名称进行匹配。
  • 元数据列:这些是特殊列,必须使用有关 Parquet 文件的元数据填充(StructField#isMetadataColumn 告诉 StructType 中的列是否为元数据列)。要了解如何填充此类列,首先将列名与一组标准元数据列名常量进行匹配。例如,
    • StructFileld#isMetadataColumn() 返回 true 且列名为 StructField.METADATA_ROW_INDEX_COLUMN_NAME,则您必须生成一个列向量,其中填充了 Parquet 文件中每行的实际索引(即,不按 Parquet 数据跳过之后返回的可能行子集索引)。

任何实施都必须遵守以下保证。

  • 返回的 ColumnarBatch 的模式必须与物理模式匹配。
    • 如果未找到数据列且 StructField.isNullable = true,则返回一个包含 null 值的 ColumnVector。如果不可为空,则抛出错误。
  • 输出迭代器必须保持与输入迭代器相同的顺序。也就是说,如果 file1 在输入迭代器中位于 file2 之前,则 file1 的列式批次必须在输出迭代器中位于 file2 的列式批次之前。
方法 writeParquetFiles(String directoryPath, CloseableIterator<FilteredColumnarBatch> dataIter, java.util.List<Column> statsColumns)
标题为“方法 writeParquetFiles(String directoryPath, CloseableIterator<FilteredColumnarBatch> dataIter, java.util.List<Column> statsColumns)”的部分

方法将给定数据写入一个或多个 Parquet 文件到给定目录中。数据以 FilteredColumnarBatches 的迭代器形式给出,其中包含一个 ColumnarBatch 和一个可选的选择向量,其中包含 ColumnarBatch 中每行的条目,指示该行是否被选中。 ColumnarBatch 还包含数据的模式。此模式应转换为 Parquet 模式,包括 FieldMetadata 中每个列 StructField 的任何字段 ID。

还有一个参数 statsColumns,它是 Parquet 写入器的一个提示,指示要为每个文件收集哪些列的统计信息。统计信息包括 statsColumns 列表中每列的 minmaxnull_count。统计信息收集是可选的,但如果存在,内核会将其作为 Delta 表提交的一部分持久化。这将帮助读取查询根据查询谓词修剪不需要的数据文件。

对于每个写入的数据文件,调用者期望一个 DataFileStatus 对象。它包含数据文件路径、大小、修改时间以及可选的列统计信息。

方法 writeParquetFileAtomically(String filePath, CloseableIterator<FilteredColumnarBatch> data)

标题为“方法 writeParquetFileAtomically(String filePath, CloseableIterator<FilteredColumnarBatch> data)”的部分

方法将给定的 data 写入 filePath 位置的 Parquet 文件。写入是原子性的,即要么创建一个包含所有给定内容的 Parquet 文件,要么根本不创建 Parquet 文件。它不应该创建一个包含部分内容的文件。

默认实现利用来自 delta-storage 模块的 LogStore 实现来完成原子性。希望实现自己的 ParquetHandler 版本的连接器可以查看默认实现以获取详细信息。

  • 将数据表示为 ColumnVectorColumnarBatch 会对查询性能产生显著影响,最好直接将 Parquet 文件数据读取到引擎原生格式的向量和批次中,以避免潜在的昂贵的内存数据格式转换。围绕引擎原生格式的等效类创建 Kernel ColumnVectorColumnarBatch 包装器。

ExpressionHandler 接口包含处理可能应用于列式数据的表达式所需的所有方法。

方法 getEvaluator(StructType batchSchema, Expression expresion, DataType outputType)
标题为“方法 getEvaluator(StructType batchSchema, Expression expresion, DataType outputType)”的部分

方法生成一个 ExpressionEvaluator 类型的对象,该对象可以在一批行数据上评估 expression,从而生成单个列向量的结果。为了生成此函数,getEvaluator() 方法将表达式和将应用表达式的数据 ColumnarBatch 的模式作为输入。同一个对象可以用于评估具有相同模式和表达式(创建评估器时指定)的多个列式输入批次。

方法 getPredicateEvaluator(StructType inputSchema, Predicate predicate)
标题为“方法 getPredicateEvaluator(StructType inputSchema, Predicate predicate)”的部分

方法用于为 Predicate 类型表达式创建表达式评估器。Predicate 类型表达式返回布尔值作为输出。

返回的对象类型为 PredicateEvaluator。这是一个特殊接口,用于评估输入批处理上的 Predicate,返回一个选择向量,其中包含输入批处理中每行的值,指示该行是否通过了 Predicate。它还可以选择与输入批处理一起接受一个现有选择向量进行评估。结果选择向量与给定的现有选择向量结合,并返回一个新的选择向量。这种机制允许对输入批处理进行多次 Predicate 评估,而无需在每次 Predicate 评估后重写输入批处理以删除不通过 Predicate 的行。新选择应该与现有选择向量相同或更具选择性。例如,如果一行在现有选择向量中标记为未选择,则即使给定 Predicate 对该行返回 true,它也应在返回的选择向量中保持未选择状态。

方法 createSelectionVector(boolean[] values, int from, int to)
标题为“方法 createSelectionVector(boolean[] values, int from, int to)”的部分

方法允许为作为输入给定的布尔类型值创建 ColumnVector。这允许连接器以所需的内存格式维护所有创建的 ColumnVector

任何实施都必须遵守以下保证。

  • 实现必须处理表达式的所有可能变体。如果实现遇到它不知道如何处理的表达式类型,则必须抛出特定于语言的异常。
  • 将使用生成 ExpressionEvaluatorColumnarBatch 保证具有生成时提供的模式。因此,将表达式评估逻辑绑定到列序号而不是列名是安全的,从而使实际评估更快。

引擎接口允许连接器插入自己的 JSON 处理代码并将其公开给 Delta 内核。

方法 readJsonFiles(CloseableIterator<FileStatus> fileIter, StructType physicalSchema, java.util.Optional<Predicate> predicate)
标题为“方法 readJsonFiles(CloseableIterator<FileStatus> fileIter, StructType physicalSchema, java.util.Optional<Predicate> predicate)”的部分

方法将 JSON 文件的 FileStatus 作为输入,并以一系列列式批处理的形式返回数据。要从 JSON 文件读取的列由物理模式定义,返回的批处理必须与该模式匹配。要实现此方法,您可能需要首先实现自己的 ColumnarBatchColumnVector,它们用于表示从 JSON 文件生成的内存数据。

在识别要读取的列时,请注意物理模式中有多种类型的列(表示为 StructType)。

方法 parseJson(ColumnVector jsonStringVector, StructType outputSchema, java.util.Optional<ColumnVector> selectionVector)
标题为“方法 parseJson(ColumnVector jsonStringVector, StructType outputSchema, java.util.Optional<ColumnVector> selectionVector)”的部分

方法允许解析以 JSON 格式表示的字符串值 ColumnVector,将其转换为 outputSchema 指定的输出格式。如果未找到 outputSchema 中的给定列,则返回 null 值。它还可以选择接受一个选择向量,该向量指示要解析输入字符串 ColumnVector 中的哪些条目。如果某个条目未被选中,则为输出中的该特定条目返回 null 值作为解析结果。

方法 deserializeStructType(String structTypeJson)
标题为“方法 deserializeStructType(String structTypeJson)”的部分

方法允许将 JSON 编码(根据 Delta 模式序列化规则)的 StructType 模式解析为 StructType。大多数 JsonHandler 的实现不需要实现此方法,而是使用 默认 JsonHandler 实现中的方法。

方法 writeJsonFileAtomically(String filePath, CloseableIterator<Row> data, boolean overwrite)

标题为“方法 writeJsonFileAtomically(String filePath, CloseableIterator<Row> data, boolean overwrite)”的部分

方法将给定的 data 写入 filePath 位置的 JSON 文件。写入是原子性的,即要么创建一个包含所有给定内容的 JSON 文件,要么根本不创建 Parquet 文件。它不应创建包含部分内容的文件。

默认实现利用来自 delta-storage 模块的 LogStore 实现来完成原子性。希望实现自己的 JsonHandler 版本的连接器可以查看默认实现以获取详细信息。

实现应处理 API Javadoc 中描述的序列化规则(将 Row 对象转换为 JSON 字符串)。

步骤 2.6:实现 ColumnarBatchColumnVector

Section titled “Step 2.6: Implement ColumnarBatch and ColumnVector”

ColumnarBatchColumnVector 是两个接口,用于表示从文件读取到内存中的数据。这种表示形式对查询性能有显著影响。每个引擎可能都有自己的内存数据原生表示形式,用于应用数据转换操作。例如,在 Apache Spark™ 中,行数据在内部以 UnsafeRow 形式表示,以实现高效处理。因此,最好直接将 Parquet 文件数据读取到原生格式的向量和批次中,以避免潜在昂贵的内存数据格式转换。推荐的方法是构建包装类,这些包装类扩展了这两个接口,但在内部使用引擎原生类来存储数据。当连接器必须将从内核接收到的列式批次转发到引擎时,它必须足够智能,能够跳过转换那些已经是引擎原生格式的向量和批次。

步骤 3:在连接器中构建读取支持

Section titled “Step 3: Build read support in your connector”

在本节中,我们将介绍连接器为了读取表而必须进行的一系列内核 API 调用。在连接器-引擎交互的上下文中,这些调用的确切时机完全取决于引擎-连接器 API,因此超出了本指南的范围。但是,我们将尝试提供可能(但不保证)适用于您的连接器-引擎设置的广泛指导。为此,我们假设引擎在处理读取/扫描查询时会经历以下阶段——逻辑计划分析、物理计划生成和物理计划执行。基于这些广泛的特征,读取 Delta 表的典型控制和数据流将如下所示

步骤此步骤通常发生的查询阶段
解析要查询的表快照逻辑计划分析阶段,此时需要解析和验证计划的模式及其他详细信息
根据查询参数解析要扫描的文件物理计划生成,此时扫描的最终参数可用。例如:剪除未使用列后要读取的数据模式。过滤器重新排列后要应用的查询过滤器
将文件信息分发给工作器物理计划执行,仅当它是分布式引擎时。
使用文件信息读取列式数据物理计划执行,此时数据正由引擎处理

让我们了解每个步骤的详细信息。

第一步是解析一致性快照及其关联的模式。连接器/引擎通常需要这样做,以解析和验证扫描查询的逻辑计划(如果您的引擎中存在逻辑计划的概念)。为此,连接器必须执行以下操作。

  • 从查询中解析表路径:如果路径直接可用,则这很容易。否则,如果它是基于目录表(例如,Hive Metastore 中定义的 Delta 表)的查询,则连接器必须从目录中解析表路径。
  • 初始化 Engine 对象:创建您在步骤 2 中选择的 Engine 的新实例。
  • 初始化 Kernel 对象并获取模式:假设查询是针对表最新可用版本/快照的,您可以按如下方式获取表模式
import io.delta.kernel.*;
import io.delta.kernel.defaults.engine.*;
Engine myEngine = new MyEngine();
Table myTable = Table.forPath(myTablePath);
Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
StructType mySchema = mySnapshot.getSchema(myEngine);

如果您想查询表的特定版本(即不是模式),那么您可以获取所需的快照为 myTable.getSnapshot(version)

步骤 3.2:解析要扫描的文件

Section titled “Step 3.2: Resolve files to scan”

接下来,我们需要使用来自查询的更多信息来构建 Scan 对象。这里我们假设连接器/引擎已经能够从查询中提取以下详细信息(例如,在优化逻辑计划之后)

  • 读取模式:查询需要读取的表中的列。这可能是全部列或列的子集。
  • 查询过滤器:可用于跳过读取表数据的分区或数据列上的过滤器。

要将此信息提供给 Kernel,您必须执行以下操作

  • 将特定于引擎的模式和过滤表达式转换为 Kernel 模式和表达式:对于模式,您必须创建一个 StructType 对象。对于过滤器,您必须使用 Expression 的所有可用子类创建一个 Expression 对象。
  • 使用转换后的信息构建扫描:按如下方式构建扫描
import io.delta.kernel.expressions.*;
import io.delta.kernel.types.*;
StructType readSchema = ... ; // convert engine schema
Predicate filterExpr = ... ; // convert engine filter expression
Scan myScan = mySnapshot.buildScan(engine)
.withFilter(myEngine, filterExpr)
.withReadSchema(myEngine, readSchema)
.build()
  • 解析文件读取所需的信息:生成的 Scan 对象有两组信息。
    • 扫描文件:myScan.getScanFiles() 返回 ColumnarBatches 的迭代器。迭代器中的每个批次都包含行,每行都包含基于查询过滤器选择的单个文件的信息。
    • 扫描状态:myScan.getScanState() 返回一个 Row,其中包含所有需要读取的文件之间通用的所有信息。
Row myScanStateRow = myScan.getScanState();
CloseableIterator<FilteredColumnarBatch> myScanFilesAsBatches = myScan.getScanFiles();
```java
Row myScanStateRow = myScan.getScanState();
CloseableIterator<FilteredColumnarBatch> myScanFilesAsBatches = myScan.getScanFiles();
while (myScanFilesAsBatches.hasNext()) {
FilteredColumnarBatch scanFileBatch = myScanFilesAsBatches.next();
CloseableIterator<Row> myScanFilesAsRows = scanFileBatch.getRows();
}

正如我们很快将看到的,从选定的文件中读取列式数据将需要同时使用扫描状态行和包含文件信息的扫描文件行。

以下是您在定义此扫描时需要确保的详细信息。

  • 提供的 readSchema 必须是引擎在执行查询时所期望的数据的精确模式。此查询规划期间定义的模式与查询执行期间的任何不匹配都将导致运行时故障。因此,您必须仅在引擎在任何优化(如列修剪)之后最终确定逻辑计划之后,才能使用 readSchema 构建扫描。
  • 在适用时(例如,使用 Java Kernel API),您必须确保在消耗扫描文件的 ColumnarBatches 时调用 close() 方法(即,要么序列化行,要么使用它们来读取表数据)。

如果您正在为像 Spark/Presto/Trino/Flink 这样的分布式引擎构建连接器,那么您的连接器必须将所有扫描元数据从查询规划机器(以下称为驱动程序)发送到任务执行机器(以下称为工作器)。您将需要序列化和反序列化扫描状态和扫描文件行。实现 Row 的序列化和反序列化实用程序是连接器的工作。如果连接器希望将读取一个扫描文件拆分为多个任务,它可以向任务添加额外的连接器特定拆分上下文。在任务中,连接器可以使用自己的 Parquet 读取器来读取拆分信息指示的文件的特定部分。

自定义 Row 序列化器/反序列化器
Section titled “Custom Row Serializer/Deserializer”

以下是关于如何构建您自己的序列化器/反序列化器(使其适用于任何模式的任何 Row)的步骤。

  • 序列化
    • 首先序列化行模式,即 StructType 对象。
    • 然后,使用模式识别 Row 中每个列/序号的类型,并使用该类型逐一序列化所有值。
  • 反序列化
    • 定义您自己的扩展 Row 接口的类。它必须能够处理复杂类型,如数组、嵌套结构和映射。
    • 首先反序列化模式。
    • 然后,使用模式反序列化值,并将它们放入自定义 Row 类的一个实例中。
import io.delta.kernel.utils.*;
// In the driver where query planning is being done
Byte[] scanStateRowBytes = RowUtils.serialize(scanStateRow);
Byte[] scanFileRowBytes = RowUtils.serialize(scanFileRow);
// Optionally the connector adds a split info to the task (scan file, scan state) to
// split reading of a Parquet file into multiple tasks. The task gets split info
// along with the scan file row and scan state row.
Split split = ...; // connector specific class, not related to Kernel
// Send these over to the worker
// In the worker when data will be read, after rowBytes have been sent over
Row scanStateRow = RowUtils.deserialize(scanStateRowBytes);
Row scanFileRow = RowUtils.deserialize(scanFileRowBytes);
Split split = ... deserialize split info ...;

最后,我们准备好读取列式数据。您将必须执行以下操作

  • 根据扫描文件行、扫描状态以及可选的拆分信息,从 Parquet 文件中读取物理数据
  • 使用 Kernel 的 API 将物理数据转换为表的逻辑数据。
Row scanStateRow = ... ;
Row scanFileRow = ... ;
Split split = ...;
// Additional option predicate such as dynamic filters the connector wants to
// pass to the reader when reading files.
Predicate optPredicate = ...;
// Get the physical read schema of columns to read from the Parquet data files
StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(engine, scanStateRow);
// From the scan file row, extract the file path, size and modification metadata
// needed to read the file.
FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
// Open the scan file which is a Parquet file using connector's own
// Parquet reader which supports reading specific parts (split) of the file.
// If the connector doesn't have its own Parquet reader, it can use the
// default Parquet reader provider which at the moment doesn't support reading
// a specific part of the file, but reads the entire file from the beginning.
CloseableIterator<ColumnarBatch> physicalDataIter =
connectParquetReader.readParquetFile(
fileStatus
physicalReadSchema,
split, // what part of the Parquet file to read data from
optPredicate /* additional predicate the connector can apply to filter data from the reader */
);
// Now the physical data read from the Parquet data file is converted to logical data
// the table represents.
// Logical data may include the addition of partition columns and/or
// subset of rows deleted
CloseableIterator<FilteredColumnarBatch> transformedData =
Scan.transformPhysicalData(
engine,
scanState,
scanFileRow,
physicalDataIter));
  • 解析批次中的数据:每个 FilteredColumnarBatch 包含两个组件
    • 列式批次(由 FilteredColumnarBatch.getData() 返回):这是从文件中读取的数据,其模式与上一步中构建 Scan 对象时提供的 readSchema 相匹配。
    • 可选的选择向量(由 FilteredColumnarBatch.getSelectionVector() 返回):可选的布尔向量,它将定义批次中哪些行是有效的,应该被引擎使用。

如果存在选择向量,则必须将其应用于批次以解析最终可消费的数据。

  • 转换为引擎特定数据格式:每个连接器/引擎都有其自己的原生行/列式批处理格式和接口。要将读取的数据批处理返回给引擎,您必须将其转换为适合这些引擎特定格式和/或接口。以下是一些您可以遵循的提高效率的技巧。
    • 匹配引擎特定格式:某些引擎可能期望数据采用与 getData() 生成的数据不同的内存格式。因此,您需要根据需要对批处理中的每个列向量进行数据转换。
    • 匹配引擎特定接口:您可能需要实现包装类,这些包装类扩展了引擎特定接口并适当地封装了行数据。

为了获得最佳性能,您可以实现自己的 Parquet 读取器和其他 Engine 实现,以确保生成的每个 ColumnVector 都已经是引擎原生格式,从而无需进行任何转换。

现在您应该能够正确读取 Delta 表了。

在本节中,我们将介绍连接器为了将数据追加到表中而必须进行的一系列内核 API 调用。在连接器-引擎交互的上下文中,这些调用的确切时机完全取决于引擎-连接器 API,因此超出了本指南的范围。但是,我们将尝试提供可能(但不保证)适用于您的连接器-引擎设置的广泛指导。为此,我们假设引擎在处理写入查询时会经历以下阶段——逻辑计划分析、物理计划生成和物理计划执行。基于这些广泛的特征,读取 Delta 表的典型控制和数据流将如下所示

步骤此步骤通常发生的查询阶段
确定需要写入表的数据模式。模式是从现有表或查询计划中 write 操作符的父操作中派生的,如果表尚不存在。逻辑计划分析阶段,此时需要解析和验证计划的模式(write 操作符模式与表模式匹配等)及其他详细信息。
根据表模式和分区列(从现有表或查询计划中(对于新表))确定数据的物理分区物理计划生成,其中确定写入器任务的数量、数据模式和分区
将写入器任务定义(包括事务状态)分发给工作器。物理计划执行,仅当它是分布式引擎时。
任务将数据写入数据文件,并将数据文件信息发送给驱动程序。物理计划执行,此时数据实际写入表位置
最终确定查询。在这里,任务写入的所有数据文件信息被聚合并提交到物理执行开始时创建的事务。最终确定查询。这发生在启动查询的驱动程序上。

让我们了解每个步骤的详细信息。

第一步是解析输出数据模式。这通常是连接器/引擎解析和验证查询逻辑计划所必需的(如果您的引擎中存在逻辑计划的概念)。为了实现这一点,连接器必须执行以下操作。从高层次来看,查询计划是一个操作符树,其中叶级操作符从存储/表中生成或读取数据,并将其向上馈送到父操作符节点。这种数据传输一直持续到到达根操作符节点,在此节点上查询最终完成(结果发送到客户端或数据写入另一个表)。

  • 创建 Table 对象
  • Table 对象尝试获取模式。
    • 如果未找到表
      • 查询包括创建表(例如,CREATE TABLE AS SQL 查询);
        • 模式是从 write 操作符上方将数据馈送到 write 操作符的操作符中派生的。
      • 查询不包括创建新表,抛出异常,指示未找到表
    • 如果表已存在
      • 从表中获取模式并检查其是否与 write 操作符的模式匹配。如果不匹配则抛出异常。
  • 创建 TransactionBuilder - 这基本上开始了事务构建的步骤。
import io.delta.kernel.*;
import io.delta.kernel.defaults.engine.*;
Engine myEngine = new MyEngine();
Table myTable = Table.forPath(myTablePath);
StructType writeOperatorSchema = // ... derived from the query operator tree ...
StructType dataSchema;
boolean isNewTable = false;
try {
Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
dataSchema = mySnapshot.getSchema(myEngine);
// .. check dataSchema and writeOperatorSchema match ...
} catch(TableNotFoundException e) {
isNewTable = true;
dataSchema = writeOperatorSchema;
}
TransactionBuilder txnBuilder =
myTable.createTransactionBuilder(
myEngine,
"Examples", /* engineInfo - connector can add its own identifier which is noted in the Delta Log */
Operation /* What is the operation we are trying to perform? This is noted in the Delta Log */
);
if (isNewTable) {
// For a new table set the table schema in the transaction builder
txnBuilder = txnBuilder.withSchema(engine, dataSchema)
}

分区列要么从查询中找到(对于新表,查询定义分区列),要么从现有表中找到。

TransactionBuilder txnBuilder = ... from the last step ...
Transaction txn;
List<String> partitionColumns = ...
if (newTable) {
partitionColumns = ... derive from the query parameters (ex. PARTITION BY clause in SQL) ...
txnBuilder = txnBuilder.withPartitionColumns(engine, partitionColumns);
txn = txnBuilder.build(engine);
} else {
txn = txnBuilder.build(engine);
partitionColumns = txn.getPartitionColumns(engine);
}

在本步骤结束时,我们拥有 Transaction 和要生成的数据模式及其分区。

如果您正在为像 Spark/Presto/Trino/Flink 这样的分布式引擎构建连接器,那么您的连接器必须将所有写入器元数据从查询规划机器(以下称为驱动程序)发送到任务执行机器(以下称为工作器)。您将需要序列化和反序列化事务状态。实现 Row 的序列化和反序列化实用程序是连接器的工作。有关自定义 Row SerDe 的更多详细信息,请参见此处

Row txnState = txn.getState(engine);
String jsonTxnState = serializeToJson(txnState);

步骤 4.4:任务将数据写入数据文件并将数据文件信息发送给驱动程序

Section titled “Step 4.4: Tasks write the data to data files and send the data file info to the driver”

在此步骤中(在每个任务内部的工作节点上执行)

  • 反序列化事务状态
  • 任务内的写入器操作符从其父操作符获取数据。
  • 数据被转换为 FilteredColumnarBatch。每个 FilteredColumnarBatch 包含两个组件
    • 列式批次(由 FilteredColumnarBatch.getData() 返回):这是从文件中读取的数据,其模式与上一步中构建 Scan 对象时提供的 readSchema 相匹配。
    • 可选的选择向量(由 FilteredColumnarBatch.getSelectionVector() 返回):可选的布尔向量,它将定义批次中哪些行是有效的,应该被引擎使用。
  • 连接器可以在其自己的内存中数据格式周围创建 FilteredColumnBatch 包装器。
  • 检查数据是否已分区。如果未分区,则按分区值对数据进行分区。
  • 为每个分区生成分区列到分区值的映射
  • 使用 Kernel 将分区数据转换为应写入数据文件的物理数据
  • 将物理数据写入一个或多个数据文件。
  • 将数据文件状态转换为 Delta 日志操作
  • 序列化 Delta 日志操作 Row 对象并将其发送到驱动程序节点
Row txnState = ... deserialize from JSON string sent by the driver ...
CloseableIterator<FilteredColumnarBatch> data = ... generate data ...
// If the table is un-partitioned then this is an empty map
Map<String, Literal> partitionValues = ... prepare the partition values ...
// First transform the logical data to physical data that needs to be written
// to the Parquet files
CloseableIterator<FilteredColumnarBatch> physicalData =
Transaction.transformLogicalData(engine, txnState, data, partitionValues);
// Get the write context
DataWriteContext writeContext = Transaction.getWriteContext(engine, txnState, partitionValues);
// Now write the physical data to Parquet files
CloseableIterator<DataFileStatus> dataFiles =
engine.getParquetHandler()
.writeParquetFiles(
writeContext.getTargetDirectory(),
physicalData,
writeContext.getStatisticsColumns());
// Now convert the data file status to data actions that needs to be written to the Delta table log
CloseableIterator<Row> partitionDataActions =
Transaction.generateAppendActions(
engine,
txnState,
dataFiles,
writeContext);
.... serialize `partitionDataActions` and send them to driver node

在驱动节点上,从所有任务接收到的 Delta 日志操作被提交到事务中。任务将 Delta 日志操作作为序列化的 JSON 发送,并将其反序列化回 Row 对象。

// Create a iterable out of the data actions. If the contents are too big to fit in memory,
// the connector may choose to write the data actions to a temporary file and return an
// iterator that reads from the file.
CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(
toCloseableIterator(dataActions.iterator()));
// Commit the transaction.
TransactionCommitResult commitResult = txn.commit(engine, dataActionsIterable);
// Optional step
if (commitResult.isReadyForCheckpoint()) {
// Checkpoint the table
Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion());
}

就是这样。现在您应该能够使用 Kernel API 将数据追加到 Delta 表中。

Kernel API 仍在不断发展,并不断添加新功能。Kernel 作者会尽力在每个新版本中保持 API 更改向后兼容,但对于一个快速发展的项目来说,有时很难保持向后兼容性。

本节提供了关于如何将您的连接器迁移到最新版本的 Delta Kernel 的指导。每个新版本都会更新示例,以反映最新的 API 更改。您可以参考这些示例以了解如何使用新的 API。

从 Delta Lake 3.1.0 版本迁移到 3.2.0 版本

Section titled “Migration from Delta Lake version 3.1.0 to 3.2.0”

以下是 Delta Kernel 3.2.0 中的 API 更改,可能需要您的连接器进行修改。

TableClient 接口已重命名为 Engine。这是此版本中最重大的 API 更改。TableClient 接口名称未能准确表示其提供的功能。从高层次来看,它提供了读取 Parquet 文件、JSON 文件、评估数据上的表达式以及文件系统功能等功能。这些基本上是 Kernel 依赖的繁重操作,作为单独的接口,允许连接器替换自己的自定义实现(例如自定义 Parquet 读取器)。本质上,这些功能是 engine 功能的核心。通过重命名为 Engine,我们使用一个易于理解的适当名称来表示接口功能。

DefaultTableClient 已重命名为 DefaultEngine

之前,当传递不存在的表路径时,API 会抛出 TableNotFoundException。现在它不再抛出异常。相反,它返回一个 Table 对象。当尝试从表对象获取 Snapshot 时,它会抛出 TableNotFoundException

之前,当传递不存在的路径时,API 会抛出 FileNotFoundException。现在它不再抛出异常。它仍然将给定路径解析为完全限定路径。

Rust Kernel 是一组用于以原生语言构建 Delta 连接器的库。正在开发中。

  • 讲解解释了 Kernel 及其 API 设计的原理(幻灯片可在此处获取,并保持与更改同步)。
  • 用户指南详细介绍了在独立 Java 程序或分布式处理连接器中使用 Kernel 读取和写入 Delta 表的分步过程。
  • 示例Java 程序,演示如何使用 Kernel API 读取和写入 Delta 表。
  • 表和默认引擎 API Java 文档
  • 迁移指南