Paimon VS Hudi 写入效率大PK

来源:安瑞哥是码农

之所以想做这么个对比呢,原因在于上周我看到 Flink 官方公众号发了篇文章,其中有一 Paimon 跟 Hudi 这两款数据湖产品的写入效率对比图,甚是扎眼。


引用自 Flink 官方公众号


一下子就吸引了我的注意力,从这个对比图来看,Hudi 那是全方位落败,表现得像一个「扶不起的阿斗」。


正当我好奇这个测试结论是基于什么样场景下测试出来时,扒拉遍了全文的内容发现,它好像并没有打算告诉我具体的测试细节,而是直接向你宣布:喏... 这就是我们的结论,就是这么牛逼。


但你们知道,这种没有具体场景交代(数据源什么样的呀?表结构什么样的呀?写入时资源配置如何呀?并行度怎么设置的呀?等等),以及没有过程描述的测试结论,我肯定是不信的。


于是我一冲动,就把对这个测试结论的怀疑,发了条朋友圈,结果很快就引来了阿里云技术专家的注意,问我为啥会怀疑他们的测试结论,我就吧啦吧啦把我的观点说了一遍。


但光怀疑没用啊,为了证实我的以上疑惑,以及表示对人家测试结论的尊重。


我决定,亲自来测一遍,并且,全过程向你们曝光。


(PS:本次测试基于Flink1.15、Hadoop3.1、Paimon0.6、Hudi0.13 展开)



0. 测试前准备


先说数据来源:


既然咱是做写入测试,那么首先得拎清楚数据源从哪来,而且这个数据源的选择,在我看来是有讲究的。


比如,它的读取效率,一定不能成为整个数据处理流程的瓶颈,也就是说,上游数据源的流速,必须要大于下游的数据写入速度,这样,才能真实反映出写入效率,否则,这个测试就是在扯淡。


那为了保证这一点,我提前将一个大文件的数据,先全部灌入到Kafka的 topic 中,然后用 Flink 来消费这个topic(用它的默认速度),这样一来,就不存在数据源读取瓶颈问题。


至于为什么?磁盘的顺序读取效率(读Kafka),一定远大于根据特定字段进行索引的写入效率(写 Paimon 或者 Hudi 表)。


至于数据内容呢,还是我之前的上网行为日志,有9个字段,文件大小为:3.1G,数据行数为:24633164。




1.  Flink 写 Paimon 代码准备


对于 Paimon 表,表结构如下:


CREATE TABLE if not exists data_from_flink2paimon01 (
`client_ip` STRING ,
domain STRING,
`time` STRING,
target_ip STRING,
rcode STRING,
query_type STRING,
authority_record STRING,
add_msg STRING,
dns_ip STRING,
PRIMARY KEY(client_ip,domain,`time`,target_ip) NOT ENFORCED
)

为了方便测试,我全部都用 String 类型(别学我)。


Flink 读取 Kafka 写 Paimon 表的代码如下:


package com.anryg.bigdata.paimon;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @DESC: 通过 Flink DS 跟 SQL 混合模式读取 kafka 数据写入 Paimon 数据湖
 * @Auther: Anryg
 * @Date: 2023/12/19 20:36
 */

public class FlinkFromKafka2Paimon {

    public static void main(String[] args) {
        //获取流任务的环境变量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
                .enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); //打开checkpoint功能

        env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Paimon01"); //设置checkpoint的hdfs目录
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置checkpoint记录的保留策略

        /**创建flink table环境对象*/
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        /**添加 Kafka 数据源*/
        KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder() //一定注意这里的类型指定,否则后面的new SimpleStringSchema()会报错
                                                                    .setBootstrapServers("192.168.211.107:6667")
                                                                    .setTopics("test")
                                                                    .setGroupId("FlinkFromKafka2Paimon01")
                                                                    .setStartingOffsets(OffsetsInitializer.timestamp(1703474520000L)) //确认消费模式为特定时间
                                                                    .setValueOnlyDeserializer(new SimpleStringSchema());



        /**将数据源生成DataStream对象*/
        DataStreamSource kafkaDS = env.fromSource(kafkaSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "kafka-data");

        /**将原始DS经过处理后,再添加schema*/
        SingleOutputStreamOperator targetDS = kafkaDS.map((MapFunction) line -> {
                                                            JSONObject rawJson = JSON.parseObject(line);
                                                            String message = rawJson.getString("message");  //获取业务数据部分
                                                            String[] msgArray = message.split(",");  //指定分隔符进行字段切分
                                                            return msgArray;
                                                        }).filter((FilterFunction) array -> {
                                                            if (array.length == 9return true;
                                                            else return false;
                                                        }).map((MapFunction) array -> Row.of( array[0], array[1], array[2], array[3], array[4], array[5], array[6], array[7], array[8]))
                                                                .returns(
                                                                        Types.ROW_NAMED(
                                                                                new String[]{"client_ip""domain""time""target_ip""rcode""query_type""authority_record""add_msg""dns_ip"},
                                                                                Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
                                                                );
        /**将目标DS转化成Table对象*/
        Table table = tableEnv.fromDataStream(targetDS);

        /**创建属于paimon类型的catalog*/
        tableEnv.executeSql("CREATE CATALOG hdfs_catalog WITH ('type' = 'paimon', 'warehouse' = 'hdfs://192.168.211.106:8020/tmp/paimon')");

        /**使用创建的catalog*/
        tableEnv.executeSql("USE CATALOG hdfs_catalog");

        /**建paimon表*/
        tableEnv.executeSql("CREATE TABLE if not exists data_from_flink2paimon01 (`client_ip` STRING ,domain STRING,`time` STRING,target_ip STRING,rcode STRING,query_type STRING,authority_record STRING,add_msg STRING,dns_ip STRING,PRIMARY KEY(client_ip,domain,`time`,target_ip) NOT ENFORCED)");

        /**将 Kafka 数据源登记为当前catalog下的表*/
        tableEnv.createTemporaryView("kafka_table", table);

        /**将Kafka数据写入到 paimon 表中*/
        tableEnv.executeSql("INSERT INTO data_from_flink2paimon01 SELECT * FROM kafka_table");

    }
}

注意,代码里面为了能准确消费到我提前写入的所有Kafka数据,我用的时间戳方式来指定数据消费的位置。




2. Flink 写 Hudi 代码准备


对于 Hudi 的表结构,设计如下:


Flink 读取 Kafka 写 Hudi 表的代码如下:


package com.anryg.bigdata.hudi;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;
import java.util.Map;

/**
 * @DESC: Flink 读取 Kafka 写 Hudi 表
 * @Auther: Anryg
 * @Date: 2023/9/3 14:54
 */

public class FlinkDSFromKafka2Hudi {

    public static void main(String[] args) throws Exception {
        //获取流任务的环境变量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
                .enableCheckpointing(Long.parseLong(args[0]), CheckpointingMode.EXACTLY_ONCE); //打开checkpoint功能
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH)

        env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Hudi01"); //设置checkpoint的hdfs目录
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置checkpoint记录的保留策略


        KafkaSourceBuilder kafkaSource = KafkaSource.builder()  //获取kafka数据源
                                                    .setBootstrapServers("192.168.211.107:6667")
                                                    .setTopics("test")
                                                    .setGroupId("FlinkDSFromKafka2Hudi01")
                                                    .setStartingOffsets(OffsetsInitializer.timestamp(1703474520000L)) //确认消费模式为特定时间
                                                    .setValueOnlyDeserializer(new SimpleStringSchema());


        /**将数据源生成DataStream对象*/
        DataStreamSource kafkaDS = env.fromSource(kafkaSource.build(), WatermarkStrategy.noWatermarks(), "kafka-data");

        DataStream targetDS = kafkaDS.map((MapFunction) line -> {
                                            JSONObject rawJson = JSON.parseObject(line);
                                            String message = rawJson.getString("message");  //获取业务数据部分
                                            String[] msgArray = message.split(",");  //指定分隔符进行字段切分
                                            return msgArray;
                                        }).filter((FilterFunction) array -> {
                                            if (array.length == 9return true;
                                            else return false;
                                        }).map((MapFunction) array -> {
                                            GenericRowData rowData = new GenericRowData(9);
                                            rowData.setField(0, StringData.fromString(array[0]));
                                            rowData.setField(1, StringData.fromString(array[1]));
                                            rowData.setField(2, StringData.fromString(array[2]));
                                            rowData.setField(3, StringData.fromString(array[3]));
                                            rowData.setField(4, StringData.fromString(array[4]));
                                            rowData.setField(5, StringData.fromString(array[5]));
                                            rowData.setField(6, StringData.fromString(array[6]));
                                            rowData.setField(7, StringData.fromString(array[7]));
                                            rowData.setField(8, StringData.fromString(array[8]));
                                            return rowData;
                                        });



        Map options = new HashMap<>();
        options.put(FlinkOptions.PATH.key(), "hdfs://192.168.211.106:8020/tmp/hudi/data_from_flink2hudi01");
        //options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
        options.put("table.type", args[1]); //指定写入类型
        options.put("file.format""ORC"); //默认为Parquet
        //options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "client_ip,domain,time,target_ip");

        HoodiePipeline.Builder builder = HoodiePipeline.builder("data_from_flink2hudi01")
                .column("client_ip string")
                .column("domain string")
                .column("`time` string")
                .column("target_ip string")
                .column("rcode string")
                .column("query_type string")
                .column("authority_record string")
                .column("add_msg string")
                .column("dns_ip string")
                .pk("client_ip","domain","`time`","target_ip"//主键字段
                .options(options);

        builder.sink(targetDS, false);

        env.execute();
    }
}

本来是准备用 Scala 来写的,结果发现依然存在跟上篇文章描述的那样,Scala API 跟 Java API 在个别地方存在类型无法互用(互转)的问题(还是怀念 Spark 的 API)。




3. 第1次测试对比


测试条件如下:


运行方式
checkpoint间隔
jobmanager内存
taskmanager内存
并行度
yarn-session
1 分钟
1.6G(默认)
4G
8


PS: 本来我想着 taskmanager 直接用默认内存(1.7G),然后就用1个并行度就好了,因为考虑到我灌到Kafka的文本数据量才3.1G,但是测试后发现,不行,直接就给我OOM了




所以,才想着重新调整了参数(本着节省资源的原则,试了几次)。


3.1 Flink 写 Hudi 表


由于数据是提前就在Kafka里面的,所以启动程序之后,数据源就一定存在 backpressure,且写入端很忙。



由于是流式任务,所以即便它把我灌到 Kafka 里面的数据全部消费完了,任务也不会退出,那么为了能准确观察到程序消费完这些数据的准确时间。


我就只能全程盯着这个页面,直到已知的数据量消费完成为止,然后再记录此时程序花费的时间。


于是,我等啊等,等啊等!


一个多小时的漫长等待,24633164 这个我期盼的数字,终于出现了(Kafka里面的所有数据已经全部消费完成)。

这里有个写入特点是,前期数据写入很快,但到后期,因为考虑到 compact 的一些原因,导致数据写入速度会越来越慢

然后,任务的状态也瞬间从之前的“繁忙”,变成了“空闲”。


整个过程耗时约:82分钟


一共生成260-2=258个数据文件(排除2个元数据目录)。


注意,这里我用的是 COPY_ON_WRITE 表。


3.2  Flink 写 Paimon 表


说出来你们可能不信,当我用同样的数据源,同样的并行度写 Paimon表时,任务提交后居然是这样的:

状态一直在这里转圈圈,当我上个厕所回来之后(至少10分钟),居然还在转。


我就纳了闷了,这问题到底是出在哪了呢?关键我试了多次都不行(集群资源是够的)。


于是,我后续又把并行度分别换成了7,6,5,4,3,2,1,结果从6往后,它就都可以了。


最终,我用6个并行度,总算让它跑起来了:


虽然只有6个并行度,但确实用 Flink 写 Paimon的效率,要明显比写 Hudi 要快。


同样的,在我眼皮子都没眨几次的观察下,它用了约半个小时的时间就完成了所有的数据写入。


总耗时约:29分钟


一共生成的文件个数为:99个



小结:


从实测来看,对于当前的对比,Paimon 的写入效率跟写入效果(文件数量),写入速度是 Hudi 的2倍多,而文件数量只有 Hudi数量的一半不到


但对比 Flink 官方测试出来的,比 Hudi COW 表写入效率快12倍的结论,这里完全没有体现出来(当然,测试的数据量不一)


Flink 官方公众号给的对比结论

但是随后,奇怪的事情就来了,当我分别用读 API 读取 Paimon 的表以及Hudi的表数据量时,只有 Hudi 的数据量顺利读取出来了。


而 Paimon 表,则怎么着都读不出来(小数据量,百万级时可以)。



4. 第2次测试对比


测试条件如下:


运行方式
checkpoint间隔
jobmanager内存
taskmanager内存
并行度
yarn-session
2 分钟
1.6G(默认)
6G
8


4.1 Flink 写 Hudi 表


这个时候,除了把 checkpoint 的时间延长到2分钟之外,还把 Hudi 的表类型换成了 MERGE_ON_READ。


而至于为什么要把 taskmanger 的内存改为 6G,因为用4G跑的时候,OOM了。


咱再来看下跑完后的效果:


同样,还是我瞪着眼珠子看着它跑完的,所以这个时间掐的,那是老准了。


可以明显看到,换成 MERGE_ON_READ 表之后,这个效率快了不止一星半点,由以前的一个多小时,嗖一下,变成了:9分36秒


而生成的文件数量,也由之前的258,变成了:41个(43减去2个元数据文件夹目录)




4.2 Flink 写 Paimon 表


这次用8个并行度总算可以了,其他条件如上表所示,写完 24633164 条 Kafka 记录后,结论如下:


总耗时约:13分08秒


生成的文件数量为: 58个


发现没,这次 Hudi 已经逆袭了。


小结:


对比Paimon,Hudi 通过将表类型修改为 MERGE_ON_READ 之后,无论是写入速度,还是生成的文件个数,都要比 Paimon 表现优秀。


此次,Hudi写入效率,比 Paimon 快3分钟以上,文件生成数比 Paimon 少17个


跟 Flink 官网给的结论,出入也比较大(当然,测试的数据量不一样):




5. 第3次测试对比


测试条件如下:


运行方式
checkpoint间隔
jobmanager内存
taskmanager内存
并行度
yarn-session
3 分钟
1.6G(默认)
6G
8


5.1 Flink 写 Hudi 表


我先用 MERGE_ON_READ 的表类型进行测试,直接看结论:


跟上次比,差别不大,耗时:11分20秒


文件生成的数量也没有变化:


依然是41个


再用 COPY_ON_WRITE 表类型进行测试,再看结论:



这个时候,写入全部数耗时为:24分30秒


看生成的文件数量为90-2=88个




5.2 Flink 写 Paimon 表


直接看结论:


写入全部数据,耗时为:16分27秒(全程眼睛盯着看)。


写入数据文件数量为54个


小结:


从实测来看,Hudi 的 MOR 表无论是写入速度,还是生成的文件数量,都要比 Paimon 优秀


而 Hudi 的 COW 表,则正好相反,其无论写入速度,还是文件生成数量,则要比 Paimon 差,但这个差距,貌似在随着 checkpoint 时间的增大,逐渐在缩小。



最后


通过这次实测对比的结果来看,虽然涵盖的范围不够全,但其实已经能说明问题,再次验证了那句话:


任何关于性能测试的结论,都必须建立在具体的场景之下才有意义,否则就是在扯淡,就是在误导人。


那么,基于本次测试场景下的测试结论如下:


测试条件
数据量
Hudi表类型
Hudi耗时
Paimon耗时
Hudi文件数量
Paimon文件数据
JM:1.7G
TM: 4G
并行数:8
CK间隔:1min
24633164
COW
82分钟
29min
258
99
JM:1.7G
TM: 6G
并行数:8
CK间隔:2min

同上
MOR
9分36秒
13分08秒
41
58
JM:1.7G
TM: 6G
并行数:8
CK间隔:3min
同上MOR
11分20秒
16分27秒
41
54
JM:1.7G
TM: 6G
并行数:8
CK间隔:3min
同上COW
24分30秒
16分27秒88
54


虽然没有出现我在朋友圈说的,结论反转的情况,但也跟官方的大不一样。


当然,这可能跟 Hudi 与 Spark 的关系更为亲密有关,下次有机会,咱再用 Spark 测试一次看看(期待真正的反转)。


那么对于本次测试的结论,你怎么看?


请使用浏览器的分享功能分享到微信等