来源:安瑞哥是码农
之所以想做这么个对比呢,原因在于上周我看到 Flink 官方公众号发了篇文章,其中有一幅 Paimon 跟 Hudi 这两款数据湖产品的写入效率对比图,甚是扎眼。
引用自 Flink 官方公众号
一下子就吸引了我的注意力,从这个对比图来看,Hudi 那是全方位落败,表现得像一个「扶不起的阿斗」。
正当我好奇这个测试结论是基于什么样场景下测试出来时,扒拉遍了全文的内容发现,它好像并没有打算告诉我具体的测试细节,而是直接向你宣布:喏... 这就是我们的结论,就是这么牛逼。
但你们知道,这种没有具体场景交代(数据源什么样的呀?表结构什么样的呀?写入时资源配置如何呀?并行度怎么设置的呀?等等),以及没有过程描述的测试结论,我肯定是不信的。
于是我一冲动,就把对这个测试结论的怀疑,发了条朋友圈,结果很快就引来了阿里云技术专家的注意,问我为啥会怀疑他们的测试结论,我就吧啦吧啦把我的观点说了一遍。
但光怀疑没用啊,为了证实我的以上疑惑,以及表示对人家测试结论的尊重。
我决定,亲自来测一遍,并且,全过程向你们曝光。
(PS:本次测试基于Flink1.15、Hadoop3.1、Paimon0.6、Hudi0.13 展开)
0. 测试前准备
先说数据来源:
既然咱是做写入测试,那么首先得拎清楚数据源从哪来,而且这个数据源的选择,在我看来是有讲究的。
比如,它的读取效率,一定不能成为整个数据处理流程的瓶颈,也就是说,上游数据源的流速,必须要大于下游的数据写入速度,这样,才能真实反映出写入效率,否则,这个测试就是在扯淡。
那为了保证这一点,我提前将一个大文件的数据,先全部灌入到Kafka的 topic 中,然后用 Flink 来消费这个topic(用它的默认速度),这样一来,就不存在数据源读取瓶颈问题。
至于为什么?磁盘的顺序读取效率(读Kafka),一定远大于根据特定字段进行索引的写入效率(写 Paimon 或者 Hudi 表)。
至于数据内容呢,还是我之前的上网行为日志,有9个字段,文件大小为:3.1G,数据行数为:24633164。
1. Flink 写 Paimon 代码准备
对于 Paimon 表,表结构如下:
CREATE TABLE if not exists data_from_flink2paimon01 (
`client_ip` STRING ,
domain STRING,
`time` STRING,
target_ip STRING,
rcode STRING,
query_type STRING,
authority_record STRING,
add_msg STRING,
dns_ip STRING,
PRIMARY KEY(client_ip,domain,`time`,target_ip) NOT ENFORCED
)
为了方便测试,我全部都用 String 类型(别学我)。
Flink 读取 Kafka 写 Paimon 表的代码如下:
package com.anryg.bigdata.paimon;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @DESC: 通过 Flink DS 跟 SQL 混合模式读取 kafka 数据写入 Paimon 数据湖
* @Auther: Anryg
* @Date: 2023/12/19 20:36
*/
public class FlinkFromKafka2Paimon {
public static void main(String[] args) {
//获取流任务的环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); //打开checkpoint功能
env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Paimon01"); //设置checkpoint的hdfs目录
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置checkpoint记录的保留策略
/**创建flink table环境对象*/
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
/**添加 Kafka 数据源*/
KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder() //一定注意这里的类型指定,否则后面的new SimpleStringSchema()会报错
.setBootstrapServers("192.168.211.107:6667")
.setTopics("test")
.setGroupId("FlinkFromKafka2Paimon01")
.setStartingOffsets(OffsetsInitializer.timestamp(1703474520000L)) //确认消费模式为特定时间
.setValueOnlyDeserializer(new SimpleStringSchema());
/**将数据源生成DataStream对象*/
DataStreamSource kafkaDS = env.fromSource(kafkaSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "kafka-data");
/**将原始DS经过处理后,再添加schema*/
SingleOutputStreamOperator targetDS = kafkaDS.map((MapFunction) line -> {
JSONObject rawJson = JSON.parseObject(line);
String message = rawJson.getString("message"); //获取业务数据部分
String[] msgArray = message.split(","); //指定分隔符进行字段切分
return msgArray;
}).filter((FilterFunction) array -> {
if (array.length == 9) return true;
else return false;
}).map((MapFunction) array -> Row.of( array[0], array[1], array[2], array[3], array[4], array[5], array[6], array[7], array[8]))
.returns(
Types.ROW_NAMED(
new String[]{"client_ip", "domain", "time", "target_ip", "rcode", "query_type", "authority_record", "add_msg", "dns_ip"},
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
);
/**将目标DS转化成Table对象*/
Table table = tableEnv.fromDataStream(targetDS);
/**创建属于paimon类型的catalog*/
tableEnv.executeSql("CREATE CATALOG hdfs_catalog WITH ('type' = 'paimon', 'warehouse' = 'hdfs://192.168.211.106:8020/tmp/paimon')");
/**使用创建的catalog*/
tableEnv.executeSql("USE CATALOG hdfs_catalog");
/**建paimon表*/
tableEnv.executeSql("CREATE TABLE if not exists data_from_flink2paimon01 (`client_ip` STRING ,domain STRING,`time` STRING,target_ip STRING,rcode STRING,query_type STRING,authority_record STRING,add_msg STRING,dns_ip STRING,PRIMARY KEY(client_ip,domain,`time`,target_ip) NOT ENFORCED)");
/**将 Kafka 数据源登记为当前catalog下的表*/
tableEnv.createTemporaryView("kafka_table", table);
/**将Kafka数据写入到 paimon 表中*/
tableEnv.executeSql("INSERT INTO data_from_flink2paimon01 SELECT * FROM kafka_table");
}
}
注意,代码里面为了能准确消费到我提前写入的所有Kafka数据,我用的时间戳方式来指定数据消费的位置。
2. Flink 写 Hudi 代码准备
对于 Hudi 的表结构,设计如下:
Flink 读取 Kafka 写 Hudi 表的代码如下:
package com.anryg.bigdata.hudi;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import java.util.HashMap;
import java.util.Map;
/**
* @DESC: Flink 读取 Kafka 写 Hudi 表
* @Auther: Anryg
* @Date: 2023/9/3 14:54
*/
public class FlinkDSFromKafka2Hudi {
public static void main(String[] args) throws Exception {
//获取流任务的环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(Long.parseLong(args[0]), CheckpointingMode.EXACTLY_ONCE); //打开checkpoint功能
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Hudi01"); //设置checkpoint的hdfs目录
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置checkpoint记录的保留策略
KafkaSourceBuilder kafkaSource = KafkaSource.builder() //获取kafka数据源
.setBootstrapServers("192.168.211.107:6667")
.setTopics("test")
.setGroupId("FlinkDSFromKafka2Hudi01")
.setStartingOffsets(OffsetsInitializer.timestamp(1703474520000L)) //确认消费模式为特定时间
.setValueOnlyDeserializer(new SimpleStringSchema());
/**将数据源生成DataStream对象*/
DataStreamSource kafkaDS = env.fromSource(kafkaSource.build(), WatermarkStrategy.noWatermarks(), "kafka-data");
DataStream targetDS = kafkaDS.map((MapFunction) line -> {
JSONObject rawJson = JSON.parseObject(line);
String message = rawJson.getString("message"); //获取业务数据部分
String[] msgArray = message.split(","); //指定分隔符进行字段切分
return msgArray;
}).filter((FilterFunction) array -> {
if (array.length == 9) return true;
else return false;
}).map((MapFunction) array -> {
GenericRowData rowData = new GenericRowData(9);
rowData.setField(0, StringData.fromString(array[0]));
rowData.setField(1, StringData.fromString(array[1]));
rowData.setField(2, StringData.fromString(array[2]));
rowData.setField(3, StringData.fromString(array[3]));
rowData.setField(4, StringData.fromString(array[4]));
rowData.setField(5, StringData.fromString(array[5]));
rowData.setField(6, StringData.fromString(array[6]));
rowData.setField(7, StringData.fromString(array[7]));
rowData.setField(8, StringData.fromString(array[8]));
return rowData;
});
Map options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), "hdfs://192.168.211.106:8020/tmp/hudi/data_from_flink2hudi01");
//options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
options.put("table.type", args[1]); //指定写入类型
options.put("file.format", "ORC"); //默认为Parquet
//options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "client_ip,domain,time,target_ip");
HoodiePipeline.Builder builder = HoodiePipeline.builder("data_from_flink2hudi01")
.column("client_ip string")
.column("domain string")
.column("`time` string")
.column("target_ip string")
.column("rcode string")
.column("query_type string")
.column("authority_record string")
.column("add_msg string")
.column("dns_ip string")
.pk("client_ip","domain","`time`","target_ip") //主键字段
.options(options);
builder.sink(targetDS, false);
env.execute();
}
}
本来是准备用 Scala 来写的,结果发现依然存在跟上篇文章描述的那样,Scala API 跟 Java API 在个别地方存在类型无法互用(互转)的问题(还是怀念 Spark 的 API)。
3. 第1次测试对比
测试条件如下:
运行方式 | checkpoint间隔 | jobmanager内存 | taskmanager内存 | 并行度 |
yarn-session | 1 分钟 | 1.6G(默认) | 4G | 8 |
PS: 本来我想着 taskmanager 直接用默认内存(1.7G),然后就用1个并行度就好了,因为考虑到我灌到Kafka的文本数据量才3.1G,但是测试后发现,不行,直接就给我OOM了。
所以,才想着重新调整了参数(本着节省资源的原则,试了几次)。
3.1 Flink 写 Hudi 表
由于数据是提前就在Kafka里面的,所以启动程序之后,数据源就一定存在 backpressure,且写入端很忙。
由于是流式任务,所以即便它把我灌到 Kafka 里面的数据全部消费完了,任务也不会退出,那么为了能准确观察到程序消费完这些数据的准确时间。
我就只能全程盯着这个页面,直到已知的数据量消费完成为止,然后再记录此时程序花费的时间。
于是,我等啊等,等啊等!
一个多小时的漫长等待,24633164 这个我期盼的数字,终于出现了(Kafka里面的所有数据已经全部消费完成)。
这里有个写入特点是,前期数据写入很快,但到后期,因为考虑到 compact 的一些原因,导致数据写入速度会越来越慢
然后,任务的状态也瞬间从之前的“繁忙”,变成了“空闲”。
整个过程耗时约:82分钟。
一共生成260-2=258个数据文件(排除2个元数据目录)。
注意,这里我用的是 COPY_ON_WRITE 表。
3.2 Flink 写 Paimon 表
说出来你们可能不信,当我用同样的数据源,同样的并行度写 Paimon表时,任务提交后居然是这样的:
状态一直在这里转圈圈,当我上个厕所回来之后(至少10分钟),居然还在转。
我就纳了闷了,这问题到底是出在哪了呢?关键我试了多次都不行(集群资源是够的)。
于是,我后续又把并行度分别换成了7,6,5,4,3,2,1,结果从6往后,它就都可以了。
最终,我用6个并行度,总算让它跑起来了:
虽然只有6个并行度,但确实用 Flink 写 Paimon的效率,要明显比写 Hudi 要快。
同样的,在我眼皮子都没眨几次的观察下,它用了约半个小时的时间就完成了所有的数据写入。
总耗时约:29分钟。
一共生成的文件个数为:99个。
小结:
从实测来看,对于当前的对比,Paimon 的写入效率跟写入效果(文件数量),写入速度是 Hudi 的2倍多,而文件数量只有 Hudi数量的一半不到。
但对比 Flink 官方测试出来的,比 Hudi COW 表写入效率快12倍的结论,这里完全没有体现出来(当然,测试的数据量不一样)。
Flink 官方公众号给的对比结论
但是随后,奇怪的事情就来了,当我分别用读 API 读取 Paimon 的表以及Hudi的表数据量时,只有 Hudi 的数据量顺利读取出来了。
而 Paimon 表,则怎么着都读不出来(小数据量,百万级时可以)。
4. 第2次测试对比
测试条件如下:
运行方式 | checkpoint间隔 | jobmanager内存 | taskmanager内存 | 并行度 |
yarn-session | 2 分钟 | 1.6G(默认) | 6G | 8 |
4.1 Flink 写 Hudi 表
这个时候,除了把 checkpoint 的时间延长到2分钟之外,还把 Hudi 的表类型换成了 MERGE_ON_READ。
而至于为什么要把 taskmanger 的内存改为 6G,因为用4G跑的时候,OOM了。
咱再来看下跑完后的效果:
同样,还是我瞪着眼珠子看着它跑完的,所以这个时间掐的,那是老准了。
可以明显看到,换成 MERGE_ON_READ 表之后,这个效率快了不止一星半点,由以前的一个多小时,嗖一下,变成了:9分36秒。
而生成的文件数量,也由之前的258,变成了:41个(43减去2个元数据文件夹目录)。
4.2 Flink 写 Paimon 表
这次用8个并行度总算可以了,其他条件如上表所示,写完 24633164 条 Kafka 记录后,结论如下:
总耗时约:13分08秒。
生成的文件数量为: 58个。
发现没,这次 Hudi 已经逆袭了。
小结:
对比Paimon,Hudi 通过将表类型修改为 MERGE_ON_READ 之后,无论是写入速度,还是生成的文件个数,都要比 Paimon 表现优秀。
此次,Hudi写入效率,比 Paimon 快3分钟以上,文件生成数比 Paimon 少17个。
跟 Flink 官网给的结论,出入也比较大(当然,测试的数据量不一样):
5. 第3次测试对比
测试条件如下:
运行方式 | checkpoint间隔 | jobmanager内存 | taskmanager内存 | 并行度 |
yarn-session | 3 分钟 | 1.6G(默认) | 6G | 8 |
5.1 Flink 写 Hudi 表
我先用 MERGE_ON_READ 的表类型进行测试,直接看结论:
跟上次比,差别不大,耗时:11分20秒。
文件生成的数量也没有变化:
依然是41个。
再用 COPY_ON_WRITE 表类型进行测试,再看结论:
这个时候,写入全部数耗时为:24分30秒。
看生成的文件数量为90-2=88个。
5.2 Flink 写 Paimon 表
直接看结论:
写入全部数据,耗时为:16分27秒(全程眼睛盯着看)。
写入数据文件数量为54个。
小结:
从实测来看,Hudi 的 MOR 表无论是写入速度,还是生成的文件数量,都要比 Paimon 优秀。
而 Hudi 的 COW 表,则正好相反,其无论写入速度,还是文件生成数量,则要比 Paimon 差,但这个差距,貌似在随着 checkpoint 时间的增大,逐渐在缩小。
最后
通过这次实测对比的结果来看,虽然涵盖的范围不够全,但其实已经能说明问题,再次验证了那句话:
任何关于性能测试的结论,都必须建立在具体的场景之下才有意义,否则就是在扯淡,就是在误导人。
那么,基于本次测试场景下的测试结论如下:
测试条件 | 数据量 | Hudi表类型 | Hudi耗时 | Paimon耗时 | Hudi文件数量 | Paimon文件数据 |
JM:1.7G TM: 4G 并行数:8 CK间隔:1min | 24633164 | COW | 82分钟 | 29min | 258 | 99 |
JM:1.7G TM: 6G 并行数:8 CK间隔:2min | 同上 | MOR | 9分36秒 | 13分08秒 | 41 | 58 |
JM:1.7G TM: 6G 并行数:8 CK间隔:3min | 同上 | MOR | 11分20秒 | 16分27秒 | 41 | 54 |
JM:1.7G TM: 6G 并行数:8 CK间隔:3min | 同上 | COW | 24分30秒 | 16分27秒 | 88 | 54 |
虽然没有出现我在朋友圈说的,结论反转的情况,但也跟官方的大不一样。
当然,这可能跟 Hudi 与 Spark 的关系更为亲密有关,下次有机会,咱再用 Spark 测试一次看看(期待真正的反转)。
那么对于本次测试的结论,你怎么看?