Doris跟Clickhouse亿级数据导出,谁更费劲?


来源:安瑞哥是码农


还记得上次我写过一篇,关于把 Doris 一张有着8亿+数据量的表,给导入到 Elasticsearch 里面的尝试不。


Doris写ES,Spark、Flink以及DataX,谁更拉垮?


当时我分别用了 Spark、Flink,以及 DataX,每种方式我都分别尝试了多次,均以失败告终,从每次的报错日志来看,矛头都指向 Doris,都是它在某个时间点,就突然掐断了数据库连接。


比如,Flink 跟 Spark 都会抛出以下一毛一样的异常:



后来,我又尝试着通过 Flink 读这张 Doris 表数据写 Kafka,妄想着会不会出现奇迹,但事实证明,确实是妄想。


看看下面这个截图,你应该就知道,我在这条不归路上走了多远:



这可是十多天,近300次的尝试啊,居然都不能把这8亿+数据给导完,还是那句话,尽力是真的尽力,但没用也是真的没用。


而且,让人惊悚的是,这个进程每小时都要挂一次,跟我每天起床的闹钟一样准时。


关键,根据我的观察,每次它自动重试之后,读取的都是重复数据(后面架了个 Spark 程序在消费 kafka 写 ES,结果这10多天时间里,ES 的数据量硬生生的就没有一点增长(指定了唯一id),而我的Kafka都快被撑爆了),shit。


算球,不折腾了。


想到这,既然 Doris 满足不了这个大数据量的导出,那试试用 Clickhouse(下称CK) 呢,是不是也会这么矬?


(PS,本文对应组件版本为 CK23.4.1,Spark3.2.0,Flink1.15.2)



0. 上 CK


公平起见,我找了张数据量跟之前 Doris 表一样的 CK 表做对比测试。


当前测试的CK表数据量


之前Doris表的数据量

因为之前的 Doris 是分布式表,所以,这张 CK 表,也得给弄成分片表才是:



CK 表准备好了,接下来,就开始各种尝试吧。



1. 尝试 Flink


从 Flink 官网来看,它并没有提供直接读 CK 的接口,只能通过 JDBC。


但扯淡的是,官方提供的这个所谓的 JDBC 连接包



根本就识别不了 CK 数据库,一跑就这熊样:


告诉你目前只支持这几种常见的数据库。


于是呢,我又跑到 CK 的官网,去找 Flink 怎么读取 CK 表,找了半天发现,它目前只支持 Flink 写 CK:


尼玛的。


于是接着,我又找到了个专门针对CK的 ck-jdbc ,


 碰碰运气,看行不行。


结果发现,跟之前一个鸟样,抛的同样异常。


又在网上找了很多资料,说是要改官方提供的 JDBC 源码,我在想,CK 这么主流的数据库都不支持读,那还玩个嘚啊。


果断放弃,投入 Spark 的怀抱。



2. 尝试用 Spark


虽然 Spark 也没有支持直接读 CK 的接口,但人家也有 JDBC ,而且从官网可以得知它支持的版本。


这个0.4.6, 就是上面我引入到 Flink 工程不起作用的那个包,



但是,诶! 人家在 Spark 的工程里,是奏效的。


一番调试,可以顺利跑通的,Spark 读 CK 表,写 Kafka 的代码如下:


package com.anryg.bigdata.jdbc

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * @DESC: Spark通过CK jdbc读取CK表,写 Kafka
  * @Auther: Anryg
  * @Date: 2024/3/19 15:00
  */

object SparkFromCK2Kafka {

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("SparkFromCK2Kafka")/*.setMaster("local[*]")*/
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._  /**引入隐式转换对象*/

        /**CK属性配置*/
        val propertiesCK = new Properties()
        propertiesCK.put("username","default")
        propertiesCK.put("password""")

        val rawDF = spark.read
            .option("driver""com.clickhouse.jdbc.ClickHouseDriver"//提交集群运行时,这里需要显示指定driver
            .jdbc("jdbc:ch://192.168.211.107:8123,192.168.211.108:8123,192.168.211.109:8123/cluster_db",
                "dns_logs_cluster_later",
                propertiesCK)

        /**讲读取到的数据,按照Kafka的格式要求做简单处理*/
        val targetDF = rawDF.map(row => {
                        val client_ip = row.getString(0)
                        val domain = row.getString(1)
                        val time = row.getString(2)
                        val target_ip = row.getString(3)
                        var rcode = 99
                        try {
                            rcode = row.getInt(4)
                        } catch {
                            case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
                        }
                        var query_type = 99
                        try {
                            query_type = row.getInt(5)
                        } catch {
                            case e:Exception => println(row,e)
                        }
                        val authority_record = row.getString(6)
                        val add_msg = row.getString(7)
                        val dns_ip = row.getString(8)

                      client_ip + "," + domain + "," + time + "," + target_ip + "," + rcode + "," + query_type + "," + authority_record + "," + add_msg + "," + dns_ip
                    }).toDF("value"/**key跟value字段,至少需要提供value*/

        /**数据写Kafka*/
        targetDF.write.format("kafka")
            .option("kafka.bootstrap.servers","192.168.211.107:6667")
            .option("topic""test")
            .save()
    }
}

虽然说代码逻辑非常简单,但是呢,这里面还是有2个值得注意的点:


点1:在写 jdbc 读取 CK 表的代码时,如果提交 YARN 集群运行,则需要显示指定 CK 的 driver (本地运行可以不用),否则,就会抛出下面的异常:


一开始我还以为是 jdbc 的包没打上,心想自己不应该这么没谱呀,后来才发现,包是在的,但需要把下面这行代码给加上:


所以很多时候,像这样的分布式任务,本地能正常跑,不一定代表集群也能跑得很利索,多试错,才能发现问题。


点2:写入到 Kafka 的数据,一定要遵循 Kafka 需要的格式,那就是数据必须得是key, value的形式,或者「value」的形式,也就是只能2个字段,或者1个字段,否则是写不进 Kafka 的。




3. 运行情况


通用对 Spark 的尝试,发现是可以顺利做到通过 JDBC 读取 CK 表,然后写 Kafka的。


提交到集群运行之后,经过 4 个多小时的运行,


任务运行中



任务运行完成


虽然看上去效率有点低(这么大数据量,居然默认只有一个partition),但好在,它导入成功了啊,而且整个过程没有抛任何异常,并且,数据确实是写进 Kafka 里了的。


就冲这一点,也值得点赞。



4. 总结


对于 Doris 和 CK 同是8亿+数据量的表导入到 Kafka,从上次跟这次的的测评对比来看,虽然 Doris 支持的 API 要比 CK 略丰富(多了个 Flink 的导入方式)。


但是,从「搞定问题」这个层面来看,那只是表面上的繁荣,因为我们对 Doris 通过了3种方法(Flink、Spark、DataX)的多次尝试,全都都以失败告终。


而对于 CK,虽然 Flink API 暂时来看行不通(Flink 是不是该反思一下),但是我们用 Spark,一下子就能导入成功,结果比较让人欣喜。


虽然说,通过导出 Doris 或者 CK 表的巨量数据到 Kafka (或者其他地方),这种场景在我们的日常项目并不十分常见,但是,在某些时候,比如数据迁移,数据库升级等情况下,还是很有必要的。


那么对于这次测评的结论,你心里是不是已经有了答案?

请使用浏览器的分享功能分享到微信等