Flink写Doris又有哪些坑呢


上一篇写了Spark如何通过structured streaming消费kafka的数据实时写Doris表,表面上看挺简单的一个逻辑,但是却经历了一波三折,好在,最后通过分析源码搞定了。


那么当我后续把spark换成flink之后,会不会要顺利一点呢?


事实证明并没有。


无论是官方文档给的样例,还是GitHub提供的demo,都没有办法直接解决当前我的问题,也还是只能不断试错,官网和源码满地找的方式,踉踉跄跄地把问题给解决了。


那么这篇文章,咱就来看看把spark换成flink之后,还是基于相同的业务场景,又会遇到哪些意想不到的坑呢?



0. 先看兼容性


Flink作为当下炙手可热的计算引擎之一,Doris自然会提供对其的兼容:


同样,官方文档也给出了对Flink不同版本的兼容情况:


而我当前软件工程里的Flink版本为1.15.2、Java版本也为1.8、对应的Doris版本为1.2.3,于是正好对应Doris的1.2.0版本的connector。



1. 环境准备


官方文档描述了一堆Flink写Doris前的准备工作,但是读完之后,在我看来,大部分其实是没有必要的(原因跟上一篇通过spark写Doris一样)。


因为我是通过idea开发的代码,对于开发环境来说,理论上我只需要额外引入对应的flink maven依赖,doris connector maven依赖就够了。


因为flink的环境我已经具备现成的了,那么暂时就只需要引入connector依赖就可以:


而至于集群运行环境,我有yarn,理论上也就够了,其他一切花里胡哨的配置全都不需要(当然,你最好要清楚为啥)。



2. SQL API第一次试错


从官方文档描述来看,Doris即支持Flink的Table API(SQL API),也支持其普通的DataStream API,这个就有点类似spark的Dataset(DataFrame)和RDD。



由于SQL API的语法更加简洁和便利,于是,我决定先用这个API来试试,代码如下:


package com.anryg.doris

import java.time.Duration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * @DESC: 通过SQL API读取kafka数据,写入到Doris
  * @Auther: Anryg
  * @Date: 2023/8/3 11:29
  */

object FlinkSQLFromKafka2Doris {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/checkpoint/FlinkSQLFromKafka2Doris")
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH)

        val tableEnv = StreamTableEnvironment.create(env)

        /**第一步:读取kafka数据源*/
        tableEnv.executeSql(
            """
             Create table dns_logs_from_kafka(
             |client_ip STRING,
             |domain STRING,
             |`time` STRING,
             |target_ip STRING,
             |rcode STRING,
             |query_type STRING,
             |authority_record STRING,
             |add_msg STRING,
             |dns_ip STRING
             |)
             |with(
             |'connector' = 'kafka',
             |'topic' = 'test',
             |'properties.bootstrap.servers' = '192.168.211.107:6667',
             |'properties.group.id' = 'FlinkSQLFromKafka2Doris',
             |'scan.startup.mode' = 'latest-offset',
             |'value.format'='csv',                                 //确定数据源为文本格式
             |'value.csv.field-delimiter'='|'                      //确定文本数据源的分隔符
             |)
            "
"".stripMargin)

        //tableEnv.executeSql("select * from dns_logs_from_kafka").print()

        /**第二步:创建Doris映射表*/
        tableEnv.executeSql(
            """
              |CREATE TABLE dns_logs_from_flink02 (
              |`client_ip` STRING
              |)
              |    WITH (
              |      'connector' = 'doris',
              |      'fenodes' = '192.168.221.173:8030',
              |      'table.identifier' = 'example_db.dns_logs_from_flink02',
              |      'username' = 'root',
              |      'password' = '',
              |      'sink.properties.format' = 'json',
              |      'sink.properties.read_json_by_line' = 'true',
              |     'sink.label-prefix' = 'doris_label'
              |)
            "
"".stripMargin)

        /**第三步:数据写入到Doris表中*/
        tableEnv.executeSql(
            """
              |INSERT INTO dns_logs_from_flink02
              |select
              |client_ip
              |from
              |dns_logs_from_kafka
            "
"".stripMargin)

    }
}

首先,我这肯定是严格按照官方文档的要求写的,而且还变着花样,改了多处设置,甚至为了尽可能快速解决问题,我都委屈到把要写入到Doris表的数据,减少到了一个字段。


先后尝试了至少10次以上,日志debug的模式也打开了(需要额外配置log4j2.xml文件),甚至在读取完kafka之后,我还专门把读取结果打印出来,确保了读取kafka这一步没有问题。


但是,即便这样,我还是没法定位出问题,这个奇葩的原因在于,即便我打开了日志的debug设置,程序依然没有任何报错提示,但数据就是写不进去


再次查看官方文档,我看它提到了需要在FE上做这么一个设置,心想,莫非是因为这一步没做吗?


于是赶紧把我的3台FE都增加了这个配置,然后重启集群。


但,然并卵,依然写不进数据,也依然没有任何报错提示。


那我就没有办法了,现在就是想找出问题,都不知道从哪找起,就是这么气人(当然,也可能是因为我水平不行)。


既然这样,那咱就换一棵歪脖子树试试(不想折腾太久了)。



3.  DataStream第二次试错


除了SQL API,官网还给出了DataStream API的样例,既然这样,那就试一下吧。


我根据这个官网样例,写出来这样的代码:


package com.anryg.doris

import java.util.Properties

import com.alibaba.fastjson.JSONObject
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.doris.flink.cfg.DorisOptions
import org.apache.doris.flink.sink.DorisSink
import org.apache.doris.flink.cfg.DorisExecutionOptions
import org.apache.doris.flink.cfg.DorisReadOptions
import org.apache.doris.flink.sink.writer.{DorisRecordSerializerSimpleStringSerializer}
import org.apache.flink.api.common.eventtime.WatermarkStrategy

/**
  * @DESC: 通过DataStream API读取kafka数据,写入到Doris
  * @Auther: Anryg
  * @Date: 2023/8/3 17:05
  */

object FlinkDSFromKafka2Doris {

    def main(args: Array[String]): Unit = {
        //获取流任务的环境变量
        val env = StreamExecutionEnvironment.getExecutionEnvironment
                .enableCheckpointing(10000CheckpointingMode.EXACTLY_ONCE//打开checkpoint功能

        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Doris"//设置checkpoint的hdfs目录
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略

        val kafkaSource = KafkaSource.builder()  //获取kafka数据源
                .setBootstrapServers("192.168.211.107:6667")
                .setTopics("test")
                .setGroupId("FlinkDSFromKafka2Doris")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build()

        import org.apache.flink.streaming.api.scala._  //引入隐式转换函数,需要将从kafka读出来的数据做隐式转换
        
        /**配置Doris相关参数*/
        val dorisBuilder = DorisOptions.builder  
        dorisBuilder.setFenodes("192.168.221.173:8030")
                .setTableIdentifier("example_db.dns_logs_from_flink01")
                .setUsername("root")
                .setPassword("")

        /**确定数据写入到Doris的方式,即stream load方式*/
        val executionBuilder = DorisExecutionOptions.builder
        executionBuilder.setLabelPrefix("flink-doris-label03"//streamload label prefix

        /**确定Doris的Sink数据方式*/
        val builder = DorisSink.builder[String//注意这个数据类型String 需要加上
        builder.setDorisReadOptions(DorisReadOptions.builder.build) //确定数据读取策略
                .setDorisExecutionOptions(executionBuilder.build) //确定数据执行策略
                .setSerializer(new SimpleStringSerializer())  //确定数据数据序列化(写入)类型
                .setDorisOptions(dorisBuilder.build) //添加Doris配置

        /**读取数据源生成DataStream对象*/
        val kafkaDS = env.fromSource[String](kafkaSource, WatermarkStrategy.noWatermarks(),"kafka-data"

        /**对读取的数据进行处理*/
        val targetDS = kafkaDS.map(line => {
            val array = line.split("\\|")
            array
        }).filter(_.length == 9)
          .map(array => {
              val json = new JSONObject()  /**将数据封装为json对象*/
              json.put("client_ip",array(0))
              json.put("domain",array(1))
              json.put("time",array(2))
              json.put("target_ip",array(3))
              json.put("rcode",array(4))
              json.put("query_type",array(5))
              json.put("authority_record",array(6))
              json.put("add_msg",array(7))
              json.put("dns_ip",array(8))
              json.toJSONString /**转换为json string*/
          })

        targetDS.sinkTo(builder.build())

        env.execute("FlinkDSFromKafka2Doris")
    }
}

而且吸取了上次用spark写Doris的惨痛经历,我把数据格式都转成json string了。


按理说,应该没什么毛病才对,可是一运行呢,又报错了:


不过好消息是,它终于给我错误提示了,对比上面用SQL api时,没有任何报错的一脸懵逼要强多了。


通过给的这个错误连接,我顺利打开了其中的内容,是这样的:


告诉我说,现在这个数据处理格式(json string)还是不符合要求,这下又有点懵逼了。


心想,这个通过计算引擎写Doris的数据格式,还真有点让人捉摸不透,上一篇spark写Doris时,是必须要把数据转为json string格式的,但到了Flink这里,又不行了


那咋办呢?又只能看源码了。



4. 查看源码,定位问题


既然要决定看源码,从哪开始看呢?


那就是找到它的数据写入逻辑,或者写入前配置,因为Flink写Doris的方式是Steam Load,那么就找到这个对应的类,而这个相关的类在这里:


 怎么说呢,又是一个几乎0注释的源码。


通读这个类之后呢,发现了一个关键函数:



这里面清清楚楚的定义了数据的格式为json string类型,而且从函数命名来看,说它是一个默认的设置方式(函数名为defaults嘛)。


但是,奇葩的是,官网给的这个获取该执行对象的代码,却并没有读到这些所谓的默认配置



看到这里,我似乎一下子就找到了原因。



5. 解决问题


原因既然找到了,是因为源码那个数据格式的设置没有生效。


那么咱就只能手动把这个设置给加上,怎么加呢,这样:



加完之后,整个代码逻辑就是这样的:


package com.anryg.doris

import java.util.Properties

import com.alibaba.fastjson.JSONObject
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.doris.flink.cfg.DorisOptions
import org.apache.doris.flink.sink.DorisSink
import org.apache.doris.flink.cfg.DorisExecutionOptions
import org.apache.doris.flink.cfg.DorisReadOptions
import org.apache.doris.flink.sink.writer.{DorisRecordSerializerSimpleStringSerializer}
import org.apache.flink.api.common.eventtime.WatermarkStrategy


/**
  * @DESC: 通过DataStream API读取kafka数据,写入到Doris
  * @Auther: Anryg
  * @Date: 2023/8/3 17:05
  */

object FlinkDSFromKafka2Doris {

    def main(args: Array[String]): Unit = {
        //获取流任务的环境变量
        val env = StreamExecutionEnvironment.getExecutionEnvironment
                .enableCheckpointing(10000CheckpointingMode.EXACTLY_ONCE//打开checkpoint功能

        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Doris"//设置checkpoint的hdfs目录
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略

        val kafkaSource = KafkaSource.builder()  //获取kafka数据源
                .setBootstrapServers("192.168.211.107:6667")
                .setTopics("test")
                .setGroupId("FlinkDSFromKafka2Doris")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build()

        import org.apache.flink.streaming.api.scala._  //引入隐式转换函数,需要将从kafka读出来的数据做隐式转换

        /**配置Doris相关参数*/
        val dorisBuilder = DorisOptions.builder
        dorisBuilder.setFenodes("192.168.221.173:8030")
                .setTableIdentifier("example_db.dns_logs_from_flink01")
                .setUsername("root")
                .setPassword("")
        
        /**确定数据写入到Doris的方式,即stream load方式*/
        val executionBuilder = DorisExecutionOptions.builder
        executionBuilder.setLabelPrefix("flink-doris-label03"//streamload label prefix

        /**添加额外的数据格式设置,否则写不进去*/
        val properties = new Properties
        properties.setProperty("format""json")
        properties.setProperty("read_json_by_line""true")

        executionBuilder.setStreamLoadProp(properties)

        /**确定Doris的Sink数据方式*/
        val builder = DorisSink.builder[String//注意这个数据类型String 需要加上
        builder.setDorisReadOptions(DorisReadOptions.builder.build) //确定数据读取策略
                .setDorisExecutionOptions(executionBuilder.build) //确定数据执行策略
                .setSerializer(new SimpleStringSerializer())  //确定数据数据序列化(写入)类型
                .setDorisOptions(dorisBuilder.build) //添加Doris配置

        /**读取数据源生成DataStream对象*/
        val kafkaDS = env.fromSource[String](kafkaSource, WatermarkStrategy.noWatermarks(),"kafka-data")


        /**对读取的数据进行处理*/
        val targetDS = kafkaDS.map(line => {
            val array = line.split("\\|")
            array
        }).filter(_.length == 9)
          .map(array => {
              val json = new JSONObject()  /**将数据封装为json对象*/
              json.put("client_ip",array(0))
              json.put("domain",array(1))
              json.put("time",array(2))
              json.put("target_ip",array(3))
              json.put("rcode",array(4))
              json.put("query_type",array(5))
              json.put("authority_record",array(6))
              json.put("add_msg",array(7))
              json.put("dns_ip",array(8))
              json.toJSONString /**转换为json string*/
          })

        targetDS.sinkTo(builder.build())

        env.execute("FlinkDSFromKafka2Doris")
    }
}

官网的例子给的是batch模式,我这里是流模式


再执行,之前一直显示数据量为0的表记录,一下子就蹭蹭往上涨了。




最后


跟上次用spark写Doris一样,这次用Flink写Doris,也一样不顺利,磕磕碰碰一路,好在汲取了上次的教训,最终解决问题的效率提高了不少。


这里我想表达一点的就是,官方文档的详尽、严谨程度,源码注释的详细程度,在很大程度上决定了使用者对一项新技术的学习成本


其次就是,只要对出现的问题,框架能够抛出足够详细(一般详细也成)的错误提示,使用者通过对逻辑关系的梳理,也能找到问题的原因并解决。


但是,就怕那种虽然有问题,但是偏偏还不告诉你问题原因的(不报错,不提示),这就让人很惆怅。


最后,希望这篇文章对你有用。



你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...


请使用浏览器的分享功能分享到微信等