Spark实时写Doris的坑爹之旅


之前Doris文章中,关于数据写Doris表的方式,都是采用Doris内置(自带)的一些工具进行的,比如routine load、stream load(还有一些没用过)等等。


这些数据库本身自带的数据导入方式固然简单、方便,但是在实际生产中你会发发现,当你面临比较复杂一点的数据过滤场景时(比如之前文章中提到的需要根据数据源的内容过滤),这些自带工具的弊端就立马显现出来了。


这些自带工具要么不支持复杂条件的过滤,要么在导入过程中会出现一些不可预知的意外(比如上篇文章就通过routine load的导入方式,在后续导入过程中,出现了莫名其妙的字段数不符合要求的错误)。


以我的生产经验来看,利用技术成熟的计算引擎,并且完全根据数据的场景需要,通过自行编写数据过滤、处理逻辑,才是最靠谱、且最可控的数据入库方式之一


那么今天这篇文章,咱就来聊聊如何通过spark的structured streaming实时将数据源(kafka)数据入库到Doris表中。



0. 与spark的兼容性


作为当下非常火热的数据库之一,Doris无疑一定是会兼容Spark的,对于这一点,从官网中的两处描述就可以看出来。


0.1 从数据导入方式上:


第一处描述在文档的数据导入部分,该部分描述Doris可以通过在其内部配置一系列的spark环境来实现:


这个部分告诉我们,可以在Doris中创建一个spark的load任务,但是呢,同时需要在Doris中添加诸多的相关配置,以及spark依赖的运行环境。


创建这个spark的load任务本身繁琐不说,其本质上还是需要依赖spark的集群环境,yarn或者spark standalone。


所以,既然是这样,那我为什么不直接开发一个spark任务,然后把处理后的结果写入到Doris表中呢?以上这些繁琐的配置不就可以全都省了吗。


所以,我个人认为,这种数据导入方式未必能成为主流。


0.2 从生态扩展方式上


Doris支持spark的第二处描述,是在其生态扩展上,支持spark对其进行读写关联:


而且,还给出了spark对应支持的版本:


正好我当前的spark版本为3.2,scala版本也为2.12、Doris版本为1.2.3,契合版本为1.2.0以及1.1.0的spark Doris Connector(后面实践证明1.2.0的版本有明显bug,或者官方文档描述有误)


于是,我决定用这种方式来测试一下,通过spark将数据写入Doris表,看看效果如何。


PS:之前有篇文章写了如何通过spark的structured streaming方式实时写数据到Clickhouse,由于CK以及spark都没有直接提供这两者的connector,最后是通过spark支持的扩展方式(重写ForeachWriter),以及CK支持的JDBC,两者结合起来才完成的。


而Doris则直接提供了Spark的connector,所以表面上看,在跟spark的兼容性上,Doris显得更友好一些。



1. 初始试错


根据官网的要求,我在软件项目中的pom文件中,引入了1.2.0的doris connector,然后根据官网的API要求,我写的代码如下:


package com.anryg.bigdata.streaming.doris

import java.util
import java.util.concurrent.TimeUnit

import com.alibaba.fastjson.JSONObject
import com.anryg.bigdata.doris.DorisSink
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputModeTrigger}

/**
  * @DESC: 读kafka实时写Doris
  * @Auther: Anryg
  * @Date: 2023/8/1 10:12
  */

object Data2Doris {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Kafka2Doris").setMaster("local[*]")
        val spark = SparkSession.builder().config(conf).getOrCreate()

        val rawDF = spark.readStream //获取数据源
                .format("kafka"//确定数据源的来源格式
                .option("kafka.bootstrap.servers""192.168.211.107:6667"//指定kafka集群的地址,理论上写一个broker就可以了
                .option("subscribe","test")  //指定topic          
                .option("failOnDataLoss",false)  //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常
                .option("fetchOffset.numRetries",3)  //获取消息的偏移量时,最多进行的重试次数
                .option("maxOffsetsPerTrigger",1)/**用于限流,限定每次读取数据的最大条数,不指定则是as fast as possible,但是每次只取最新的数据,不取旧的*/
                .option("startingOffsets","latest")  //第一次消费时,读取kafka数据的位置
                .load()

        import spark.implicits._
        val ds = rawDF.selectExpr("CAST(value as STRING)")  //将kafka中的数据的value转为为string,原始为binary类型
                .map(row => {
                    val array = row.getAs[String]("value").split("\\|")
                    array
                }).filter(_.size == 9)
                .map(array =>(array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8)))
                .toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")

        ds.writeStream
                .option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/offset/test/kafka2doris03")
                .outputMode(OutputMode.Append())
                .format("doris")
                .option("doris.table.identifier""example_db.dns_logs_from_spark01")
                .option("doris.fenodes","192.168.221.173:8030")
                .option("user","root")
                .option("password","")
                .start()
                .awaitTermination()
    }

}


引入的pom依赖:


按理说,其实挺简单的代码逻辑,之前用同样的方式写过很多其他数据数据库(比如Elasticsearc、hive等)是非常顺利的。


坑爹情况1:


可是,就是这么个简单的代码逻辑,出幺蛾子了。



报错提示要我用writeSteam.start的方式去写数据,看到这个报错信息,我是瞬间一脸懵逼,用了这么些年的spark,难道我还不知道流式数据要用这种方式写入吗?


关键我明明是这么写的呀,而且为了保证这个写Doris表之前的代码逻辑没有问题,我还专门把处理后的数据它打印到控制台了,显示一切都正常。


在排除了代码本身的问题,以及jar包冲突的可能性之后,我只能怀疑可能是当前的doris connector版本的问题,于是就想着把这个connector的版本从1.2.0改为1.1.0试试(后面论证我的怀疑是对的)


毕竟,从官方文档来看,这个版本也是兼容我当前的spark环境的(我的spark:3.2.0 scala:2.12 Doris:1.2.3)。


坑爹情况2:


然后,再次运行发现,之前那个错误不见了,可又出现了另一个新奇的错误:


说不能识别我数据中的“.”号,可这个“.” 是我的原始数据啊,难道原始数据这个符号都不配出现吗?有点不能理解。


既然这种方式行不通,那咱不能只在一棵树上吊死不是,换一棵树行不行?



2. 再次试错


我突然在文档中看到了这句话,好像一下子抓住了一根救命稻草:


于是果断打开了这里的代码样例,而这个样例代码中,跟spark息息相关的在这块:


这个模块里的所有代码我都看了,发现一个特别有意思的地方就是,这个样例的代码实现里,根本就没有用官网给到的例子(莫非你们内部有啥矛盾)。


也就是说,我刚才费劲扒拉写的那些,写Doris部分的代码,在这个样例里压根就没有出现,人家用的是另一种写Doris的方式:


既然出现了新的写入策略,那咱就再相信它一次,用同样的方法试试看。


结果,发现我天真了,还是不行,报出如下的错误提示:


貌似在告诉我,是跟be节点在通信时候的问题,这让我一度认为是网络问题,但在我排除半天,确定网络以及集群状态完全没有问题之后,我又懵逼了。


怎么办?总不能就此放弃吧,最后只剩一招了:看源码。



3. 看源码,定位问题


不得不说,这个Doris里面有些源码写的吧,我是真的不敢恭维,官方文档该有的关键说明缺失就算了,连源代码写的也如此草率,好歹给个像样的注释啊。


最后发现问题,并解决问题的,是我读了下面这个关键的核心源代码(可以看出,这个spark的写Doris实现,其实用的就是Steam load的实现方式):


而就是这个核心代码,可以说是0注释,连作者是谁都不知


通读完这个代码之后,再结合刚才在github上官网给的样例(同样也是关键信息缺失,几乎0注释),发现一个关键函数:


那就是,它源码写到Doris表的数据结构,全都是string类型的,我就在想:这个string要怎么才能反映出我的字段跟数据的对应信息呢,schema从哪取呢


在我进一步追踪源码后发现,它用的是jackson-databind这个第三方包(并非Doris的源码)来解析这个string的:


这下我才恍然大悟,原来要写入到Doris表的数据,不能封装成跟表结构一样schema的DataFrame(要知道,这才是常规、主流的做法),而要封装成json string


我真是去你大爷的,这些关键信息在官方文档,以及源码注释中全是缺失的,你要是不深入读源码,根本发现不了


关键在以上这些debug过程中,报错的信息提示,也特别有误导性,告诉你各种奇怪的异常,但就是不告诉你问题出在哪(而且网上也根本找不到类似问题的解决办法)



4. 最终解决问题


既然问题定位到了,解决起来就容易了。


将原本封装好的带字段信息的DataFrame:


.map(array =>(array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8)))
.toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")

给改为json string:

.map(array => {
      val json = new JSONObject()
      json.put("client_ip",array(0))
      json.put("domain",array(1))
      json.put("time",array(2))
      json.put("target_ip",array(3))
      json.put("rcode",array(4))
      json.put("query_type",array(5))
      json.put("authority_record",array(6))
      json.put("add_msg",array(7))
      json.put("dns_ip",array(8))
      json.toJSONString
  })

而其他部分不变,整个完整代码如下所示:


package com.anryg.bigdata.streaming.doris

import com.alibaba.fastjson.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode}

/**
  * @DESC: spark structured streaming实时读取kafka写Doris
  * @Auther: Anryg
  * @Date: 2023/8/1 10:12
  */

object Data2Doris {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Kafka2Doris").setMaster("local[*]")
        val spark = SparkSession.builder().config(conf).getOrCreate()

        val rawDF = spark.readStream //获取数据源
                .format("kafka"//确定数据源的来源格式
                .option("kafka.bootstrap.servers""192.168.211.107:6667"//指定kafka集群的地址,理论上写一个broker就可以了
                .option("subscribe","test")  //指定topic
                //.option("group.id","test9999") /**不再用该方式来绑定offset,而是每个程序有个唯一的id,该id跟checkpointLocation绑定,虽然group.id属性在运行中依然保留,但是不再跟offset绑定*/
                .option("failOnDataLoss",false)  //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常
                .option("fetchOffset.numRetries",3)  //获取消息的偏移量时,最多进行的重试次数
                .option("maxOffsetsPerTrigger",1)/**用于限流,限定每次读取数据的最大条数,不指定则是as fast as possible,但是每次只取最新的数据,不取旧的*/
                .option("startingOffsets","earliest")  //第一次消费时,读取kafka数据的位置
                .load()

        import spark.implicits._
        val ds = rawDF.selectExpr("CAST(value as STRING)")  //将kafka中的数据的value转为为string,原始为binary类型
                .map(row => {
                    val array = row.getAs[String]("value").split("\\|")
                    array
                }).filter(_.size == 9)
                .map(array => {
                    val json = new JSONObject()
                    json.put("client_ip",array(0))
                    json.put("domain",array(1))
                    json.put("time",array(2))
                    json.put("target_ip",array(3))
                    json.put("rcode",array(4))
                    json.put("query_type",array(5))
                    json.put("authority_record",array(6))
                    json.put("add_msg",array(7))
                    json.put("dns_ip",array(8))
                    json.toJSONString /**转换为json string*/
                })

        ds.writeStream
                .option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/offset/test/kafka2doris04")
                .outputMode(OutputMode.Append())
                .format("doris")
                .option("doris.table.identifier""example_db.dns_logs_from_spark01")
                .option("doris.fenodes","192.168.221.173:8030")
                .option("user","root")
                .option("password","")
                .start()
                .awaitTermination()
    }

}

至此,数据才算顺利写到Doris表中。


(PS:代码已提交到实战项目的GitHub中)



最后


本来觉得呢,这个通过spark structured streaming写Doris表的功能应该挺简单的,官方文档明确支持,然后又给了样例代码,之前写其他数据库的经验也告诉,这个一般不存在什么难度。


结果不尝试不知道,一尝试全是惊吓,各种货不对版,各种隐藏的深坑,虐得你不要不要的,原本以为2-3个小时就可写完这篇文章,结果花了我接近2天时间。


最开始是提供的1.2.0版本的connector根本没法用,至少跟我当下的spark版本是没法匹配的(但官网说老匹配了);


其次是更换了低版本的connector之后,又出现了很多其他意想不到的幺蛾子;


最后把我逼急了,只能去看源码,完了源码还没注释,一番低效的查找后,最终才能找到问题的原因


所以再次说明,作为官方文档或者源代码,还是要写严谨一点,关键信息一定不要缺失,否则对于使用者来说,可能是灾难性的


当然了,如果我的这趟趟坑之旅能帮到你,那也不枉费我的一番心血。




你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...


请使用浏览器的分享功能分享到微信等