来源:安瑞哥是码农
上篇文章写了 Clickhouse 可以在不通过外部工具帮助的情况下,用创建mysql的外部库方式,来同步开启了binlog功能的整个mysql库下的表,实现真正意义上的CDC(历史数据+变化数据)同步。
那么今天这篇文章,就来接着说说,对于Doris这款数据来说,将如何做到对mysql CDC的,官网宣传的一键同步整库,到底靠不靠谱?
0. Doris 对 mysql 数据导入的支持
在开始聊Doris对mysql CDC的支持之前,我们先来看看Doris官方文档对mysql的支持有哪些。
打开官网,通过搜索关键字「mysql」可以得到如下一众结果:

从搜索的结果来看,其中支持mysql CDC的有:
1. MySQL binlog Load:这个除了需要mysql支持binlog功能外,还需要额外借助 cancel 这个工具;
2. MySQL to Doris(生态扩展):利用额外开发的脚本,同步mysql数据到Doris。
对于第1个来说,因为要用到 cancel,所以这算是一个同步mysql数据的“通用”解决方案,根据我的经验,这个肯定是可行的,但个人认为实现步骤有些麻烦,所以暂时不想做尝试。
对于第2种,是针对Doris定制的一种mysql数据导入策略,根据Doris的VP介绍说,这个方案是一个客户贡献的,一会咱使用起来看看好使不。
但是,比较有意思的是,Doris官方目前主推同步 mysql 数据的方案,并没有出现在刚才那个索引推荐里,而是通过Flink Doris Connector来实现对mysql数据的导入。
那么今天这篇文章,就来聊聊这两种可以实现mysql CDC的方案,到底怎么样?
1. MySQL to Doris方案
从官方文档的篇幅安排来看,给了这个方案很大一块描述的内容,看起来好像挺厉害的样子。

但是,当我把对应的脚本下载下来之后,根据文档描述的步骤来操作时,很快就失望了。
文档描述说,可以根据需要,配置你要导入的mysql源表,以及最终的Doris表。
于是我在 mysql 中分别创建了一个表结构简单的小表,跟一张表结构稍微复杂一点的大表,然后根据它的要求,一步步执行配置、导入。
最后,一顿操作猛如虎(各种尝试之后),发现,就导入了那张简单表的表结构,其他啥玩意都没有。
(整个操作过程略...)
总之,该方案很坑爹,强烈建议把文档结构调整一下,既然不靠谱,就不要占用这么大的篇幅,容易误导像我这种严重依赖官方文档的人。
2. Flink CDC 方案趟坑
既然是官方最推荐的,那咱高低得试一试,但是这样一来呢,就凸显出一个问题。
那便是,Doris 想要顺利、高效地同步 mysql 数据源,现阶段必须得借助外力,而对比之下,Clickhouse 是完全可以利用它自身的功能,就能实现对mysql表同步的()。
也就是说,Doris官方推荐的这个数据导入方案,必须要你提前具备Flink环境(开发和运行都要)。
其实通过Flink CDC读取mysql写Doris的案例,我之前写过一篇文章(),描述了如何通过将一张具体的mysql目标表,写入到Doris的过程。
只不过,这个案例当时只能一次同步一张,而且还需要提前在Doris里创建目标表结构,如果导入过程再简单点就最好了。
这不,Doris官方宣称较新版本(1.4及往后版本)的 flink-doris-connector,可以满足所谓「一键整库同步」,这样一来,就只需要指定 mysql 数据源端的连接方式和库名,而到Doris端的目标表结构,则会根据这两个数据库之间,对应的映射关系主动创建(连库名都不用自己建)。

摘自Doris官方公众号
既然都这么说了,那咱必须得来试一波。
从官方文档的描述来看,用这种导入方案是相当简单的:

可以看到,描述中说,只要你使用这个对应的 connector 包,然后输入对应的数据库配置,就能实现对整库表的导入了。
但真的有这么简单吗?我表示严重怀疑。
来,根据我当下的Flink版本(1.15),我从maven中央仓库下载了 flink-doris-connector-1.15-1.4.0.jar 这个包。
然后根据文档要求,启动下面的命令:
flink run -t yarn-session -Dyarn.application.id=application_1696747391642_15889\
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=3 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./flink-doris-connector-1.15-1.4.0.jar \
mysql-sync-database \
--database from_mysql8 \
--mysql-conf hostname=192.168.221.173 \
--mysql-conf port=3306 \
--mysql-conf username=xxx \
--mysql-conf password=xxxxx \
--mysql-conf database-name=test \
--sink-conf fenodes=192.168.221.174:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://192.168.221.174:9030 \
--sink-conf sink.label-prefix=mysql2doris01 \
--table-conf replication_num=2
用的yarn-session运行模式
就是这么一个简单的提交命令,却是引发一系列「噩梦」的开始(就说没那么简单),接下来遇到的坑,让我轻微的有点怀疑人生。
2.1 坑一
刚一提交上面这个命令,还没等我反映过来,就抛出下面这个异常:

很明显,缺包。
一查呢,缺的是 flink-connector-debezium-2.4.0.jar 这个包(至于版本我是怎么知道,因为之前开发的flink软件工程中可以查到),于是就给加上吧,加到哪呢?
因为Flink的提交命令不像spark,可以添加依赖包的路径,这里就只能暂时放到Flink HOME目录的lib子目录中。
再次提交任务,该问题消失。
2.2 坑二
但是很快,另一个问题又来了,又抛出了下面这个错:

再一查呢,又是缺包,缺谁呢?缺 flink-connector-mysql-cdc-2.4.0.jar。
用同样的办法,下载之后给加到Flink的lib目录下,这个问题也消失了。
2.3 坑三
接着,还是缺包:

缺 mysql-connector-java-8.0.28.jar,补上。
2.4 坑四
还没完,还是缺:

这回缺的是 debezium-api-1.9.7.Final.jar,继续补。
2.5 坑五
这回缺的是 connect-json-3.2.0.jar。

补!
2.6 坑六
又缺,缺的是 connect-api-3.2.0.jar。

还补!
2.7 坑七
等第7个坑出现的时候,我实在忍不了了,因为这次抛出的异常,跟第五个坑抛出来的异常是一毛一样的。

也就是说,现在已经不再是缺包导致的问题了,而是引入的jar包太多导致冲突了。
这特喵的可咋整?感觉把自己一步步逼到悬崖边。
3. 正确的玩法
片刻冷静之后,我转念一想,咦... 我上次用Flink CDC同步mysql到Doris,不是用的软件工程开发的吗?而且期间还解决过jar包冲突问题,最关键是,这次同步需要的核心jar包,上次好像已经引入过了耶。
只不过区别在于,之前我用的1.2的版本,现在因为需要同步整库,要用到1.4版本。

于是果断将软件工程的,该connector的版本升级到1.4.0.
然后将该工程编译打包,得到一个已经解决过jar包冲突问题,且含各种必要依赖的fat jar。

这样一来呢,之前那些下载的所有依赖包就都不再需要了,因为这个包已经全部包括在内。
接下来,就将这个fat jar上传到flink服务器(客户端),然后启动下面的命令:
flink run -t yarn-session -Dyarn.application.id=application_1696747391642_15889\
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=3 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./flink-coding-1.0-SNAPSHOT-with-dependencies.jar \
mysql-sync-database \
--database from_mysql8 \
--mysql-conf hostname=192.168.221.173 \
--mysql-conf port=3306 \
--mysql-conf username=xxx \
--mysql-conf password=xxxxx \
--mysql-conf database-name=test \
--sink-conf fenodes=192.168.221.174:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://192.168.221.174:9030 \
--sink-conf sink.label-prefix=mysql2doris01 \
--table-conf replication_num=2
关于这个命令中的一些参数,详情去参考官方文档,说的很清楚,这里就不赘述了。
这一次的运行,就变得非常丝滑,没有任何异常抛出来。

最后可以看到,mysql test库中的6张表全部同步过来了,其中的数据量也是一条不差:

从mysql的test库同步过来的所有表
后续通过分别操作mysql源表的新增,修改,删除之后,Doris表都可以实时捕捉到变化。
(同样,篇幅原因,这里不展示,具体的下次直播给大家看)
这一次,终于靠谱了。
总结
从以上的实践论证来看,通过利用开发环境的IDE,集成Flink CDC必要的依赖(flink-doris-connector-1.4.0),然后将其build成一个fat jar.
之后,再根据官方文档给出的flink提交命令,指定相应的mysql,以及Doris配置,才能完成所谓的mysql「整库一键同步」。
而如果你只是简单的直接根据官网给出的示例来操作,肯定是搞不定的。
要不怎么说,凡事需要自己动手实践呢,你如果完全轻信文档的说明,而不加以思考,会很容易被带沟里。
另外,如果要跟 Clickhouse 做个对比的话(上篇文章内容),在能够满足相同功能的同时,CK的实现显然要更简单一些。
至于Doris跟CK各自提供的 mysql CDC功能的稳定性如何,咱还需要通过时间来验证。