目前数据湖难就难在,对于普通使用者来说需要很多的配置,配置好就可以提供较好的性能,相反性能则一直提不上去。
数据存储的方式对数据读取与写入性能有很大影响,例如单文件存储大小,数据顺序,分布等。
其次对数据读取与数据写入是存在平衡点的,写入快那么读取可能就慢,读取快那么写入就可能慢,数据存储平台一直在找寻这种平衡。
可以说,虽然现在数据平台的使用方一般偏向于“傻瓜式的”存储和使用数据,但实际上到目前为止还没有一种通用的存储或者优化方式使得所有的任务可能高效的读取写入,这造成了信息的不对称,使得用户在使用数据平台面临性能极其不稳定。
对于Apache Iceberg数据湖来说道理也是存在的,目前其默认配置的优势是快速的写入,要想在默认配置的前提下做到快速读取显然是不可能的。
不过虽然Iceberg定位是表格式,但也提供了很多帮助用户在数据存储后进行表性能优化(数据重分布)的工具或功能,这些功能可以提升Iceberg数据读取的性能。
Compaction (压缩合并)
Iceberg表的每一个操作,都会提交一个commit, 即使当前写入或操作的数据量很小,也会生成至少1个数据文件和1个元数据文件。例如,在Flink进行数据写入时,如果是分钟级的延迟,那么每分钟都会进行commit,相当于每分钟就会生成新的数据文件+一系列的元数据文件。可想,这会造成很严重的“小文件”问题。
大量的小文件不光给HDFS系统造成压力,而且使得Iceberg的所有执行任务都会付出更多的时间成本。例如,当您查询 Apache Iceberg 表时,需要打开、扫描每个文件,并在完成后关闭。查询时扫描的文件越多,就给查询带来的成本越高。
在流数据或“实时”数据的世界中,这个问题会被更加放大,其中很多数据在创建时就被读取,从而创建大量文件,每个文件中只有几条记录。
虽然在读取数据时,我们无法避免固定成本,但我们可以使用不同的策略来避免可变成本。
对于小文件问题,我们可以将其合并压缩到较少的大文件中,如下图所示:

如何进行压缩?
Apache Iceberg 的 actions 包中包含多种类似存储过程的维护表的工具可以直接被使用。
在Spark可以通过编写 Spark程序来实现:
Table table = catalog.loadTable("iceberg_1");
SparkActions
.get()
.rewriteDataFiles(table)
.option("rewrite-job-order", "files-desc")
.execute();
在上面的代码中,我们创建了表的一个实例,然后触发了rewriteDataFiles,这是用于压缩的操作。
压缩是将许多较小文件重写为更少的较大文件的过程。SparkActions 使用的构建器模式不仅可以设置压缩,还允许我们通过设置如何完成压缩的方式。
除了option设置参数外,其还可以设置binPack,Sort,zOrder,filter等策略,binPack是默认的压缩策略,例如下面的用法:
Table table = catalog.loadTable("myTable");
SparkActions
.get()
.rewriteDataFiles(table)
.sort()
.filter(Expressions.and(
Expressions.greaterThanOrEqual("date", "2023-01-01"),
Expressions.lessThanOrEqual("date", "2023-01-31")))
.option("rewrite-job-order", "files-desc")
.execute();
此外options中我们也可以配置,压缩任务的属性性能,例如上面的例子中rewrite-job-order表示的压缩时是写入文件组的顺序,可选的有字节升序、字节降序、文件升序、文件降序、无,上面例子中选择的是文件降序。
下面我们介绍下比较重要的几个参数:
target-file-size-bytes:设置输出文件的预期大小,默认为512M;
max-concurrent-file-group-rewrites:同时写入的文件组数量上限;引擎在执行压缩作业时,会将将这些文件分组为“文件组”,并行写入,这也代表并行写入文件组数量。
partial-progress-enabled: 允许在压缩文件组时进行提交;它允许你在部分“文件组”完成时就创建新快照,那么新的查询就可以直接从已压缩的文件中受益。
rewrite-job-order:写入文件组的顺序;
这里要注意的是,对于 Apache Parquet 文件,有ROW GROUP 大小和FILE大小。row group就是parquet文件中的一块,是parquet的组成部分,一个文件可以有多个组。默认情况下,表的默认配置将允许 128mb 行组和 512mb 文件大小(每个文件四个行组)。举个例子,假如你想增加到每个文件 1 GB,但将行组保持为 128mb(每个文件 8 个行组),这样可以减少需要打开和关闭的文件。不过,如果您经常运行的查询读取大部分数据,那么您会更喜欢较少的行组,因为谓词下推不会加快获取所有数据的速度。行组文件和文件大小都可以设置为表属性(分别为 write.parquet.row-group-size-bytes 和 write.target-file-size-bytes)。
除了上述Spark程序外,我们也可以通过扩展的SparkSQL来实现上述功能:
-- Rewrite Data Files CALL Procedure in SparkSQL
CALL catalog.system.rewrite_data_files(
table => 'iceberg_2',
strategy => 'binpack',
where => 'genre = "rock"',
options => map(
'rewrite-job-order','bytes-asc',
'target-file-size-bytes','1073741824', -- 1GB
'max-file-group-size-bytes','10737418240' -- 10GB
)
)
它类似于SQL中的存储过程,其语法为下面格式:
-- 使用位置参数
CALL Catalog.system.procedure(arg1, arg2, arg3)
-- 使用命名参数
CALL Catalog.system.procedure(argkey1 => argval1, argkey2 => argval2)
下面我们着重介绍下binPack,Sort,zOrder 这次压缩策略的实现以及有优点与缺点。
binPack 压缩策略
binPack算法的本质就是在我们设置压缩的目标文件大小配置后target-file-size-bytes,使用装箱算法,快速的将较小的文件写入较大的文件。
binPack算法是默认的Iceberg压缩策略的算法,也是最快的压缩策略,其不会做全局的排序,可能会在任务内做本地排序。
binPack优势是”快“,缺点是数据并没有被聚集分类起来(数据没被排序),不利于join读取等。
此外,如果你使用了binPack进行压缩,此时同时使用sort或zorder进行排序策略,那么这个排序只会在单个任务中对数据进行排序(本地排序)。
那么如何使用呢?
如果您正在读取流数据,需要对每小时后提取的数据进行快速压缩。你可以这样做:
CALL catalog.system.rewrite_data_files(
table => 'streamingtable',
strategy => 'binpack',
where => 'created_at between "2023-01-26 09:00:00" and "2023-01-26 09:59:59" ',
options => map(
'rewrite-job-order','bytes-asc',
'target-file-size-bytes','1073741824',
'max-file-group-size-bytes','10737418240',
'partial-progress-enabled', 'true'
)
)
这处理数据流任务时,使用binpack可以快速的处理,避免影响数据流任务性能。此外,这里我们启用部分进度提交,在写入文件组时立即提交它们,以便读取时可以立即开始看到性能提升。
如果您想要对超过一小时数据进行压缩,您需要在快速运行压缩作业与优化之间取得平衡。可以在夜间对当天的数据进行额外的压缩工作,并在周末对一周的数据进行压缩,以便以持续优化的时间间隔继续优化,同时尽可能减少对其他操作的干扰。
Sort & Zorder 策略
在读取数据湖数据时,尤其是在读取update的数据时,数据的有序性对读取的性能影响是非常大的。
这个背后的原理我们在之前已经强调过很多遍了,一方面其读取的数据更少了,另一方面在Join时可能会减少或加速Shuffle。
不过在使用Sort策略时,Iceberg还支持在创建表插入数据前设置表的排序字段,提前优化数据的组织方式,提升了读取的效率,不过也影响了写入的速度。
如何在创建表时设置排序策略?
CREATE TABLE catalog.iceberg_03 (
id bigint ,
player_name varchar,
team varchar
)
ALTER TABLE Catalog.iceberg_03 WRITE ORDERED BY team;
如果执行 CTAS,请对 AS 查询中的数据进行排序:
CREATE TABLE catalog.iceberg_03
AS (SELECT * FROM non_iceberg_teams_table ORDER BY team);
ALTER TABLE catalog.iceberg_03 WRITE ORDERED BY team;
另外可以在每个 INSERT 中指定顺序,如下所示:
INSERT INTO Catalog.iceberg_03
SELECT *
FROM staging_table
ORDER BY team
这将确保数据在写入时进行排序,但它并不完美。因为随着数据写入与变更会产生越来越多的小文件,这时候就需要压缩策略进行处理。
通过Sort的压缩策略,将在查询引擎将记录分配到不同任务之前对数据进行排序,从而优化跨任务的数据集群。
Sort排序压缩策略,就是按照一个或多个字段,按照字段的优先级进行依次排序。
Sort排序压缩的优势是数据按经常查询的字段进行”聚集“,加速经常查询一个字段的读取速度,缺点是比binPack需要更长的压缩时间。
那么如何执行排序压缩策略呢?
CALL catalog.system.rewrite_data_files(
table => 'iceberg_03',
strategy => 'sort',
sort_order => 'team ASC NULLS LAST'
)
“NULLS LAST”会将所有具有空值的球员放在排序末尾。
对于多个字段的排序,可以按照下述方式使用:
CALL catalog.system.rewrite_data_files(
table => 'iceberg_03',
strategy => 'sort',
sort_order => 'team ASC NULLS LAST, name ASC NULLS FIRST'
)
那么,这时的数据组织方式,将会按照按team排序,其次是name。
Zorder排序策略,也是一种排序策略,这种策略怎么实现,怎么优化我们之前也分析介绍过了,可以看相关文章:[Delta][SQL] Delta开源付费功能,最全分析ZOrder的源码实现流程
Zorder排序压缩策略使得多个排序字段时,每个字段的优先级是相等的。
Zorder排序压缩策略优点是对于多个经常使用的字段可以提升读取的性能,缺点是合并压缩的时间较长。
我们可以采用下面的语句进行调用:
CALL catalog.system.rewrite_data_files(
table => 'people',
strategy => 'sort',
sort_order => 'zorder(age,height)'
)
使用压缩策略只能减少数据所在的文件数量,通过sort和zorder可以确保这些文件中数据的顺序实现更高效的查询规划。
自动压缩(Automating Compaction)
Iceberg本身的定位是表格式,它提供了上述性能优化功能或工具,用户可以适时的使用,对于自动的调度编排能力不在表格式的范围内。
那么如何实现自动压缩能力,可以采用公司内部的定时调度服务或者采用开源的Airflow,Prefect等编排工具即可。
这种设计的好处是有利于社区的长期发展,属于引擎的能力就让引擎来做,属于存储的能力就该存储系统做。毕竟数据湖目前所处的阶段还不成熟,在未来完全数据湖阶段很容易出现一个系统完全替代掉现有的系统。
如果一个引擎做的过于复杂,属于大包大揽的形式。虽然前期可能会快速的上手和抢占市场,但是这种形式使得项目不容易变革和创新。如果有新的技术和理念出现,对于些什么都融入的工程,则很难掉头跟进,只能慢性死亡。
目前对于Iceberg来说自动的调度编排服务已经非常成熟而且易用,用户可以自己实现或直接采用成熟且性能优化的开源产品进行使用。
不过Iceberg母公司Tabular,也有些产品(Dremio Arctic 和 Tabular)通过托管 Apache Iceberg Catalog服务,同时实现自动的表维护,包括性能优化的压缩能力。
总结
数据湖性能挑战: 用户在使用数据湖时,其配置和优化是普通用户面临的主要问题之一。此外,数据的存储方式、顺序和分布等因素都会影响读写性能。用户需要在数据读取和写入之间找到平衡。
Iceberg 压缩合并策略: Iceberg数据湖可以使用压缩合并策略,来减少小文件问题和提升读取性能。其默认的合并压缩策略是BinPack。
Sort 和 Zorder 策略: Iceberg中可以通过对数据进行Sort或者使用Zorder排序策略,可以使经常查询的字段数据紧凑地存储在一起,从而减少读取开销。
自动压缩和调度: 在数据湖中自动化压缩和调度是非常重要的,它可以确保数据湖中的数据保持高性能。Iceberg本身不提供自动调度功能,但用户通过定时调度服务或使用开源的编排工具来实现。