来源:安瑞哥是码农
还记得上次我写过一篇,关于把 Doris 一张有着8亿+数据量的表,给导入到 Elasticsearch 里面的尝试不。
当时我分别用了 Spark、Flink,以及 DataX,每种方式我都分别尝试了多次,均以失败告终,从每次的报错日志来看,矛头都指向 Doris,都是它在某个时间点,就突然掐断了数据库连接。
比如,Flink 跟 Spark 都会抛出以下一毛一样的异常:
后来,我又尝试着通过 Flink 读这张 Doris 表数据写 Kafka,妄想着会不会出现奇迹,但事实证明,确实是妄想。
看看下面这个截图,你应该就知道,我在这条不归路上走了多远:
这可是十多天,近300次的尝试啊,居然都不能把这8亿+数据给导完,还是那句话,尽力是真的尽力,但没用也是真的没用。
而且,让人惊悚的是,这个进程每小时都要挂一次,跟我每天起床的闹钟一样准时。
关键,根据我的观察,每次它自动重试之后,读取的都是重复数据(后面架了个 Spark 程序在消费 kafka 写 ES,结果这10多天时间里,ES 的数据量硬生生的就没有一点增长(指定了唯一id),而我的Kafka都快被撑爆了),shit。
算球,不折腾了。
想到这,既然 Doris 满足不了这个大数据量的导出,那试试用 Clickhouse(下称CK) 呢,是不是也会这么矬?
(PS,本文对应组件版本为 CK23.4.1,Spark3.2.0,Flink1.15.2)
0. 上 CK
公平起见,我找了张数据量跟之前 Doris 表一样的 CK 表做对比测试。
当前测试的CK表数据量
之前Doris表的数据量
因为之前的 Doris 是分布式表,所以,这张 CK 表,也得给弄成分片表才是:
CK 表准备好了,接下来,就开始各种尝试吧。
1. 尝试 Flink
从 Flink 官网来看,它并没有提供直接读 CK 的接口,只能通过 JDBC。
但扯淡的是,官方提供的这个所谓的 JDBC 连接包
根本就识别不了 CK 数据库,一跑就这熊样:
告诉你目前只支持这几种常见的数据库。
于是呢,我又跑到 CK 的官网,去找 Flink 怎么读取 CK 表,找了半天发现,它目前只支持 Flink 写 CK:
尼玛的。
于是接着,我又找到了个专门针对CK的 ck-jdbc ,
碰碰运气,看行不行。
结果发现,跟之前一个鸟样,抛的同样异常。
又在网上找了很多资料,说是要改官方提供的 JDBC 源码,我在想,CK 这么主流的数据库都不支持读,那还玩个嘚啊。
果断放弃,投入 Spark 的怀抱。
2. 尝试用 Spark
虽然 Spark 也没有支持直接读 CK 的接口,但人家也有 JDBC ,而且从官网可以得知它支持的版本。
这个0.4.6, 就是上面我引入到 Flink 工程不起作用的那个包,
但是,诶! 人家在 Spark 的工程里,是奏效的。
一番调试,可以顺利跑通的,Spark 读 CK 表,写 Kafka 的代码如下:
package com.anryg.bigdata.jdbc
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* @DESC: Spark通过CK jdbc读取CK表,写 Kafka
* @Auther: Anryg
* @Date: 2024/3/19 15:00
*/
object SparkFromCK2Kafka {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkFromCK2Kafka")/*.setMaster("local[*]")*/
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._ /**引入隐式转换对象*/
/**CK属性配置*/
val propertiesCK = new Properties()
propertiesCK.put("username","default")
propertiesCK.put("password", "")
val rawDF = spark.read
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver") //提交集群运行时,这里需要显示指定driver
.jdbc("jdbc:ch://192.168.211.107:8123,192.168.211.108:8123,192.168.211.109:8123/cluster_db",
"dns_logs_cluster_later",
propertiesCK)
/**讲读取到的数据,按照Kafka的格式要求做简单处理*/
val targetDF = rawDF.map(row => {
val client_ip = row.getString(0)
val domain = row.getString(1)
val time = row.getString(2)
val target_ip = row.getString(3)
var rcode = 99
try {
rcode = row.getInt(4)
} catch {
case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
}
var query_type = 99
try {
query_type = row.getInt(5)
} catch {
case e:Exception => println(row,e)
}
val authority_record = row.getString(6)
val add_msg = row.getString(7)
val dns_ip = row.getString(8)
client_ip + "," + domain + "," + time + "," + target_ip + "," + rcode + "," + query_type + "," + authority_record + "," + add_msg + "," + dns_ip
}).toDF("value") /**key跟value字段,至少需要提供value*/
/**数据写Kafka*/
targetDF.write.format("kafka")
.option("kafka.bootstrap.servers","192.168.211.107:6667")
.option("topic", "test")
.save()
}
}
虽然说代码逻辑非常简单,但是呢,这里面还是有2个值得注意的点:
点1:在写 jdbc 读取 CK 表的代码时,如果提交 YARN 集群运行,则需要显示指定 CK 的 driver (本地运行可以不用),否则,就会抛出下面的异常:
一开始我还以为是 jdbc 的包没打上,心想自己不应该这么没谱呀,后来才发现,包是在的,但需要把下面这行代码给加上:
所以很多时候,像这样的分布式任务,本地能正常跑,不一定代表集群也能跑得很利索,多试错,才能发现问题。
点2:写入到 Kafka 的数据,一定要遵循 Kafka 需要的格式,那就是数据必须得是「key, value」的形式,或者「value」的形式,也就是只能2个字段,或者1个字段,否则是写不进 Kafka 的。
3. 运行情况
通用对 Spark 的尝试,发现是可以顺利做到通过 JDBC 读取 CK 表,然后写 Kafka的。
提交到集群运行之后,经过 4 个多小时的运行,
任务运行中
任务运行完成
虽然看上去效率有点低(这么大数据量,居然默认只有一个partition),但好在,它导入成功了啊,而且整个过程没有抛任何异常,并且,数据确实是写进 Kafka 里了的。
就冲这一点,也值得点赞。
4. 总结
对于 Doris 和 CK 同是8亿+数据量的表导入到 Kafka,从上次跟这次的的测评对比来看,虽然 Doris 支持的 API 要比 CK 略丰富(多了个 Flink 的导入方式)。
但是,从「搞定问题」这个层面来看,那只是表面上的繁荣,因为我们对 Doris 通过了3种方法(Flink、Spark、DataX)的多次尝试,全都都以失败告终。
而对于 CK,虽然 Flink API 暂时来看行不通(Flink 是不是该反思一下),但是我们用 Spark,一下子就能导入成功,结果比较让人欣喜。
虽然说,通过导出 Doris 或者 CK 表的巨量数据到 Kafka (或者其他地方),这种场景在我们的日常项目并不十分常见,但是,在某些时候,比如数据迁移,数据库升级等情况下,还是很有必要的。
那么对于这次测评的结论,你心里是不是已经有了答案?