今天我们来聊一聊Apache Iceberg数据查询过程,以及它如何进行Skip File的进行加速的。
对于数据湖三剑客来说,在它们创建之初时,定位不同,导致其设计实现与最擅长的方面是不同的。
在之前的文章中,我们其实已经聊过,Iceberg其主要定位是想解决Hive表格式的问题,其擅长对S3数据储存性能的加速。
[LakeHouse] 数据湖之Iceberg一种开放的表格式
其功能建设最早,性能优化最完善的就是Append与Select能力了,那么我们今天聊下Select的执行细节。
1. Iceberg存储架构
传统的Hive表格式是采取目录形式对数据文件直接管理。要读取数据时,访问MetaStore拿到数据的目录直接读取即可。
Apache Iceberg表格式对于数据文件的管理主要分为三层:
Catalog 层
Metadata 层
Data 层
具体如下所示:

这里不是重点,我们就从下往上简单解释下。
1.1 Data层
Data层是存储表的实际数据的地方,这和Hive直接存储的数据是类似的。Data层构成了Iceberg表的树形结构的叶子节点。不过,在Iceberg中,数据文件分为多种类型,包括:
Data Files
数据文件用于存储数据本身, 目前支持 Apache Parquet、Apache ORC 和 Apache Avro。一般我们使用Parquet 可以用于用于大规模 OLAP 分析的表,Avro 可以用于用于更低延迟的流分析的表。
Delete Files
删除文件用于跟踪记录数据集中已删除的记录。由于数据湖存储是不可变的,因此无法直接更新文件中的行 - 所以需要编写一个新文件。
在这里有两种方式:
这个新文件可以是旧文件的副本,并应用了更改(称为写时复制COW);
也可以是生成仅有更改数据的新文件,然后读取数据的引擎将合并(称为读时合并MOR)。
删除文件是Iceberg表对合并读取(MOR)策略的支持,也就是说,删除文件仅适用于MOR表。
当然,在删除文件还包括,Positional Delete Files和Equality Delete Files,它们都用于表示哪些行已被逻辑删除。
为什么这里有位置删除和相等删除两个文件呢?这里我们再讲Iceberg如何更新数据时在详细解释。
Puffin Files
Puffin Files文件格式存储有关表中数据的统计信息和索引,这些统计信息和索引可以提高广泛范围查询的性能,甚至比存储在数据文件和元数据文件中的统计信息还要广泛。
虽然在Iceberg表中的数据文件和删除文件中存在结构以增强与数据交互的性能,但有时您需要更高级的结构以增强特定类型查询的性能。例如,如果您想知道过去30天有多少个客户向您下订单。当然,您可以读30天内的每个订单并在引擎中进行聚合,但这可能需要太长时间。
1.2 Metadata 层
元数据层是 Iceberg 表架构的组成部分,它是一个树结构,用于跟踪数据文件和有关它们的元数据以及创建它们的操作。
该树结构由三种文件类型组成,所有这些文件类型都存储在数据湖存储中:
清单文件(manifest files)
清单列表(manifest lists)
元数据文件(metadata files)
下面我们简单介绍下:
清单文件 (manifest files)
清单文件跟踪数据层中的文件(即数据文件、删除文件和 puffin 文件)以及有关每个文件的其他详细信息和统计信息。
每个清单文件都跟踪一组数据文件,它会记录每个文件的分区信息、Record的计数以及数据列的下限和上限等详细信息。虽然其中一些统计信息也存储在数据文件本身中(Parquet),但单个清单文件存储多个数据文件的这些统计信息,这意味着从单个清单文件中的统计信息即可完成数据查询链路的修剪,大大减少了打开许多数据文件的需要。
清单列表 (manifest lists)
清单列表是 Iceberg 表的快照,其中包含所有清单文件的列表,包括位置、所属分区以及它跟踪的数据文件的分区列的上限和下限。
清单列表包含一个结构体数组,每个结构体跟踪一个清单文件。注意,清单列表其实是位于元数据文件之中,属于其中一部分。
元数据文件(metadata files)
清单列表由元数据文件跟踪。另一个恰当命名的文件是元数据文件,它存储有关 Iceberg 表在某个时间点的元数据。这包括有关表的架构、分区信息、快照以及哪个快照是当前快照的信息。
每次对 Iceberg 表进行更改时,都会创建一个新的元数据文件,并通过目录自动注册为元数据文件的最新版本。
1.3 CataLog 层
任何从表中读取数据的人都需要知道首先要去哪里——他们去哪里找出给定表的读/写数据的位置。对于任何想要与Iceberg表交互的人来说,第一步是找到元数据文件(即当前元数据指针)的位置。
查找当前元数据指针就需要通过Iceberg Catalog层。
Iceberg Catalog的主要要求是它必须是原子操作,以便所有读取器和写入器在给定时间点都能看到表的相同状态。
在Catalog中,每个表都有一个指向该表当前元数据文件的引用或指针。Iceberg表在CataLog中的当前元数据指针的值就是元数据文件的位置。
2. Iceberg Select查询
从 Apache Iceberg 表中读取数据需要遵循一定的步骤,这个步骤就经历了Iceberg整个存储树。
当发起读查询时,首先将其发送到查询引擎(Spark,Flink等)。引擎利用来Catalog来检索最新的元数据文件位置,其中包含有关Iceberg表结构和其他元数据文件(例如最终导致实际数据文件的清单列表)的关键信息。在此过程中使用有关列的统计信息来限制正在读取的文件数量,这有助于提高查询性能。
例如我们运行下面的查询SQL:
## Spark SQL/Dremio Sonar
SELECT *
FROM orders
WHERE order_ts BETWEEN '2023-01-01' AND '2023-01-31'
我们一步步执行下这个查询过程:
1. 向引擎发送查询
SQL查询首先会被发送到解析它的引擎。在此阶段,执行引擎将开始根据元数据文件生成查询计划。
2. 检查Catalog
查询引擎访问 Iceberg Catalog,假如上面例子中是 Hadoop 文件系统。
它向Catalog请求订单表的元数据文件的路径,然后读取它。因为我们在这里使用 Hadoop 目录,这里引擎将先读取该/orders/metadata/version-hint.txt文件,它记录了metadata的版本号。假如该文件的内容是一个整数3. 根据此信息和Catalog的实现逻辑,引擎知道当前元数据文件位置是/orders/metadata/v3.metadata.json。
3. 从元数据文件获取信息
首先,引擎打开并读取元数据文件v3.metadata.json以获取有关的内容信息。
然后,它确定Iceberg表的模式,如下图所示format-version为2,以准备其内部内存数据结构来读取数据。

其次,了解Iceberg表的分区方案以及数据的组织方式。稍后查询引擎可以利用它来跳过不相关的数据文件。
"partition-spec" : [ {
"name" : "order_ts_hour",
"transform" : "hour",
"source-id" : 4,
"field-id" : 1000
} ]
引擎从元数据文件检索的最重要的信息之一是current-snapshot-id. 这表示表的当前状态。基于current-snapshot-id,引擎将从快照数组中定位清单列表文件路径,以便它可以进一步遍历并扫描相关文件,如下面的json所示。
"current-snapshot-id" : 5139476312242609518,
"refs" : {
"main" : {
"snapshot-id" : 5139476312242609518,
"type" : "branch"
}
},
"snapshots" : [{
"snapshot-id" : 7327164675870333694,
"manifest-list" : "s3://datalake/db1/orders/metadata/snap-7327164675870333694-1-f5e79df9-7027-4d0c-a39b-7a3091741d6f.avro",
"schema-id" : 0
},
{
"snapshot-id" : 5139476312242609518,
"parent-snapshot-id" : 8333017788700497002,
"manifest-list" : "s3://datalake/db1/orders/metadata/snap-5139476312242609518-1-e22ff753-2738-4d7d-a810-d65dcc1abe63.avro",
},
{
"snapshot-id" : 8333017788700497002,
"parent-snapshot-id" : 7327164675870333694
"manifest-list" : "s3://datalake/db1/orders/metadata/snap-8333017788700497002-1-4010cc03-5585-458c-9fdc-188de318c3e6.avro",
"schema-id" : 0
}
]
4. 从清单列表里获取信息
从元数据文件获取清单列表文件路径的位置后,查询引擎读取该文件snap-5139476312242609518-1-e22ff753-2738-4d7d-a810-d65dcc1abe63.avro以获取更多详细信息。
引擎从该清单列表获取的最关键信息是每个快照的清单文件路径位置。引擎需要此信息来获取特定查询所需的相关数据文件。
清单列表还包含有关分区的关键信息,例如partition-spec-id. 这告诉引擎用于写入特定快照的特定分区方案。截至目前,该字段的值为 0(如下面的 JSON 所示),这意味着这是为表定义的唯一分区。
还有其他特定于分区的统计信息,例如清单的分区列的下限和上限。当引擎确定要跳过哪些清单文件以更好地修剪文件时,此信息特别有用。
其他详细信息,例如每个快照添加/删除的数据文件总数、添加/删除的行数等,也可以在此文件中找到。
{
"manifest_path" : "s3://datalake/db1/orders/metadata/faf71ac0-3aee-4910-9080-c2e688148066.avro",
"partition_spec_id" : 0,
"added_snapshot_id" : 5139476312242609518,
"added_data_files_count" : 2,
"partitions" : [
{
"contains_null" : false,
"contains_nan" : false,
"lower_bound" : "ShkHAA==",
"upper_bound" : "8BwHAA=="
}
],}
5. 从清单文件中获取信息
引擎打开未修剪的清单文件(即与查询相关)faf71ac0-3aee-4910-9080-c2e688148066.avro
然后,它读取文件以获取详细信息。
首先,查询引擎扫描属于此清单文件的数据文件的每个条目。它将每个数据文件所属的分区值与查询中请求的值进行比较。以下是我们示例的两个分区值。
"partition" : {
"order_ts_hour" : 2023-01-27-10
},
"partition" : {
"order_ts_hour" : 2023-03-07-08
},
现在我们的SQL查询请求是获取“ 2023-01-01”和“ 2023-01-31”之间的所有订单详细信息。
因此,引擎将忽略第二个分区,因为2023-03-07-08它与过滤器值范围不匹配。
由于过滤器值与第一个分区值匹配,引擎将检查该分区中的所有记录,从2023-01-01 00:00:00to23:59:59和2023-01-31 00:00:00to23:59:59。
根据分区值,引擎查找相应的数据文件,在本例中就是文件0_0_0.parquet。
该引擎还收集其他统计信息,例如每列的下限和上限、空值计数等,以跳过任何不相关的文件。
请注意,本例子这种情况下,引擎不需要使用统计信息,因为分区中只有一个数据文件满足查询。
"data_file" : {
"file_path" :
"s3://datalake/db1/orders/data/order_ts_hour=2023-01-27-10/0_0_0.parquet",
"partition" : {
"order_ts_hour" : 2023-01-27-10
},
"lower_bounds" : [{
"key" : 1,
"value" : "fQAAAAAAAAA="
},
{
"key" : 2,
"value" : "QQEAAAAAAAA="
}],
"upper_bounds" : [{
"key" : 1,
"value" : "fQAAAAAAAAA="
},
{
"key" : 2,
"value" : "QQEAAAAAAAA="
}],
},
"data_file" : {
"file_path" :
"s3://datalake/db1/orders/data/order_ts_hour=2023-03-07-08/0_0_1.parquet"
"partition" : {
"order_ts_hour" : 2023-03-07-08
},
},
Iceberg 中默认提供了数据和文件读取优化技术(例如分区过滤和基于min-max列的过滤(列的上/下界)允许引擎避免全表扫描,从而保证了查询的性能。
最后,我们将将查询到的数据记录返回给用户。

整体上,我们的读取过程如上图所示。
3. 总结
下面我们来简单总结下整个查询的过程:
向引擎发送查询;
检查Catalog,返回当前元数据文件的位置;
从元数据文件获取信息;
从清单列表里获取信息,进行数据剪裁;
从清单文件中获取信息,进行数据剪裁;
利用索引对数据进行跳过和过滤,返回最后结果。