Flink CDC 3.0 耍起来到底怎么样?

来源:安瑞哥是码农

这不前不久 Flink 官方发出喜讯,说Flink CDC 3.0 release 了,从官方介绍来看,相较以往的版本有比较大的改变。


这一下就引起了我想体验一把的兴趣,从之前我测试过的Flink CDC 案例来看,感觉还不错,不知道最新的版本,又做出了哪些升级跟优化。


从 Flink 官方公众号发布的内容来看,Flink CDC 3.0 的主要特征变化有如下3点:


1. 软件架构变了,由原来只是 Flink 代码开发过程中,对数据源的接入部分编码,改为独立于 Flink 代码开发之外的,单独的数据接入工具(软件),类似 sqoop、dataX 的玩法(但依然要依赖基础的 Flink 环境);


2. 数据接入方式的改变,之前 Flink CDC 接入数据,依靠的是在 Flink 程序中,通过编码的方式来指定数据源(Source),以及数据目的地(Sink),现在改为通过配置文件配置的方式就能实现;


3. 通过数据源(数据目的地)连接插件(jar包),也叫pipeline connector 方式,可插拔式添加需要的数据源(Source),和数据目的地(Sink),跟 DataX 的玩法一样。


还是老规矩,虽然说新版本做了这么多的改变,那到底用起来如何?咱拭目以待。



0. 先读官网


虽然3.0之前的版本咱已经通读过一遍官网了,但毕竟新版本跟之前的比,做了较大的改变,所以还是得再看一眼,怎么样才能把它给顺利耍起来。


老规矩,先看兼容性,因为即便新版本的CDC在架构上,跟原来的 Flink 做了分离,但是在使用时,还是要依赖基础的 Flink 环境。



从这个表格来看,新版本的Flink CDC 支持 Flink 1.14 及以上版本(但从我的实际验证来看不是的)。


而我当前的 Flink 是1.15的版本,意味着我不需要做任何额外的调整就可以玩起来了。


事先要说明的是:官网目前针对 Flink CDC 3.0 的 demo 演示,是基于 Flink 1.18 的 standalone 模式展开的,但咱为了更贴合项目实际环境,跟它玩的方式会有些不一样,因为我没有 standalone 环境


接下来,我们以 Flink CDC 3.0 (配合 Flink 1.15) 同步 MySQL8 整库到 Doris 2.0.3 为例展开说明



1. 环境准备


Flink 是匹配的环境,那么现在就只需要下载最新版本的 Flink CDC 安装包。


下载地址:https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz。


可以直接在你的 Linux 命令行界面,用 wget 下载到你的目标目录下:


wget https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz


解压之后,得到这样式的一个文件布局:



整个结构主打的是一个“轻奢简洁风”。


bin 目录:里面就一个脚本,所有数据导入的功能全仰仗它;


conf:除开一个日志的配置文件外,其他的,就是根据你的数据导入要求,配置的一个个从数据源到数据目的地的详细说明;


lib:最开始也只有一个 CDC 的基础 jar 包,后面根据你的导入需要,再添加各个数据source,以及数据sink的 pipeline connector jar 包;


log:记录 CDC 程序导入过程中的详细过程日志。


观察完了整个目录结构之后,接下来,就要告诉这个 CDC 软件,你的 Flink home 搁哪呢,否则你的程序一启动,就会抛出下面的异常:



这里会提示你需要设置「Flink Home」的环境变量,在哪设置呢?


你可以设置全局的,也可以只针对专门的用户,设置局部的,我这里因为只需要针对 flink 这个用户,所以,只需要修改 flink 这个用户下的概要文件(vi /home/flink/.bash_profile)就可以了:



根据它的要求,添加「Flink Home」变量,至于剩下的3个变量,那是之前针对 Flink 环境时添加的,不用动。


PS:修改完之后,记得用 source 命令让它生效哦。


接下来,因为CDC程序需要对接的数据源,跟数据目的地分别是MySQL 跟 Doris,所以需要下载对应的 pipeline connector 到上面的那个 lib 目录下(具体下载地址去官网找):



基础环境准备好了,接下来,咱正式开耍。



2. 跑起来


最新版本 CDC 的一个特点之一,就是根据配置文件来执行数据的导入任务,所以,咱接下来的一步,就是配置数据源(Source),跟数据目的地(Sink)。


配置文件内容如下:


这里面有个特别注意的地方,就是这个时区的设置,官网给的 demo 中用的UTC:


我猜写这个文档人,要么就是没有实际跑过,要么就是他不是咱中国人,否则程序会因为跟MySQL 服务端的时区设置不一致报错滴:



既然这样,那咱给时区设置改过来就好了,但你以为这样就完事了吗?


真正的坑,其实是在后面等着你呢。


2.1 坑1:不能直接用 on yarn 模式


从官网提供的说明案例来看,人家是从头开始 build 了一套全新的Flink standalone 环境的,然后很自然的用下面这个提交命令,就跑起来了(提交到 standalone 环境了)。


bin/flink-cdc.sh conf/mysql2doris.yml

可对于我来说,咱没有 Flink standalone 环境啊,我只有 yarn,是不是就不配了呢?


好像还真是。


对于这个最新版本的 CDC 如何在 yarn 上面运行,我研究了好久,但无果。


一来它官网没有说明怎么设置(我没找到);二来我在它目前提供的脚本程序,以及配置文件里都没有找到相关配置


咋整?我抱着运行上面这个命令试试看的心理,结果,不出意外,就出意外了:


从异常提示来看,显然它要找的这个8081端口就是 standalone 模式下的 http 端口,但咱,没有。


果然是不配!


那既然暂时不能通过 on yarn 以集群方式运行,还有别的招不?


有,本地模式!


矬是矬了点,但好歹能跑起来不是吗?就知足吧!


但是...


2.2 坑2:跟Flink 1.15 不兼容


 我是怎么知道的呢?


上面不是说运行方式改成了本地模式嘛,那么运行命令就变成了这样:


bin/flink-cdc.sh conf/mysql2doris.yml --use-mini-cluster

结果一启动,就给我抛了这个异常:



面对这个错,我是横看竖看、上看下看,研究了半天,最后猜测,大概可能也许是跟当前 Flink 版本(1.15)不兼容导致的。


于是,我果断下载了最新版本的 Flink(1.18),一顿猛如虎的操作之后,我把刚才对 Flink home 的设置改成了下面这样:


修改完,还是别忘记 source 一下让它即时生效。


果不其然,升级完 Flink 之后,刚才那个错误就不见了,所以再次证明,官网提供的信息(Flink 1.15 跟 Flink CDC 3.0 的兼容性),有时候不一定就是对的,你得有自己的怀疑跟判断。



2.3 注意点3:需要提前建库名


这个虽然算不上什么坑,但我觉得还是需要你注意。


那就是,根据官网提供的案例,咱也准备将MySQL(8.0)其中一个库(test库)的所有表,给同步到 Doris (不做表名映射)。


但是我忘记提前给 Doris 创建相同的数据库名了,也就是在 Doris 里,没有 test 这个库。


于是,在CDC程序启动的时候,就会抛出下面的异常:



解决办法就是,把它找不到的这个 test 库名给建起来。



2.4 正常运行时的亚子


一切故障扫清之后,再次启动 CDC 导入进程,就如下图看到的这样:


 

表面看,它是卡在这里一动不动,但实际上,它的后台则在暗流涌动。


打开它的日志,可以清楚的看到它在不间断的滚动,证明程序是在干活的。


这下可以放心了。



3. 查看结果,吓你一跳


由于程序正常启动了,所以数据同步就应该不是问题,很快这个库下面的6张表就同步了过来。



检查之后发现,数据同步的表数量,以及每张表的历史记录条,都能保持一致,并且,数据同步的表结构和字段类型,也能根据各自的映射规范保持一致。


拿其中一张表 test02 来举个栗子:


MySQL的原始记录数


同步到Doris之后的记录数


但是,当我往源 MySQL 表的 test02 再次写入几条记录之后,你猜怎么着?


Doris这边对应表的记录数量,居然丝毫没有变化


MySQL再次写入4条记录之后

Doris对应表的记录数没有增加

关键看它的日志输出,也没有丝毫动静(没有感知到数据变化,当然,程序肯定没挂):


要知道,同样就是这个库,我之前测试过 Flink CDC 2.x版本的时候(也是同步到 Doris),可没有这个问题


随后,我又尝试在 MySQL 这边新建一张表 test07,同样,Doris 依然感知不到


这坑爹的玩意!莫非是有哪个神秘的配置被我漏了吗?


算了,不再继续试错了,测试结果已然低于预期,不满意!



最后


虽说最新的 Flink CDC 3.0 做了很多方面的改变,咱就是说,你要是往着好用,且能保住基本功能的同时做出的改变,那还行。


但你要是改着改着,虽然在编码上免去了使用者的工作量(用配置文件代替),但在功能上却还不如之前的老版本,个人认为会比较遗憾。


从这次测试来看,用新版本 Flink CDC 3.0 同步 MySQL8 整库到 Doris2.0.3,不能使用 on yarn的方式,且只有历史数据部分能同步成功,至于数据的增量修改部分,暂时不行


希望以上提到的不足,能很快得到解决,否则,升级的意义在哪?

请使用浏览器的分享功能分享到微信等