上次Flink SQL写Doris的坑,这样填


上篇文章说到用Flink SQL API读取kafka数据写Doris时,数据没有写进去的问题。


后面Doris社区的人找到我,说可能是因为当时代码里面没有设置checkpoint,心想,我明明设置了呀。


保险起见,我又回过头去把代码又仔细看了一遍。


发现,有个关键设置没有enable:


也就是说,少了这个设置,数据虽然可以从kafka读取到,但就是写不进Doris。


但是呢,我其实是设置了checkpoint相关的配置的:



只不过,漏掉了这个enableCheckpoint方法的调用。


附上完整代码如下:


package com.anryg.doris

import java.time.Duration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * @DESC: 通过SQL API读取kafka数据,写入到Doris
  * @Auther: Anryg
  * @Date: 2023/8/3 11:29
  */

object FlinkSQLFromKafka2Doris {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(10000L)/**这个必须加上,否则数据写不进Doris*/

        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/checkpoint/FlinkSQLFromKafka2Doris02")
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

        val tableEnv = StreamTableEnvironment.create(env)

        /**第一步:读取kafka数据源*/
        tableEnv.executeSql(
            """
             Create table dns_logs_from_kafka(
             |client_ip STRING,
             |domain STRING,
             |`time` STRING,
             |target_ip STRING,
             |rcode STRING,
             |query_type STRING,
             |authority_record STRING,
             |add_msg STRING,
             |dns_ip STRING
             |)
             |with(
             |'connector' = 'kafka',
             |'topic' = 'test',
             |'properties.bootstrap.servers' = '192.168.211.107:6667',
             |'properties.group.id' = 'FlinkSQLFromKafka2Doris',
             |'scan.startup.mode' = 'latest-offset',
             |'value.format'='csv',                                 //确定数据源为文本格式
             |'value.csv.field-delimiter'='|'                      //确定文本数据源的分隔符
             |)
            "
"".stripMargin)

        //tableEnv.executeSql("select * from dns_logs_from_kafka").print()

        /**第二步:创建Doris映射表*/
        tableEnv.executeSql(
            """
              |CREATE TABLE dns_logs_from_flink02 (
              |`client_ip` STRING,
             |domain STRING,
             |`time` STRING,
             |target_ip STRING,
             |rcode STRING,
             |query_type STRING,
             |authority_record STRING,
             |add_msg STRING,
             |dns_ip STRING
              |)
              |    WITH (
              |      'connector' = 'doris',
              |      'fenodes' = '192.168.221.173:8030',
              |      'table.identifier' = 'example_db.dns_logs_from_flink02',
              |      'username' = 'root',
              |      'password' = '',
              |     'sink.label-prefix' = 'doris_label02'
              |)
            "
"".stripMargin)

        /**第三步:数据写入到Doris表中*/
        tableEnv.executeSql(
            """
              |INSERT INTO dns_logs_from_flink02
              |select
              |*
              |from
              |dns_logs_from_kafka
            "
"".stripMargin)

    }
}

也就是说,对于Flink而言,你的checkpoint配置的不管多么花里胡哨,如果你不提前把checkpoint这个功能给enable,依然是无效的(不知道是否有人遇到过同样的坑)


这就让我想起了spark对checkpoint的设置,相比之下就简单太多了,就这一行:


spark的checkpoint设置

可能是因为习惯了spark的编码方式,导致在写flink API时,惯性思维使然,没有注意到一些细节的部分(给自己找个借口)。


这就很尴尬。


而且上篇文章也说了,虽然写不进去数据,但是运行日志却没有任何的告警,或者报错提示,日志显示一切正常:


这个截图,就是没有enable这个checkpoint的flink日志输出。


其实同样的写外部数据库场景,用spark的structured streaming写Doris时,也是需要打开启checkpoint功能的。


只不过,对于spark来说,忘记打开这个功能的话,它会有非常明显的报错提示


但凡是个人,你就能通过这个错误提示知道问题出在哪,且知道该如何解决。


但是通过Flink SQL的api来写Doris,那不好意思,如果你不小心漏掉了某个设置而导致数据无法正常写入,那你就只能靠猜。


也不知道这个锅是该耍给Flink呢,还是Doris,我觉得呢,应该是Flink的责任更大一些。


虽然说Flink在流式处理上非常厉害,也有很多的优势,但是从我的一些场景使用过程中跟spark的对比来看,依然有很多明显的缺陷和不完善的地方。


希望社区在后续能真正地,务实地重视这些客观存在的问题,并为用户一一解决,少一些华而不实的广告营销,多做些实事。



你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...


请使用浏览器的分享功能分享到微信等