上篇文章说到用Flink SQL API读取kafka数据写Doris时,数据没有写进去的问题。
后面Doris社区的人找到我,说可能是因为当时代码里面没有设置checkpoint,心想,我明明设置了呀。
保险起见,我又回过头去把代码又仔细看了一遍。
发现,有个关键设置没有enable:

也就是说,少了这个设置,数据虽然可以从kafka读取到,但就是写不进Doris。
但是呢,我其实是设置了checkpoint相关的配置的:

只不过,漏掉了这个enableCheckpoint方法的调用。
附上完整代码如下:
package com.anryg.doris
import java.time.Duration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* @DESC: 通过SQL API读取kafka数据,写入到Doris
* @Auther: Anryg
* @Date: 2023/8/3 11:29
*/
object FlinkSQLFromKafka2Doris {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000L)/**这个必须加上,否则数据写不进Doris*/
env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/checkpoint/FlinkSQLFromKafka2Doris02")
env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val tableEnv = StreamTableEnvironment.create(env)
/**第一步:读取kafka数据源*/
tableEnv.executeSql(
"""
Create table dns_logs_from_kafka(
|client_ip STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
|with(
|'connector' = 'kafka',
|'topic' = 'test',
|'properties.bootstrap.servers' = '192.168.211.107:6667',
|'properties.group.id' = 'FlinkSQLFromKafka2Doris',
|'scan.startup.mode' = 'latest-offset',
|'value.format'='csv', //确定数据源为文本格式
|'value.csv.field-delimiter'='|' //确定文本数据源的分隔符
|)
""".stripMargin)
//tableEnv.executeSql("select * from dns_logs_from_kafka").print()
/**第二步:创建Doris映射表*/
tableEnv.executeSql(
"""
|CREATE TABLE dns_logs_from_flink02 (
|`client_ip` STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
| WITH (
| 'connector' = 'doris',
| 'fenodes' = '192.168.221.173:8030',
| 'table.identifier' = 'example_db.dns_logs_from_flink02',
| 'username' = 'root',
| 'password' = '',
| 'sink.label-prefix' = 'doris_label02'
|)
""".stripMargin)
/**第三步:数据写入到Doris表中*/
tableEnv.executeSql(
"""
|INSERT INTO dns_logs_from_flink02
|select
|*
|from
|dns_logs_from_kafka
""".stripMargin)
}
}
也就是说,对于Flink而言,你的checkpoint配置的不管多么花里胡哨,如果你不提前把checkpoint这个功能给enable,依然是无效的(不知道是否有人遇到过同样的坑)。
这就让我想起了spark对checkpoint的设置,相比之下就简单太多了,就这一行:

spark的checkpoint设置
可能是因为习惯了spark的编码方式,导致在写flink API时,惯性思维使然,没有注意到一些细节的部分(给自己找个借口)。
这就很尴尬。
而且上篇文章也说了,虽然写不进去数据,但是运行日志却没有任何的告警,或者报错提示,日志显示一切正常:

这个截图,就是没有enable这个checkpoint的flink日志输出。
其实同样的写外部数据库场景,用spark的structured streaming写Doris时,也是需要打开启checkpoint功能的。
只不过,对于spark来说,忘记打开这个功能的话,它会有非常明显的报错提示:

但凡是个人,你就能通过这个错误提示知道问题出在哪,且知道该如何解决。
但是通过Flink SQL的api来写Doris,那不好意思,如果你不小心漏掉了某个设置而导致数据无法正常写入,那你就只能靠猜。
也不知道这个锅是该耍给Flink呢,还是Doris,我觉得呢,应该是Flink的责任更大一些。
虽然说Flink在流式处理上非常厉害,也有很多的优势,但是从我的一些场景使用过程中跟spark的对比来看,依然有很多明显的缺陷和不完善的地方。
希望社区在后续能真正地,务实地重视这些客观存在的问题,并为用户一一解决,少一些华而不实的广告营销,多做些实事。
